diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs index 4372725..aa20f26 100644 --- a/plugins/reaction-plugin-virtual/src/main.rs +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -1,8 +1,8 @@ use std::collections::{BTreeMap, BTreeSet}; use reaction_plugin::{ - ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, - line::PatternLine, + ActionConfig, ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamConfig, + StreamImpl, Value, line::PatternLine, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; @@ -17,10 +17,7 @@ async fn main() { } #[derive(Default)] -struct Plugin { - streams: BTreeMap, - actions_init: Vec, -} +struct Plugin {} impl PluginInfo for Plugin { async fn manifest(&mut self) -> Result { @@ -31,75 +28,69 @@ impl PluginInfo for Plugin { }) } - async fn stream_impl( + async fn load_config( &mut self, - stream_name: String, - stream_type: String, - config: Value, - ) -> RemoteResult { - if stream_type != "virtual" { - return Err("This plugin can't handle other stream types than virtual".into()); - } + streams: Vec, + actions: Vec, + ) -> RemoteResult<(Vec, Vec)> { + let mut ret_streams = Vec::with_capacity(streams.len()); + let mut ret_actions = Vec::with_capacity(actions.len()); - let (virtual_stream, receiver) = VirtualStream::new(config)?; + let mut local_streams = BTreeMap::new(); - if let Some(_) = self.streams.insert(stream_name, virtual_stream) { - return Err("this virtual stream has already been initialized".into()); - } - - Ok(StreamImpl { - stream: receiver, - standalone: false, - }) - } - - async fn action_impl( - &mut self, - stream_name: String, - filter_name: String, - action_name: String, - action_type: String, - config: Value, - patterns: Vec, - ) -> RemoteResult { - if &action_type != "virtual" { - return Err("This plugin can't handle other action types than virtual".into()); - } - - let (virtual_action_init, tx) = - VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns)?; - - self.actions_init.push(virtual_action_init); - Ok(ActionImpl { tx }) - } - - async fn finish_setup(&mut self) -> RemoteResult<()> { - while let Some(action_init) = self.actions_init.pop() { - match self.streams.get(&action_init.to) { - Some(virtual_stream) => { - let virtual_stream = virtual_stream.clone(); - tokio::spawn(async move { - VirtualAction::from(action_init, virtual_stream) - .serve() - .await - }); - } - None => { - return Err(format!( - "action {}.{}.{}: send \"{}\" matches no stream name", - action_init.stream_name, - action_init.filter_name, - action_init.action_name, - action_init.to - ) - .into()); - } + for StreamConfig { + stream_name, + stream_type, + config, + } in streams + { + if stream_type != "virtual" { + return Err("This plugin can't handle other stream types than virtual".into()); } - } - // Free containers - self.streams = Default::default(); - self.actions_init = Default::default(); + let (virtual_stream, receiver) = VirtualStream::new(config)?; + + if let Some(_) = local_streams.insert(stream_name, virtual_stream) { + return Err("this virtual stream has already been initialized".into()); + } + + ret_streams.push(StreamImpl { + stream: receiver, + standalone: false, + }); + } + + for ActionConfig { + stream_name, + filter_name, + action_name, + action_type, + config, + patterns, + } in actions + { + if &action_type != "virtual" { + return Err("This plugin can't handle other action types than virtual".into()); + } + + let (mut virtual_action, tx) = VirtualAction::new( + stream_name, + filter_name, + action_name, + config, + patterns, + &local_streams, + )?; + + tokio::spawn(async move { virtual_action.serve().await }); + + ret_actions.push(ActionImpl { tx }); + } + + Ok((ret_streams, ret_actions)) + } + + async fn start(&mut self) -> RemoteResult<()> { Ok(()) } @@ -140,44 +131,6 @@ struct ActionOptions { to: String, } -struct VirtualActionInit { - stream_name: String, - filter_name: String, - action_name: String, - rx: mpsc::Receiver, - patterns: Vec, - send: String, - to: String, -} - -impl VirtualActionInit { - fn new( - stream_name: String, - filter_name: String, - action_name: String, - config: Value, - patterns: Vec, - ) -> Result<(Self, mpsc::Sender), String> { - let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { - format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") - })?; - - let (tx, rx) = mpsc::channel(1); - Ok(( - Self { - stream_name, - filter_name, - action_name, - rx, - patterns, - send: options.send, - to: options.to, - }, - tx, - )) - } -} - struct VirtualAction { rx: mpsc::Receiver, send: PatternLine, @@ -185,13 +138,36 @@ struct VirtualAction { } impl VirtualAction { - fn from(action_init: VirtualActionInit, to: VirtualStream) -> VirtualAction { - let send = PatternLine::new(action_init.send, action_init.patterns); - VirtualAction { - rx: action_init.rx, - send, - to, - } + fn new( + stream_name: String, + filter_name: String, + action_name: String, + config: Value, + patterns: Vec, + streams: &BTreeMap, + ) -> Result<(Self, mpsc::Sender), String> { + let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { + format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") + })?; + + let send = PatternLine::new(options.send, patterns); + + let stream = streams.get(&options.to).ok_or_else(|| { + format!( + "action {}.{}.{}: send \"{}\" matches no stream name", + stream_name, filter_name, action_name, options.to + ) + })?; + + let (tx, rx) = mpsc::channel(1); + Ok(( + Self { + rx, + send: send, + to: stream.clone(), + }, + tx, + )) } async fn serve(&mut self) { diff --git a/plugins/reaction-plugin-virtual/src/tests.rs b/plugins/reaction-plugin-virtual/src/tests.rs index b08f416..32df952 100644 --- a/plugins/reaction-plugin-virtual/src/tests.rs +++ b/plugins/reaction-plugin-virtual/src/tests.rs @@ -1,6 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use reaction_plugin::{Exec, PluginInfo, Value}; +use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig, Value}; use serde_json::json; use crate::Plugin; @@ -10,26 +10,42 @@ async fn conf_stream() { // Invalid type assert!( Plugin::default() - .stream_impl("stream".into(), "virtu".into(), Value::Null) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtu".into(), + config: Value::Null + }], + vec![] + ) .await .is_err() ); assert!( Plugin::default() - .stream_impl("stream".into(), "virtual".into(), Value::Null) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null + }], + vec![] + ) .await .is_ok() ); - eprintln!( - "err: {:?}", - Plugin::default() - .stream_impl("stream".into(), "virtual".into(), json!({}).into()) - .await - ); + assert!( Plugin::default() - .stream_impl("stream".into(), "virtual".into(), json!({}).into()) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: json!({}).into(), + }], + vec![] + ) .await .is_ok() ); @@ -37,10 +53,13 @@ async fn conf_stream() { // Invalid conf: must be empty assert!( Plugin::default() - .stream_impl( - "stream".into(), - "virtual".into(), - json!({"key": "value" }).into() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: json!({"key": "value" }).into(), + }], + vec![] ) .await .is_err() @@ -49,6 +68,12 @@ async fn conf_stream() { #[tokio::test] async fn conf_action() { + let streams = vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null, + }]; + let valid_conf = json!({ "send": "message", "to": "stream" }); let missing_send_conf = json!({ "to": "stream" }); @@ -60,26 +85,32 @@ async fn conf_action() { // Invalid type assert!( Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtu".into(), - Value::Null, - patterns.clone() + .load_config( + streams.clone(), + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtu".into(), + config: Value::Null, + patterns: patterns.clone(), + }] ) .await .is_err() ); assert!( Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - valid_conf.into(), - patterns.clone() + .load_config( + streams.clone(), + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: valid_conf.into(), + patterns: patterns.clone() + }] ) .await .is_ok() @@ -88,13 +119,16 @@ async fn conf_action() { for conf in [missing_send_conf, missing_to_conf, extra_attr_conf] { assert!( Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - conf.clone().into(), - patterns.clone() + .load_config( + streams.clone(), + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: conf.clone().into(), + patterns: patterns.clone() + }] ) .await .is_err(), @@ -107,42 +141,48 @@ async fn conf_action() { #[tokio::test] async fn conf_send() { // Valid to: option - let mut plugin = Plugin::default(); - plugin - .stream_impl("stream".into(), "virtual".into(), Value::Null) - .await - .unwrap(); - plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - json!({ "send": "message", "to": "stream" }).into(), - Vec::default(), - ) - .await - .unwrap(); - assert!(plugin.finish_setup().await.is_ok()); + assert!( + Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null, + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: json!({ "send": "message", "to": "stream" }).into(), + patterns: vec![], + }] + ) + .await + .is_ok(), + ); // Invalid to: option - let mut plugin = Plugin::default(); - plugin - .stream_impl("stream".into(), "virtual".into(), Value::Null) - .await - .unwrap(); - plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - json!({ "send": "message", "to": "stream1" }).into(), - Vec::default(), - ) - .await - .unwrap(); - assert!(plugin.finish_setup().await.is_err()); + assert!( + Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null, + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: json!({ "send": "message", "to": "stream1" }).into(), + patterns: vec![], + }] + ) + .await + .is_err(), + ); } // Let's allow empty streams for now. @@ -150,35 +190,46 @@ async fn conf_send() { // // #[tokio::test] // async fn conf_empty_stream() { -// let mut plugin = Plugin::default(); -// plugin -// .stream_impl("stream".into(), "virtual".into(), Value::Null) -// .await -// .unwrap(); -// assert!(plugin.finish_setup().await.is_err()); +// assert!( +// Plugin::default() +// .load_config( +// vec![StreamConfig { +// stream_name: "stream".into(), +// stream_type: "virtual".into(), +// config: Value::Null, +// }], +// vec![], +// ) +// .await +// .is_err(), +// ); // } #[tokio::test] async fn run_simple() { let mut plugin = Plugin::default(); - let mut stream = plugin - .stream_impl("stream".into(), "virtual".into(), Value::Null) - .await - .unwrap(); - assert!(!stream.standalone); - - let action = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - json!({ "send": "message ", "to": "stream" }).into(), - vec!["test".into()], + let (mut streams, mut actions) = plugin + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null, + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: json!({ "send": "message ", "to": "stream" }).into(), + patterns: vec!["test".into()], + }], ) .await .unwrap(); - assert!(plugin.finish_setup().await.is_ok()); + + let mut stream = streams.pop().unwrap(); + let action = actions.pop().unwrap(); + assert!(!stream.standalone); for m in ["test1", "test2", "test3", " a a a aa a a"] { let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); @@ -202,37 +253,40 @@ async fn run_simple() { #[tokio::test] async fn run_two_actions() { let mut plugin = Plugin::default(); - let mut stream = plugin - .stream_impl("stream".into(), "virtual".into(), Value::Null) + let (mut streams, mut actions) = plugin + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "virtual".into(), + config: Value::Null, + }], + vec![ + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: json!({ "send": "send ", "to": "stream" }).into(), + patterns: vec!["a".into(), "b".into()], + }, + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "virtual".into(), + config: json!({ "send": " send", "to": "stream" }).into(), + patterns: vec!["a".into(), "b".into()], + }, + ], + ) .await .unwrap(); + + let mut stream = streams.pop().unwrap(); assert!(!stream.standalone); - let action1 = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - json!({ "send": "send ", "to": "stream" }).into(), - vec!["a".into(), "b".into()], - ) - .await - .unwrap(); - - let action2 = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "virtual".into(), - json!({ "send": " send", "to": "stream" }).into(), - vec!["a".into(), "b".into()], - ) - .await - .unwrap(); - - assert!(plugin.finish_setup().await.is_ok()); + let action2 = actions.pop().unwrap(); + let action1 = actions.pop().unwrap(); let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();