LoadedDB internal to Database, open_tree uses LoadedDB. More doc!

This commit is contained in:
ppom 2025-05-25 12:00:00 +02:00
commit c93f4e34c1
No known key found for this signature in database
4 changed files with 335 additions and 29 deletions

View file

@ -55,3 +55,14 @@ This code has async code, to handle input streams and communication with clients
- `stream.rs`: Stream managers: start the stream `cmd` and dispatch its stdout lines to its Filter managers.
- `filter.rs`: Filter managers: handle lines, persistance, store matches and trigger actions. This is the main piece of runtime logic.
- `socket.rs`: The socket task, responsible for communication with clients.
### `src/waltree`
Persistence layer.
This is a database highly adapted to reaction workload, making reaction faster than when used with general purpose key-value databases
(heed, sled and fjall crates ahve been tested).
Its design is explained in the comments of its files:
- `mod.rs`: main database code, with its two API structs: Tree and Database.
- `raw.rs` low-level part, directly interacting with de/serializisation and files.

106
src/waltree/helpers.rs Normal file
View file

@ -0,0 +1,106 @@
use std::collections::BTreeSet;
use chrono::{Local, TimeZone};
use serde_json::Value;
use crate::concepts::{Match, Time};
/// Tries to convert a [`Value`] into a [`String`]
pub fn to_string(val: &Value) -> Result<String, String> {
Ok(val.as_str().ok_or("not a string")?.to_owned())
}
/// Tries to convert a [`Value`] into a [`Time`]
pub fn to_time(val: &Value) -> Result<Time, String> {
Ok(Local
.timestamp_millis_opt(val.as_i64().ok_or("not a number")?)
.single()
.ok_or("not a valid timestamp")?)
}
/// Tries to convert a [`Value`] into a [`Match`]
pub fn to_match(val: &Value) -> Result<Match, String> {
val.as_array()
.ok_or("not an array")?
.into_iter()
.map(|v| to_string(v))
.collect()
}
/// Tries to convert a [`Value`] into a [`BTreeSet<Time>`]
pub fn to_timeset(val: &Value) -> Result<BTreeSet<Time>, String> {
val.as_array()
.ok_or("not an array")?
.into_iter()
.map(|v| to_time(v))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_string() {
assert_eq!(to_string(&("".into())), Ok("".into()));
assert_eq!(to_string(&("ploup".into())), Ok("ploup".into()));
assert!(to_string(&(["ploup"].into())).is_err());
assert!(to_string(&(true.into())).is_err());
assert!(to_string(&(8.into())).is_err());
assert!(to_string(&(None::<String>.into())).is_err());
}
#[test]
fn test_to_time() {
assert_eq!(
to_time(&(123456.into())).unwrap(),
Local.timestamp_millis_opt(123456).unwrap(),
);
assert!(to_time(&(u64::MAX.into())).is_err());
assert!(to_time(&(["ploup"].into())).is_err());
assert!(to_time(&(true.into())).is_err());
assert!(to_time(&("12345".into())).is_err());
assert!(to_time(&(None::<String>.into())).is_err());
}
#[test]
fn test_to_match() {
assert_eq!(to_match(&([""].into())), Ok(vec!["".into()]));
assert_eq!(
to_match(&(["plip", "ploup"].into())),
Ok(vec!["plip".into(), "ploup".into()])
);
assert!(to_match(&[Value::from("plip"), Value::from(10)].into()).is_err());
assert!(to_match(&("ploup".into())).is_err());
assert!(to_match(&(true.into())).is_err());
assert!(to_match(&(8.into())).is_err());
assert!(to_match(&(None::<String>.into())).is_err());
}
#[test]
fn test_to_timeset() {
assert_eq!(
to_timeset(&([1234567].into())),
Ok(BTreeSet::from([Local
.timestamp_millis_opt(1234567)
.unwrap()]))
);
assert_eq!(
to_timeset(&([8, 123456].into())),
Ok(BTreeSet::from([
Local.timestamp_millis_opt(8).unwrap(),
Local.timestamp_millis_opt(123456).unwrap()
]))
);
assert!(to_timeset(&[Value::from("plip"), Value::from(10)].into()).is_err());
assert!(to_timeset(&([""].into())).is_err());
assert!(to_timeset(&(["ploup"].into())).is_err());
assert!(to_timeset(&(true.into())).is_err());
assert!(to_timeset(&(8.into())).is_err());
assert!(to_timeset(&(None::<String>.into())).is_err());
}
}

View file

@ -1,3 +1,14 @@
/// This module implements an asynchronously persisted BTreeMap named [`Tree`],
/// via an unique "Write Behind Log" (in opposition to WAL, "Write Ahead Log").
///
/// This permits to have RAM-speed read & write operations, while eventually
/// persisting operations. The log is flushed to kernel every 2 seconds.
///
/// Operations stored in the log have a timeout configured at the Tree level.
/// All operations are then stored for this lifetime.
///
/// Data is stored as JSONL. Each line stores a (key, value), plus the tree id,
/// and an expiry timestamp in milliseconds.
use std::{
collections::{BTreeMap, HashMap},
io::{Error as IoError, ErrorKind},
@ -18,6 +29,8 @@ use tracing::error;
use crate::concepts::{Config, Time};
pub mod helpers;
// Database
use raw::{ReadDB, WriteDB};
@ -33,7 +46,7 @@ pub struct Entry {
pub expiry: Time,
}
pub type LoadedDb = HashMap<String, HashMap<Value, Value>>;
pub type LoadedDB = HashMap<String, HashMap<Value, Value>>;
const DB_NAME: &str = "reaction.db";
const DB_NEW_NAME: &str = "reaction.new.db";
@ -48,11 +61,16 @@ impl Config {
}
}
// TODO rotate_db at a regular interval instead of every N bytes?
// This would make more sense, as actual garbage collection is time-based
/// A [`Database`] logs all write operations on [`Tree`]s in a single file.
/// Logs are written asynchronously, so the write operations in RAM will never block.
pub struct Database {
/// Inner database
write_db: WriteDB,
/// [`Tree`]s loaded from disk
loaded_db: LoadedDB,
/// Path for the "normal" database
path: PathBuf,
/// Path for the "new" database, when rotating database.
@ -72,7 +90,7 @@ pub struct Database {
}
impl Database {
pub async fn open(config: Config) -> Result<(Database, LoadedDb), IoError> {
pub async fn open(config: &Config) -> Result<Database, IoError> {
let path = config.path_of(DB_NAME);
let new_path = config.path_of(DB_NEW_NAME);
@ -80,19 +98,17 @@ impl Database {
let (tx, rx) = mpsc::channel(1000);
Ok((
Database {
write_db,
path,
new_path,
rx,
tx,
flush_every: Duration::from_secs(2),
max_bytes: 20 * 1024 * 1024, // 20 MiB
bytes_written: 0,
},
Ok(Database {
write_db,
loaded_db,
))
path,
new_path,
rx,
tx,
flush_every: Duration::from_secs(2),
max_bytes: 20 * 1024 * 1024, // 20 MiB
bytes_written: 0,
})
}
pub async fn task(&mut self) {
@ -175,7 +191,7 @@ async fn rotate_db(
path: &Path,
new_path: &Path,
startup: bool,
) -> Result<(WriteDB, LoadedDb), IoError> {
) -> Result<(WriteDB, LoadedDB), IoError> {
let file = match File::open(&path).await {
Ok(file) => file,
Err(err) => match (startup, err.kind()) {
@ -231,18 +247,40 @@ pub struct Tree<K: KeyType, V: ValueType> {
impl Database {
/// Creates a new Tree with the given name and entry timeout.
pub fn open_tree<K: KeyType, V: ValueType>(
/// Takes a closure (or regular function) that converts (Value, Value) JSON entries
/// into (K, V) typed entries.
/// Helpers for this closure can be find in the [`helpers`] module.
pub fn open_tree<K: KeyType, V: ValueType, F>(
&mut self,
name: String,
entry_timeout: TimeDelta,
) -> Tree<K, V> {
// TODO load from LoadedDb
Tree {
map_f: F,
) -> Result<Tree<K, V>, String>
where
F: Fn((Value, Value)) -> Result<(K, V), String>,
{
// Load the tree from its JSON
let tree = if let Some(json_tree) = self.loaded_db.remove(&name) {
json_tree
.into_iter()
.map(map_f)
.collect::<Result<BTreeMap<K, V>, String>>()
.unwrap()
} else {
BTreeMap::default()
};
Ok(Tree {
id: name,
entry_timeout,
tree: BTreeMap::new(),
tree,
tx: self.tx.clone(),
}
})
}
// TODO keep only tree names, and use it for next db rotation to remove associated entries
/// Drops Trees that have not been loaded already
pub fn drop_trees(&mut self) {
self.loaded_db = HashMap::default();
}
}
@ -322,13 +360,17 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use chrono::{Local, TimeDelta};
use tempfile::NamedTempFile;
use tokio::fs::File;
use chrono::{Local, TimeDelta, TimeZone};
use serde_json::Value;
use tempfile::{NamedTempFile, TempDir};
use tokio::fs::{read, write, File};
use crate::waltree::{raw::WriteDB, rotate_db, Entry};
use crate::{
concepts::Config,
waltree::{helpers::*, raw::WriteDB, rotate_db, Database, Entry, DB_NAME},
};
#[tokio::test]
async fn test_rotate_db() {
@ -430,4 +472,151 @@ mod tests {
let (_, loaded_db) = rotate_db(&path, &new_path, false).await.unwrap();
assert_eq!(loaded_db, HashMap::default());
}
#[tokio::test]
async fn test_open_tree() {
let now = Local::now();
let now2 = now + TimeDelta::milliseconds(2);
let now3 = now + TimeDelta::milliseconds(3);
let now_ms = now.timestamp_millis();
let now = Local.timestamp_millis_opt(now_ms).unwrap();
let now2_ms = now2.timestamp_millis();
let now2 = Local.timestamp_millis_opt(now2_ms).unwrap();
let now3_ms = now3.timestamp_millis();
let now3 = Local.timestamp_millis_opt(now3_ms).unwrap();
let valid = now + TimeDelta::seconds(2);
let ip127 = vec!["127.0.0.1".to_string()];
let ip1 = vec!["1.1.1.1".to_string()];
let entries = [
Entry {
tree: "time-match".into(),
key: now_ms.into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now2_ms.into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now3_ms.into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now2_ms.into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip127.clone().into(),
value: Some([Value::Number(now_ms.into())].into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip1.clone().into(),
value: Some([Value::Number(now2_ms.into())].into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip1.clone().into(),
value: Some([Value::Number(now2_ms.into()), now3_ms.into()].into()),
expiry: valid,
},
];
let dir = TempDir::new().unwrap();
let dir_path = dir.path();
let db_path = dir_path.join(DB_NAME);
let mut write_db = WriteDB::new(File::create(&db_path).await.unwrap());
for entry in entries {
write_db.write_entry(&entry).await.unwrap();
}
write_db.close().await.unwrap();
drop(write_db);
let config_path = dir_path.join("reaction.jsonnet");
write(
&config_path,
format!(
"
{{
state_directory: {dir_path:?},
patterns: {{ pattern: {{ regex: \"prout\" }} }},
streams: {{ dummy: {{
cmd: [\"dummy\"],
filters: {{ dummy: {{
regex: [\"dummy\"],
actions: {{ dummy: {{
cmd: [\"dummy\"]
}} }}
}} }}
}} }}
}}
"
),
)
.await
.unwrap();
println!(
"{}",
String::from_utf8(read(&config_path).await.unwrap()).unwrap()
);
let config = Config::from_file(&config_path).unwrap();
let mut database = Database::open(&config).await.unwrap();
let time_match = database
.open_tree(
"time-match".into(),
TimeDelta::seconds(2),
|(key, value)| Ok((to_time(&key)?, to_match(&value)?)),
)
.unwrap();
assert_eq!(
time_match.tree,
BTreeMap::from([
(now, ip127.clone()),
(now2, ip127.clone()),
(now3, ip127.clone())
])
);
let match_timeset = database
.open_tree(
"match-timeset".into(),
TimeDelta::hours(2),
|(key, value)| Ok((to_match(&key)?, to_timeset(&value)?)),
)
.unwrap();
assert_eq!(
match_timeset.tree,
BTreeMap::from([
(ip127.clone(), BTreeSet::from([now])),
(ip1.clone(), BTreeSet::from([now2, now3])),
])
);
let unknown_tree = database
.open_tree(
"unknown_tree".into(),
TimeDelta::hours(2),
|(key, value)| Ok((to_match(&key)?, to_timeset(&value)?)),
)
.unwrap();
assert_eq!(unknown_tree.tree, BTreeMap::default());
}
}

View file

@ -10,7 +10,7 @@ use tokio::{
};
use tracing::error;
use super::env::{Entry, LoadedDb};
use super::{Entry, LoadedDB};
const DB_TREE_ID: u64 = 0;
// const DB_TREE_NAME: &str = "tree_names";
@ -159,7 +159,7 @@ impl ReadDB {
&mut self,
write_db: &mut WriteDB,
load_db: bool,
) -> tokio::io::Result<LoadedDb> {
) -> tokio::io::Result<LoadedDB> {
let mut data_maps = HashMap::new();
loop {
@ -259,8 +259,8 @@ mod tests {
use tokio::fs::{read, write, File};
use crate::waltree::{
env::Entry,
raw::{DatabaseError, ReadDB, WriteDB, DB_TREE_ID},
Entry,
};
#[tokio::test]