Fix #124: discard invalid utf8 sequences from input streams

This commit is contained in:
Baptiste Careil 2025-06-21 17:13:04 +02:00 committed by ppom
commit d12a61c14a
2 changed files with 71 additions and 7 deletions

View file

@ -3,7 +3,7 @@ use std::{collections::HashMap, process::Stdio, task::Poll, time::Duration};
use chrono::Local;
use futures::{FutureExt, StreamExt};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
io::{AsyncBufReadExt, BufReader},
pin,
process::{Child, ChildStderr, ChildStdout, Command},
time::sleep,
@ -17,16 +17,52 @@ use crate::{
use super::shutdown::ShutdownToken;
/** Converts bytes to string, discarding invalid utf8 sequences
*/
fn to_string(data: &[u8]) -> String {
let res = String::from_utf8_lossy(data);
res.to_string()
.replace(std::char::REPLACEMENT_CHARACTER, "")
}
#[allow(clippy::type_complexity)]
fn lines_to_stream<T: tokio::io::AsyncBufRead + Unpin>(
mut lines: Lines<T>,
fn lines_to_stream<T: tokio::io::AsyncRead + Unpin>(
mut lines: BufReader<T>,
) -> futures::stream::PollFn<
impl FnMut(&mut std::task::Context) -> Poll<Option<Result<String, std::io::Error>>>,
> {
let mut at_eof = false;
let mut buffer = vec![];
futures::stream::poll_fn(move |cx| {
let nl = lines.next_line();
if at_eof {
// reached EOF earlier, avoid calling read again
return Poll::Ready(None);
}
// Try to read until LF or EOF. If interrupted, buffer might contain data, in which case
// new data will be happened to it
let nl = lines.read_until(0x0a, &mut buffer);
pin!(nl);
futures::Future::poll(nl, cx).map(Result::transpose)
match futures::Future::poll(nl, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(0)) => {
if buffer.is_empty() {
// at eof
Poll::Ready(None)
} else {
// reached eof with data in the buffer
at_eof = true;
let line = to_string(&buffer);
buffer.clear();
Poll::Ready(Some(Ok(line)))
}
}
Poll::Ready(Ok(_)) => {
let line = to_string(&buffer);
buffer.clear();
Poll::Ready(Some(Ok(line)))
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
}
})
}
@ -119,8 +155,8 @@ async fn handle_io(
child_stderr: ChildStderr,
filter_managers: HashMap<&'static Filter, FilterManager>,
) {
let lines_stdout = lines_to_stream(BufReader::new(child_stdout).lines());
let lines_stderr = lines_to_stream(BufReader::new(child_stderr).lines());
let lines_stdout = lines_to_stream(BufReader::new(child_stdout));
let lines_stderr = lines_to_stream(BufReader::new(child_stderr));
// aggregate outputs, will end when both streams end
let mut lines = futures::stream::select(lines_stdout, lines_stderr);

View file

@ -0,0 +1,28 @@
/* Test that non-utf8 characters are stripped from the commands' output
*
* ASCII characters such as \x1b, \x05 are kept as is.
*/
{
patterns: {
id: {
regex: @'.+',
},
},
streams: {
binary: {
cmd: ['sh', '-c', 'for n in 123 456 987; do printf "\\n\\x1b$n\\xe2 \\x05"; sleep 0.5; done; printf "\\n"; sleep 0.2'],
filters: {
filt1: {
regex: [
@'<id>',
],
actions: {
act: {
cmd: ['echo', 'received "<id>"'],
},
},
},
},
},
},
}