Database: finish writing entries when quitting

This commit is contained in:
ppom 2025-08-08 12:00:00 +02:00
commit a7b63b69a8
No known key found for this signature in database

View file

@ -82,7 +82,7 @@ pub struct Database {
entry_rx: mpsc::Receiver<Entry>,
/// The sender on [`Tree`] write operations.
/// Only used to clone new senders for new Trees.
entry_tx: mpsc::Sender<Entry>,
entry_tx: Option<mpsc::Sender<Entry>>,
/// The interval at which the database must be flushed to kernel
flush_every: Duration,
/// The maximum bytes that must be written until the database is rotated
@ -106,7 +106,7 @@ impl Database {
path,
new_path,
entry_rx,
entry_tx,
entry_tx: Some(entry_tx),
flush_every: Duration::from_secs(2),
max_bytes: 20 * 1024 * 1024, // 20 MiB
bytes_written: 0,
@ -121,7 +121,7 @@ impl Database {
// flush_every for the next tick, resulting in a relaxed interval.
// Hoping this will smooth IO pressure when under heavy load.
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let status = loop {
let mut status = loop {
tokio::select! {
entry = self.entry_rx.recv() => {
if let Err(err) = self.handle_entry(entry).await {
@ -139,6 +139,18 @@ impl Database {
};
};
// Finish consuming received entries when shutdown asked
if status.is_none() {
self.entry_tx = None;
loop {
let entry = self.entry_rx.recv().await;
if let Err(err) = self.handle_entry(entry).await {
status = err;
break;
}
}
}
// Shutdown
let close_status = self
.close()
@ -277,6 +289,11 @@ impl Database {
where
F: Fn((Value, Value)) -> Result<(K, V), String>,
{
// Get a clone of the channel sender
let tx = self
.entry_tx
.clone()
.ok_or("Database is closing".to_string())?;
// Load the tree from its JSON
let tree = if let Some(json_tree) = self.loaded_db.remove(&name) {
json_tree
@ -290,7 +307,7 @@ impl Database {
id: name,
entry_timeout,
tree,
tx: self.entry_tx.clone(),
tx,
})
}