Separate treedb into its own crate

This commit is contained in:
ppom 2025-12-14 12:00:00 +01:00
commit f414245168
No known key found for this signature in database
15 changed files with 214 additions and 192 deletions

16
Cargo.lock generated
View file

@ -2841,6 +2841,7 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-subscriber",
"treedb",
]
[[package]]
@ -3962,6 +3963,21 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "treedb"
version = "1.0.0"
dependencies = [
"chrono",
"futures",
"serde",
"serde_json",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "try-lock"
version = "0.2.5"

View file

@ -34,7 +34,7 @@ assets = [
[dependencies]
# Time types
chrono = { workspace = true }
chrono.workspace = true
# CLI parsing
clap = { version = "4.5.4", features = ["derive"] }
# Unix interfaces
@ -48,17 +48,19 @@ serde_json.workspace = true
serde_yaml = "0.9.34"
jrsonnet-evaluator = "0.4.2"
# Error macro
thiserror = "1.0.63"
thiserror.workspace = true
# Async runtime & helpers
futures = { workspace = true }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-util = { workspace = true, features = ["codec"] }
# Async logging
tracing = "0.1.40"
tracing.workspace = true
tracing-subscriber = "0.3.18"
# Database
treedb.workspace = true
# Reaction plugin system
remoc = { workspace = true }
reaction-plugin = { workspace = true }
remoc.workspace = true
reaction-plugin.workspace = true
[build-dependencies]
clap = { version = "4.5.4", features = ["derive"] }
@ -69,13 +71,20 @@ tracing = "0.1.40"
[dev-dependencies]
rand = "0.8.5"
tempfile = "3.12.0"
treedb.workspace = true
treedb.features = ["test"]
tempfile.workspace = true
assert_fs.workspace = true
assert_cmd = "2.0.17"
predicates = "3.1.3"
[workspace]
members = ["plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugins/reaction-plugin-virtual"]
members = [
"crates/treedb",
"plugins/reaction-plugin",
"plugins/reaction-plugin-cluster",
"plugins/reaction-plugin-virtual"
]
[workspace.dependencies]
assert_fs = "1.1.3"
@ -84,6 +93,10 @@ futures = "0.3.30"
remoc = { version = "0.18.3" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1.0.117", features = ["arbitrary_precision"] }
tempfile = "3.12.0"
thiserror = "1.0.63"
tokio = { version = "1.40.0" }
tokio-util = { version = "0.7.12" }
tracing = "0.1.40"
reaction-plugin = { path = "plugins/reaction-plugin" }
treedb = { path = "crates/treedb" }

23
crates/treedb/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "treedb"
version = "1.0.0"
edition = "2024"
[features]
test = []
[dependencies]
chrono.workspace = true
futures.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio.features = ["rt-multi-thread", "macros", "io-util", "time", "fs", "tracing"]
tokio-util.workspace = true
tokio-util.features = ["rt"]
tracing.workspace = true
[dev-dependencies]
tempfile.workspace = true

View file

@ -6,7 +6,7 @@ use std::{
use chrono::DateTime;
use serde_json::Value;
use crate::concepts::{Match, MatchTime, Time};
use crate::time::Time;
/// Tries to convert a [`Value`] into a [`String`]
pub fn to_string(val: &Value) -> Result<String, String> {
@ -50,8 +50,8 @@ pub fn to_time(val: &Value) -> Result<Time, String> {
string_to_time(val.as_str().ok_or("not a string number")?)
}
/// Tries to convert a [`Value`] into a [`Match`]
pub fn to_match(val: &Value) -> Result<Match, String> {
/// Tries to convert a [`Value`] into a [`Vec<String>`]
pub fn to_match(val: &Value) -> Result<Vec<String>, String> {
val.as_array()
.ok_or("not an array")?
.iter()
@ -59,15 +59,6 @@ pub fn to_match(val: &Value) -> Result<Match, String> {
.collect()
}
/// Tries to convert a [`Value`] into a [`MatchTime`]
pub fn to_matchtime(val: &Value) -> Result<MatchTime, String> {
let map = val.as_object().ok_or("not an object")?;
Ok(MatchTime {
m: to_match(map.get("m").ok_or("no m in object")?)?,
t: to_time(map.get("t").ok_or("no t in object")?)?,
})
}
/// Tries to convert a [`Value`] into a [`BTreeSet<Time>`]
pub fn to_timeset(val: &Value) -> Result<BTreeSet<Time>, String> {
val.as_array()
@ -90,8 +81,6 @@ pub fn to_timemap(val: &Value) -> Result<BTreeMap<Time, u64>, String> {
mod tests {
use std::collections::BTreeMap;
use serde_json::Map;
use super::*;
#[test]
@ -165,62 +154,6 @@ mod tests {
assert!(to_timeset(&(None::<String>.into())).is_err());
}
#[test]
fn test_to_matchtime() {
assert_eq!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), "12345678".into()),
])
.into_iter()
))),
Ok(MatchTime {
m: vec!["plip".into(), "ploup".into()],
t: Time::from_nanos(12345678),
})
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), ["plip", "ploup"].into()),]).into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("t".into(), 12345678.into()),]).into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), "ploup".into()), ("t".into(), 12345678.into()),])
.into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), [1234567].into()),
])
.into_iter()
)))
.is_err()
);
assert!(to_timeset(&([""].into())).is_err());
assert!(to_timeset(&(["ploup"].into())).is_err());
assert!(to_timeset(&(true.into())).is_err());
assert!(to_timeset(&(8.into())).is_err());
assert!(to_timeset(&(None::<String>.into())).is_err());
}
#[test]
fn test_to_timemap() {
let time1 = 1234567;

View file

@ -17,7 +17,6 @@ use std::{
time::Duration,
};
use reaction_plugin::shutdown::ShutdownToken;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
@ -25,16 +24,16 @@ use tokio::{
sync::{mpsc, oneshot},
time::{MissedTickBehavior, interval},
};
use crate::concepts::{Config, Time, now};
pub mod helpers;
use tokio_util::{sync::CancellationToken, task::task_tracker::TaskTrackerToken};
// Database
use raw::{ReadDB, WriteDB};
use time::{Time, now};
pub mod helpers;
mod raw;
pub mod time;
/// Any order the Database can receive
enum Order {
@ -63,13 +62,11 @@ pub type LoadedDB = HashMap<String, LoadedTree>;
const DB_NAME: &str = "reaction.db";
const DB_NEW_NAME: &str = "reaction.new.db";
impl Config {
fn path_of(&self, name: &str) -> PathBuf {
if self.state_directory.is_empty() {
fn path_of(state_directory: &Path, name: &str) -> PathBuf {
if state_directory.as_os_str().is_empty() {
name.into()
} else {
PathBuf::from(&self.state_directory).join(name)
}
PathBuf::from(state_directory).join(name)
}
}
@ -90,9 +87,13 @@ impl Database {
/// to have the Database properly quit.
///
/// You can wait for [`Self::quit`] returned channel to know how it went.
pub async fn open(config: &Config, shutdown: ShutdownToken) -> Result<Database, IoError> {
let (manager, entry_tx) = DatabaseManager::open(config).await?;
let error_rx = manager.manager(shutdown);
pub async fn open(
path_directory: &Path,
cancellation_token: CancellationToken,
task_tracker_token: TaskTrackerToken,
) -> Result<Database, IoError> {
let (manager, entry_tx) = DatabaseManager::open(path_directory).await?;
let error_rx = manager.manager(cancellation_token, task_tracker_token);
Ok(Self {
entry_tx: Some(entry_tx),
error_rx,
@ -133,9 +134,11 @@ struct DatabaseManager {
}
impl DatabaseManager {
pub async fn open(config: &Config) -> Result<(DatabaseManager, mpsc::Sender<Order>), IoError> {
let path = config.path_of(DB_NAME);
let new_path = config.path_of(DB_NEW_NAME);
pub async fn open(
path_directory: &Path,
) -> Result<(DatabaseManager, mpsc::Sender<Order>), IoError> {
let path = path_of(path_directory, DB_NAME);
let new_path = path_of(path_directory, DB_NEW_NAME);
let (write_db, loaded_db) = rotate_db(&path, &new_path, true).await?;
@ -156,7 +159,11 @@ impl DatabaseManager {
))
}
pub fn manager(mut self, shutdown: ShutdownToken) -> oneshot::Receiver<Result<(), String>> {
pub fn manager(
mut self,
cancellation_token: CancellationToken,
_task_tracker_token: TaskTrackerToken,
) -> oneshot::Receiver<Result<(), String>> {
let (error_tx, error_rx) = oneshot::channel();
tokio::spawn(async move {
let mut interval = interval(self.flush_every);
@ -168,17 +175,17 @@ impl DatabaseManager {
tokio::select! {
order = self.entry_rx.recv() => {
if let Err(err) = self.handle_order(order).await {
shutdown.ask_shutdown();
cancellation_token.cancel();
break err;
}
}
_ = interval.tick() => {
if let Err(err) = self.flush().await {
shutdown.ask_shutdown();
cancellation_token.cancel();
break Some(err);
}
}
_ = shutdown.wait() => break None
_ = cancellation_token.cancelled() => break None
};
};
@ -467,6 +474,35 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
self.tree.remove(&key)
}
}
#[cfg(any(test, feature = "test"))]
pub fn tree(&self) -> &BTreeMap<K, V> {
&self.tree
}
}
#[cfg(any(test, feature = "test"))]
impl DatabaseManager {
pub fn set_loaded_db(&mut self, loaded_db: LoadedDB) {
self.loaded_db = loaded_db;
}
}
#[cfg(any(test, feature = "test"))]
impl Database {
pub async fn from_dir(dir_path: &Path, loaded_db: Option<LoadedDB>) -> Result<Self, IoError> {
use tokio_util::task::TaskTracker;
let (mut manager, entry_tx) = DatabaseManager::open(dir_path).await?;
if let Some(loaded_db) = loaded_db {
manager.set_loaded_db(loaded_db)
}
let error_rx = manager.manager(CancellationToken::new(), TaskTracker::new().token());
Ok(Self {
entry_tx: Some(entry_tx),
error_rx,
})
}
}
#[cfg(test)]
@ -474,75 +510,14 @@ mod tests {
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
io::Error as IoError,
path::Path,
time::Duration,
};
use reaction_plugin::shutdown::ShutdownController;
use serde_json::Value;
use tempfile::{NamedTempFile, TempDir};
use tokio::fs::{File, write};
use tokio::fs::File;
use crate::concepts::{Config, Time, now};
use super::{
DB_NAME, Database, DatabaseManager, Entry, KeyType, LoadedDB, Tree, ValueType, helpers::*,
raw::WriteDB, rotate_db,
};
impl DatabaseManager {
pub fn set_loaded_db(&mut self, loaded_db: LoadedDB) {
self.loaded_db = loaded_db;
}
}
impl Database {
pub async fn from_dir(
dir_path: &Path,
loaded_db: Option<LoadedDB>,
) -> Result<Self, IoError> {
let config_path = dir_path.join("reaction.jsonnet");
write(
&config_path,
format!(
"
{{
state_directory: {dir_path:?},
patterns: {{ pattern: {{ regex: \"prout\" }} }},
streams: {{ dummy: {{
cmd: [\"dummy\"],
filters: {{ dummy: {{
regex: [\"dummy\"],
actions: {{ dummy: {{
cmd: [\"dummy\"]
}} }}
}} }}
}} }}
}}
"
),
)
.await?;
let config = Config::from_path(&config_path).unwrap();
let (mut manager, entry_tx) = DatabaseManager::open(&config).await?;
if let Some(loaded_db) = loaded_db {
manager.set_loaded_db(loaded_db)
}
let error_rx = manager.manager(ShutdownController::new().token());
Ok(Self {
entry_tx: Some(entry_tx),
error_rx,
})
}
}
impl<K: KeyType, V: ValueType> Tree<K, V> {
pub fn tree(&self) -> &BTreeMap<K, V> {
&self.tree
}
}
use super::{DB_NAME, Database, Entry, Time, helpers::*, now, raw::WriteDB, rotate_db};
#[tokio::test]
async fn test_rotate_db() {

View file

@ -13,7 +13,7 @@ use tokio::{
};
use tracing::error;
use crate::concepts::Time;
use crate::time::Time;
use super::{Entry, LoadedDB};
@ -265,11 +265,9 @@ mod tests {
use tokio::fs::{File, read, write};
use crate::{
concepts::{Time, now},
treedb::{
Entry,
raw::{DB_TREE_ID, DatabaseError, ReadDB, WriteDB},
},
time::{Time, now},
};
#[tokio::test]

View file

@ -81,6 +81,10 @@ impl ShutdownToken {
}
}
pub fn split(self) -> (CancellationToken, TaskTrackerToken) {
(self.shutdown_notifyer, self._task_tracker_token)
}
/// Returns a future that will resolve only when a shutdown request happened.
pub fn wait(&self) -> WaitForCancellationFuture<'_> {
self.shutdown_notifyer.cancelled()

View file

@ -1,5 +1,5 @@
use std::{
collections::{btree_map::Entry, BTreeMap},
collections::{BTreeMap, btree_map::Entry},
fs::File,
io,
path::Path,
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, error, info, warn};
use super::{merge_attrs, Pattern, Plugin, Stream};
use super::{Pattern, Plugin, Stream, merge_attrs};
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
@ -55,7 +55,10 @@ impl Config {
e.insert(pattern);
}
Entry::Occupied(e) => {
return Err(format!("pattern {} is already defined. pattern definitions can't be spread accross multiple files.", e.key()));
return Err(format!(
"pattern {} is already defined. pattern definitions can't be spread accross multiple files.",
e.key()
));
}
}
}
@ -328,7 +331,7 @@ enum ConfigError {
mod jsonnet {
use std::path::Path;
use jrsonnet_evaluator::{error::LocError, EvaluationState, FileImportResolver};
use jrsonnet_evaluator::{EvaluationState, FileImportResolver, error::LocError};
use super::ConfigError;

View file

@ -4,7 +4,6 @@ mod filter;
mod pattern;
mod plugin;
mod stream;
mod time;
use std::fmt::Debug;
@ -17,7 +16,7 @@ pub use filter::{Duplicate, Filter};
pub use pattern::{Pattern, PatternType};
pub use plugin::Plugin;
pub use stream::Stream;
pub use time::{Time, now};
pub use treedb::time::{Time, now};
pub type Match = Vec<String>;

View file

@ -15,8 +15,8 @@ use crate::{
concepts::{Action, Duplicate, Filter, Match, Pattern, Time},
daemon::plugin::Plugins,
protocol::{Order, PatternStatus},
treedb::Database,
};
use treedb::Database;
use state::State;

View file

@ -1,12 +1,9 @@
use std::collections::{BTreeMap, BTreeSet};
use crate::{
concepts::{Filter, Match, MatchTime, Time},
treedb::{
Database, Tree,
helpers::{to_match, to_matchtime, to_time, to_timemap, to_u64},
},
};
use serde_json::Value;
use treedb::{Database, Tree, helpers::*};
use crate::concepts::{Filter, Match, MatchTime, Time};
pub fn filter_ordered_times_db_name(filter: &Filter) -> String {
format!(
@ -241,18 +238,30 @@ impl State {
}
}
/// Tries to convert a [`Value`] into a [`MatchTime`]
pub fn to_matchtime(val: &Value) -> Result<MatchTime, String> {
let map = val.as_object().ok_or("not an object")?;
Ok(MatchTime {
m: to_match(map.get("m").ok_or("no m in object")?)?,
t: to_time(map.get("t").ok_or("no t in object")?)?,
})
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
use serde_json::json;
use serde_json::{Map, Value, json};
use crate::{
concepts::{Action, Duplicate, Filter, Pattern, Time, filter_tests::ok_filter, now},
daemon::filter::state::State,
concepts::{
Action, Duplicate, Filter, MatchTime, Pattern, Time, filter_tests::ok_filter, now,
},
tests::TempDatabase,
};
use super::{State, to_matchtime};
// Tests `new`, `clear_past_matches` and `load_matches_from_ordered_times`
#[tokio::test]
async fn state_new() {
@ -593,4 +602,54 @@ mod tests {
state.remove_trigger(&one).await;
assert!(state.triggers.tree().is_empty());
}
#[test]
fn test_to_matchtime() {
assert_eq!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), "12345678".into()),
])
.into_iter()
))),
Ok(MatchTime {
m: vec!["plip".into(), "ploup".into()],
t: Time::from_nanos(12345678),
})
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), ["plip", "ploup"].into()),]).into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("t".into(), 12345678.into()),]).into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), "ploup".into()), ("t".into(), 12345678.into()),])
.into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), [1234567].into()),
])
.into_iter()
)))
.is_err()
);
}
}

View file

@ -16,11 +16,9 @@ use tokio::{
sync::Semaphore,
};
use tracing::{debug, error, info};
use treedb::Database;
use crate::{
concepts::{Config, now},
treedb::Database,
};
use crate::concepts::{Config, now};
use filter::FilterManager;
pub use filter::React;
use plugin::Plugins;
@ -116,7 +114,9 @@ async fn daemon_start(
let mut plugins = Plugins::new(config, shutdown.clone()).await?;
// Open Database
*db = Some(Database::open(config, shutdown.clone()).await?);
let (cancellation, task_tracker) = shutdown.clone().split();
let path = PathBuf::from(config.state_directory.clone());
*db = Some(Database::open(&path, cancellation, task_tracker).await?);
let (state, stream_managers) = {
// Semaphore limiting action execution concurrency

View file

@ -9,4 +9,3 @@ pub mod concepts;
pub mod daemon;
pub mod protocol;
pub mod tests;
pub mod treedb;

View file

@ -9,7 +9,7 @@ use std::{
use tempfile::TempDir;
use crate::treedb::{Database, LoadedDB};
use treedb::{Database, LoadedDB};
pub struct Fixture {
path: PathBuf,