Improve reaction-plugin developer documentation

This commit is contained in:
ppom 2026-02-11 12:00:00 +01:00
commit b07b5064e9
No known key found for this signature in database
4 changed files with 296 additions and 45 deletions

View file

@ -1,6 +1,6 @@
//! This crate defines the API between reaction's core and plugins.
//!
//! Plugins must be written in Rust.
//! Plugins must be written in Rust, for now.
//!
//! This documentation assumes the reader has some knowledge of Rust.
//! However, if you find that something is unclear, don't hesitate to
@ -10,6 +10,14 @@
//! the entrypoint for a plugin.
//! It permits to define `0` to `n` custom stream and action types.
//!
//! ## Note on reaction-plugin API stability
//!
//! This is the v1 of reaction's plugin interface.
//! It's quite efficient and complete, but it has the big drawback of being Rust-only and [`tokio`]-only.
//!
//! In the future, I'd like to define a language-agnostic interface, which will be a major breaking change in the API.
//! However, I'll try my best to reduce the necessary code changes for plugins that use this v1.
//!
//! ## Naming & calling conventions
//!
//! Your plugin should be named `reaction-plugin-$NAME`, eg. `reaction-plugin-postgresql`.
@ -20,26 +28,38 @@
//! This can be useful if you want to provide CLI functionnality to your users,
//! so you can distinguish between a human user and reaction.
//!
//! ### State directory
//!
//! It will be executed in its own directory, in which it should have write access.
//! The directory is `$reaction_state_directory/plugin_data/$NAME`.
//! reaction's [state_directory](https://reaction.ppom.me/reference.html#state_directory)
//! defaults to its working directory.
//! defaults to its working directory, which is `/var/lib/reaction` in most setups.
//!
//! So your plugin directory should most often be `/var/lib/reaction/plugin_data/$NAME`,
//! but the plugin shouldn't expect that and use the current working directory instead.
//!
//! ## Communication
//!
//! Communication between the plugin and reaction is based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait
//! calls over a single transport channel.
//! The channels is made of stdin and stdout, so don't use them for something else.
//! The channels read and write channels are stdin and stdout, so you shouldn't use them for something else.
//!
//! [`remoc`] build upon [`tokio`], so you'll need to use tokio too.
//! [`remoc`] builds upon [`tokio`], so you'll need to use tokio too.
//!
//! ### Errors
//!
//! Errors can be printed to stderr.
//! Errors during:
//! - config loading in [`PluginInfo::load_config`]
//! - startup in [`PluginInfo::start`]
//!
//! should be returned to reaction by the function's return value, permitting reaction to abort startup.
//!
//! During normal runtime, after the plugin has loaded its config and started, and before reaction is quitting, there is no *rusty* way to send errors to reaction.
//! Then errors can be printed to stderr.
//! They'll be captured line by line and re-printed by reaction, with the plugin name prepended.
//!
//! A line can start with `DEBUG `, `INFO `, `WARN `, `ERROR `.
//! If the starts with none of the above, the line is assumed to be an error.
//! If it starts with none of the above, the line is assumed to be an error.
//!
//! Example:
//! Those lines:
@ -53,27 +73,31 @@
//! ERROR plugin test: Freeeee errrooooorrr
//! ```
//!
//! ## Helpers
//! Plugins should not exit when there is an error: reaction quits only when told to do so,
//! or if all its streams exit, and won't retry starting a failing plugin or stream.
//! Please only exit if you're in a 100% failing state.
//! It's considered better to continue operating in a degraded state than exiting.
//!
//! Those helpers permits to easily maintain similar configuration interfaces accross plugins:
//! ## Getting started
//!
//! - [`line::PatternLine`], to permit users to use templated lines (ie. "\<ip> bad password").
//! - [`time::parse_duration`] to parse durations (ie. "6h", "3 days").
//! If you don't have Rust already installed, follow their [*Getting Started* documentation](https://rust-lang.org/learn/get-started/)
//! to get rust build tools and learn about editor support.
//!
//! Those helpers solve common issues for reaction plugins:
//!
//! - The [`shutdown`] module provides structures to ease the quitting process when having multiple tokio tasks.
//!
//! ## Starting template
//! Then create a new repository with cargo:
//!
//! ```bash
//! cargo new reaction-plugin-$NAME
//! cd reaction-plugin-$NAME
//! cargo add reaction-plugin tokio
//! vim src/main.rs
//! ```
//!
//! `src/main.rs`
//! Add required dependencies:
//!
//! ```bash
//! cargo add reaction-plugin tokio
//! ```
//!
//! Replace `src/main.rs` with those contents:
//!
//! ```ignore
//! use reaction_plugin::PluginInfo;
//!
@ -86,15 +110,20 @@
//! #[derive(Default)]
//! struct MyPlugin {}
//!
//! impl PluginInfo for Plugin {
//! // ...
//! // Your IDE should propose to implement missing members of the `Plugin` trait
//! impl PluginInfo for MyPlugin {
//! // ...
//! }
//! ```
//!
//! Your IDE should now propose to implement missing members of the [`PluginInfo`] trait.
//! Your journey starts!
//!
//! ## Examples
//!
//! Core plugins can be found here: <https://framagit.org/ppom/reaction/-/tree/main/plugins>.
//! The "virtual" plugin is the simplest and can serve as a good complete example.
//!
//! - The "virtual" plugin is the simplest and can serve as a good complete example that links custom stream types and custom action types.
//! - The "ipset" plugin is a good example of an action-only plugin.
use std::{
collections::{BTreeMap, BTreeSet},
@ -116,18 +145,35 @@ pub mod line;
pub mod shutdown;
pub mod time;
/// This is the only trait that **must** be implemented by a plugin.
/// The only trait that **must** be implemented by a plugin.
/// It provides lists of stream, filter and action types implemented by a dynamic plugin.
#[rtc::remote]
pub trait PluginInfo {
/// Return the manifest of the plugin.
/// This should not be dynamic, and return always the same manifest.
///
/// Example implementation:
/// ```
/// Ok(Manifest {
/// hello: Hello::new(),
/// streams: BTreeSet::from(["mystreamtype".into()]),
/// actions: BTreeSet::from(["myactiontype".into()]),
/// })
/// ```
///
/// First function called.
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError>;
/// Load all plugin stream and action configurations,
/// Errors if config is invalid.
/// Load all plugin stream and action configurations.
/// Must error if config is invalid.
///
/// The plugin should not start running mutable commands here:
/// It should be ok to quit without cleanup for now.
///
/// Each [`StreamConfig`] from the `streams` arg should result in a corresponding [`StreamImpl`] returned, in the same order.
/// Each [`ActionConfig`] from the `actions` arg should result in a corresponding [`ActionImpl`] returned, in the same order.
///
/// Function called after [`PluginInfo::manifest`].
async fn load_config(
&mut self,
streams: Vec<StreamConfig>,
@ -136,14 +182,62 @@ pub trait PluginInfo {
/// Notify the plugin that setup is finished, permitting a last occasion to report an error that'll make reaction exit.
/// All initialization (opening remote connections, starting streams, etc) should happen here.
///
/// Function called after [`PluginInfo::load_config`].
async fn start(&mut self) -> RemoteResult<()>;
/// Notify the plugin that reaction is quitting and that the plugin should quit too.
/// A few seconds later, the plugin will receive SIGTERM.
/// A few seconds later, the plugin will receive SIGKILL.
///
/// Function called after [`PluginInfo::start`], when reaction is quitting.
async fn close(mut self) -> RemoteResult<()>;
}
/// The config for one Stream of a type advertised by this plugin.
///
/// For example this user config:
/// ```jsonnet
/// {
/// streams: {
/// mystream: {
/// type: "mystreamtype",
/// options: {
/// key: "value",
/// num: 3,
/// },
/// // filters: ...
/// },
/// },
/// }
/// ```
///
/// would result in the following `StreamConfig`:
///
/// ```
/// StreamConfig {
/// stream_name: "mystream",
/// stream_type: "mystreamtype",
/// config: Value::Object(BTreeMap::from([
/// ("key", Value::String("value")),
/// ("num", Value::Integer(3)),
/// ])),
/// }
/// ```
///
/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
///
/// ```
/// #[derive(Deserialize)]
/// struct MyStreamOptions {
/// key: String,
/// num: i64,
/// }
///
/// fn validate_config(stream_config: Value) -> Result<MyStreamOptions, serde_json::Error> {
/// serde_json::from_value(stream_config.into())
/// }
/// ```
#[derive(Serialize, Deserialize, Clone)]
pub struct StreamConfig {
pub stream_name: String,
@ -151,6 +245,59 @@ pub struct StreamConfig {
pub config: Value,
}
/// The config for one Stream of a type advertised by this plugin.
///
/// For example this user config:
/// ```jsonnet
/// {
/// streams: {
/// mystream: {
/// // ...
/// filters: {
/// myfilter: {
/// // ...
/// actions: {
/// myaction: {
/// type: "myactiontype",
/// options: {
/// boolean: true,
/// array: ["item"],
/// },
/// },
/// },
/// },
/// },
/// },
/// },
/// }
/// ```
///
/// would result in the following `ActionConfig`:
///
/// ```rust
/// ActionConfig {
/// action_name: "myaction",
/// action_type: "myactiontype",
/// config: Value::Object(BTreeMap::from([
/// ("boolean", Value::Boolean(true)),
/// ("array", Value::Array([Value::String("item")])),
/// ])),
/// }
/// ```
///
/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
///
/// ```rust
/// #[derive(Deserialize)]
/// struct MyActionOptions {
/// boolean: bool,
/// array: Vec<String>,
/// }
///
/// fn validate_config(action_config: Value) -> Result<MyActionOptions, serde_json::Error> {
/// serde_json::from_value(action_config.into())
/// }
/// ```
#[derive(Serialize, Deserialize, Clone)]
pub struct ActionConfig {
pub stream_name: String,
@ -161,11 +308,14 @@ pub struct ActionConfig {
pub patterns: Vec<String>,
}
/// Mandatory announcement of a plugin's protocol version, stream and action types.
#[derive(Serialize, Deserialize)]
pub struct Manifest {
// Protocol version. available as the [`hello!`] macro.
// Protocol version.
// Just use the [`Hello::new`] constructor that uses this crate's current version.
pub hello: Hello,
/// stream types that should be made available to reaction users
/// Stream types that should be made available to reaction users
///
/// ```jsonnet
/// {
/// streams: {
@ -177,7 +327,26 @@ pub struct Manifest {
/// }
/// ```
pub streams: BTreeSet<String>,
/// All action types that should be made available to reaction users
/// Action types that should be made available to reaction users
///
/// ```jsonnet
/// {
/// streams: {
/// mystream: {
/// filters: {
/// myfilter: {
/// actions: {
/// myaction: {
/// type: "myactiontype",
/// # ↑ all those exposed types
/// },
/// },
/// },
/// },
/// },
/// },
/// }
/// ```
pub actions: BTreeSet<String>,
}
@ -192,6 +361,8 @@ pub struct Hello {
}
impl Hello {
/// Constructor that fills a [`Hello`] struct with [`crate`]'s version.
/// You should use this in your plugin [`Manifest`].
pub fn new() -> Hello {
Hello {
version_major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
@ -199,6 +370,9 @@ impl Hello {
}
}
/// Used by the reaction daemon. Permits to check compatibility between two versions.
/// Major versions must be the same between the daemon and plugin.
/// Minor version of the daemon must be greater than or equal minor version of the plugin.
pub fn is_compatible(server: &Hello, plugin: &Hello) -> std::result::Result<(), String> {
if server.version_major == plugin.version_major
&& server.version_minor >= plugin.version_minor
@ -215,8 +389,8 @@ impl Hello {
}
}
/// A clone of [`serde_json::Value`]
/// Implements From & Into [`serde_json::Value`]
/// A clone of [`serde_json::Value`].
/// Implements From & Into [`serde_json::Value`].
#[derive(Serialize, Deserialize, Clone)]
pub enum Value {
Null,
@ -263,14 +437,18 @@ impl Into<JValue> for Value {
}
}
pub type Line = (String, Duration);
/// Represents a Stream handled by a plugin on reaction core's side.
///
/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Line`].
/// It will keep the sending side for itself and put the receiving side in a [`StreamImpl`].
///
/// The plugin should start sending [`Line`]s in the channel only after [`PluginInfo::start`] has been called by reaction core.
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamImpl {
pub stream: rch::mpsc::Receiver<Line>,
/// Whether this stream works standalone, or if it needs other streams to be fed.
/// Whether this stream works standalone, or if it needs other streams or actions to be fed.
/// Defaults to true.
/// When false, reaction will exit if it's the last one standing.
/// When `false`, reaction will exit if it's the last one standing.
#[serde(default = "_true")]
pub standalone: bool,
}
@ -279,22 +457,37 @@ fn _true() -> bool {
true
}
/// Messages passed from the [`StreamImpl`] of a plugin to reaction core
pub type Line = (String, Duration);
// // Filters
// // For now, plugins can't handle custom filter implementations.
// #[derive(Serialize, Deserialize)]
// pub struct FilterImpl {
// pub stream: rch::lr::Sender<Exec>,
// }
// #[derive(Serialize, Deserialize)]
// pub struct Match {
// pub match_: String,
// pub result: rch::oneshot::Sender<bool>,
// }
/// Represents an Action handled by a plugin on reaction core's side.
///
/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Exec`].
/// It will keep the receiving side for itself and put the sending side in a [`ActionImpl`].
///
/// The plugin will start receiving [`Exec`]s in the channel from reaction only after [`PluginInfo::start`] has been called by reaction core.
#[derive(Clone, Serialize, Deserialize)]
pub struct ActionImpl {
pub tx: rch::mpsc::Sender<Exec>,
}
/// A [trigger](https://reaction.ppom.me/reference.html#trigger) of the Action, sent by reaction core to the plugin.
///
/// The plugin should perform the configured action for each received [`Exec`].
///
/// Any error during its execution should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
#[derive(Serialize, Deserialize)]
pub struct Exec {
pub match_: Vec<String>,
@ -303,6 +496,9 @@ pub struct Exec {
/// The main loop for a plugin.
///
/// Bootstraps the communication with reaction core on the process' stdin and stdout,
/// then holds the connection and maintains the plugin in a server state.
///
/// Your main function should only create a struct that implements [`PluginInfo`]
/// and then call [`main_loop`]:
/// ```ignore
@ -344,11 +540,18 @@ pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
pub type RemoteResult<T> = Result<T, RemoteError>;
/// A Plugin Error
/// It's either a connection error or a free String for plugin-specific errors
/// reaction-plugin's Error type.
#[derive(Debug, Serialize, Deserialize)]
pub enum RemoteError {
/// A connection error that origins from [`remoc`], the crate used for communication on the plugin's `stdin`/`stdout`.
///
/// You should not instantiate this type of error yourself.
Remoc(rtc::CallError),
/// A free String for application-specific errors.
///
/// You should only instantiate this type of error yourself, for any error that you encounter at startup and shutdown.
///
/// Otherwise, any error during the plugin's runtime should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
Plugin(String),
}

View file

@ -1,3 +1,9 @@
//! Helper module that permits to use templated lines (ie. `bad password for <ip>`), like in Stream's and Action's `cmd`.
//!
//! Corresponding reaction core settings:
//! - [Stream's `cmd`](https://reaction.ppom.me/reference.html#cmd)
//! - [Action's `cmd`](https://reaction.ppom.me/reference.html#cmd-1)
//!
#[derive(Debug, PartialEq, Eq)]
enum SendItem {
Index(usize),
@ -55,7 +61,7 @@ pub struct PatternLine {
impl PatternLine {
/// Construct [`PatternLine`] from a template line and the list of patterns of the underlying [Filter](https://reaction.ppom.me/reference.html#filter).
///
/// This list of patterns comes from [`PluginInfo::action_impl`].
/// This list of patterns comes from [`super::ActionConfig`].
pub fn new(template: String, patterns: Vec<String>) -> Self {
let line = Self::_from(patterns, Vec::from([SendItem::Str(template)]));
Self {

View file

@ -1,10 +1,49 @@
//! Helper module that provides structures to ease the quitting process when having multiple tokio tasks.
//!
//! It defines a [`ShutdownController`], that permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
//!
//! You can have it as an attribute of your plugin struct.
//! ```
//! struct MyPlugin {
//! shutdown: ShutdownController
//! }
//! ```
//!
//! You can then give a [`ShutdownToken`] to other tasks when creating them:
//!
//! ```
//! impl PluginInfo for MyPlugin {
//! async fn start(&mut self) -> RemoteResult<()> {
//! let token = self.shutdown.token();
//!
//! tokio::spawn(async move {
//! token.wait().await;
//! eprintln!("DEBUG shutdown asked to quit, now quitting")
//! })
//! }
//! }
//! ```
//!
//! On closing, calling [`ShutdownController::ask_shutdown`] will inform all tasks waiting on [`ShutdownToken::wait`] that it's time to leave.
//! Then we can wait for [`ShutdownController::wait_all_task_shutdown`] to complete.
//!
//! ```
//! impl PluginInfo for MyPlugin {
//! async fn close(self) -> RemoteResult<()> {
//! self.shutdown.ask_shutdown();
//! self.shutdown.wait_all_task_shutdown().await;
//! Ok(())
//! }
//! }
//! ```
use tokio_util::{
sync::{CancellationToken, WaitForCancellationFuture},
task::task_tracker::{TaskTracker, TaskTrackerToken},
};
/// Permits to keep track of ongoing tasks, ask them to shutdown and for all of them to quit.
/// Stupid wrapper around [`tokio_util::CancellationToken`] and [`tokio_util::task_tracker::TaskTracker`].
/// Permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
/// Stupid wrapper around [`tokio_util::sync::CancellationToken`] and [`tokio_util::task::task_tracker::TaskTracker`].
#[derive(Default, Clone)]
pub struct ShutdownController {
shutdown_notifyer: CancellationToken,
@ -12,12 +51,8 @@ pub struct ShutdownController {
}
impl ShutdownController {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
shutdown_notifyer: CancellationToken::new(),
task_tracker: TaskTracker::new(),
}
Self::default()
}
/// Ask for all tasks to quit
@ -66,7 +101,7 @@ impl ShutdownDelegate {
///
/// - Wait for a shutdown request to happen with [`Self::wait`]
/// - Keep track of the current task. While this token is held,
/// [`ShutdownController::wait_shutdown`] will block.
/// [`ShutdownController::wait_all_task_shutdown`] will block.
#[derive(Clone)]
pub struct ShutdownToken {
shutdown_notifyer: CancellationToken,
@ -81,6 +116,7 @@ impl ShutdownToken {
}
}
/// Returns underlying [`CancellationToken`] and [`TaskTrackerToken`], consuming self.
pub fn split(self) -> (CancellationToken, TaskTrackerToken) {
(self.shutdown_notifyer, self._task_tracker_token)
}

View file

@ -1,3 +1,9 @@
//! This module provides [`parse_duration`], which parses duration in reaction's format (ie. `6h`, `3 days`)
//!
//! Like in those reaction core settings:
//! - [Filters' `retryperiod`](https://reaction.ppom.me/reference.html#retryperiod)
//! - [Actions' `after`](https://reaction.ppom.me/reference.html#after).
use std::time::Duration;
/// Parses the &str argument as a Duration