virtual: adapt to plugin interface change

This commit is contained in:
ppom 2026-02-09 12:00:00 +01:00
commit 57d6da5377
No known key found for this signature in database
2 changed files with 242 additions and 212 deletions

View file

@ -1,8 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
use reaction_plugin::{
ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
line::PatternLine,
ActionConfig, ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamConfig,
StreamImpl, Value, line::PatternLine,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
@ -17,10 +17,7 @@ async fn main() {
}
#[derive(Default)]
struct Plugin {
streams: BTreeMap<String, VirtualStream>,
actions_init: Vec<VirtualActionInit>,
}
struct Plugin {}
impl PluginInfo for Plugin {
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
@ -31,75 +28,69 @@ impl PluginInfo for Plugin {
})
}
async fn stream_impl(
async fn load_config(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> RemoteResult<StreamImpl> {
if stream_type != "virtual" {
return Err("This plugin can't handle other stream types than virtual".into());
}
streams: Vec<StreamConfig>,
actions: Vec<ActionConfig>,
) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)> {
let mut ret_streams = Vec::with_capacity(streams.len());
let mut ret_actions = Vec::with_capacity(actions.len());
let (virtual_stream, receiver) = VirtualStream::new(config)?;
let mut local_streams = BTreeMap::new();
if let Some(_) = self.streams.insert(stream_name, virtual_stream) {
return Err("this virtual stream has already been initialized".into());
}
Ok(StreamImpl {
stream: receiver,
standalone: false,
})
}
async fn action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "virtual" {
return Err("This plugin can't handle other action types than virtual".into());
}
let (virtual_action_init, tx) =
VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns)?;
self.actions_init.push(virtual_action_init);
Ok(ActionImpl { tx })
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
while let Some(action_init) = self.actions_init.pop() {
match self.streams.get(&action_init.to) {
Some(virtual_stream) => {
let virtual_stream = virtual_stream.clone();
tokio::spawn(async move {
VirtualAction::from(action_init, virtual_stream)
.serve()
.await
});
}
None => {
return Err(format!(
"action {}.{}.{}: send \"{}\" matches no stream name",
action_init.stream_name,
action_init.filter_name,
action_init.action_name,
action_init.to
)
.into());
}
for StreamConfig {
stream_name,
stream_type,
config,
} in streams
{
if stream_type != "virtual" {
return Err("This plugin can't handle other stream types than virtual".into());
}
}
// Free containers
self.streams = Default::default();
self.actions_init = Default::default();
let (virtual_stream, receiver) = VirtualStream::new(config)?;
if let Some(_) = local_streams.insert(stream_name, virtual_stream) {
return Err("this virtual stream has already been initialized".into());
}
ret_streams.push(StreamImpl {
stream: receiver,
standalone: false,
});
}
for ActionConfig {
stream_name,
filter_name,
action_name,
action_type,
config,
patterns,
} in actions
{
if &action_type != "virtual" {
return Err("This plugin can't handle other action types than virtual".into());
}
let (mut virtual_action, tx) = VirtualAction::new(
stream_name,
filter_name,
action_name,
config,
patterns,
&local_streams,
)?;
tokio::spawn(async move { virtual_action.serve().await });
ret_actions.push(ActionImpl { tx });
}
Ok((ret_streams, ret_actions))
}
async fn start(&mut self) -> RemoteResult<()> {
Ok(())
}
@ -140,44 +131,6 @@ struct ActionOptions {
to: String,
}
struct VirtualActionInit {
stream_name: String,
filter_name: String,
action_name: String,
rx: mpsc::Receiver<Exec>,
patterns: Vec<String>,
send: String,
to: String,
}
impl VirtualActionInit {
fn new(
stream_name: String,
filter_name: String,
action_name: String,
config: Value,
patterns: Vec<String>,
) -> Result<(Self, mpsc::Sender<Exec>), String> {
let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
let (tx, rx) = mpsc::channel(1);
Ok((
Self {
stream_name,
filter_name,
action_name,
rx,
patterns,
send: options.send,
to: options.to,
},
tx,
))
}
}
struct VirtualAction {
rx: mpsc::Receiver<Exec>,
send: PatternLine,
@ -185,13 +138,36 @@ struct VirtualAction {
}
impl VirtualAction {
fn from(action_init: VirtualActionInit, to: VirtualStream) -> VirtualAction {
let send = PatternLine::new(action_init.send, action_init.patterns);
VirtualAction {
rx: action_init.rx,
send,
to,
}
fn new(
stream_name: String,
filter_name: String,
action_name: String,
config: Value,
patterns: Vec<String>,
streams: &BTreeMap<String, VirtualStream>,
) -> Result<(Self, mpsc::Sender<Exec>), String> {
let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
let send = PatternLine::new(options.send, patterns);
let stream = streams.get(&options.to).ok_or_else(|| {
format!(
"action {}.{}.{}: send \"{}\" matches no stream name",
stream_name, filter_name, action_name, options.to
)
})?;
let (tx, rx) = mpsc::channel(1);
Ok((
Self {
rx,
send: send,
to: stream.clone(),
},
tx,
))
}
async fn serve(&mut self) {

View file

@ -1,6 +1,6 @@
use std::time::{SystemTime, UNIX_EPOCH};
use reaction_plugin::{Exec, PluginInfo, Value};
use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig, Value};
use serde_json::json;
use crate::Plugin;
@ -10,26 +10,42 @@ async fn conf_stream() {
// Invalid type
assert!(
Plugin::default()
.stream_impl("stream".into(), "virtu".into(), Value::Null)
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtu".into(),
config: Value::Null
}],
vec![]
)
.await
.is_err()
);
assert!(
Plugin::default()
.stream_impl("stream".into(), "virtual".into(), Value::Null)
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null
}],
vec![]
)
.await
.is_ok()
);
eprintln!(
"err: {:?}",
Plugin::default()
.stream_impl("stream".into(), "virtual".into(), json!({}).into())
.await
);
assert!(
Plugin::default()
.stream_impl("stream".into(), "virtual".into(), json!({}).into())
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: json!({}).into(),
}],
vec![]
)
.await
.is_ok()
);
@ -37,10 +53,13 @@ async fn conf_stream() {
// Invalid conf: must be empty
assert!(
Plugin::default()
.stream_impl(
"stream".into(),
"virtual".into(),
json!({"key": "value" }).into()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: json!({"key": "value" }).into(),
}],
vec![]
)
.await
.is_err()
@ -49,6 +68,12 @@ async fn conf_stream() {
#[tokio::test]
async fn conf_action() {
let streams = vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null,
}];
let valid_conf = json!({ "send": "message", "to": "stream" });
let missing_send_conf = json!({ "to": "stream" });
@ -60,26 +85,32 @@ async fn conf_action() {
// Invalid type
assert!(
Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtu".into(),
Value::Null,
patterns.clone()
.load_config(
streams.clone(),
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtu".into(),
config: Value::Null,
patterns: patterns.clone(),
}]
)
.await
.is_err()
);
assert!(
Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
valid_conf.into(),
patterns.clone()
.load_config(
streams.clone(),
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: valid_conf.into(),
patterns: patterns.clone()
}]
)
.await
.is_ok()
@ -88,13 +119,16 @@ async fn conf_action() {
for conf in [missing_send_conf, missing_to_conf, extra_attr_conf] {
assert!(
Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
conf.clone().into(),
patterns.clone()
.load_config(
streams.clone(),
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: conf.clone().into(),
patterns: patterns.clone()
}]
)
.await
.is_err(),
@ -107,42 +141,48 @@ async fn conf_action() {
#[tokio::test]
async fn conf_send() {
// Valid to: option
let mut plugin = Plugin::default();
plugin
.stream_impl("stream".into(), "virtual".into(), Value::Null)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
json!({ "send": "message", "to": "stream" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
assert!(
Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null,
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: json!({ "send": "message", "to": "stream" }).into(),
patterns: vec![],
}]
)
.await
.is_ok(),
);
// Invalid to: option
let mut plugin = Plugin::default();
plugin
.stream_impl("stream".into(), "virtual".into(), Value::Null)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
json!({ "send": "message", "to": "stream1" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_err());
assert!(
Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null,
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: json!({ "send": "message", "to": "stream1" }).into(),
patterns: vec![],
}]
)
.await
.is_err(),
);
}
// Let's allow empty streams for now.
@ -150,35 +190,46 @@ async fn conf_send() {
//
// #[tokio::test]
// async fn conf_empty_stream() {
// let mut plugin = Plugin::default();
// plugin
// .stream_impl("stream".into(), "virtual".into(), Value::Null)
// .await
// .unwrap();
// assert!(plugin.finish_setup().await.is_err());
// assert!(
// Plugin::default()
// .load_config(
// vec![StreamConfig {
// stream_name: "stream".into(),
// stream_type: "virtual".into(),
// config: Value::Null,
// }],
// vec![],
// )
// .await
// .is_err(),
// );
// }
#[tokio::test]
async fn run_simple() {
let mut plugin = Plugin::default();
let mut stream = plugin
.stream_impl("stream".into(), "virtual".into(), Value::Null)
.await
.unwrap();
assert!(!stream.standalone);
let action = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
json!({ "send": "message <test>", "to": "stream" }).into(),
vec!["test".into()],
let (mut streams, mut actions) = plugin
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null,
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: json!({ "send": "message <test>", "to": "stream" }).into(),
patterns: vec!["test".into()],
}],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
let mut stream = streams.pop().unwrap();
let action = actions.pop().unwrap();
assert!(!stream.standalone);
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
@ -202,37 +253,40 @@ async fn run_simple() {
#[tokio::test]
async fn run_two_actions() {
let mut plugin = Plugin::default();
let mut stream = plugin
.stream_impl("stream".into(), "virtual".into(), Value::Null)
let (mut streams, mut actions) = plugin
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "virtual".into(),
config: Value::Null,
}],
vec![
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: json!({ "send": "send <a>", "to": "stream" }).into(),
patterns: vec!["a".into(), "b".into()],
},
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "virtual".into(),
config: json!({ "send": "<b> send", "to": "stream" }).into(),
patterns: vec!["a".into(), "b".into()],
},
],
)
.await
.unwrap();
let mut stream = streams.pop().unwrap();
assert!(!stream.standalone);
let action1 = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
json!({ "send": "send <a>", "to": "stream" }).into(),
vec!["a".into(), "b".into()],
)
.await
.unwrap();
let action2 = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"virtual".into(),
json!({ "send": "<b> send", "to": "stream" }).into(),
vec!["a".into(), "b".into()],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
let action2 = actions.pop().unwrap();
let action1 = actions.pop().unwrap();
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();