diff --git a/Cargo.lock b/Cargo.lock index d4eb582..85db0af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2847,6 +2847,7 @@ dependencies = [ name = "reaction-plugin" version = "1.0.0" dependencies = [ + "chrono", "remoc", "serde", "serde_json", @@ -2857,10 +2858,10 @@ dependencies = [ name = "reaction-plugin-cluster" version = "0.1.0" dependencies = [ + "chrono", "data-encoding", "iroh", "rand 0.9.2", - "reaction", "reaction-plugin", "remoc", "serde", diff --git a/Cargo.toml b/Cargo.toml index 1a0aaeb..9fd0298 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ assets = [ [dependencies] # Time types -chrono = { version = "0.4.38", features = ["std", "clock", "serde"] } +chrono = { workspace = true } # CLI parsing clap = { version = "4.5.4", features = ["derive"] } # Unix interfaces @@ -83,3 +83,4 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" tokio = { version = "1.40.0" } reaction-plugin = { path = "plugins/reaction-plugin" } +chrono = { version = "0.4.38", features = ["std", "clock", "serde"] } diff --git a/plugins/reaction-plugin-cluster/Cargo.toml b/plugins/reaction-plugin-cluster/Cargo.toml index 2c5d69e..42c3171 100644 --- a/plugins/reaction-plugin-cluster/Cargo.toml +++ b/plugins/reaction-plugin-cluster/Cargo.toml @@ -4,9 +4,9 @@ version = "0.1.0" edition = "2024" [dependencies] -reaction.path = "../../" reaction-plugin.workspace = true +chrono.workspace = true remoc.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index c32d21a..87b283e 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -1,12 +1,11 @@ use std::{ - collections::{BTreeMap, VecDeque}, + collections::VecDeque, net::{SocketAddrV4, SocketAddrV6}, sync::Arc, - time::Instant, }; -use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection}; -use reaction_plugin::Exec; +use chrono::{DateTime, Local}; +use iroh::{Endpoint, EndpointAddr, endpoint::Connection}; use tokio::sync::{mpsc, oneshot}; use crate::{ActionInit, StreamInit, endpoint::EndpointManager}; @@ -100,7 +99,7 @@ impl ActionInit { pub struct TimeMessage { pub message: Arc, - pub timeout: Instant, + pub timeout: DateTime, } pub struct ConnectionManager { diff --git a/plugins/reaction-plugin-cluster/src/endpoint.rs b/plugins/reaction-plugin-cluster/src/endpoint.rs index cd803e3..851166c 100644 --- a/plugins/reaction-plugin-cluster/src/endpoint.rs +++ b/plugins/reaction-plugin-cluster/src/endpoint.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::time::Duration; use iroh::endpoint::Incoming; diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index fcfda25..0bd5117 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -1,12 +1,13 @@ use std::{ collections::{BTreeMap, BTreeSet}, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, - time::Duration, }; +use chrono::TimeDelta; use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr}; use reaction_plugin::{ ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop, + parse_duration, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; @@ -16,11 +17,9 @@ mod cluster; mod endpoint; mod secret_key; -use reaction::concepts::parse_duration; - use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key}; -use crate::cluster::Cluster; +use crate::cluster::{bind, cluster_tasks}; #[tokio::main] async fn main() { @@ -68,7 +67,7 @@ struct StreamInit { bind_ipv4: Option, bind_ipv6: Option, secret_key: SecretKey, - message_timeout: Duration, + message_timeout: TimeDelta, nodes: BTreeMap, tx: mpsc::Sender, } @@ -213,12 +212,14 @@ impl PluginInfo for Plugin { while let Some((stream_name, stream)) = self.streams.pop_first() { let (tx, rx) = oneshot::channel(); self.cluster_shutdown.push(tx); - Cluster::new( + + let endpoint = bind(&stream).await?; + cluster_tasks( + endpoint, stream, self.actions.remove(&stream_name).unwrap_or_default(), rx, - ) - .await?; + ); } // Check there is no action left if !self.actions.is_empty() { diff --git a/plugins/reaction-plugin/Cargo.lock b/plugins/reaction-plugin/Cargo.lock deleted file mode 100644 index a4701f3..0000000 --- a/plugins/reaction-plugin/Cargo.lock +++ /dev/null @@ -1,319 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 4 - -[[package]] -name = "cfg-if" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" - -[[package]] -name = "equivalent" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" - -[[package]] -name = "getrandom" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "hashbrown" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" - -[[package]] -name = "indexmap" -version = "2.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" -dependencies = [ - "equivalent", - "hashbrown", -] - -[[package]] -name = "libc" -version = "0.2.176" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" - -[[package]] -name = "libloading" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" -dependencies = [ - "cfg-if", - "windows-link", -] - -[[package]] -name = "memchr" -version = "2.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" - -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - -[[package]] -name = "proc-macro-crate" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" -dependencies = [ - "toml_edit", -] - -[[package]] -name = "proc-macro2" -version = "1.0.101" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - -[[package]] -name = "reaction-plugin" -version = "0.1.0" -dependencies = [ - "stabby", -] - -[[package]] -name = "rustc_version" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" -dependencies = [ - "semver", -] - -[[package]] -name = "rustversion" -version = "1.0.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" - -[[package]] -name = "semver" -version = "1.0.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" - -[[package]] -name = "serde_core" -version = "1.0.226" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4" -dependencies = [ - "serde_derive", -] - -[[package]] -name = "serde_derive" -version = "1.0.226" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - -[[package]] -name = "sha2-const-stable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f179d4e11094a893b82fff208f74d448a7512f99f5a0acbd5c679b705f83ed9" - -[[package]] -name = "stabby" -version = "72.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976399a0c48ea769ef7f5dc303bb88240ab8d84008647a6b2303eced3dab3945" -dependencies = [ - "libloading", - "rustversion", - "stabby-abi", -] - -[[package]] -name = "stabby-abi" -version = "72.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b54832a9a1f92a0e55e74a5c0332744426edc515bb3fbad82f10b874a87f0d" -dependencies = [ - "rustc_version", - "rustversion", - "sha2-const-stable", - "stabby-macros", -] - -[[package]] -name = "stabby-macros" -version = "72.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a768b1e51e4dbfa4fa52ae5c01241c0a41e2938fdffbb84add0c8238092f9091" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "rand", - "syn 1.0.109", -] - -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "syn" -version = "2.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "toml_datetime" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f1085dec27c2b6632b04c80b3bb1b4300d6495d1e129693bdda7d91e72eec1" -dependencies = [ - "serde_core", -] - -[[package]] -name = "toml_edit" -version = "0.23.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3effe7c0e86fdff4f69cdd2ccc1b96f933e24811c5441d44904e8683e27184b" -dependencies = [ - "indexmap", - "toml_datetime", - "toml_parser", - "winnow", -] - -[[package]] -name = "toml_parser" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cf893c33be71572e0e9aa6dd15e6677937abd686b066eac3f8cd3531688a627" -dependencies = [ - "winnow", -] - -[[package]] -name = "unicode-ident" -version = "1.0.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" - -[[package]] -name = "wasi" -version = "0.11.1+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" - -[[package]] -name = "windows-link" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" - -[[package]] -name = "winnow" -version = "0.7.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" -dependencies = [ - "memchr", -] - -[[package]] -name = "zerocopy" -version = "0.8.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] diff --git a/plugins/reaction-plugin/Cargo.toml b/plugins/reaction-plugin/Cargo.toml index 4a23e2c..ec53c7d 100644 --- a/plugins/reaction-plugin/Cargo.toml +++ b/plugins/reaction-plugin/Cargo.toml @@ -4,11 +4,10 @@ version = "1.0.0" edition = "2024" [dependencies] +chrono.workspace = true remoc.workspace = true - serde.workspace = true - serde_json.workspace = true - tokio.workspace = true tokio.features = ["io-std"] + diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index 1f2eecc..c03be3c 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -100,6 +100,9 @@ use serde::{Deserialize, Serialize}; use serde_json::{Number, Value as JValue}; use tokio::io::{stdin, stdout}; +mod time; +pub use time::parse_duration; + /// This is the only trait that **must** be implemented by a plugin. /// It provides lists of stream, filter and action types implemented by a dynamic plugin. #[rtc::remote] diff --git a/src/concepts/parse_duration.rs b/plugins/reaction-plugin/src/time.rs similarity index 54% rename from src/concepts/parse_duration.rs rename to plugins/reaction-plugin/src/time.rs index c2b2562..9a2694b 100644 --- a/src/concepts/parse_duration.rs +++ b/plugins/reaction-plugin/src/time.rs @@ -1,6 +1,6 @@ -use std::time::Duration; +use chrono::TimeDelta; -/// Parses the &str argument as a Duration +/// Parses the &str argument as a TimeDelta /// Returns Ok(TimeDelta) if successful, or Err(String). /// /// Format is defined as follows: ` ` @@ -12,26 +12,26 @@ use std::time::Duration; /// - `m` / `min` / `mins` / `minute` / `minutes` /// - `h` / `hour` / `hours` /// - `d` / `day` / `days` -pub fn parse_duration(d: &str) -> Result { +pub fn parse_duration(d: &str) -> Result { let d_trimmed = d.trim(); let chars = d_trimmed.as_bytes(); let mut value = 0; let mut i = 0; while i < chars.len() && chars[i].is_ascii_digit() { - value = value * 10 + (chars[i] - b'0') as u64; + value = value * 10 + (chars[i] - b'0') as u32; i += 1; } if i == 0 { return Err(format!("duration '{}' doesn't start with digits", d)); } - let ok_as = |func: fn(u64) -> Duration| -> Result<_, String> { Ok(func(value as u64)) }; + let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) }; match d_trimmed[i..].trim() { - "ms" | "millis" | "millisecond" | "milliseconds" => ok_as(Duration::from_millis), - "s" | "sec" | "secs" | "second" | "seconds" => ok_as(Duration::from_secs), - "m" | "min" | "mins" | "minute" | "minutes" => ok_as(|n| Duration::from_secs(n * 60)), - "h" | "hour" | "hours" => ok_as(|n| Duration::from_secs(n * 3600)), - "d" | "day" | "days" => ok_as(|n| Duration::from_secs(n * 3600 * 24)), + "ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds), + "s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds), + "m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes), + "h" | "hour" | "hours" => ok_as(TimeDelta::hours), + "d" | "day" | "days" => ok_as(TimeDelta::days), unit => Err(format!( "unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days", unit @@ -42,6 +42,8 @@ pub fn parse_duration(d: &str) -> Result { #[cfg(test)] mod tests { + use chrono::TimeDelta; + use super::*; #[test] @@ -51,22 +53,13 @@ mod tests { #[test] fn parse_duration_test() { - assert_eq!(parse_duration("1s"), Ok(Duration::from_secs(1))); - assert_eq!(parse_duration("12s"), Ok(Duration::from_secs(12))); - assert_eq!(parse_duration(" 12 secs "), Ok(Duration::from_secs(12))); - assert_eq!(parse_duration("2m"), Ok(Duration::from_secs(2 * 60))); - assert_eq!( - parse_duration("6 hours"), - Ok(Duration::from_secs(6 * 60 * 60)) - ); - assert_eq!( - parse_duration("1d"), - Ok(Duration::from_secs(1 * 24 * 60 * 60)) - ); - assert_eq!( - parse_duration("365d"), - Ok(Duration::from_secs(365 * 24 * 60 * 60)) - ); + assert_eq!(parse_duration("1s"), Ok(TimeDelta::seconds(1))); + assert_eq!(parse_duration("12s"), Ok(TimeDelta::seconds(12))); + assert_eq!(parse_duration(" 12 secs "), Ok(TimeDelta::seconds(12))); + assert_eq!(parse_duration("2m"), Ok(TimeDelta::minutes(2))); + assert_eq!(parse_duration("6 hours"), Ok(TimeDelta::hours(6))); + assert_eq!(parse_duration("1d"), Ok(TimeDelta::days(1))); + assert_eq!(parse_duration("365d"), Ok(TimeDelta::days(365))); assert!(parse_duration("d 3").is_err()); assert!(parse_duration("d3").is_err()); diff --git a/src/concepts/action.rs b/src/concepts/action.rs index fcf5c63..c2bf976 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -2,11 +2,12 @@ use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc}; use chrono::TimeDelta; +use reaction_plugin::parse_duration; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::process::Command; -use super::{Match, Pattern, PatternType, null_value, parse_duration::*}; +use super::{Match, Pattern, PatternType, null_value}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -108,11 +109,8 @@ impl Action { if let Some(after) = &self.after { self.after_duration = Some( - TimeDelta::from_std( - parse_duration(after) - .map_err(|err| format!("failed to parse after time: {}", err))?, - ) - .map_err(|err| format!("too big after time: {err}"))?, + parse_duration(after) + .map_err(|err| format!("failed to parse after time: {}", err))?, ); self.after = None; } else if self.on_exit { diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index b6708b9..b1a0d7e 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -7,10 +7,11 @@ use std::{ }; use chrono::TimeDelta; +use reaction_plugin::parse_duration; use regex::Regex; use serde::{Deserialize, Serialize}; -use super::{Action, Match, Pattern, PatternType, Patterns, parse_duration}; +use super::{Action, Match, Pattern, PatternType, Patterns}; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] pub enum Duplicate { @@ -124,11 +125,8 @@ impl Filter { if let Some(retry_period) = &self.retry_period { self.retry_duration = Some( - TimeDelta::from_std( - parse_duration(retry_period) - .map_err(|err| format!("failed to parse retry period: {}", err))?, - ) - .map_err(|err| format!("too big retry period: {err}"))?, + parse_duration(retry_period) + .map_err(|err| format!("failed to parse retry period: {}", err))?, ); self.retry_period = None; } diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index 4b78070..215be2b 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -1,7 +1,6 @@ mod action; mod config; mod filter; -mod parse_duration; mod pattern; mod plugin; mod stream; @@ -11,7 +10,6 @@ use std::fmt::Debug; pub use action::Action; pub use config::{Config, Patterns}; pub use filter::{Duplicate, Filter}; -pub use parse_duration::parse_duration; pub use pattern::{Pattern, PatternType}; pub use plugin::Plugin; use serde::{Deserialize, Serialize};