Move parse_duration to reaction-plugin and fix dependency tree

This commit is contained in:
ppom 2025-11-15 12:00:00 +01:00
commit a70b45ba2d
No known key found for this signature in database
13 changed files with 51 additions and 379 deletions

3
Cargo.lock generated
View file

@ -2847,6 +2847,7 @@ dependencies = [
name = "reaction-plugin"
version = "1.0.0"
dependencies = [
"chrono",
"remoc",
"serde",
"serde_json",
@ -2857,10 +2858,10 @@ dependencies = [
name = "reaction-plugin-cluster"
version = "0.1.0"
dependencies = [
"chrono",
"data-encoding",
"iroh",
"rand 0.9.2",
"reaction",
"reaction-plugin",
"remoc",
"serde",

View file

@ -34,7 +34,7 @@ assets = [
[dependencies]
# Time types
chrono = { version = "0.4.38", features = ["std", "clock", "serde"] }
chrono = { workspace = true }
# CLI parsing
clap = { version = "4.5.4", features = ["derive"] }
# Unix interfaces
@ -83,3 +83,4 @@ serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.40.0" }
reaction-plugin = { path = "plugins/reaction-plugin" }
chrono = { version = "0.4.38", features = ["std", "clock", "serde"] }

View file

@ -4,9 +4,9 @@ version = "0.1.0"
edition = "2024"
[dependencies]
reaction.path = "../../"
reaction-plugin.workspace = true
chrono.workspace = true
remoc.workspace = true
serde.workspace = true
serde_json.workspace = true

View file

@ -1,12 +1,11 @@
use std::{
collections::{BTreeMap, VecDeque},
collections::VecDeque,
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
time::Instant,
};
use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection};
use reaction_plugin::Exec;
use chrono::{DateTime, Local};
use iroh::{Endpoint, EndpointAddr, endpoint::Connection};
use tokio::sync::{mpsc, oneshot};
use crate::{ActionInit, StreamInit, endpoint::EndpointManager};
@ -100,7 +99,7 @@ impl ActionInit {
pub struct TimeMessage {
pub message: Arc<String>,
pub timeout: Instant,
pub timeout: DateTime<Local>,
}
pub struct ConnectionManager {

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::time::Duration;
use iroh::endpoint::Incoming;

View file

@ -1,12 +1,13 @@
use std::{
collections::{BTreeMap, BTreeSet},
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration,
};
use chrono::TimeDelta;
use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr};
use reaction_plugin::{
ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop,
parse_duration,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
@ -16,11 +17,9 @@ mod cluster;
mod endpoint;
mod secret_key;
use reaction::concepts::parse_duration;
use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key};
use crate::cluster::Cluster;
use crate::cluster::{bind, cluster_tasks};
#[tokio::main]
async fn main() {
@ -68,7 +67,7 @@ struct StreamInit {
bind_ipv4: Option<Ipv4Addr>,
bind_ipv6: Option<Ipv6Addr>,
secret_key: SecretKey,
message_timeout: Duration,
message_timeout: TimeDelta,
nodes: BTreeMap<PublicKey, EndpointAddr>,
tx: mpsc::Sender<String>,
}
@ -213,12 +212,14 @@ impl PluginInfo for Plugin {
while let Some((stream_name, stream)) = self.streams.pop_first() {
let (tx, rx) = oneshot::channel();
self.cluster_shutdown.push(tx);
Cluster::new(
let endpoint = bind(&stream).await?;
cluster_tasks(
endpoint,
stream,
self.actions.remove(&stream_name).unwrap_or_default(),
rx,
)
.await?;
);
}
// Check there is no action left
if !self.actions.is_empty() {

View file

@ -1,319 +0,0 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "cfg-if"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9"
[[package]]
name = "equivalent"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "getrandom"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hashbrown"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d"
[[package]]
name = "indexmap"
version = "2.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5"
dependencies = [
"equivalent",
"hashbrown",
]
[[package]]
name = "libc"
version = "0.2.176"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
[[package]]
name = "libloading"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55"
dependencies = [
"cfg-if",
"windows-link",
]
[[package]]
name = "memchr"
version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro-crate"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983"
dependencies = [
"toml_edit",
]
[[package]]
name = "proc-macro2"
version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "reaction-plugin"
version = "0.1.0"
dependencies = [
"stabby",
]
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "semver"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
[[package]]
name = "serde_core"
version = "1.0.226"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.226"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "sha2-const-stable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f179d4e11094a893b82fff208f74d448a7512f99f5a0acbd5c679b705f83ed9"
[[package]]
name = "stabby"
version = "72.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "976399a0c48ea769ef7f5dc303bb88240ab8d84008647a6b2303eced3dab3945"
dependencies = [
"libloading",
"rustversion",
"stabby-abi",
]
[[package]]
name = "stabby-abi"
version = "72.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b54832a9a1f92a0e55e74a5c0332744426edc515bb3fbad82f10b874a87f0d"
dependencies = [
"rustc_version",
"rustversion",
"sha2-const-stable",
"stabby-macros",
]
[[package]]
name = "stabby-macros"
version = "72.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a768b1e51e4dbfa4fa52ae5c01241c0a41e2938fdffbb84add0c8238092f9091"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"rand",
"syn 1.0.109",
]
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "toml_datetime"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32f1085dec27c2b6632b04c80b3bb1b4300d6495d1e129693bdda7d91e72eec1"
dependencies = [
"serde_core",
]
[[package]]
name = "toml_edit"
version = "0.23.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3effe7c0e86fdff4f69cdd2ccc1b96f933e24811c5441d44904e8683e27184b"
dependencies = [
"indexmap",
"toml_datetime",
"toml_parser",
"winnow",
]
[[package]]
name = "toml_parser"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cf893c33be71572e0e9aa6dd15e6677937abd686b066eac3f8cd3531688a627"
dependencies = [
"winnow",
]
[[package]]
name = "unicode-ident"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d"
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "windows-link"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65"
[[package]]
name = "winnow"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf"
dependencies = [
"memchr",
]
[[package]]
name = "zerocopy"
version = "0.8.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]

View file

@ -4,11 +4,10 @@ version = "1.0.0"
edition = "2024"
[dependencies]
chrono.workspace = true
remoc.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio.features = ["io-std"]

View file

@ -100,6 +100,9 @@ use serde::{Deserialize, Serialize};
use serde_json::{Number, Value as JValue};
use tokio::io::{stdin, stdout};
mod time;
pub use time::parse_duration;
/// This is the only trait that **must** be implemented by a plugin.
/// It provides lists of stream, filter and action types implemented by a dynamic plugin.
#[rtc::remote]

View file

@ -1,6 +1,6 @@
use std::time::Duration;
use chrono::TimeDelta;
/// Parses the &str argument as a Duration
/// Parses the &str argument as a TimeDelta
/// Returns Ok(TimeDelta) if successful, or Err(String).
///
/// Format is defined as follows: `<integer> <unit>`
@ -12,26 +12,26 @@ use std::time::Duration;
/// - `m` / `min` / `mins` / `minute` / `minutes`
/// - `h` / `hour` / `hours`
/// - `d` / `day` / `days`
pub fn parse_duration(d: &str) -> Result<Duration, String> {
pub fn parse_duration(d: &str) -> Result<TimeDelta, String> {
let d_trimmed = d.trim();
let chars = d_trimmed.as_bytes();
let mut value = 0;
let mut i = 0;
while i < chars.len() && chars[i].is_ascii_digit() {
value = value * 10 + (chars[i] - b'0') as u64;
value = value * 10 + (chars[i] - b'0') as u32;
i += 1;
}
if i == 0 {
return Err(format!("duration '{}' doesn't start with digits", d));
}
let ok_as = |func: fn(u64) -> Duration| -> Result<_, String> { Ok(func(value as u64)) };
let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) };
match d_trimmed[i..].trim() {
"ms" | "millis" | "millisecond" | "milliseconds" => ok_as(Duration::from_millis),
"s" | "sec" | "secs" | "second" | "seconds" => ok_as(Duration::from_secs),
"m" | "min" | "mins" | "minute" | "minutes" => ok_as(|n| Duration::from_secs(n * 60)),
"h" | "hour" | "hours" => ok_as(|n| Duration::from_secs(n * 3600)),
"d" | "day" | "days" => ok_as(|n| Duration::from_secs(n * 3600 * 24)),
"ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds),
"s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds),
"m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes),
"h" | "hour" | "hours" => ok_as(TimeDelta::hours),
"d" | "day" | "days" => ok_as(TimeDelta::days),
unit => Err(format!(
"unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days",
unit
@ -42,6 +42,8 @@ pub fn parse_duration(d: &str) -> Result<Duration, String> {
#[cfg(test)]
mod tests {
use chrono::TimeDelta;
use super::*;
#[test]
@ -51,22 +53,13 @@ mod tests {
#[test]
fn parse_duration_test() {
assert_eq!(parse_duration("1s"), Ok(Duration::from_secs(1)));
assert_eq!(parse_duration("12s"), Ok(Duration::from_secs(12)));
assert_eq!(parse_duration(" 12 secs "), Ok(Duration::from_secs(12)));
assert_eq!(parse_duration("2m"), Ok(Duration::from_secs(2 * 60)));
assert_eq!(
parse_duration("6 hours"),
Ok(Duration::from_secs(6 * 60 * 60))
);
assert_eq!(
parse_duration("1d"),
Ok(Duration::from_secs(1 * 24 * 60 * 60))
);
assert_eq!(
parse_duration("365d"),
Ok(Duration::from_secs(365 * 24 * 60 * 60))
);
assert_eq!(parse_duration("1s"), Ok(TimeDelta::seconds(1)));
assert_eq!(parse_duration("12s"), Ok(TimeDelta::seconds(12)));
assert_eq!(parse_duration(" 12 secs "), Ok(TimeDelta::seconds(12)));
assert_eq!(parse_duration("2m"), Ok(TimeDelta::minutes(2)));
assert_eq!(parse_duration("6 hours"), Ok(TimeDelta::hours(6)));
assert_eq!(parse_duration("1d"), Ok(TimeDelta::days(1)));
assert_eq!(parse_duration("365d"), Ok(TimeDelta::days(365)));
assert!(parse_duration("d 3").is_err());
assert!(parse_duration("d3").is_err());

View file

@ -2,11 +2,12 @@ use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc};
use chrono::TimeDelta;
use reaction_plugin::parse_duration;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
use super::{Match, Pattern, PatternType, null_value, parse_duration::*};
use super::{Match, Pattern, PatternType, null_value};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
@ -108,11 +109,8 @@ impl Action {
if let Some(after) = &self.after {
self.after_duration = Some(
TimeDelta::from_std(
parse_duration(after)
.map_err(|err| format!("failed to parse after time: {}", err))?,
)
.map_err(|err| format!("too big after time: {err}"))?,
parse_duration(after)
.map_err(|err| format!("failed to parse after time: {}", err))?,
);
self.after = None;
} else if self.on_exit {

View file

@ -7,10 +7,11 @@ use std::{
};
use chrono::TimeDelta;
use reaction_plugin::parse_duration;
use regex::Regex;
use serde::{Deserialize, Serialize};
use super::{Action, Match, Pattern, PatternType, Patterns, parse_duration};
use super::{Action, Match, Pattern, PatternType, Patterns};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub enum Duplicate {
@ -124,11 +125,8 @@ impl Filter {
if let Some(retry_period) = &self.retry_period {
self.retry_duration = Some(
TimeDelta::from_std(
parse_duration(retry_period)
.map_err(|err| format!("failed to parse retry period: {}", err))?,
)
.map_err(|err| format!("too big retry period: {err}"))?,
parse_duration(retry_period)
.map_err(|err| format!("failed to parse retry period: {}", err))?,
);
self.retry_period = None;
}

View file

@ -1,7 +1,6 @@
mod action;
mod config;
mod filter;
mod parse_duration;
mod pattern;
mod plugin;
mod stream;
@ -11,7 +10,6 @@ use std::fmt::Debug;
pub use action::Action;
pub use config::{Config, Patterns};
pub use filter::{Duplicate, Filter};
pub use parse_duration::parse_duration;
pub use pattern::{Pattern, PatternType};
pub use plugin::Plugin;
use serde::{Deserialize, Serialize};