Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ fn run_capture_pipeline(
capture_result?;

if let Some(err) = pipe.aggregator.take_fatal_error() {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err).into());
return Err(std::io::Error::other(err).into());
}

// Print summary
Expand Down
8 changes: 4 additions & 4 deletions src/pipeline/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ fn handle_event(
output_sinks: &mut OutputSinks,
mode: DrainMode,
) -> Result<(), std::io::Error> {
if let DrainMode::ShutdownOnly = mode {
if !matches!(event, WorkerEvent::Shutdown(_)) {
return Ok(());
}
if let DrainMode::ShutdownOnly = mode
&& !matches!(event, WorkerEvent::Shutdown(_))
{
return Ok(());
}

match event {
Expand Down
6 changes: 6 additions & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ pub struct VlanStack {
truncated: bool,
}

impl Default for VlanStack {
fn default() -> Self {
Self::new()
}
}

impl VlanStack {
pub fn new() -> Self {
Self {
Expand Down
36 changes: 13 additions & 23 deletions src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ impl ExpiredFlowSinks {
break;
}
}
if !disable_jsonl {
if let Err(err) = sink.flush() {
tracing::warn!(error = %err, "expired flows jsonl disabled after flush error");
disable_jsonl = true;
}
if !disable_jsonl && let Err(err) = sink.flush() {
tracing::warn!(error = %err, "expired flows jsonl disabled after flush error");
disable_jsonl = true;
}
}
if disable_jsonl {
Expand All @@ -92,11 +90,9 @@ impl ExpiredFlowSinks {
break;
}
}
if !disable_csv {
if let Err(err) = sink.flush() {
tracing::warn!(error = %err, "expired flows csv disabled after flush error");
disable_csv = true;
}
if !disable_csv && let Err(err) = sink.flush() {
tracing::warn!(error = %err, "expired flows csv disabled after flush error");
disable_csv = true;
}
}
if disable_csv {
Expand Down Expand Up @@ -151,11 +147,11 @@ impl OutputSinks {
description: &str,
) -> Result<(), std::io::Error> {
let mut disable_alerts = false;
if let Some(sink) = self.alerts_jsonl.as_mut() {
if let Err(err) = sink.write_alert(ts, kind, description) {
tracing::warn!(error = %err, "alerts jsonl disabled after write error");
disable_alerts = true;
}
if let Some(sink) = self.alerts_jsonl.as_mut()
&& let Err(err) = sink.write_alert(ts, kind, description)
{
tracing::warn!(error = %err, "alerts jsonl disabled after write error");
disable_alerts = true;
}
if disable_alerts {
self.alerts_jsonl = None;
Expand All @@ -171,10 +167,7 @@ impl OutputSinks {
}
}

fn open_optional_jsonl_sink(
path: Option<&Path>,
label: &str,
) -> Option<JsonlSink> {
fn open_optional_jsonl_sink(path: Option<&Path>, label: &str) -> Option<JsonlSink> {
match path {
Some(path) => match JsonlSink::new(path) {
Ok(sink) => Some(sink),
Expand All @@ -192,10 +185,7 @@ fn open_optional_jsonl_sink(
}
}

fn open_optional_csv_sink(
path: Option<&Path>,
label: &str,
) -> Option<ExpiredFlowCsvSink> {
fn open_optional_csv_sink(path: Option<&Path>, label: &str) -> Option<ExpiredFlowCsvSink> {
match path {
Some(path) => match ExpiredFlowCsvSink::new(path) {
Ok(sink) => Some(sink),
Expand Down
11 changes: 7 additions & 4 deletions src/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ fn should_serve_spa(path: &str) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::web::messages::StatsTick;
use axum::{
body::Body,
http::{Request, StatusCode, header},
Expand All @@ -601,7 +602,6 @@ mod tests {
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use tower::util::ServiceExt;
use crate::web::messages::StatsTick;

fn test_state(auth: Option<BasicAuthCredentials>) -> Arc<AppState> {
let (broadcast_tx, _) = broadcast::channel::<BroadcastFrame>(16);
Expand Down Expand Up @@ -820,8 +820,7 @@ mod tests {
WsMessage::Text(t) => t,
other => panic!("unexpected: {other:?}"),
};
let hello_json: serde_json::Value =
serde_json::from_str(&hello_text).expect("hello json");
let hello_json: serde_json::Value = serde_json::from_str(&hello_text).expect("hello json");
assert_eq!(hello_json["type"], "hello");

server_handle.abort();
Expand Down Expand Up @@ -933,7 +932,11 @@ mod tests {
.await
.expect("reconnect");

let hello2 = socket2.next().await.expect("hello after reconnect").expect("msg");
let hello2 = socket2
.next()
.await
.expect("hello after reconnect")
.expect("msg");
let hello2_text = match hello2 {
WsMessage::Text(t) => t,
other => panic!("unexpected: {other:?}"),
Expand Down
Loading