From fe5cd70f7a30ff2d31bba0a964337adb6b9f450f Mon Sep 17 00:00:00 2001 From: ppom Date: Sun, 25 May 2025 12:00:00 +0200 Subject: [PATCH] use tokio::time::interval instead of sleep, for guaranteed flushes --- src/waltree/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/waltree/mod.rs b/src/waltree/mod.rs index 5765da9..e639acf 100644 --- a/src/waltree/mod.rs +++ b/src/waltree/mod.rs @@ -23,7 +23,7 @@ use serde_json::Value; use tokio::{ fs::{rename, File}, sync::mpsc, - time::sleep, + time::{interval, sleep, MissedTickBehavior}, }; use tracing::error; @@ -112,8 +112,11 @@ impl Database { } pub async fn task(&mut self) { - // FIXME this never flushes if entries keep coming, as receiving an entry cancels the sleep - // replace by tokio::time::Interval::Skip + let mut interval = interval(self.flush_every); + // If we missed a tick, it will tick immediately, then wait + // flush_every for the next tick, resulting in a relaxed interval. + // Hoping this will smooth IO pressure when under heavy load. + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { tokio::select! { entry = self.rx.recv() => { @@ -121,7 +124,7 @@ impl Database { break; } } - _ = sleep(self.flush_every) => { + _ = interval.tick() => { if !self.flush().await { break; }