Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 56 additions & 47 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,57 +1,66 @@
# --- Rust build artifacts ---
/target/
**/*.rs.bk
```
# Compiled and build artifacts
*.o
*.obj
*.exe
*.dll
*.so
*.a
*.out
target/

# --- Dependency lockfile ---
# Keep Cargo.lock tracked for binaries/apps.
# Uncomment the next line only if this repository is intended to be a library crate.
# Cargo.lock
# Dependencies
**/node_modules/
**/venv/
**/.venv/
**/__pycache__/
**/.mypy_cache/
**/.pytest_cache/
**/.gradle/

# --- Local configuration / secrets ---
.env
.env.*
!.env.example
!.env.sample
!.env.template
*.local

# --- Logs and runtime output ---
# Logs and temp files
*.log
logs/
netscope.log

# --- Temporary files ---
/tmp/
*.tmp
*.temp
*.swp
*.swo

# --- Coverage, profiling, and analysis outputs ---
*.profraw
*.profdata
*.gcda
*.gcno
*.lcov
*.coverage
coverage/

# --- Benchmark outputs ---
criterion/
# Environment
.env
.env.local
.env.*

# --- IDE / editor ---
# Editors
.vscode/
!.vscode/extensions.json
!.vscode/settings.json
.idea/
*.iml
*.swp
*.swo

# --- OS files ---
.DS_Store
._*
.Spotlight-V100
.Trashes
# Coverage
coverage/
htmlcov/
.coverage

# --- Misc caches ---
.cache/
.sass-cache/
node_modules/
feature list(internal).md
# Compressed files
*.zip
*.gz
*.tar
*.tgz
*.bz2
*.xz
*.7z
*.rar
*.zst
*.lz4
*.lzh
*.cab
*.arj
*.rpm
*.deb
*.Z
*.lz
*.lzo
*.tar.gz
*.tar.bz2
*.tar.xz
*.tar.zst
```
27 changes: 17 additions & 10 deletions src/pipeline/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,20 @@ impl AggregatorHandle {

/// Take the latest aggregated tick (returns `None` if no new tick since last call).
pub fn take_tick(&self) -> Option<AggregatedTick> {
self.inner.lock().unwrap().latest_tick.take()
self.inner.lock().ok()?.latest_tick.take()
}

/// Current alert count.
pub fn alert_count(&self) -> u64 {
self.inner.lock().unwrap().alert_count
self.inner.lock().ok()?.alert_count
}

/// Collect all final flow snapshots (call after pipeline shutdown).
pub fn take_final_snapshots(&self) -> Vec<FlowSnapshot> {
let mut state = self.inner.lock().unwrap();
let mut state = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let mut all: Vec<FlowSnapshot> = state
.shard_snapshots
.drain(..)
Expand All @@ -125,7 +128,7 @@ impl AggregatorHandle {

/// Take the fatal error, if any.
pub fn take_fatal_error(&self) -> Option<String> {
self.inner.lock().unwrap().fatal_error.take()
self.inner.lock().ok()?.fatal_error.take()
}
}

Expand All @@ -142,7 +145,11 @@ pub fn run(rx: Receiver<WorkerEvent>, handle: AggregatorHandle, config: Aggregat
running,
} = config;

let num_workers = handle.inner.lock().unwrap().num_workers;
let num_workers = handle
.inner
.lock()
.map(|g| g.num_workers)
.unwrap_or_else(|e| e.into_inner().num_workers);
let frame_seq = AtomicU64::new(0);

// Accumulate partial shard ticks, then merge once all shards have reported.
Expand Down Expand Up @@ -191,7 +198,7 @@ pub fn run(rx: Receiver<WorkerEvent>, handle: AggregatorHandle, config: Aggregat

// Store for CLI consumption.
{
let mut state = handle.inner.lock().unwrap();
let mut state = handle.inner.lock().unwrap_or_else(|e| e.into_inner());
state.latest_tick = Some(merged);
}

Expand Down Expand Up @@ -245,7 +252,7 @@ pub fn run(rx: Receiver<WorkerEvent>, handle: AggregatorHandle, config: Aggregat
}

{
let mut state = handle.inner.lock().unwrap();
let mut state = handle.inner.lock().unwrap_or_else(|e| e.into_inner());
state.latest_tick = Some(merged);
}

Expand Down Expand Up @@ -313,12 +320,12 @@ fn handle_event(

match event {
WorkerEvent::Shutdown(shutdown) => {
let mut state = handle.inner.lock().unwrap();
let mut state = handle.inner.lock().unwrap_or_else(|e| e.into_inner());
record_shutdown_snapshot(&mut state, shutdown);
}
WorkerEvent::Alert(alert) => {
{
let mut state = handle.inner.lock().unwrap();
let mut state = handle.inner.lock().unwrap_or_else(|e| e.into_inner());
state.alert_count += 1;
}
output_sinks.write_alert(alert.ts, &alert.kind, &alert.description)?;
Expand Down Expand Up @@ -387,7 +394,7 @@ fn handle_fatal_error(
context: &str,
) {
tracing::error!(error = %err, context, "fatal pipeline error");
let mut state = handle.inner.lock().unwrap();
let mut state = handle.inner.lock().unwrap_or_else(|e| e.into_inner());
if state.fatal_error.is_none() {
state.fatal_error = Some(format!("{}: {}", context, err));
}
Expand Down
38 changes: 31 additions & 7 deletions src/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,20 @@ fn constant_time_eq(left: &[u8], right: &[u8]) -> bool {
}

fn unauthorized_response() -> Response {
Response::builder()
match Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header(header::WWW_AUTHENTICATE, "Basic realm=\"NetScope\"")
.body(axum::body::Body::from("unauthorized"))
.unwrap()
{
Ok(response) => response,
Err(err) => {
tracing::error!(error = %err, "failed to build unauthorized response");
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from("internal error"))
.expect("response builder failed")
}
}
}

async fn handle_ws(mut socket: WebSocket, state: Arc<AppState>) {
Expand Down Expand Up @@ -552,7 +561,10 @@ async fn static_handler(uri: Uri) -> impl IntoResponse {
return Response::builder()
.status(StatusCode::NOT_FOUND)
.body(axum::body::Body::from("not found"))
.unwrap();
.unwrap_or_else(|err| {
tracing::error!(error = %err, "failed to build not found response");
Response::new(axum::body::Body::from("not found"))
});
}

// Try the exact path first, then fall back to index.html (SPA)
Expand All @@ -562,27 +574,39 @@ async fn static_handler(uri: Uri) -> impl IntoResponse {
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime.as_ref())
.body(axum::body::Body::from(content.data.to_vec()))
.unwrap()
.unwrap_or_else(|err| {
tracing::error!(error = %err, "failed to build static file response");
Response::new(axum::body::Body::from("internal error"))
})
} else if should_serve_spa(path) {
let content = match Assets::get("index.html") {
Some(content) => content,
None => {
return Response::builder()
.status(StatusCode::NOT_FOUND)
.body(axum::body::Body::from("not found"))
.unwrap();
.unwrap_or_else(|err| {
tracing::error!(error = %err, "failed to build not found response");
Response::new(axum::body::Body::from("not found"))
});
}
};
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/html; charset=utf-8")
.body(axum::body::Body::from(content.data.to_vec()))
.unwrap()
.unwrap_or_else(|err| {
tracing::error!(error = %err, "failed to build index.html response");
Response::new(axum::body::Body::from("internal error"))
})
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(axum::body::Body::from("not found"))
.unwrap()
.unwrap_or_else(|err| {
tracing::error!(error = %err, "failed to build not found response");
Response::new(axum::body::Body::from("not found"))
})
}
}

Expand Down
Loading