From ea1bc033c5754036bc199dd51ef88db5dd5babfb Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 5 May 2025 12:00:00 +0200 Subject: [PATCH] WIP --- NOTE | 3 +++ src/daemon/filter.rs | 18 +++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) create mode 100644 NOTE diff --git a/NOTE b/NOTE new file mode 100644 index 0000000..a04e4f9 --- /dev/null +++ b/NOTE @@ -0,0 +1,3 @@ +j'essaie de créer un wtxn dans une autre wtxn et je deadlock. +donc il faudrait que j'envoie la wtxn parente à l'enfant pour soit qu'il l'utilise direct, soit qu'il crée une nested_wtxn. +je galère un peu avec les types mais je vais m'en sortir. diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 62d3b31..5d5b4ce 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -10,11 +10,11 @@ use std::{ use heed::{ byteorder::LittleEndian, types::{SerdeBincode, U64}, - Database, + Database, RwTxn, }; use regex::Regex; use tokio::sync::Semaphore; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ concepts::{Action, Filter, Match, Pattern, Time}, @@ -93,7 +93,7 @@ impl FilterManager { if exec { self.remove_match(&m); self.add_trigger(m.clone(), now); - self.schedule_exec(m, now, now); + self.schedule_exec(m, now, now, None); } } @@ -205,13 +205,13 @@ impl FilterManager { /// Schedule execution for a given Action and Match. /// We check first if the trigger is still here /// because pending actions can be flushed. - fn schedule_exec(&self, m: Match, t: Time, now: Time) { + fn schedule_exec(&self, m: Match, t: Time, now: Time, wtxn: Option<&mut RwTxn>) { for action in self.filter.actions().values() { let exec_time = t + action.after_duration().unwrap_or_default(); let m = m.clone(); if exec_time <= now { - if self.decrement_trigger(&m, t) { + if self.decrement_trigger(&m, t, &mut wtxn) { self.exec_now(action, m); } } else { @@ -229,7 +229,7 @@ impl FilterManager { _ = this.shutdown.wait() => true, }; // Exec action if triggered hasn't been already flushed - if (!exiting || action.on_exit()) && this.decrement_trigger(&m, t) { + if (!exiting || action.on_exit()) && this.decrement_trigger(&m, t, &mut wtxn) { this.exec_now(action, m); } }); @@ -291,11 +291,11 @@ impl FilterManager { } /// Returns whether we should still execute an action for this (Match, Time) trigger - fn decrement_trigger(&self, m: &Match, t: Time) -> bool { + fn decrement_trigger(&self, m: &Match, t: Time, wtxn: &mut RwTxn) -> bool { // We record triggered filters only when there is an action with an `after` directive if self.has_after { let mut exec_needed = false; - let mut wtxn = self.env.write_txn().unwrap(); + let mut wtxn = self.env.nested_write_txn(wtxn).unwrap(); let mt = MatchTime { m: m.clone(), t }; let count = self.triggers.get(&wtxn, &mt).unwrap(); if let Some(count) = count { @@ -359,7 +359,7 @@ impl FilterManager { .put(&mut wtxn, &mt, &(number_of_actions as u64)) .unwrap(); // Schedule the upcoming times - self.schedule_exec(mt.m, mt.t, now); + self.schedule_exec(mt.m, mt.t, now, Some(&mut wtxn)); } else { self.triggers.delete(&mut wtxn, &mt).unwrap(); }