use std::{ env, fs::File, io::{IsTerminal, Read, Write}, time::Duration, }; use tempfile::TempDir; use tracing::Level; use reaction::{cli::Format, client::request, daemon::daemon, protocol::Order}; use tokio::time::sleep; fn file_with_contents(path: &str, contents: &str) { let mut file = File::create(path).unwrap(); file.write_all(contents.as_bytes()).unwrap(); } fn config_with_cmd(config_path: &str, cmd: &str) { file_with_contents( config_path, &(" { concurrency: 0, patterns: { ip: { type: 'ip', ipv6mask: 64, }, }, streams: { stream1: { cmd: ['sh', '-c', '" .to_owned() + cmd + "'], filters: { filter1: { regex: ['ip '], retry: 2, retryperiod: '2s', duplicate: 'rerun', actions: { // Don't mix code and data at home! // You may permit arbitrary execution from vilains, // if your regex is permissive enough. // This is OK only for testing purposes. ipv4_1: { cmd: ['sh', '-c', 'echo >> ./ipv4.txt'], ipv4only: true, }, ipv4_2: { cmd: ['sh', '-c', 'echo del >> ./ipv4.txt'], ipv4only: true, after: '30s', onexit: false, }, ipv6_1: { cmd: ['sh', '-c', 'echo >> ./ipv6.txt'], ipv6only: true, }, ipv6_2: { cmd: ['sh', '-c', 'echo del >> ./ipv6.txt'], ipv6only: true, after: '30s', onexit: false, }, all_1: { cmd: ['sh', '-c', 'echo >> ./out.txt'], }, all_2: { cmd: ['sh', '-c', 'echo del >> ./out.txt'], after: '30s', onexit: false, }, } } } } } }"), ); } fn get_file_content(path: &str) -> String { let mut out_txt = File::open(path).unwrap(); let mut contents = String::new(); out_txt.read_to_string(&mut contents).unwrap(); contents } #[tokio::test] async fn ip() { let dir = TempDir::new().unwrap(); env::set_current_dir(&dir).unwrap(); let config_path = "config.jsonnet"; let out_path = "./out.txt"; let ipv4_path = "./ipv4.txt"; let ipv6_path = "./ipv6.txt"; let socket_path = "./reaction.sock"; config_with_cmd( config_path, "for i in 1.2.3.4 204:31::1 5.5.5.5 1.2.3.4 204:31::1 5.5.5.5; do echo ip $i; sleep 0.01; done; sleep 0.15", ); file_with_contents(out_path, ""); file_with_contents(ipv4_path, ""); file_with_contents(ipv6_path, ""); // Set the logger before running any code from the crate tracing_subscriber::fmt::fmt() .without_time() .with_target(false) .with_ansi(std::io::stdout().is_terminal()) .with_max_level(Level::DEBUG) .try_init() .unwrap(); // Run the daemon let handle = tokio::spawn(async move { daemon(config_path.into(), socket_path.into()).await }); // Run the flushes // We sleep for the time the echoes are finished + a bit (100ms) let handle2 = tokio::spawn(async move { sleep(Duration::from_millis(160)).await; request( socket_path.into(), Format::JSON, None, vec![("ip".into(), "1.2.3.4".into())], Order::Flush, ) .await }); let handle3 = tokio::spawn(async move { sleep(Duration::from_millis(180)).await; request( socket_path.into(), Format::JSON, None, vec![("ip".into(), "204:31::/64".into())], Order::Flush, ) .await }); let (daemon_exit, flush1, flush2) = tokio::join!(handle, handle2, handle3); assert!(daemon_exit.is_ok()); assert!(flush1.is_ok()); assert!(flush2.is_ok()); // tokio::time::sleep(Duration::from_secs(100)).await; assert_eq!( get_file_content(out_path).trim(), [ "1.2.3.4", "204:31::/64", "5.5.5.5", "del 1.2.3.4", "del 204:31::/64" ] .join("\n") ); assert_eq!( get_file_content(ipv4_path).trim(), ["1.2.3.4", "5.5.5.5", "del 1.2.3.4"].join("\n") ); assert_eq!( get_file_content(ipv6_path).trim(), ["204:31::/64", "del 204:31::/64"].join("\n") ); }