From 8b3bde456e5a26aa7e35d60945f97b3dc1355168 Mon Sep 17 00:00:00 2001 From: ppom Date: Sun, 14 Dec 2025 12:00:00 +0100 Subject: [PATCH] cluster: UTC: no need for conversions, as Time already is UTC-aware --- plugins/reaction-plugin-cluster/src/cluster.rs | 1 - plugins/reaction-plugin-cluster/src/connection.rs | 15 +++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index 29a13e6..24904ac 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -142,7 +142,6 @@ impl ActionInit { eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}"); } - // TODO to_utc let line = (Arc::new(line), now); for result in join_all(nodes_tx.iter().map(|tx| tx.send(line.clone()))).await { if let Err(err) = result { diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index c9da512..dbb656b 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -29,8 +29,7 @@ const CLOSE_RECV: (u32, &[u8]) = (1, b"error receiving from your stream"); const CLOSE_CLOSED: (u32, &[u8]) = (2, b"you closed your stream"); const CLOSE_SEND: (u32, &[u8]) = (3, b"could not send a message to your channel so I quit"); -type RemoteLine = (String, Time); -type MaybeRemoteLine = Result, IoError>; +type MaybeRemoteLine = Result, IoError>; enum Event { LocalMessageReceived(Option), @@ -70,7 +69,10 @@ impl OwnConnection { } } - /// Send a line to peer + /// Send a line to peer. + /// + /// Time is a std::time::Duration since UNIX_EPOCH, which is defined as UTC + /// So it's safe to use between nodes using different timezones async fn send_line(&mut self, line: &String, time: &Time) -> Result<(), std::io::Error> { self.line_tx.write_u64(time.as_secs()).await?; self.line_tx.write_u32(time.subsec_nanos()).await?; @@ -82,7 +84,7 @@ impl OwnConnection { /// Cancel-safe function that returns next line from peer /// Returns None if we don't have all data yet. - async fn recv_line(&mut self) -> Result, std::io::Error> { + async fn recv_line(&mut self) -> MaybeRemoteLine { if self.next_time_secs.is_none() { self.next_time_secs = Some(self.line_rx.read_u64().await?); } @@ -387,11 +389,9 @@ impl ConnectionManager { self.close_connection(CLOSE_CLOSED).await; } Ok(Some(line)) => { - // TODO from_utc - let local_time = line.1; if let Err(err) = self .own_cluster_tx - .send((line.0.clone(), local_time.into())) + .send((line.0.clone(), line.1.into())) .await { eprintln!( @@ -428,7 +428,6 @@ impl ConnectionManager { } Some(message) => match &mut self.connection { Some(connection) => { - // TODO to_utc if let Err(err) = connection.send_line(&message.0, &message.1).await { eprintln!( "INFO cluster {}: connection with node {} failed: {err}",