diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index 2ce95dc..38a91d1 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -1,6 +1,6 @@ //! This crate defines the API between reaction's core and plugins. //! -//! Plugins must be written in Rust. +//! Plugins must be written in Rust, for now. //! //! This documentation assumes the reader has some knowledge of Rust. //! However, if you find that something is unclear, don't hesitate to @@ -10,6 +10,14 @@ //! the entrypoint for a plugin. //! It permits to define `0` to `n` custom stream and action types. //! +//! ## Note on reaction-plugin API stability +//! +//! This is the v1 of reaction's plugin interface. +//! It's quite efficient and complete, but it has the big drawback of being Rust-only and [`tokio`]-only. +//! +//! In the future, I'd like to define a language-agnostic interface, which will be a major breaking change in the API. +//! However, I'll try my best to reduce the necessary code changes for plugins that use this v1. +//! //! ## Naming & calling conventions //! //! Your plugin should be named `reaction-plugin-$NAME`, eg. `reaction-plugin-postgresql`. @@ -20,26 +28,38 @@ //! This can be useful if you want to provide CLI functionnality to your users, //! so you can distinguish between a human user and reaction. //! +//! ### State directory +//! //! It will be executed in its own directory, in which it should have write access. //! The directory is `$reaction_state_directory/plugin_data/$NAME`. //! reaction's [state_directory](https://reaction.ppom.me/reference.html#state_directory) -//! defaults to its working directory. +//! defaults to its working directory, which is `/var/lib/reaction` in most setups. +//! +//! So your plugin directory should most often be `/var/lib/reaction/plugin_data/$NAME`, +//! but the plugin shouldn't expect that and use the current working directory instead. //! //! ## Communication //! //! Communication between the plugin and reaction is based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait //! calls over a single transport channel. -//! The channels is made of stdin and stdout, so don't use them for something else. +//! The channels read and write channels are stdin and stdout, so you shouldn't use them for something else. //! -//! [`remoc`] build upon [`tokio`], so you'll need to use tokio too. +//! [`remoc`] builds upon [`tokio`], so you'll need to use tokio too. //! //! ### Errors //! -//! Errors can be printed to stderr. +//! Errors during: +//! - config loading in [`PluginInfo::load_config`] +//! - startup in [`PluginInfo::start`] +//! +//! should be returned to reaction by the function's return value, permitting reaction to abort startup. +//! +//! During normal runtime, after the plugin has loaded its config and started, and before reaction is quitting, there is no *rusty* way to send errors to reaction. +//! Then errors can be printed to stderr. //! They'll be captured line by line and re-printed by reaction, with the plugin name prepended. //! //! A line can start with `DEBUG `, `INFO `, `WARN `, `ERROR `. -//! If the starts with none of the above, the line is assumed to be an error. +//! If it starts with none of the above, the line is assumed to be an error. //! //! Example: //! Those lines: @@ -53,27 +73,31 @@ //! ERROR plugin test: Freeeee errrooooorrr //! ``` //! -//! ## Helpers +//! Plugins should not exit when there is an error: reaction quits only when told to do so, +//! or if all its streams exit, and won't retry starting a failing plugin or stream. +//! Please only exit if you're in a 100% failing state. +//! It's considered better to continue operating in a degraded state than exiting. //! -//! Those helpers permits to easily maintain similar configuration interfaces accross plugins: +//! ## Getting started //! -//! - [`line::PatternLine`], to permit users to use templated lines (ie. "\ bad password"). -//! - [`time::parse_duration`] to parse durations (ie. "6h", "3 days"). +//! If you don't have Rust already installed, follow their [*Getting Started* documentation](https://rust-lang.org/learn/get-started/) +//! to get rust build tools and learn about editor support. //! -//! Those helpers solve common issues for reaction plugins: -//! -//! - The [`shutdown`] module provides structures to ease the quitting process when having multiple tokio tasks. -//! -//! ## Starting template +//! Then create a new repository with cargo: //! //! ```bash //! cargo new reaction-plugin-$NAME //! cd reaction-plugin-$NAME -//! cargo add reaction-plugin tokio -//! vim src/main.rs //! ``` //! -//! `src/main.rs` +//! Add required dependencies: +//! +//! ```bash +//! cargo add reaction-plugin tokio +//! ``` +//! +//! Replace `src/main.rs` with those contents: +//! //! ```ignore //! use reaction_plugin::PluginInfo; //! @@ -86,15 +110,20 @@ //! #[derive(Default)] //! struct MyPlugin {} //! -//! impl PluginInfo for Plugin { -//! // ... -//! // Your IDE should propose to implement missing members of the `Plugin` trait +//! impl PluginInfo for MyPlugin { //! // ... //! } //! ``` //! +//! Your IDE should now propose to implement missing members of the [`PluginInfo`] trait. +//! Your journey starts! +//! +//! ## Examples +//! //! Core plugins can be found here: . -//! The "virtual" plugin is the simplest and can serve as a good complete example. +//! +//! - The "virtual" plugin is the simplest and can serve as a good complete example that links custom stream types and custom action types. +//! - The "ipset" plugin is a good example of an action-only plugin. use std::{ collections::{BTreeMap, BTreeSet}, @@ -116,18 +145,35 @@ pub mod line; pub mod shutdown; pub mod time; -/// This is the only trait that **must** be implemented by a plugin. +/// The only trait that **must** be implemented by a plugin. /// It provides lists of stream, filter and action types implemented by a dynamic plugin. #[rtc::remote] pub trait PluginInfo { /// Return the manifest of the plugin. + /// This should not be dynamic, and return always the same manifest. + /// + /// Example implementation: + /// ``` + /// Ok(Manifest { + /// hello: Hello::new(), + /// streams: BTreeSet::from(["mystreamtype".into()]), + /// actions: BTreeSet::from(["myactiontype".into()]), + /// }) + /// ``` + /// + /// First function called. async fn manifest(&mut self) -> Result; - /// Load all plugin stream and action configurations, - /// Errors if config is invalid. + /// Load all plugin stream and action configurations. + /// Must error if config is invalid. /// /// The plugin should not start running mutable commands here: /// It should be ok to quit without cleanup for now. + /// + /// Each [`StreamConfig`] from the `streams` arg should result in a corresponding [`StreamImpl`] returned, in the same order. + /// Each [`ActionConfig`] from the `actions` arg should result in a corresponding [`ActionImpl`] returned, in the same order. + /// + /// Function called after [`PluginInfo::manifest`]. async fn load_config( &mut self, streams: Vec, @@ -136,14 +182,62 @@ pub trait PluginInfo { /// Notify the plugin that setup is finished, permitting a last occasion to report an error that'll make reaction exit. /// All initialization (opening remote connections, starting streams, etc) should happen here. + /// + /// Function called after [`PluginInfo::load_config`]. async fn start(&mut self) -> RemoteResult<()>; /// Notify the plugin that reaction is quitting and that the plugin should quit too. /// A few seconds later, the plugin will receive SIGTERM. /// A few seconds later, the plugin will receive SIGKILL. + /// + /// Function called after [`PluginInfo::start`], when reaction is quitting. async fn close(mut self) -> RemoteResult<()>; } +/// The config for one Stream of a type advertised by this plugin. +/// +/// For example this user config: +/// ```jsonnet +/// { +/// streams: { +/// mystream: { +/// type: "mystreamtype", +/// options: { +/// key: "value", +/// num: 3, +/// }, +/// // filters: ... +/// }, +/// }, +/// } +/// ``` +/// +/// would result in the following `StreamConfig`: +/// +/// ``` +/// StreamConfig { +/// stream_name: "mystream", +/// stream_type: "mystreamtype", +/// config: Value::Object(BTreeMap::from([ +/// ("key", Value::String("value")), +/// ("num", Value::Integer(3)), +/// ])), +/// } +/// ``` +/// +/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct: +/// +/// ``` +/// #[derive(Deserialize)] +/// struct MyStreamOptions { +/// key: String, +/// num: i64, +/// } +/// +/// fn validate_config(stream_config: Value) -> Result { +/// serde_json::from_value(stream_config.into()) +/// } +/// ``` #[derive(Serialize, Deserialize, Clone)] pub struct StreamConfig { pub stream_name: String, @@ -151,6 +245,59 @@ pub struct StreamConfig { pub config: Value, } +/// The config for one Stream of a type advertised by this plugin. +/// +/// For example this user config: +/// ```jsonnet +/// { +/// streams: { +/// mystream: { +/// // ... +/// filters: { +/// myfilter: { +/// // ... +/// actions: { +/// myaction: { +/// type: "myactiontype", +/// options: { +/// boolean: true, +/// array: ["item"], +/// }, +/// }, +/// }, +/// }, +/// }, +/// }, +/// }, +/// } +/// ``` +/// +/// would result in the following `ActionConfig`: +/// +/// ```rust +/// ActionConfig { +/// action_name: "myaction", +/// action_type: "myactiontype", +/// config: Value::Object(BTreeMap::from([ +/// ("boolean", Value::Boolean(true)), +/// ("array", Value::Array([Value::String("item")])), +/// ])), +/// } +/// ``` +/// +/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct: +/// +/// ```rust +/// #[derive(Deserialize)] +/// struct MyActionOptions { +/// boolean: bool, +/// array: Vec, +/// } +/// +/// fn validate_config(action_config: Value) -> Result { +/// serde_json::from_value(action_config.into()) +/// } +/// ``` #[derive(Serialize, Deserialize, Clone)] pub struct ActionConfig { pub stream_name: String, @@ -161,11 +308,14 @@ pub struct ActionConfig { pub patterns: Vec, } +/// Mandatory announcement of a plugin's protocol version, stream and action types. #[derive(Serialize, Deserialize)] pub struct Manifest { - // Protocol version. available as the [`hello!`] macro. + // Protocol version. + // Just use the [`Hello::new`] constructor that uses this crate's current version. pub hello: Hello, - /// stream types that should be made available to reaction users + /// Stream types that should be made available to reaction users + /// /// ```jsonnet /// { /// streams: { @@ -177,7 +327,26 @@ pub struct Manifest { /// } /// ``` pub streams: BTreeSet, - /// All action types that should be made available to reaction users + /// Action types that should be made available to reaction users + /// + /// ```jsonnet + /// { + /// streams: { + /// mystream: { + /// filters: { + /// myfilter: { + /// actions: { + /// myaction: { + /// type: "myactiontype", + /// # ↑ all those exposed types + /// }, + /// }, + /// }, + /// }, + /// }, + /// }, + /// } + /// ``` pub actions: BTreeSet, } @@ -192,6 +361,8 @@ pub struct Hello { } impl Hello { + /// Constructor that fills a [`Hello`] struct with [`crate`]'s version. + /// You should use this in your plugin [`Manifest`]. pub fn new() -> Hello { Hello { version_major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), @@ -199,6 +370,9 @@ impl Hello { } } + /// Used by the reaction daemon. Permits to check compatibility between two versions. + /// Major versions must be the same between the daemon and plugin. + /// Minor version of the daemon must be greater than or equal minor version of the plugin. pub fn is_compatible(server: &Hello, plugin: &Hello) -> std::result::Result<(), String> { if server.version_major == plugin.version_major && server.version_minor >= plugin.version_minor @@ -215,8 +389,8 @@ impl Hello { } } -/// A clone of [`serde_json::Value`] -/// Implements From & Into [`serde_json::Value`] +/// A clone of [`serde_json::Value`]. +/// Implements From & Into [`serde_json::Value`]. #[derive(Serialize, Deserialize, Clone)] pub enum Value { Null, @@ -263,14 +437,18 @@ impl Into for Value { } } -pub type Line = (String, Duration); - +/// Represents a Stream handled by a plugin on reaction core's side. +/// +/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Line`]. +/// It will keep the sending side for itself and put the receiving side in a [`StreamImpl`]. +/// +/// The plugin should start sending [`Line`]s in the channel only after [`PluginInfo::start`] has been called by reaction core. #[derive(Debug, Serialize, Deserialize)] pub struct StreamImpl { pub stream: rch::mpsc::Receiver, - /// Whether this stream works standalone, or if it needs other streams to be fed. + /// Whether this stream works standalone, or if it needs other streams or actions to be fed. /// Defaults to true. - /// When false, reaction will exit if it's the last one standing. + /// When `false`, reaction will exit if it's the last one standing. #[serde(default = "_true")] pub standalone: bool, } @@ -279,22 +457,37 @@ fn _true() -> bool { true } +/// Messages passed from the [`StreamImpl`] of a plugin to reaction core +pub type Line = (String, Duration); + +// // Filters +// // For now, plugins can't handle custom filter implementations. // #[derive(Serialize, Deserialize)] // pub struct FilterImpl { // pub stream: rch::lr::Sender, // } - // #[derive(Serialize, Deserialize)] // pub struct Match { // pub match_: String, // pub result: rch::oneshot::Sender, // } +/// Represents an Action handled by a plugin on reaction core's side. +/// +/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Exec`]. +/// It will keep the receiving side for itself and put the sending side in a [`ActionImpl`]. +/// +/// The plugin will start receiving [`Exec`]s in the channel from reaction only after [`PluginInfo::start`] has been called by reaction core. #[derive(Clone, Serialize, Deserialize)] pub struct ActionImpl { pub tx: rch::mpsc::Sender, } +/// A [trigger](https://reaction.ppom.me/reference.html#trigger) of the Action, sent by reaction core to the plugin. +/// +/// The plugin should perform the configured action for each received [`Exec`]. +/// +/// Any error during its execution should be logged to stderr, see [`crate#Errors`] for error handling recommandations. #[derive(Serialize, Deserialize)] pub struct Exec { pub match_: Vec, @@ -303,6 +496,9 @@ pub struct Exec { /// The main loop for a plugin. /// +/// Bootstraps the communication with reaction core on the process' stdin and stdout, +/// then holds the connection and maintains the plugin in a server state. +/// /// Your main function should only create a struct that implements [`PluginInfo`] /// and then call [`main_loop`]: /// ```ignore @@ -344,11 +540,18 @@ pub async fn main_loop(plugin_info: T) { pub type RemoteResult = Result; -/// A Plugin Error -/// It's either a connection error or a free String for plugin-specific errors +/// reaction-plugin's Error type. #[derive(Debug, Serialize, Deserialize)] pub enum RemoteError { + /// A connection error that origins from [`remoc`], the crate used for communication on the plugin's `stdin`/`stdout`. + /// + /// You should not instantiate this type of error yourself. Remoc(rtc::CallError), + /// A free String for application-specific errors. + /// + /// You should only instantiate this type of error yourself, for any error that you encounter at startup and shutdown. + /// + /// Otherwise, any error during the plugin's runtime should be logged to stderr, see [`crate#Errors`] for error handling recommandations. Plugin(String), } diff --git a/plugins/reaction-plugin/src/line.rs b/plugins/reaction-plugin/src/line.rs index bf91a8c..7b315d0 100644 --- a/plugins/reaction-plugin/src/line.rs +++ b/plugins/reaction-plugin/src/line.rs @@ -1,3 +1,9 @@ +//! Helper module that permits to use templated lines (ie. `bad password for `), like in Stream's and Action's `cmd`. +//! +//! Corresponding reaction core settings: +//! - [Stream's `cmd`](https://reaction.ppom.me/reference.html#cmd) +//! - [Action's `cmd`](https://reaction.ppom.me/reference.html#cmd-1) +//! #[derive(Debug, PartialEq, Eq)] enum SendItem { Index(usize), @@ -55,7 +61,7 @@ pub struct PatternLine { impl PatternLine { /// Construct [`PatternLine`] from a template line and the list of patterns of the underlying [Filter](https://reaction.ppom.me/reference.html#filter). /// - /// This list of patterns comes from [`PluginInfo::action_impl`]. + /// This list of patterns comes from [`super::ActionConfig`]. pub fn new(template: String, patterns: Vec) -> Self { let line = Self::_from(patterns, Vec::from([SendItem::Str(template)])); Self { diff --git a/plugins/reaction-plugin/src/shutdown.rs b/plugins/reaction-plugin/src/shutdown.rs index f3c6c96..cc9ee4f 100644 --- a/plugins/reaction-plugin/src/shutdown.rs +++ b/plugins/reaction-plugin/src/shutdown.rs @@ -1,10 +1,49 @@ +//! Helper module that provides structures to ease the quitting process when having multiple tokio tasks. +//! +//! It defines a [`ShutdownController`], that permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit. +//! +//! You can have it as an attribute of your plugin struct. +//! ``` +//! struct MyPlugin { +//! shutdown: ShutdownController +//! } +//! ``` +//! +//! You can then give a [`ShutdownToken`] to other tasks when creating them: +//! +//! ``` +//! impl PluginInfo for MyPlugin { +//! async fn start(&mut self) -> RemoteResult<()> { +//! let token = self.shutdown.token(); +//! +//! tokio::spawn(async move { +//! token.wait().await; +//! eprintln!("DEBUG shutdown asked to quit, now quitting") +//! }) +//! } +//! } +//! ``` +//! +//! On closing, calling [`ShutdownController::ask_shutdown`] will inform all tasks waiting on [`ShutdownToken::wait`] that it's time to leave. +//! Then we can wait for [`ShutdownController::wait_all_task_shutdown`] to complete. +//! +//! ``` +//! impl PluginInfo for MyPlugin { +//! async fn close(self) -> RemoteResult<()> { +//! self.shutdown.ask_shutdown(); +//! self.shutdown.wait_all_task_shutdown().await; +//! Ok(()) +//! } +//! } +//! ``` + use tokio_util::{ sync::{CancellationToken, WaitForCancellationFuture}, task::task_tracker::{TaskTracker, TaskTrackerToken}, }; -/// Permits to keep track of ongoing tasks, ask them to shutdown and for all of them to quit. -/// Stupid wrapper around [`tokio_util::CancellationToken`] and [`tokio_util::task_tracker::TaskTracker`]. +/// Permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit. +/// Stupid wrapper around [`tokio_util::sync::CancellationToken`] and [`tokio_util::task::task_tracker::TaskTracker`]. #[derive(Default, Clone)] pub struct ShutdownController { shutdown_notifyer: CancellationToken, @@ -12,12 +51,8 @@ pub struct ShutdownController { } impl ShutdownController { - #[allow(clippy::new_without_default)] pub fn new() -> Self { - Self { - shutdown_notifyer: CancellationToken::new(), - task_tracker: TaskTracker::new(), - } + Self::default() } /// Ask for all tasks to quit @@ -66,7 +101,7 @@ impl ShutdownDelegate { /// /// - Wait for a shutdown request to happen with [`Self::wait`] /// - Keep track of the current task. While this token is held, -/// [`ShutdownController::wait_shutdown`] will block. +/// [`ShutdownController::wait_all_task_shutdown`] will block. #[derive(Clone)] pub struct ShutdownToken { shutdown_notifyer: CancellationToken, @@ -81,6 +116,7 @@ impl ShutdownToken { } } + /// Returns underlying [`CancellationToken`] and [`TaskTrackerToken`], consuming self. pub fn split(self) -> (CancellationToken, TaskTrackerToken) { (self.shutdown_notifyer, self._task_tracker_token) } diff --git a/plugins/reaction-plugin/src/time.rs b/plugins/reaction-plugin/src/time.rs index d682429..60f5914 100644 --- a/plugins/reaction-plugin/src/time.rs +++ b/plugins/reaction-plugin/src/time.rs @@ -1,3 +1,9 @@ +//! This module provides [`parse_duration`], which parses duration in reaction's format (ie. `6h`, `3 days`) +//! +//! Like in those reaction core settings: +//! - [Filters' `retryperiod`](https://reaction.ppom.me/reference.html#retryperiod) +//! - [Actions' `after`](https://reaction.ppom.me/reference.html#after). + use std::time::Duration; /// Parses the &str argument as a Duration