From a7b63b69a8a6577c9ac28c81e287c7e0b30dc2a6 Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 8 Aug 2025 12:00:00 +0200 Subject: [PATCH] Database: finish writing entries when quitting --- src/treedb/mod.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/treedb/mod.rs b/src/treedb/mod.rs index 02086f9..fc0c71d 100644 --- a/src/treedb/mod.rs +++ b/src/treedb/mod.rs @@ -82,7 +82,7 @@ pub struct Database { entry_rx: mpsc::Receiver, /// The sender on [`Tree`] write operations. /// Only used to clone new senders for new Trees. - entry_tx: mpsc::Sender, + entry_tx: Option>, /// The interval at which the database must be flushed to kernel flush_every: Duration, /// The maximum bytes that must be written until the database is rotated @@ -106,7 +106,7 @@ impl Database { path, new_path, entry_rx, - entry_tx, + entry_tx: Some(entry_tx), flush_every: Duration::from_secs(2), max_bytes: 20 * 1024 * 1024, // 20 MiB bytes_written: 0, @@ -121,7 +121,7 @@ impl Database { // flush_every for the next tick, resulting in a relaxed interval. // Hoping this will smooth IO pressure when under heavy load. interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - let status = loop { + let mut status = loop { tokio::select! { entry = self.entry_rx.recv() => { if let Err(err) = self.handle_entry(entry).await { @@ -139,6 +139,18 @@ impl Database { }; }; + // Finish consuming received entries when shutdown asked + if status.is_none() { + self.entry_tx = None; + loop { + let entry = self.entry_rx.recv().await; + if let Err(err) = self.handle_entry(entry).await { + status = err; + break; + } + } + } + // Shutdown let close_status = self .close() @@ -277,6 +289,11 @@ impl Database { where F: Fn((Value, Value)) -> Result<(K, V), String>, { + // Get a clone of the channel sender + let tx = self + .entry_tx + .clone() + .ok_or("Database is closing".to_string())?; // Load the tree from its JSON let tree = if let Some(json_tree) = self.loaded_db.remove(&name) { json_tree @@ -290,7 +307,7 @@ impl Database { id: name, entry_timeout, tree, - tx: self.entry_tx.clone(), + tx, }) }