WIP async db

Fixes inherent problem on sync db, which spawns a new task for
persistance. This makes the log unordered, which can cause inconsistence
issues.
This commit is contained in:
ppom 2025-09-03 12:00:00 +02:00
commit 582889f71e
No known key found for this signature in database
6 changed files with 104 additions and 96 deletions

View file

@ -54,7 +54,7 @@ pub enum React {
#[allow(clippy::unwrap_used)]
impl FilterManager {
pub fn new(
pub async fn new(
filter: &'static Filter,
exec_limit: Option<Arc<Semaphore>>,
shutdown: ShutdownToken,
@ -65,15 +65,15 @@ impl FilterManager {
filter,
exec_limit,
shutdown,
state: Arc::new(Mutex::new(State::new(filter, db, now)?)),
state: Arc::new(Mutex::new(State::new(filter, db, now).await?)),
};
this.clear_past_triggers_and_schedule_future_actions(now);
Ok(this)
}
pub fn handle_line(&self, line: &str, now: Time) -> React {
pub async fn handle_line(&self, line: &str, now: Time) -> React {
if let Some(match_) = self.filter.get_match(line) {
if self.handle_match(match_, now) {
if self.handle_match(match_, now).await {
React::Trigger
} else {
React::Match
@ -83,7 +83,7 @@ impl FilterManager {
}
}
fn handle_match(&self, m: Match, now: Time) -> bool {
async fn handle_match(&self, m: Match, now: Time) -> bool {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state.clear_past_matches(now);
@ -100,7 +100,7 @@ impl FilterManager {
Some(retry) => {
state.add_match(m.clone(), now);
// Number of stored times for this match >= configured retry for this filter
state.get_times(&m) >= retry as usize
state.get_times(&m).await >= retry as usize
}
};
@ -109,7 +109,7 @@ impl FilterManager {
let actions_left = if Duplicate::Extend == self.filter.duplicate {
// Get number of actions left from last trigger
state
.remove_trigger(&m)
.remove_trigger(&m).await
// Only one entry in the map because Duplicate::Extend
.and_then(|map| map.first_key_value().map(|(_, n)| n.clone()))
} else {
@ -122,7 +122,7 @@ impl FilterManager {
trigger
}
pub fn handle_trigger(
pub async fn handle_trigger(
&self,
patterns: BTreeMap<Arc<Pattern>, String>,
now: Time,
@ -138,7 +138,7 @@ impl FilterManager {
Ok(())
}
pub fn handle_order(
pub async fn handle_order(
&self,
patterns: &BTreeMap<Arc<Pattern>, Regex>,
order: Order,
@ -241,12 +241,12 @@ impl FilterManager {
/// Schedule execution for a given Match.
/// We check first if the trigger is still here
/// because pending actions can be flushed.
fn schedule_exec(
async fn schedule_exec<'a>(
&self,
m: Match,
t: Time,
now: Time,
state: &mut MutexGuard<State>,
state: &'a mut MutexGuard<'_, State>,
startup: bool,
actions_left: Option<u64>,
) {
@ -268,7 +268,7 @@ impl FilterManager {
let m = m.clone();
if exec_time <= now {
if state.decrement_trigger(&m, t, false) {
if state.decrement_trigger(&m, t, false).await {
exec_now(&self.exec_limit, self.shutdown.clone(), action, m);
}
} else {
@ -289,7 +289,7 @@ impl FilterManager {
if !exiting || action.on_exit {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = this.state.lock().unwrap();
if state.decrement_trigger(&m, t, exiting) {
if state.decrement_trigger(&m, t, exiting).await {
exec_now(&this.exec_limit, this.shutdown, action, m);
}
}
@ -298,7 +298,7 @@ impl FilterManager {
}
}
fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) {
async fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) {
let longuest_action_duration = self.filter.longuest_action_duration;
let number_of_actions = self
.filter
@ -349,7 +349,7 @@ impl FilterManager {
}
}
fn exec_now(
async fn exec_now(
exec_limit: &Option<Arc<Semaphore>>,
shutdown: ShutdownToken,
action: &'static Action,

View file

@ -59,7 +59,11 @@ pub struct State {
}
impl State {
pub fn new(filter: &'static Filter, db: &mut Database, now: Time) -> Result<Self, String> {
pub async fn new(
filter: &'static Filter,
db: &mut Database,
now: Time,
) -> Result<Self, String> {
let ordered_times = db.open_tree(
filter_ordered_times_db_name(filter),
filter.retry_duration.unwrap_or_default(),
@ -100,13 +104,13 @@ impl State {
Ok(this)
}
pub fn add_match(&mut self, m: Match, t: Time) {
pub async fn add_match(&mut self, m: Match, t: Time) {
let set = self.matches.entry(m.clone()).or_default();
set.insert(t);
self.ordered_times.insert(t, m);
}
pub fn add_trigger(&mut self, m: Match, t: Time, action_count: Option<u64>) {
pub async fn add_trigger(&mut self, m: Match, t: Time, action_count: Option<u64>) {
// We record triggered filters only when there is an action with an `after` directive
if self.has_after {
// Add the (Match, Time) to the triggers map
@ -125,7 +129,7 @@ impl State {
}
// Completely remove a Match from the matches
pub fn remove_match(&mut self, m: &Match) {
pub async fn remove_match(&mut self, m: &Match) {
if let Some(set) = self.matches.get(m) {
for t in set {
self.ordered_times.remove(t);
@ -135,12 +139,12 @@ impl State {
}
/// Completely remove a Match from the triggers
pub fn remove_trigger(&mut self, m: &Match) -> Option<BTreeMap<Time, u64>> {
self.triggers.remove(m)
pub async fn remove_trigger(&mut self, m: &Match) -> Option<BTreeMap<Time, u64>> {
self.triggers.remove(m).await
}
/// Returns whether we should still execute an action for this (Match, Time) trigger
pub fn decrement_trigger(&mut self, m: &Match, t: Time, exiting: bool) -> bool {
pub async fn decrement_trigger(&mut self, m: &Match, t: Time, exiting: bool) -> bool {
// We record triggered filters only when there is an action with an `after` directive
if self.has_after {
let mut exec_needed = false;
@ -186,7 +190,7 @@ impl State {
}
}
pub fn clear_past_matches(&mut self, now: Time) {
pub async fn clear_past_matches(&mut self, now: Time) {
let retry_duration = self.filter.retry_duration.unwrap_or_default();
while self
.ordered_times
@ -212,14 +216,14 @@ impl State {
}
}
pub fn get_times(&self, m: &Match) -> usize {
pub async fn get_times(&self, m: &Match) -> usize {
match self.matches.get(m) {
Some(vec) => vec.len(),
None => 0,
}
}
fn load_matches_from_ordered_times(&mut self) {
async fn load_matches_from_ordered_times(&mut self) {
for (t, m) in self.ordered_times.iter() {
let set = self.matches.entry(m.clone()).or_default();
set.insert(*t);
@ -342,7 +346,7 @@ mod tests {
trigger_db,
]));
let state = State::new(filter, &mut db, now).unwrap();
let state = State::new(filter, &mut db, now).await.unwrap();
assert_eq!(
state.ordered_times.tree(),
@ -386,7 +390,7 @@ mod tests {
let now_less_4s = now - TimeDelta::seconds(4);
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).unwrap();
let mut state = State::new(filter, &mut db, now).await.unwrap();
assert!(state.ordered_times.tree().is_empty());
assert!(state.matches.is_empty());
@ -427,7 +431,7 @@ mod tests {
let now = Local::now();
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).unwrap();
let mut state = State::new(filter, &mut db, now).await.unwrap();
assert!(state.triggers.tree().is_empty());
@ -437,7 +441,7 @@ mod tests {
assert!(state.triggers.tree().is_empty());
// Will be called immediately after, it returns true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
}
#[tokio::test]
@ -481,7 +485,7 @@ mod tests {
let now_plus_1s = now + TimeDelta::seconds(1);
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).unwrap();
let mut state = State::new(filter, &mut db, now).await.unwrap();
assert!(state.triggers.tree().is_empty());
@ -492,22 +496,22 @@ mod tests {
&BTreeMap::from([(one.clone(), [(now, 3)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 2)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 1)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert!(state.triggers.tree().is_empty());
// Decrement → false
assert!(!state.decrement_trigger(&one, now, false));
assert!(!state.decrement_trigger(&one, now, false).await);
// Add unique trigger (but decrement exiting-like)
state.add_trigger(one.clone(), now, None);
@ -516,28 +520,28 @@ mod tests {
&BTreeMap::from([(one.clone(), [(now, 3)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, true));
assert!(state.decrement_trigger(&one, now, true).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 2)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, true));
assert!(state.decrement_trigger(&one, now, true).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 1)].into())])
);
// Decrement but exiting → true, does nothing
assert!(state.decrement_trigger(&one, now, true));
assert!(state.decrement_trigger(&one, now, true).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 1)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert!(state.triggers.tree().is_empty());
// Decrement → false
assert!(!state.decrement_trigger(&one, now, false));
assert!(!state.decrement_trigger(&one, now, false).await);
// Add trigger with neighbour
state.add_trigger(one.clone(), now, None);
@ -547,25 +551,25 @@ mod tests {
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 2)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 1)].into())])
);
// Decrement → true
assert!(state.decrement_trigger(&one, now, false));
assert!(state.decrement_trigger(&one, now, false).await);
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3)].into())])
);
// Decrement → false
assert!(!state.decrement_trigger(&one, now, false));
assert!(!state.decrement_trigger(&one, now, false).await);
// Remove neighbour
state.remove_trigger(&one);
assert!(state.triggers.tree().is_empty());

View file

@ -66,7 +66,7 @@ pub async fn daemon(
let mut filter_managers = HashMap::new();
for filter in stream.filters.values() {
let manager =
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?;
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now).await?;
filter_managers.insert(filter, manager);
}
state.insert(stream, filter_managers.clone());

View file

@ -51,7 +51,7 @@ async fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
err_str!(UnixListener::bind(path))
}
fn handle_trigger_order(
async fn handle_trigger_order(
stream_name: Option<String>,
filter_name: Option<String>,
patterns: BTreeMap<Arc<Pattern>, String>,
@ -100,13 +100,13 @@ fn handle_trigger_order(
};
let now = Local::now();
match filter_manager.handle_trigger(patterns, now) {
match filter_manager.handle_trigger(patterns, now).await {
Ok(()) => DaemonResponse::Ok(()),
Err(err) => DaemonResponse::Err(err),
}
}
fn handle_show_or_flush_order(
async fn handle_show_or_flush_order(
stream_name: Option<String>,
filter_name: Option<String>,
patterns: BTreeMap<Arc<Pattern>, Regex>,
@ -114,40 +114,39 @@ fn handle_show_or_flush_order(
shared_state: &HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
) -> DaemonResponse {
let now = Local::now();
let cs: ClientStatus = shared_state
let iter = shared_state
.iter()
// stream filtering
.filter(|(stream, _)| {
stream_name.is_none() || stream_name.clone().is_some_and(|name| name == stream.name)
})
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| {
let inner_map = filter_manager
.iter()
// filter filtering
.filter(|(filter, _)| {
filter_name.is_none()
|| filter_name.clone().is_some_and(|name| name == filter.name)
})
// pattern filtering
.filter(|(filter, _)| {
patterns
.iter()
.all(|(pattern, _)| filter.patterns.get(pattern).is_some())
})
.map(|(filter, manager)| {
(
filter.name.to_owned(),
manager.handle_order(&patterns, order, now),
)
})
.collect();
acc.insert(stream.name.to_owned(), inner_map);
acc
});
let mut cs = ClientStatus::new();
for (stream, filter_manager) in iter {
let iter = filter_manager
.iter()
// filter filtering
.filter(|(filter, _)| {
filter_name.is_none() || filter_name.clone().is_some_and(|name| name == filter.name)
})
// pattern filtering
.filter(|(filter, _)| {
patterns
.iter()
.all(|(pattern, _)| filter.patterns.get(pattern).is_some())
});
let mut inner_map = BTreeMap::new();
for (filter, manager) in iter {
inner_map.insert(
filter.name.to_owned(),
manager.handle_order(&patterns, order, now).await,
);
}
cs.insert(stream.name.to_owned(), inner_map);
}
DaemonResponse::Order(cs)
}
fn answer_order(
async fn answer_order(
config: &'static Config,
shared_state: &HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
options: ClientRequest,
@ -182,7 +181,7 @@ fn answer_order(
};
if let Order::Trigger = options.order {
handle_trigger_order(stream_name, filter_name, patterns, shared_state)
handle_trigger_order(stream_name, filter_name, patterns, shared_state).await
} else {
let patterns = match patterns
.into_iter()
@ -206,6 +205,7 @@ fn answer_order(
options.order,
shared_state,
)
.await
}
}
@ -264,7 +264,7 @@ impl Socket {
serde_json::from_slice(&encoded_request)
);
// Process
let response = answer_order(config, &shared_state, request);
let response = answer_order(config, &shared_state, request).await;
// Encode
let encoded_response =
or_next!("failed to serialize response", serde_json::to_string::<DaemonResponse>(&response));

View file

@ -173,7 +173,7 @@ impl StreamManager {
Some(Ok(line)) => {
let now = Local::now();
for manager in self.matching_filters(&line) {
manager.handle_line(&line, now);
manager.handle_line(&line, now).await;
}
}
Some(Err(err)) => {

View file

@ -330,7 +330,7 @@ impl<K: KeyType, V: ValueType> Deref for Tree<K, V> {
// Reimplement write functions
impl<K: KeyType, V: ValueType> Tree<K, V> {
/// Log an [`Entry`] to the [`Database`]
fn log(&mut self, k: &K, v: Option<&V>) {
async fn log(&mut self, k: &K, v: Option<&V>) {
let e = Entry {
tree: self.id.clone(),
key: serde_json::to_value(k).expect("could not serialize key"),
@ -339,36 +339,40 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
};
let tx = self.tx.clone();
// FIXME what if send fails?
tokio::spawn(async move {
let _ = tx.send(e).await;
});
let _ = tx.send(e).await;
}
/// Asynchronously persisted version of [`BTreeMap::insert`]
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
self.log(&key, Some(&value));
pub async fn insert(&mut self, key: K, value: V) -> Option<V> {
self.log(&key, Some(&value)).await;
self.tree.insert(key, value)
}
/// Asynchronously persisted version of [`BTreeMap::pop_first`]
pub fn pop_first(&mut self) -> Option<(K, V)> {
self.tree.pop_first().map(|(key, value)| {
self.log(&key, None);
(key, value)
})
pub async fn pop_first(&mut self) -> Option<(K, V)> {
match self.tree.pop_first() {
Some((key, value)) => {
self.log(&key, None).await;
Some((key, value))
}
None => None,
}
}
/// Asynchronously persisted version of [`BTreeMap::pop_last`]
pub fn pop_last(&mut self) -> Option<(K, V)> {
self.tree.pop_last().map(|(key, value)| {
self.log(&key, None);
(key, value)
})
pub async fn pop_last(&mut self) -> Option<(K, V)> {
match self.tree.pop_last() {
Some((key, value)) => {
self.log(&key, None).await;
Some((key, value))
}
None => None,
}
}
/// Asynchronously persisted version of [`BTreeMap::remove`]
pub fn remove(&mut self, key: &K) -> Option<V> {
self.log(key, None);
pub async fn remove(&mut self, key: &K) -> Option<V> {
self.log(key, None).await;
self.tree.remove(key)
}
@ -376,14 +380,14 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
/// Returning None removes the item if it existed before.
/// Asynchronously persisted.
/// *API design borrowed from [`fjall::WriteTransaction::fetch_update`].*
pub fn fetch_update<F: FnMut(Option<V>) -> Option<V>>(
pub async fn fetch_update<F: FnMut(Option<V>) -> Option<V>>(
&mut self,
key: K,
mut f: F,
) -> Option<V> {
let old_value = self.get(&key).map(|v| v.to_owned());
let new_value = f(old_value);
self.log(&key, new_value.as_ref());
self.log(&key, new_value.as_ref()).await;
if let Some(new_value) = new_value {
self.tree.insert(key, new_value)
} else {