plugin: Remove action oneshot response

This commit is contained in:
ppom 2025-12-07 12:00:00 +01:00
commit c595552504
No known key found for this signature in database
5 changed files with 1 additions and 33 deletions

View file

@ -141,10 +141,6 @@ impl ActionInit {
eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}");
};
}
if let Err(err) = m.result.send(Ok(())) {
eprintln!("ERROR while responding to reaction action: {err}");
}
}
}
}

View file

@ -197,11 +197,7 @@ impl VirtualAction {
async fn serve(&mut self) {
while let Ok(Some(exec)) = self.rx.recv().await {
let line = self.send.line(exec.match_);
let result = match self.to.tx.send((line, exec.time)).await {
Ok(_) => Ok(()),
Err(err) => Err(format!("{err}")),
};
exec.result.send(result).unwrap();
self.to.tx.send((line, exec.time)).await.unwrap();
}
}
}

View file

@ -1,7 +1,6 @@
use std::time::{SystemTime, UNIX_EPOCH};
use reaction_plugin::{Exec, PluginInfo, Value};
use remoc::rch::oneshot;
use serde_json::json;
use crate::Plugin;
@ -179,14 +178,12 @@ async fn run_simple() {
assert!(plugin.finish_setup().await.is_ok());
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let (tx, rx) = oneshot::channel();
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
assert!(
action
.tx
.send(Exec {
match_: vec![m.into()],
result: tx,
time,
})
.await
@ -196,7 +193,6 @@ async fn run_simple() {
stream.stream.recv().await.unwrap().unwrap(),
(format!("message {m}"), time),
);
assert!(rx.await.is_ok());
}
}
@ -237,13 +233,11 @@ async fn run_two_actions() {
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let (tx, rx1) = oneshot::channel();
assert!(
action1
.tx
.send(Exec {
match_: vec!["aa".into(), "bb".into()],
result: tx,
time,
})
.await
@ -254,13 +248,11 @@ async fn run_two_actions() {
("send aa".into(), time),
);
let (tx, rx2) = oneshot::channel();
assert!(
action2
.tx
.send(Exec {
match_: vec!["aa".into(), "bb".into()],
result: tx,
time,
})
.await
@ -270,7 +262,4 @@ async fn run_two_actions() {
stream.stream.recv().await.unwrap().unwrap(),
("bb send".into(), time),
);
assert!(rx1.await.is_ok());
assert!(rx2.await.is_ok());
}

View file

@ -293,7 +293,6 @@ pub struct ActionImpl {
#[derive(Serialize, Deserialize)]
pub struct Exec {
pub match_: Vec<String>,
pub result: rch::oneshot::Sender<Result<(), String>>,
pub time: Duration,
}

View file

@ -416,12 +416,10 @@ fn exec_now(
);
// Sending action
let (response_tx, response_rx) = remoc::rch::oneshot::channel();
if let Err(err) = action_impl
.tx
.send(reaction_plugin::Exec {
match_: m,
result: response_tx,
time: t.into(),
})
.await
@ -429,16 +427,6 @@ fn exec_now(
error!("{action}: communication with plugin failed: {err}");
return;
}
// Receiving response
match response_rx.await {
Ok(Ok(())) => (),
Ok(Err(err)) => error!(
"{action}: run {}: {err}",
action.action_type.clone().unwrap_or_default(),
),
Err(err) => error!("{action}: communication with plugin failed: {err}"),
}
}
None => {
// Wait for semaphore's permission, if it is Some