This commit is contained in:
ppom 2025-05-05 12:00:00 +02:00
commit ea1bc033c5
No known key found for this signature in database
2 changed files with 12 additions and 9 deletions

3
NOTE Normal file
View file

@ -0,0 +1,3 @@
j'essaie de créer un wtxn dans une autre wtxn et je deadlock.
donc il faudrait que j'envoie la wtxn parente à l'enfant pour soit qu'il l'utilise direct, soit qu'il crée une nested_wtxn.
je galère un peu avec les types mais je vais m'en sortir.

View file

@ -10,11 +10,11 @@ use std::{
use heed::{
byteorder::LittleEndian,
types::{SerdeBincode, U64},
Database,
Database, RwTxn,
};
use regex::Regex;
use tokio::sync::Semaphore;
use tracing::{error, info};
use tracing::{debug, error, info};
use crate::{
concepts::{Action, Filter, Match, Pattern, Time},
@ -93,7 +93,7 @@ impl FilterManager {
if exec {
self.remove_match(&m);
self.add_trigger(m.clone(), now);
self.schedule_exec(m, now, now);
self.schedule_exec(m, now, now, None);
}
}
@ -205,13 +205,13 @@ impl FilterManager {
/// Schedule execution for a given Action and Match.
/// We check first if the trigger is still here
/// because pending actions can be flushed.
fn schedule_exec(&self, m: Match, t: Time, now: Time) {
fn schedule_exec(&self, m: Match, t: Time, now: Time, wtxn: Option<&mut RwTxn>) {
for action in self.filter.actions().values() {
let exec_time = t + action.after_duration().unwrap_or_default();
let m = m.clone();
if exec_time <= now {
if self.decrement_trigger(&m, t) {
if self.decrement_trigger(&m, t, &mut wtxn) {
self.exec_now(action, m);
}
} else {
@ -229,7 +229,7 @@ impl FilterManager {
_ = this.shutdown.wait() => true,
};
// Exec action if triggered hasn't been already flushed
if (!exiting || action.on_exit()) && this.decrement_trigger(&m, t) {
if (!exiting || action.on_exit()) && this.decrement_trigger(&m, t, &mut wtxn) {
this.exec_now(action, m);
}
});
@ -291,11 +291,11 @@ impl FilterManager {
}
/// Returns whether we should still execute an action for this (Match, Time) trigger
fn decrement_trigger(&self, m: &Match, t: Time) -> bool {
fn decrement_trigger(&self, m: &Match, t: Time, wtxn: &mut RwTxn) -> bool {
// We record triggered filters only when there is an action with an `after` directive
if self.has_after {
let mut exec_needed = false;
let mut wtxn = self.env.write_txn().unwrap();
let mut wtxn = self.env.nested_write_txn(wtxn).unwrap();
let mt = MatchTime { m: m.clone(), t };
let count = self.triggers.get(&wtxn, &mt).unwrap();
if let Some(count) = count {
@ -359,7 +359,7 @@ impl FilterManager {
.put(&mut wtxn, &mt, &(number_of_actions as u64))
.unwrap();
// Schedule the upcoming times
self.schedule_exec(mt.m, mt.t, now);
self.schedule_exec(mt.m, mt.t, now, Some(&mut wtxn));
} else {
self.triggers.delete(&mut wtxn, &mt).unwrap();
}