WIP fjall

This commit is contained in:
ppom 2025-05-14 12:00:00 +02:00
commit 2facac9fbd
No known key found for this signature in database
3 changed files with 313 additions and 45 deletions

204
Cargo.lock generated
View file

@ -169,6 +169,12 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
[[package]]
name = "byteview"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
[[package]]
name = "cc"
version = "1.2.19"
@ -270,6 +276,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "compare"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@ -303,12 +315,36 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.10",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -320,6 +356,12 @@ dependencies = [
"syn 2.0.82",
]
[[package]]
name = "double-ended-peekable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
name = "doxygen-rs"
version = "0.4.2"
@ -329,6 +371,18 @@ dependencies = [
"phf",
]
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.82",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@ -351,6 +405,23 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "fjall"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b929b3db7be7d7b4d4df67fb016fc446b8f57507b48a82e69d2a30610e460f28"
dependencies = [
"byteorder",
"byteview",
"dashmap",
"log",
"lsm-tree",
"path-absolutize",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -485,6 +556,18 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "guardian"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.0"
@ -710,7 +793,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.15.0",
]
[[package]]
@ -722,6 +805,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "interval-heap"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
dependencies = [
"compare",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
@ -750,7 +842,7 @@ dependencies = [
"jrsonnet-types",
"md5",
"pathdiff",
"rustc-hash",
"rustc-hash 1.1.0",
"serde",
"serde_json",
"thiserror",
@ -784,7 +876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ff75843e778244f3476800e6f492950a6ecee1d9308019764983d311620bf9"
dependencies = [
"jrsonnet-gc",
"rustc-hash",
"rustc-hash 1.1.0",
"serde",
]
@ -877,6 +969,36 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lsm-tree"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0d03b764a7e3009cc4d314bfce42ce28b4a2c458fc7149b57817cbed7898f43"
dependencies = [
"byteorder",
"crossbeam-skiplist",
"double-ended-peekable",
"enum_dispatch",
"guardian",
"interval-heap",
"log",
"lz4_flex",
"path-absolutize",
"quick_cache",
"rustc-hash 2.1.1",
"self_cell",
"tempfile",
"value-log",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]]
name = "md5"
version = "0.7.0"
@ -1030,6 +1152,24 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "path-absolutize"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5"
dependencies = [
"path-dedot",
]
[[package]]
name = "path-dedot"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397"
dependencies = [
"once_cell",
]
[[package]]
name = "pathdiff"
version = "0.2.2"
@ -1141,6 +1281,16 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quick_cache"
version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b450dad8382b1b95061d5ca1eb792081fb082adf48c678791fe917509596d5f"
dependencies = [
"equivalent",
"hashbrown 0.15.0",
]
[[package]]
name = "quote"
version = "1.0.37"
@ -1189,6 +1339,7 @@ dependencies = [
"clap",
"clap_complete",
"clap_mangen",
"fjall",
"futures",
"heed",
"jrsonnet-evaluator",
@ -1274,6 +1425,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustix"
version = "0.38.37"
@ -1299,6 +1456,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "self_cell"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
[[package]]
name = "serde"
version = "1.0.219"
@ -1421,6 +1584,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "std-semaphore"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e"
[[package]]
name = "strsim"
version = "0.11.1"
@ -1708,6 +1877,29 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-log"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
dependencies = [
"byteorder",
"byteview",
"interval-heap",
"log",
"path-absolutize",
"rustc-hash 2.1.1",
"tempfile",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "varint-rs"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@ -1894,6 +2086,12 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "xxhash-rust"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
[[package]]
name = "yansi-term"
version = "0.1.2"

View file

@ -47,6 +47,7 @@ tokio-util = { version = "0.7.12", features = ["codec"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
sled = "0.34.7"
fjall = "2.10.0"
[build-dependencies]
clap = { version = "4.5.4", features = ["derive"] }

View file

@ -1,11 +1,7 @@
use std::{collections::BTreeSet, fs::create_dir, io::ErrorKind};
use std::{collections::BTreeSet, fs::create_dir, io::ErrorKind, marker::PhantomData};
use heed::{
byteorder::LittleEndian,
types::{DecodeIgnore, SerdeBincode, Str, U64},
Database, DatabaseFlags, Env, EnvFlags, Result,
};
use serde::{Deserialize, Serialize};
use fjall::{Result, TransactionalKeyspace, WriteTransaction};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::concepts::{Config, Filter, Match, Time};
@ -15,10 +11,10 @@ pub struct MatchTime {
pub t: Time,
}
pub fn open_db(config: &Config) -> std::result::Result<Env, String> {
let lmdb_dir = format!("{}/lmdb", config.state_directory());
pub fn open_db(config: &Config) -> std::result::Result<TransactionalKeyspace, String> {
let fjall_dir = format!("{}/fjall", config.state_directory());
match create_dir(&lmdb_dir) {
match create_dir(&fjall_dir) {
Ok(_) => Ok(()),
Err(err) => match err.kind() {
ErrorKind::AlreadyExists => Ok(()),
@ -30,43 +26,25 @@ pub fn open_db(config: &Config) -> std::result::Result<Env, String> {
},
}?;
let max_dbs = config
.streams()
.values()
.map(|stream| stream.filters().len())
// We have 3 databases per filter + 1 unnamed db
.fold(1, |acc, elt| acc + 3 * elt);
let env = fjall::Config::new(fjall_dir)
.fsync_ms(Some(1000))
.open_transactional()
.map_err(|err| {
format!(
"while opening database at {}: {}",
config.state_directory(),
err
)
})?;
// We have no choice but to use unsafe, as the library exposes unsafe code
#[allow(unsafe_code)]
let env = unsafe {
heed::EnvOpenOptions::new()
// safety: if you have more than 1.4 billion filters,
// you searched for this overflow :smirk:
.max_dbs(max_dbs as u32)
.map_size(100000 * 4096)
.flags(EnvFlags::NO_SYNC)
.open(lmdb_dir)
}
.map_err(|err| {
env.cleanup_unused_dbs(config).map_err(|err| {
format!(
"while opening database at {}: {}",
"while cleaning database on startup at {}: {}",
config.state_directory(),
err
)
})?;
// FIXME not cleaning databases because it would make us open too much dbs,
// as we can't delete them nor close them for now.
// See FIXME below for more information.
// env.cleanup_unused_dbs(config).map_err(|err| {
// format!(
// "while cleaning database on startup at {}: {}",
// config.state_directory(),
// err
// )
// })?;
Ok(env)
}
@ -108,7 +86,7 @@ fn filter_triggers_db_name(filter: &Filter) -> String {
format!("filter_triggers_{}.{}", filter.stream_name(), filter.name())
}
impl EnvExt for heed::Env {
impl EnvExt for TransactionalKeyspace {
fn open_filter_matches_db(
&self,
filter: &Filter,
@ -194,3 +172,94 @@ impl EnvExt for heed::Env {
Ok(())
}
}
/// This [`fjall::TransactionalKeyspace`] wrapper permits to have typed Trees and avoid handling the de/serialization in
/// business logic.
/// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`].
#[derive(Clone)]
pub struct Tree<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> {
tree: fjall::TxPartitionHandle,
_k_marker: PhantomData<K>,
_v_marker: PhantomData<V>,
}
#[allow(clippy::unwrap_used)]
impl<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> Tree<K, V> {
fn new(tree: fjall::TxPartitionHandle) -> Self {
Self {
tree,
_k_marker: PhantomData::<K>,
_v_marker: PhantomData::<V>,
}
}
pub fn get(&self, k: &K) -> Result<Option<V>> {
let k = bincode::serialize(k).unwrap();
Ok(self.tree.get(k)?.map(|v| bincode::deserialize(&v).unwrap()))
}
pub fn remove(&self, key: &K, wtxn: WriteTransaction) {
let key = bincode::serialize(key).unwrap();
wtxn.remove(self, key);
}
pub fn first(&self) -> Result<Option<(K, V)>> {
let option = self.tree.first()?;
match option {
None => Ok(None),
Some((k, v)) => {
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
Ok(Some((k, v)))
}
}
}
pub fn pop_min(&self) -> Result<Option<(K, V)>> {
let option = self.tree.pop_min()?;
match option {
None => Ok(None),
Some((k, v)) => {
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
Ok(Some((k, v)))
}
}
}
pub fn fetch_and_update<F>(&self, k: &K, mut f: F) -> Result<Option<V>>
where
F: FnMut(Option<V>) -> Option<V>,
{
let k = bincode::serialize(&k).unwrap();
let f = |v: Option<&[u8]>| -> Option<Vec<u8>> {
let v = v.map(|v| bincode::deserialize(v).unwrap());
f(v).map(|v| bincode::serialize(&v).unwrap())
};
Ok(self
.tree
.fetch_and_update(k, f)?
.map(|v| bincode::deserialize::<V>(&v).unwrap()))
}
pub fn insert(&self, k: &K, v: &V) -> Result<Option<V>> {
let k = bincode::serialize(k).unwrap();
let v = bincode::serialize(v).unwrap();
Ok(self
.tree
.insert(k, v)?
.map(|v| bincode::deserialize(&v).unwrap()))
}
// The lifetime annotations permit to decouple the lifetime of self
// from the limetime of the Iterator
#[allow(clippy::needless_lifetimes)] // I find this clearer with 2 lifetimes
pub fn iter<'a, 'b>(&'a self) -> impl Iterator<Item = (K, V)> + 'b {
self.tree.iter().map(|elt| {
let (k, v) = elt.unwrap();
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
(k, v)
})
}
}