fmt, clippy, tests, fix some tests after startup refacto

This commit is contained in:
ppom 2025-10-30 12:00:00 +01:00
commit 58180fe609
No known key found for this signature in database
20 changed files with 166 additions and 175 deletions

View file

@ -83,7 +83,7 @@ struct ActionInit {
impl PluginInfo for Plugin {
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
Ok(Manifest {
hello: Hello::hello(),
hello: Hello::new(),
streams: BTreeSet::from(["cluster".into()]),
actions: BTreeSet::from(["cluster_send".into()]),
})

View file

@ -21,7 +21,7 @@ struct Plugin {
impl PluginInfo for Plugin {
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
Ok(Manifest {
hello: Hello::hello(),
hello: Hello::new(),
streams: BTreeSet::from(["virtual".into()]),
actions: BTreeSet::from(["virtual".into()]),
})

View file

@ -143,21 +143,11 @@ pub struct Hello {
}
impl Hello {
pub fn hello() -> Hello {
pub fn new() -> Hello {
let mut version = env!("CARGO_PKG_VERSION").split(".");
Hello {
version_major: env!("CARGO_PKG_VERSION")
.split(".")
.next()
.unwrap()
.parse()
.unwrap(),
version_minor: env!("CARGO_PKG_VERSION")
.split(".")
.skip(1)
.next()
.unwrap()
.parse()
.unwrap(),
version_major: version.next().unwrap().parse().unwrap(),
version_minor: version.next().unwrap().parse().unwrap(),
}
}
@ -166,15 +156,13 @@ impl Hello {
&& server.version_minor >= plugin.version_minor
{
Ok(())
} else if plugin.version_major > server.version_major
|| (plugin.version_major == server.version_major
&& plugin.version_minor > server.version_minor)
{
Err("consider upgrading reaction".into())
} else {
if plugin.version_major > server.version_major
|| (plugin.version_major == server.version_major
&& plugin.version_minor > server.version_minor)
{
Err("consider upgrading reaction".into())
} else {
Err("consider upgrading the plugin".into())
}
Err("consider upgrading the plugin".into())
}
}
}

View file

@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
use super::{null_value, parse_duration::*, Match, Pattern, PatternType};
use super::{Match, Pattern, PatternType, null_value, parse_duration::*};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
@ -102,10 +102,8 @@ impl Action {
if self.cmd[0].is_empty() {
return Err("cmd's first item is empty".into());
}
} else {
if !self.cmd.is_empty() {
return Err("can't define a cmd and a plugin type".into());
}
} else if !self.cmd.is_empty() {
return Err("can't define a cmd and a plugin type".into());
}
if let Some(after) = &self.after {

View file

@ -10,7 +10,7 @@ use chrono::TimeDelta;
use regex::Regex;
use serde::{Deserialize, Serialize};
use super::{parse_duration, Action, Match, Pattern, PatternType, Patterns};
use super::{Action, Match, Pattern, PatternType, Patterns, parse_duration};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub enum Duplicate {
@ -159,9 +159,9 @@ impl Filter {
}
} else if !first && new_patterns.contains(pattern) {
return Err(format!(
"pattern {} is present in the first regex but is not present in a following regex. all regexes should contain the same set of regexes",
&pattern.name_with_braces()
));
"pattern {} is present in the first regex but is not present in a following regex. all regexes should contain the same set of regexes",
&pattern.name_with_braces()
));
}
regex_buf = regex_buf.replacen(pattern.name_with_braces(), &pattern.regex, 1);
}
@ -202,12 +202,12 @@ impl Filter {
for pattern in self.patterns.as_ref() {
// if the pattern is in an optional part of the regex,
// there may be no captured group for it.
if let Some(match_) = matches.name(&pattern.name) {
if !pattern.is_ignore(match_.as_str()) {
let mut match_ = match_.as_str().to_string();
pattern.normalize(&mut match_);
result.push(match_);
}
if let Some(match_) = matches.name(&pattern.name)
&& !pattern.is_ignore(match_.as_str())
{
let mut match_ = match_.as_str().to_string();
pattern.normalize(&mut match_);
result.push(match_);
}
}
if result.len() == self.patterns.len() {
@ -408,10 +408,10 @@ impl Filter {
#[cfg(test)]
pub mod tests {
use crate::concepts::action::tests::{ok_action, ok_action_with_after};
use crate::concepts::pattern::PatternIp;
use crate::concepts::pattern::tests::{
boubou_pattern_with_ignore, default_pattern, number_pattern, ok_pattern_with_ignore,
};
use crate::concepts::pattern::PatternIp;
use super::*;
@ -707,24 +707,32 @@ pub mod tests {
Ok(vec!("b".into()))
);
// Doesn't match
assert!(filter
.get_match_from_patterns(BTreeMap::from([(pattern.clone(), "abc".into())]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([(pattern.clone(), "abc".into())]))
.is_err()
);
// Ignored match
assert!(filter
.get_match_from_patterns(BTreeMap::from([(pattern.clone(), "a".into())]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([(pattern.clone(), "a".into())]))
.is_err()
);
// Bad pattern
assert!(filter
.get_match_from_patterns(BTreeMap::from([(boubou.clone(), "bou".into())]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([(boubou.clone(), "bou".into())]))
.is_err()
);
// Bad number of patterns
assert!(filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "bou".into()),
]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "bou".into()),
]))
.is_err()
);
// Bad number of patterns
assert!(filter.get_match_from_patterns(BTreeMap::from([])).is_err());
@ -752,34 +760,42 @@ pub mod tests {
Ok(vec!("bou".into(), "b".into()))
);
// Doesn't match
assert!(filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "abc".into()),
(boubou.clone(), "bou".into()),
]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "abc".into()),
(boubou.clone(), "bou".into()),
]))
.is_err()
);
// Ignored match
assert!(filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "boubou".into()),
]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "boubou".into()),
]))
.is_err()
);
// Bad pattern
assert!(filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(number_pattern.clone(), "1".into()),
]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(number_pattern.clone(), "1".into()),
]))
.is_err()
);
// Bad number of patterns
assert!(filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "bou".into()),
(number_pattern.clone(), "1".into()),
]))
.is_err());
assert!(
filter
.get_match_from_patterns(BTreeMap::from([
(pattern.clone(), "b".into()),
(boubou.clone(), "bou".into()),
(number_pattern.clone(), "1".into()),
]))
.is_err()
);
// Bad number of patterns
assert!(filter.get_match_from_patterns(BTreeMap::from([])).is_err());

View file

@ -43,7 +43,7 @@ fn merge_attrs<A: Default + Debug + PartialEq + Eq + Clone>(
if this == default {
return Ok(other);
}
return Ok(this);
Ok(this)
}
fn null_value() -> Value {

View file

@ -1,9 +1,4 @@
use std::{
collections::BTreeMap,
io::{Error, ErrorKind},
os::linux::fs::MetadataExt,
process::Stdio,
};
use std::{collections::BTreeMap, io::Error, os::linux::fs::MetadataExt, process::Stdio};
use serde::{Deserialize, Serialize};
use tokio::{
@ -50,10 +45,15 @@ impl Plugin {
// Only when testing, make relative paths absolute
#[cfg(debug_assertions)]
if !self.path.starts_with("/") {
self.path = std::fs::canonicalize(&self.path)
.unwrap()
.to_string_lossy()
.to_string();
use std::env::current_dir;
self.path = format!(
"{}/{}",
current_dir()
.map_err(|err| format!("error on working directory: {err}"))?
.to_string_lossy(),
self.path
);
}
// Disallow relative paths
@ -79,10 +79,7 @@ impl Plugin {
let stat = fs::metadata(path).await?;
if stat.st_uid() != 0 {
return Err(Error::new(
ErrorKind::Other,
"plugin file is not owned by root",
));
return Err(Error::other("plugin file is not owned by root"));
}
}

View file

@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::{merge_attrs, null_value, Filter, Patterns};
use super::{Filter, Patterns, merge_attrs, null_value};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(test, derive(Default))]
@ -36,7 +36,10 @@ impl Stream {
for (key, filter) in other.filters.into_iter() {
if self.filters.insert(key.clone(), filter).is_some() {
return Err(format!("filter {} is already defined. filter definitions can't be spread accross multiple files.", key));
return Err(format!(
"filter {} is already defined. filter definitions can't be spread accross multiple files.",
key
));
}
}
@ -71,10 +74,8 @@ impl Stream {
if self.cmd[0].is_empty() {
return Err("cmd's first item is empty".into());
}
} else {
if !self.cmd.is_empty() {
return Err("can't define cmd and a plugin type".into());
}
} else if !self.cmd.is_empty() {
return Err("can't define cmd and a plugin type".into());
}
if self.filters.is_empty() {
@ -118,10 +119,11 @@ mod tests {
use crate::concepts::filter::tests::ok_filter;
fn ok_stream() -> Stream {
let mut stream = Stream::default();
stream.cmd = vec!["command".into()];
stream.filters.insert("name".into(), ok_filter());
stream
Stream {
cmd: vec!["command".into()],
filters: BTreeMap::from([("name".into(), ok_filter())]),
..Default::default()
}
}
#[test]

View file

@ -2,7 +2,6 @@ use std::{
collections::HashMap,
error::Error,
path::PathBuf,
process::exit,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@ -36,13 +35,13 @@ mod socket;
mod stream;
mod utils;
pub async fn daemon(config_path: PathBuf, socket: PathBuf) {
pub async fn daemon(config_path: PathBuf, socket: PathBuf) -> i32 {
// Load config or quit
let config: &'static Config = Box::leak(Box::new(match Config::from_path(&config_path) {
Ok(config) => config,
Err(err) => {
error!("{err}");
return;
return 1;
}
}));
@ -53,7 +52,7 @@ pub async fn daemon(config_path: PathBuf, socket: PathBuf) {
let signal_received = Arc::new(AtomicBool::new(false));
if let Err(err) = handle_signals(shutdown.delegate(), signal_received.clone()) {
error!("{err}");
return;
return 1;
}
let mut db = None;
@ -92,17 +91,17 @@ pub async fn daemon(config_path: PathBuf, socket: PathBuf) {
}
if daemon_err || !stop_ok {
exit(1);
return 1;
} else if let Some(mut db_status) = db_status
&& let Ok(Err(err)) = db_status.try_recv()
{
error!("database error: {}", err);
exit(1);
return 1;
} else if !signal_received.load(Ordering::SeqCst) {
error!("quitting because all streams finished");
exit(1);
return 1;
} else {
exit(0);
return 0;
}
}

View file

@ -86,7 +86,7 @@ impl PluginManager {
.await
.map_err(|err| format!("error while getting plugin {} manifest: {err}", plugin.name))?;
let my_hello = Hello::hello();
let my_hello = Hello::new();
if let Err(hint) = Hello::is_compatible(&my_hello, &manifest.hello) {
return Err(format!(
@ -182,7 +182,7 @@ impl Plugins {
for plugin in config.plugins.values() {
let name = plugin.name.clone();
this.load_plugin(&plugin, &config.state_directory, shutdown.clone())
this.load_plugin(plugin, &config.state_directory, shutdown.clone())
.await
.map_err(|err| format!("plugin {name}: {err}]"))?;
}
@ -200,7 +200,7 @@ impl Plugins {
let manager = PluginManager::new(plugin, state_directory, shutdown).await?;
for stream in &manager.streams {
if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) {
if let Some(name) = self.streams.insert(stream.clone(), name.clone()) {
return Err(format!(
"plugin {name} already exposed a stream with type name '{stream}'",
));
@ -208,7 +208,7 @@ impl Plugins {
}
for action in &manager.actions {
if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) {
if let Some(name) = self.actions.insert(action.clone(), name.clone()) {
return Err(format!(
"plugin {name} already exposed a action with type name '{action}'",
));
@ -276,10 +276,9 @@ impl Plugins {
// Convert Vec<Result<Result>> into Result
.into_iter()
.zip(self.plugins.values())
.map(|(result, plugin_manager)| {
.try_for_each(|(result, plugin_manager)| {
result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.name))
})
.collect::<Result<(), String>>()
}
pub fn manager(self) {

View file

@ -4,7 +4,7 @@ use std::{
};
use chrono::Local;
use futures::{future::join_all, FutureExt, Stream as AsyncStream, StreamExt};
use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all};
use reaction_plugin::StreamImpl;
use regex::RegexSet;
use tokio::{

View file

@ -1,10 +1,4 @@
#![warn(
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::unwrap_used,
unsafe_code
)]
#![warn(clippy::panic, clippy::todo, clippy::unimplemented, unsafe_code)]
#![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)]
// Allow unwrap in tests
#![cfg_attr(test, allow(clippy::unwrap_used))]

View file

@ -44,7 +44,7 @@ async fn main() {
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
daemon(config, socket).await;
exit(daemon(config, socket).await);
} else {
let result = match cli.command {
SubCommand::Show {

View file

@ -89,6 +89,7 @@ impl Database {
/// You'll have to:
/// - drop all [`Tree`]s,
/// - call [`Self::quit`],
///
/// to have the Database properly quit.
///
/// You can wait for [`Self::quit`] returned channel to know how it went.

View file

@ -84,7 +84,7 @@ fn kill_stream_on_exit() -> Result<(), Box<dyn Error>> {
let _ = signal::kill(pid, signal::SIGKILL);
let _ = child.wait();
assert!(false, "Test timed out");
panic!("Test timed out");
}
}
@ -173,7 +173,7 @@ fn manualy_trigger_filter() -> Result<(), Box<dyn Error>> {
if elapsed > Duration::from_secs(1) {
let _ = daemon.kill();
let _ = daemon.wait();
assert!(false, "Daemon did not create socket");
panic!("Daemon did not create socket");
}
}
@ -204,7 +204,7 @@ fn manualy_trigger_filter() -> Result<(), Box<dyn Error>> {
if elapsed > Duration::from_secs(2) {
let _ = daemon.kill();
let _ = daemon.wait();
assert!(false, "Daemon did not exit");
panic!("Daemon did not exit");
}
}

View file

@ -31,6 +31,6 @@ fn plugin_virtual() {
let output = [
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
tmp_dir.child("log").assert(&output.join("\n"));
tmp_dir.child("log").assert(output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
}

View file

@ -129,8 +129,11 @@ async fn simple() {
let (daemon_exit, flush1, flush2) = tokio::join!(handle, handle2, handle3);
assert!(daemon_exit.is_ok());
assert!(daemon_exit.unwrap() == 1);
assert!(flush1.is_ok());
assert!(flush1.unwrap().is_ok());
assert!(flush2.is_ok());
assert!(flush2.unwrap().is_ok());
assert_eq!(
// 24 is encountered for the second time, then
@ -160,11 +163,7 @@ async fn simple() {
file_with_contents(oneshot_path, "");
let daemon_exit = daemon(config_path.into(), socket_path.into()).await;
assert!(daemon_exit.is_err());
assert_eq!(
daemon_exit.unwrap_err().to_string(),
"quitting because all streams finished"
);
assert!(daemon_exit == 1);
// 36 trigger from DB
// 12 trigger from DB

View file

@ -1,7 +1,7 @@
use std::{path::Path, time::Duration};
use assert_cmd::Command;
use assert_fs::{prelude::*, TempDir};
use assert_fs::{TempDir, prelude::*};
use predicates::prelude::predicate;
#[test]
@ -23,7 +23,7 @@ fn start_stop() {
"stop 2",
"",
];
tmp_dir.child("log").assert(&output.join("\n"));
tmp_dir.child("log").assert(output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
println!(

View file

@ -10,17 +10,15 @@
plugins: {
cluster: {
path: "./target/debug/reaction-plugin-cluster",
path: './target/debug/reaction-plugin-cluster',
check_root: false,
systemd_options: {
DynamicUser: ["false"],
},
options: {
clusters: {
DynamicUser: ['false'],
options: {
org1: {
listen_port: 9000,
bootstrap_nodes: {
"public_key": ["127.0.0.1:9001"],
public_key: ['127.0.0.1:9001'],
},
},
},
@ -30,36 +28,36 @@
streams: {
s0: {
cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"],
cmd: ['bash', '-c', 'for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2'],
filters: {
f0: {
regex: ["^<num>$"],
regex: ['^<num>$'],
actions: {
a0: {
type: "virtual",
type: 'virtual',
options: {
send: "a0 <num>",
to: "s1",
}
send: 'a0 <num>',
to: 's1',
},
},
b0: {
type: "virtual",
type: 'virtual',
options: {
send: "b0 <num>",
to: "s1",
send: 'b0 <num>',
to: 's1',
},
after: "600ms",
after: '600ms',
},
},
},
},
},
s1: {
type: "cluster",
type: 'cluster',
options: {},
filters: {
f1: {
regex: ["^<all>$"],
regex: ['^<all>$'],
actions: {
a1: {
cmd: ['sh', '-c', 'echo <all> >>./log'],

View file

@ -4,52 +4,52 @@
regex: @"[0-9]+",
},
all: {
regex: @".*"
}
regex: @".*",
},
},
plugins: {
virtual: {
path: "./target/debug/reaction-plugin-virtual",
path: './target/debug/reaction-plugin-virtual',
check_root: false,
systemd_options: {
DynamicUser: ["false"],
}
}
DynamicUser: ['false'],
},
},
},
streams: {
s0: {
cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"],
cmd: ['bash', '-c', 'for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2'],
filters: {
f0: {
regex: ["^<num>$"],
regex: ['^<num>$'],
actions: {
a0: {
type: "virtual",
type: 'virtual',
options: {
send: "a0 <num>",
to: "s1",
}
send: 'a0 <num>',
to: 's1',
},
},
b0: {
type: "virtual",
type: 'virtual',
options: {
send: "b0 <num>",
to: "s1",
send: 'b0 <num>',
to: 's1',
},
after: "600ms",
after: '600ms',
},
},
},
},
},
s1: {
type: "virtual",
type: 'virtual',
options: {},
filters: {
f1: {
regex: ["^<all>$"],
regex: ['^<all>$'],
actions: {
a1: {
cmd: ['sh', '-c', 'echo <all> >>./log'],