Fix daemon startup: all subsystems will cleanly exit

Regardless of which startup error makes reaction exit.

Also made plugin stderr task exit when the ShutdownToken asks for it.
Also updated Rust edition to 2024.
This commit is contained in:
ppom 2025-10-30 12:00:00 +01:00
commit 20921be07d
No known key found for this signature in database
5 changed files with 158 additions and 113 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "reaction"
version = "2.2.1"
edition = "2021"
edition = "2024"
authors = ["ppom <reaction@ppom.me>"]
license = "AGPL-3.0"
description = "Scan logs and take action"

View file

@ -2,9 +2,10 @@ use std::{
collections::HashMap,
error::Error,
path::PathBuf,
process::exit,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
atomic::{AtomicBool, Ordering},
},
};
@ -12,10 +13,10 @@ use chrono::Local;
use futures::future::join_all;
use tokio::{
select,
signal::unix::{signal, SignalKind},
signal::unix::{SignalKind, signal},
sync::Semaphore,
};
use tracing::{debug, info};
use tracing::{debug, error, info};
use crate::{concepts::Config, treedb::Database};
use filter::FilterManager;
@ -35,19 +36,87 @@ mod socket;
mod stream;
mod utils;
pub async fn daemon(
config_path: PathBuf,
socket: PathBuf,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?));
pub async fn daemon(config_path: PathBuf, socket: PathBuf) {
// Load config or quit
let config: &'static Config = Box::leak(Box::new(match Config::from_path(&config_path) {
Ok(config) => config,
Err(err) => {
error!("{err}");
return;
}
}));
// Cancellation Token
let shutdown = ShutdownController::new();
let mut plugins = Plugins::new(config, shutdown.token()).await?;
// Cancel when we receive a quit signal
let signal_received = Arc::new(AtomicBool::new(false));
if let Err(err) = handle_signals(shutdown.delegate(), signal_received.clone()) {
error!("{err}");
return;
}
let mut db = None;
let mut config_started = false;
let mut daemon_err = false;
// Start the real daemon 👹
if let Err(err) = daemon_start(
config,
socket,
shutdown.token(),
&mut db,
&mut config_started,
)
.await
{
error!("{err}");
daemon_err = true;
}
// Release last db's sender
let mut db_status = None;
if let Some(db) = db {
db_status = Some(db.quit());
}
debug!("Asking for all tasks to quit...");
shutdown.ask_shutdown();
debug!("Waiting for all tasks to quit...");
shutdown.wait_shutdown().await;
let mut stop_ok = true;
if config_started {
stop_ok = config.stop();
}
if daemon_err || !stop_ok {
exit(1);
} else if let Some(mut db_status) = db_status
&& let Ok(Err(err)) = db_status.try_recv()
{
error!("database error: {}", err);
exit(1);
} else if !signal_received.load(Ordering::SeqCst) {
error!("quitting because all streams finished");
exit(1);
} else {
exit(0);
}
}
async fn daemon_start(
config: &'static Config,
socket: PathBuf,
shutdown: ShutdownToken,
db: &mut Option<Database>,
config_started: &mut bool,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut plugins = Plugins::new(config, shutdown.clone()).await?;
// Open Database
let mut db = Database::open(config, shutdown.token()).await?;
*db = Some(Database::open(config, shutdown.clone()).await?);
let (state, stream_managers) = {
// Semaphore limiting action execution concurrency
@ -66,8 +135,8 @@ pub async fn daemon(
let manager = FilterManager::new(
filter,
exec_limit.clone(),
shutdown.token(),
&mut db,
shutdown.clone(),
db.as_mut().unwrap(),
&mut plugins,
now,
)
@ -77,7 +146,7 @@ pub async fn daemon(
state.insert(stream, filter_managers.clone());
stream_managers.push(
StreamManager::new(stream, filter_managers, shutdown.token(), &mut plugins).await?,
StreamManager::new(stream, filter_managers, shutdown.clone(), &mut plugins).await?,
);
}
(state, stream_managers)
@ -89,9 +158,10 @@ pub async fn daemon(
// Open socket and run task
let socket = Socket::open(socket).await?;
socket.manager(config, state, shutdown.token());
socket.manager(config, state, shutdown.clone());
// reaction won't abort on startup anymore, we can run start commands
*config_started = true;
if !config.start() {
return Err("a start command failed, exiting.".into());
}
@ -101,40 +171,12 @@ pub async fn daemon(
let standalone = stream_manager.is_standalone();
let handle = tokio::spawn(async move { stream_manager.start().await });
// Only wait for standalone streams
if standalone {
Some(handle)
} else {
None
}
if standalone { Some(handle) } else { None }
});
// Close streams when we receive a quit signal
let signal_received = Arc::new(AtomicBool::new(false));
handle_signals(shutdown.delegate(), signal_received.clone())?;
// Wait for all streams to quit
join_all(stream_task_handles).await;
// Release last db's sender
let mut db_status = db.quit();
debug!("Asking for all tasks to quit...");
shutdown.ask_shutdown();
debug!("Waiting for all tasks to quit...");
shutdown.wait_shutdown().await;
let stop_ok = config.stop();
if let Ok(Err(err)) = db_status.try_recv() {
Err(format!("database error: {}", err).into())
} else if !signal_received.load(Ordering::SeqCst) {
Err("quitting because all streams finished".into())
} else if !stop_ok {
Err("while executing stop command".into())
} else {
Ok(())
}
Ok(())
}
fn handle_signals(

View file

@ -4,7 +4,7 @@ use std::{
time::Duration,
};
use futures::{future::join_all, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt, future::join_all};
use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl};
use remoc::Connect;
use serde_json::Value;
@ -16,7 +16,7 @@ use tracing::error;
use crate::{
concepts::{Config, Plugin},
daemon::{stream::reader_to_stream, utils::kill_child, ShutdownToken},
daemon::{ShutdownToken, stream::reader_to_stream, utils::kill_child},
};
pub struct PluginManager {
@ -92,8 +92,10 @@ impl PluginManager {
return Err(format!(
"reaction can't handle plugin {} with incompatible version {}.{}: current version: {}.{}. {}",
plugin.name,
manifest.hello.version_major, manifest.hello.version_minor,
my_hello.version_major, my_hello.version_minor,
manifest.hello.version_major,
manifest.hello.version_minor,
my_hello.version_major,
my_hello.version_minor,
hint
));
}
@ -133,13 +135,17 @@ impl PluginManager {
}
}
async fn handle_stderr(stderr: ChildStderr, plugin_name: String, _shutdown: ShutdownToken) {
async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: ShutdownToken) {
let lines = reader_to_stream(stderr);
tokio::pin!(lines);
loop {
match lines.next().await {
let event = tokio::select! {
line = lines.next() => line,
_ = shutdown.wait() => None,
};
match event {
Some(Ok(line)) => {
// dumb: I can't factorize this because tracing::event!
// sad: I can't factorize this because the tracing::event! macro
// requires its log level to be a constant.
if line.starts_with("DEBUG ") {
tracing::debug!("plugin {plugin_name}: {}", line.split_at(5).1)
@ -150,7 +156,7 @@ async fn handle_stderr(stderr: ChildStderr, plugin_name: String, _shutdown: Shut
} else if line.starts_with("ERROR ") {
tracing::error!("plugin {plugin_name}: {}", line.split_at(5).1)
} else {
// If there is no log level, we suppose it's an error (panic or something)
// If there is no log level, we suppose it's an error (may be a panic or something)
tracing::error!("plugin {plugin_name}: {}", line)
}
}

View file

@ -7,7 +7,6 @@ use reaction::{
daemon::daemon,
protocol::Order,
};
use tracing::{error, Level};
#[tokio::main]
async fn main() {
@ -28,68 +27,64 @@ async fn main() {
let cli = Cli::parse();
let (is_daemon, level) = if let SubCommand::Start { loglevel, .. } = cli.command {
(true, loglevel)
} else {
(false, Level::DEBUG)
};
if is_daemon {
// Set log level
if let SubCommand::Start {
loglevel,
config,
socket,
} = cli.command
{
if let Err(err) = tracing_subscriber::fmt::fmt()
.without_time()
.with_target(false)
.with_ansi(std::io::stdout().is_terminal())
.with_max_level(level)
.with_max_level(loglevel)
// .with_max_level(Level::TRACE)
.try_init()
{
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
}
let result = match cli.command {
SubCommand::Start { config, socket, .. } => daemon(config, socket).await,
SubCommand::Show {
socket,
format,
limit,
patterns,
} => request(socket, format, limit, patterns, Order::Show).await,
SubCommand::Flush {
socket,
format,
limit,
patterns,
} => request(socket, format, limit, patterns, Order::Flush).await,
SubCommand::Trigger {
socket,
limit,
patterns,
} => request(socket, Format::JSON, Some(limit), patterns, Order::Trigger).await,
SubCommand::TestRegex {
config,
regex,
line,
} => test_regex(config, regex, line),
SubCommand::TestConfig {
config,
format,
verbose,
} => test_config(config, format, verbose),
};
match result {
Ok(()) => {
exit(0);
}
Err(err) => {
if is_daemon {
error!("{err}");
} else {
eprintln!("ERROR {err}");
daemon(config, socket).await;
} else {
let result = match cli.command {
SubCommand::Show {
socket,
format,
limit,
patterns,
} => request(socket, format, limit, patterns, Order::Show).await,
SubCommand::Flush {
socket,
format,
limit,
patterns,
} => request(socket, format, limit, patterns, Order::Flush).await,
SubCommand::Trigger {
socket,
limit,
patterns,
} => request(socket, Format::JSON, Some(limit), patterns, Order::Trigger).await,
SubCommand::TestRegex {
config,
regex,
line,
} => test_regex(config, regex, line),
SubCommand::TestConfig {
config,
format,
verbose,
} => test_config(config, format, verbose),
// Can't be daemon
_ => Ok(()),
};
match result {
Ok(()) => {
exit(0);
}
Err(err) => {
eprintln!("ERROR {err}");
exit(1);
}
exit(1);
}
}
}

View file

@ -18,12 +18,12 @@ use std::{
};
use chrono::{Local, TimeDelta};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
fs::{rename, File},
fs::{File, rename},
sync::{mpsc, oneshot},
time::{interval, MissedTickBehavior},
time::{MissedTickBehavior, interval},
};
use crate::{
@ -76,10 +76,12 @@ impl Config {
}
}
pub type DatabaseErrorReceiver = oneshot::Receiver<Result<(), String>>;
/// Public-facing API for a treedb Database
pub struct Database {
entry_tx: Option<mpsc::Sender<Order>>,
error_rx: oneshot::Receiver<Result<(), String>>,
error_rx: DatabaseErrorReceiver,
}
impl Database {
@ -101,7 +103,7 @@ impl Database {
/// Permit to close DB's channel.
/// Without this function manually called, the DB can't close.
pub fn quit(self) -> oneshot::Receiver<Result<(), String>> {
pub fn quit(self) -> DatabaseErrorReceiver {
self.error_rx
}
}
@ -279,7 +281,7 @@ async fn rotate_db(
// No need to rotate the database when it is new,
// we return here
(true, ErrorKind::NotFound) => {
return Ok((WriteDB::new(File::create(path).await?), HashMap::default()))
return Ok((WriteDB::new(File::create(path).await?), HashMap::default()));
}
(_, _) => return Err(err),
},
@ -480,13 +482,13 @@ mod tests {
use chrono::{Local, TimeDelta};
use serde_json::Value;
use tempfile::{NamedTempFile, TempDir};
use tokio::fs::{write, File};
use tokio::fs::{File, write};
use crate::{concepts::Config, daemon::ShutdownController};
use super::{
helpers::*, raw::WriteDB, rotate_db, Database, DatabaseManager, Entry, KeyType, LoadedDB,
Tree, ValueType, DB_NAME,
DB_NAME, Database, DatabaseManager, Entry, KeyType, LoadedDB, Tree, ValueType, helpers::*,
raw::WriteDB, rotate_db,
};
impl DatabaseManager {