WIP impl of socket, daemon-side

This commit is contained in:
ppom 2024-09-26 12:00:00 +02:00
commit 6170f2dcd2
12 changed files with 308 additions and 42 deletions

View file

@ -43,6 +43,14 @@ impl Action {
self.on_exit
}
pub fn full_name(&self) -> (String, String, String) {
(
self.stream_name.clone(),
self.filter_name.clone(),
self.name.clone(),
)
}
pub fn setup(
&mut self,
stream_name: &str,

View file

@ -71,7 +71,6 @@ impl Filter {
self.longuest_action_duration
}
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
pub fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
&self.patterns
}
@ -122,7 +121,6 @@ impl Filter {
return Err("no regex configured".into());
}
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut new_patterns = BTreeSet::new();
let mut first = true;
for regex in &self.regex {
@ -204,14 +202,25 @@ impl Filter {
None
}
pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender<ExecsManagerInput>) {
pub fn send_actions(
&'static self,
m: &Match,
t: Time,
tx: &Sender<ExecsManagerInput>,
exec: bool,
) {
for action in self.actions.values() {
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(ExecsManagerInput::Exec(MAT {
let mat = MAT {
m: m.clone(),
a: action,
t: t + action.after_duration().unwrap_or_default(),
}))
};
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(if exec {
ExecsManagerInput::Exec(mat)
} else {
ExecsManagerInput::Flush(mat)
})
.unwrap();
}
}

View file

@ -3,6 +3,7 @@ mod config;
mod filter;
mod messages;
mod pattern;
mod socket_messages;
mod stream;
pub use action::*;
@ -10,4 +11,5 @@ pub use config::*;
pub use filter::*;
pub use messages::*;
pub use pattern::*;
pub use socket_messages::*;
pub use stream::*;

View file

@ -0,0 +1,33 @@
use std::collections::{BTreeMap, BTreeSet};
use super::Match;
use serde::{Deserialize, Serialize};
// We don't need protocol versionning here because
// client and daemon are the same binary
#[derive(Clone, Serialize, Deserialize)]
pub enum ClientRequest {
Info,
Flush(FlushOpts),
}
#[derive(Clone, Serialize, Deserialize)]
pub struct FlushOpts {
pub m: Match,
pub f: (String, String),
}
#[derive(Clone, Serialize, Deserialize)]
pub enum DaemonResponse {
Info(InfoRes),
Flush,
Err(String),
}
#[derive(Clone, Serialize, Deserialize)]
pub struct InfoRes {
pub matches: BTreeMap<(String, String), BTreeMap<Match, BTreeSet<i64>>>,
pub execs: BTreeMap<(String, String, String), BTreeMap<Match, BTreeSet<i64>>>,
}

View file

@ -11,7 +11,10 @@ use chrono::{DateTime, Local};
use log::{debug, error, warn};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::concepts::{Config, Filter, LogEntry, Match};
use crate::{
concepts::{Config, Filter, LogEntry, Match},
utils::{bincode_options, BincodeOptions},
};
use super::DBError;
@ -23,20 +26,12 @@ type DatabaseHeader = BTreeMap<usize, (String, String)>;
type ReadHeader = BTreeMap<usize, &'static Filter>;
type WriteHeader = BTreeMap<&'static Filter, usize>;
type BinOptions = bincode::config::WithOtherIntEncoding<
bincode::config::DefaultOptions,
bincode::config::VarintEncoding,
>;
fn bin_options() -> BinOptions {
bincode::DefaultOptions::new().with_varint_encoding()
}
const DB_SIGNATURE: &str = "reaction-db-v01";
pub struct ReadDB {
f: BufReader<File>,
h: ReadHeader,
bin: BinOptions,
bin: BincodeOptions,
}
impl ReadDB {
@ -60,12 +55,12 @@ impl ReadDB {
let mut ret = ReadDB {
f: BufReader::new(file),
h: BTreeMap::default(),
bin: bin_options(),
bin: bincode_options(),
};
match ret.read::<String>() {
Ok(signature) => {
if DB_SIGNATURE == &signature {
if DB_SIGNATURE == signature {
Ok(())
} else {
Err(DBError::Error("database is not a reaction database".into()))
@ -115,7 +110,7 @@ impl Iterator for ReadDB {
pub struct WriteDB {
f: BufWriter<File>,
h: WriteHeader,
bin: BinOptions,
bin: BincodeOptions,
}
impl WriteDB {
@ -131,7 +126,7 @@ impl WriteDB {
let mut ret = WriteDB {
f: BufWriter::new(file),
h: BTreeMap::default(),
bin: bin_options(),
bin: bincode_options(),
};
if let Err(err) = ret._write(DB_SIGNATURE) {
@ -187,7 +182,6 @@ struct ComputedLogEntry {
}
impl ComputedLogEntry {
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
fn from(value: LogEntry, header: &WriteHeader) -> Result<Self, DBError> {
match header.get(&value.f) {
Some(f) => Ok(ComputedLogEntry {

View file

@ -40,7 +40,6 @@ pub enum DBError {
#[derive(Clone)]
pub enum DatabaseManagerInput {
Log(LogEntry),
#[allow(dead_code)]
Flush(LogEntry),
}
@ -192,7 +191,6 @@ fn __rotate_db(
let mut millisecond_disambiguation_counter: u32 = 0;
// Read flushes
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut flushes: BTreeMap<&'static Filter, BTreeMap<Match, Time>> = BTreeMap::new();
for flush_entry in flush_read_db {
match flush_entry {

View file

@ -1,15 +1,15 @@
use std::{
collections::{BTreeMap, BTreeSet},
process::Stdio,
sync::mpsc::{Receiver, Sender},
sync::mpsc::{Receiver, Sender, SyncSender},
};
use chrono::{DateTime, Local};
use chrono::Local;
use log::{error, info};
use timer::MessageTimer;
use crate::{
concepts::{Action, Config, Match, MAT},
concepts::{Action, Config, Match, Time, MAT},
utils::ThreadPool,
};
@ -17,16 +17,17 @@ use crate::{
pub enum ExecsManagerInput {
Exec(MAT),
ExecPending(MAT),
#[allow(dead_code)]
Flush(MAT),
Gimme(SyncSender<ExecsMap>),
Stop,
}
type ExecsMap = BTreeMap<&'static Action, BTreeMap<Match, BTreeSet<DateTime<Local>>>>;
type ExecsMap = BTreeMap<&'static Action, BTreeMap<Match, BTreeSet<Time>>>;
trait ExecsMapTrait {
fn add(&mut self, mat: &MAT);
fn rm(&mut self, mat: &MAT);
fn rm(&mut self, mat: &MAT) -> bool;
fn rm_times(&mut self, mat: &MAT) -> Option<BTreeSet<Time>>;
}
impl ExecsMapTrait for ExecsMap {
fn add(&mut self, mat: &MAT) {
@ -35,10 +36,12 @@ impl ExecsMapTrait for ExecsMap {
inner_set.insert(mat.t);
}
fn rm(&mut self, mat: &MAT) {
fn rm(&mut self, mat: &MAT) -> bool {
let mut removed = false;
if let Some(inner_map) = self.get_mut(&mat.a) {
if let Some(inner_set) = inner_map.get_mut(&mat.m) {
inner_set.remove(&mat.t);
removed = true;
if inner_set.is_empty() {
inner_map.remove(&mat.m);
}
@ -47,6 +50,18 @@ impl ExecsMapTrait for ExecsMap {
self.remove(&mat.a);
}
}
removed
}
fn rm_times(&mut self, mat: &MAT) -> Option<BTreeSet<Time>> {
let mut set = None;
if let Some(inner_map) = self.get_mut(&mat.a) {
set = inner_map.remove(&mat.m);
if inner_map.is_empty() {
self.remove(&mat.a);
}
}
set
}
}
@ -91,7 +106,6 @@ pub fn execs_manager(
}
};
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut execs: ExecsMap = BTreeMap::new();
let timer = MessageTimer::new(exec_tx);
@ -110,11 +124,19 @@ pub fn execs_manager(
}
}
ExecsManagerInput::ExecPending(mat) => {
execs.rm(&mat);
exec_now(mat);
if execs.rm(&mat) {
exec_now(mat);
}
}
#[allow(clippy::todo)]
ExecsManagerInput::Flush(_mat) => todo!(),
ExecsManagerInput::Flush(mat) => {
if let Some(set) = execs.rm_times(&mat) {
for _ in set {
exec_now(mat.clone());
}
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
ExecsManagerInput::Gimme(tx) => tx.send(execs.clone()).unwrap(),
ExecsManagerInput::Stop => {
for (action, inner_map) in execs {
if action.on_exit() {

View file

@ -16,8 +16,8 @@ use crate::{
pub enum MatchManagerInput {
Match(MFT),
Unmatch(MFT),
#[allow(dead_code)]
Flush(MFT),
Gimme(SyncSender<MatchesMap>),
EndOfStartup,
Stop,
}
@ -75,7 +75,6 @@ pub fn matches_manager(
action_tx: Sender<ExecsManagerInput>,
log_tx: SyncSender<DatabaseManagerInput>,
) {
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut matches: MatchesMap = BTreeMap::new();
let timer = MessageTimer::new(match_tx);
@ -114,7 +113,7 @@ pub fn matches_manager(
if mft.f.retry().is_some() {
matches.rm_times(&mft);
}
mft.f.send_actions(&mft.m, mft.t, &action_tx);
mft.f.send_actions(&mft.m, mft.t, &action_tx, true);
}
if !startup {
@ -131,7 +130,22 @@ pub fn matches_manager(
}
MatchManagerInput::Unmatch(mft) => matches.rm(&mft),
#[allow(clippy::todo)]
MatchManagerInput::Flush(_) => todo!(), // TODO handle flushes
MatchManagerInput::Flush(mft) => {
// remove from matches
matches.rm_times(&mft);
// send to DB
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mft.m,
f: mft.f,
t: mft.t,
}))
.unwrap();
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
MatchManagerInput::Gimme(tx) => tx.send(matches.clone()).unwrap(),
MatchManagerInput::Stop => break,
}
}

View file

@ -1,4 +1,5 @@
use std::{
fs,
path::Path,
process::exit,
sync::{
@ -10,6 +11,7 @@ use std::{
};
use log::{error, info, Level};
use socket::socket_manager;
use crate::{concepts::Config, utils::SimpleLogger};
use database::database_manager;
@ -22,6 +24,7 @@ pub use matches::MatchManagerInput;
mod database;
mod execs;
mod matches;
mod socket;
#[allow(unused_variables)]
pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
@ -70,6 +73,13 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
database_manager(config, log_rx, match_tx_database)
};
{
let match_tx_socket = match_tx.clone();
let exec_tx_socket = exec_tx.clone();
let socket = socket.to_owned();
thread::spawn(move || socket_manager(config, socket, match_tx_socket, exec_tx_socket));
}
for stream in config.streams().values() {
let match_tx = match_tx.clone();
let (child_tx, child_rx) = sync_channel(0);
@ -116,7 +126,10 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
let stop_ok = config.stop();
// TODO remove socket
// not waiting for the socket_manager to finish, sorry
if let Err(err) = fs::remove_file(socket) {
error!("failed to remove socket: {}", err);
}
exit(match !signal_received.load(Ordering::SeqCst) && stop_ok {
true => 0,

164
rust/src/daemon/socket.rs Normal file
View file

@ -0,0 +1,164 @@
use std::{
fs, io,
os::unix::net::{UnixListener, UnixStream},
path::PathBuf,
process::exit,
sync::mpsc::{sync_channel, Sender},
};
use bincode::Options;
use chrono::Local;
use log::{error, warn};
use crate::{
concepts::{ClientRequest, Config, DaemonResponse, InfoRes, MFT},
utils::bincode_options,
};
use super::{ExecsManagerInput, MatchManagerInput};
macro_rules! err_str {
($expression:expr) => {
$expression.map_err(|err| err.to_string())
};
}
fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
// First create all directories to the file
let dir = path
.parent()
.ok_or(format!("socket {path:?} has no parent directory"))?;
err_str!(fs::create_dir_all(dir))?;
// Test if file exists
match fs::metadata(&path) {
Ok(meta) => {
if meta.file_type().is_dir() {
Err(format!("socket {path:?} is already a directory"))
} else {
warn!("socket {path:?} already exists: is the daemon already running? deleting.");
err_str!(fs::remove_file(&path))
}
}
Err(err) => err_str!(match err.kind() {
io::ErrorKind::NotFound => Ok(()),
_ => Err(err),
}),
}?;
// Open socket
err_str!(UnixListener::bind(path))
}
macro_rules! or_next {
($msg:expr, $expression:expr) => {
match $expression {
Ok(x) => x,
Err(err) => {
error!("failed to answer client: {}, {}", $msg, err);
continue;
}
}
};
}
pub fn socket_manager(
config: &'static Config,
socket: PathBuf,
match_tx: Sender<MatchManagerInput>,
exec_tx: Sender<ExecsManagerInput>,
) {
let listener = match open_socket(socket) {
Ok(l) => l,
Err(err) => {
error!("while creating communication socket: {err}");
exit(1);
}
};
let bin = bincode_options();
for try_conn in listener.incoming() {
match try_conn {
Ok(conn) => {
let conn2 = or_next!("failed to clone stream", conn.try_clone());
// read request
let request = or_next!(
"invalid message received: ",
bin.deserialize_from::<UnixStream, ClientRequest>(conn)
);
let response = match request {
ClientRequest::Info => {
// ask for matches clone
let (m_tx, m_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx.send(MatchManagerInput::Gimme(m_tx)).unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
let matches = m_rx.recv().unwrap();
// ask for execs clone
let (e_tx, e_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
exec_tx.send(ExecsManagerInput::Gimme(e_tx)).unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
let execs = e_rx.recv().unwrap();
// Transform structures
macro_rules! map_map {
($map:expr) => {
$map.into_iter()
.map(|(object, inner_map)| {
(
object.full_name(),
inner_map
.into_iter()
.map(|(key, set)| {
(
key,
set.into_iter()
.map(|time| time.timestamp())
.collect(),
)
})
.collect(),
)
})
.collect()
};
}
DaemonResponse::Info(InfoRes {
matches: map_map!(matches),
execs: map_map!(execs),
})
}
ClientRequest::Flush(flush) => {
match config.get_filter(&flush.f) {
Some(filter) => {
let now = Local::now();
// Flush actions
filter.send_actions(&flush.m, now, &exec_tx, false);
// Flush filters
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx
.send(MatchManagerInput::Flush(MFT {
m: flush.m,
f: filter,
t: now,
}))
.unwrap();
DaemonResponse::Flush
}
None => DaemonResponse::Err(format!(
"no filter with name {}.{}",
flush.f.0, flush.f.1
)),
}
}
};
or_next!(
"failed to send response:",
bin.serialize_into(conn2, &response)
);
}
Err(err) => error!("failed to open connection from cli: {err}"),
}
}
}

View file

@ -3,9 +3,9 @@
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::unwrap_used,
clippy::unwrap_used
)]
#![allow(clippy::upper_case_acronyms)]
#![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)]
#![forbid(unsafe_code)]
//! TODO document a bit

View file

@ -3,6 +3,15 @@ pub mod logger;
mod parse_duration;
mod threadpool;
use bincode::Options;
pub use logger::SimpleLogger;
pub use parse_duration::parse_duration;
pub use threadpool::ThreadPool;
pub type BincodeOptions = bincode::config::WithOtherIntEncoding<
bincode::config::DefaultOptions,
bincode::config::VarintEncoding,
>;
pub fn bincode_options() -> BincodeOptions {
bincode::DefaultOptions::new().with_varint_encoding()
}