From fd2f2fd9f77ea01dcd8f23296265e53df41432cb Mon Sep 17 00:00:00 2001 From: dkay Date: Fri, 5 Jun 2026 19:16:30 -0500 Subject: [PATCH] sandbox,server: surface per-path L7 escalations as fresh draft chunks Three pieces that together let post-approval L7 denials reach a reviewer instead of vanishing: - sandbox: wire L7 relay denials into the denial aggregator. L7EvalContext gains a denial_tx channel; every L7 deny (request-log and forward paths) emits a DenialEvent carrying the observed method/path, feeding the same observation-driven analysis as connect-stage denials so mechanistic proposals can be path-aware. - server persistence: clear dedup_key when a chunk is decided (sqlite + postgres). New observations for the same host|port|binary surface as a fresh pending chunk instead of silently folding their hit_count into a row the reviewer already acted on. - server: make the post-approval mechanistic self-reject sweep L7-evidence-aware. A resubmit asking for nothing beyond the union of the approved grants for that endpoint still self-rejects (straggler-flush noise suppression); a submission carrying method/path asks OUTSIDE the approved grants is the agent asking for more than was granted and stays pending for review. Path coverage uses a conservative glob matcher (* = one segment, ** only as trailing segment, unknown shapes fall back to exact equality) so ambiguity errs toward surfacing a card rather than swallowing an escalation. Co-Authored-By: Claude Opus 4.8 --- crates/openshell-sandbox/src/l7/graphql.rs | 1 + crates/openshell-sandbox/src/l7/relay.rs | 81 +++++ crates/openshell-sandbox/src/l7/websocket.rs | 1 + crates/openshell-sandbox/src/proxy.rs | 5 + crates/openshell-server/src/grpc/policy.rs | 322 +++++++++++++++++- .../src/persistence/postgres.rs | 7 +- .../src/persistence/sqlite.rs | 7 +- 7 files changed, 410 insertions(+), 14 deletions(-) diff --git a/crates/openshell-sandbox/src/l7/graphql.rs b/crates/openshell-sandbox/src/l7/graphql.rs index 2ff502d1c..20a457fe9 100644 --- a/crates/openshell-sandbox/src/l7/graphql.rs +++ b/crates/openshell-sandbox/src/l7/graphql.rs @@ -802,6 +802,7 @@ network_policies: cmdline_paths: Vec::new(), secret_resolver: None, activity_tx: None, + denial_tx: None, }; let request_info = crate::l7::L7RequestInfo { action: req.action, diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 9efa7ca9f..c60de3462 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -8,6 +8,7 @@ //! and either forwards or denies the request. use crate::activity_aggregator::{ActivitySender, try_record_activity}; +use crate::denial_aggregator::DenialEvent; use crate::l7::provider::{L7Provider, RelayOutcome}; use crate::l7::rest::WebSocketExtensionMode; use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo}; @@ -40,6 +41,10 @@ pub struct L7EvalContext { pub(crate) secret_resolver: Option>, /// Anonymous activity counter channel. pub(crate) activity_tx: Option, + /// Denial aggregator channel. L7 request denials feed the same + /// observation-driven policy analysis as connect-stage denials, carrying + /// the observed method/path so proposals can be path-aware. + pub(crate) denial_tx: Option>, } #[derive(Default)] @@ -453,6 +458,28 @@ fn emit_l7_request_log( .build(); ocsf_emit!(event); emit_activity(ctx, decision_str == "deny", "l7_policy"); + if decision_str == "deny" { + emit_l7_denial(ctx, request_info, redacted_target, reason); + } +} + +/// Feed an L7 request denial to the denial aggregator (if configured) so the +/// observation-driven analysis can propose path-aware rules. The target is +/// already redacted (no query string / credentials), matching what the OCSF +/// log records. +fn emit_l7_denial(ctx: &L7EvalContext, request_info: &L7RequestInfo, path: &str, reason: &str) { + if let Some(tx) = &ctx.denial_tx { + let _ = tx.send(DenialEvent { + host: ctx.host.clone(), + port: ctx.port, + binary: ctx.binary_path.clone(), + ancestors: ctx.ancestors.clone(), + deny_reason: reason.to_string(), + denial_stage: "l7".to_string(), + l7_method: Some(request_info.action.clone()), + l7_path: Some(path.to_string()), + }); + } } fn emit_activity(ctx: &L7EvalContext, denied: bool, deny_group: &'static str) { @@ -763,6 +790,9 @@ where )) .build(); ocsf_emit!(event); + if decision_str == "deny" { + emit_l7_denial(ctx, &request_info, &redacted_target, &reason); + } } // Store the resolved target for the deny response redaction @@ -1309,6 +1339,51 @@ mod tests { const TEST_POLICY: &str = include_str!("../../data/sandbox-policy.rego"); + /// An L7 deny must feed the denial aggregator with the observed + /// method/path so observation-driven analysis can propose path-aware + /// rules; allows must not. + #[test] + fn l7_deny_emits_denial_event_with_method_and_path() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let ctx = L7EvalContext { + host: "api.example.test".into(), + port: 443, + policy_name: "rest_api".into(), + binary_path: "/usr/bin/gh".into(), + ancestors: vec!["/bin/bash".into()], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + denial_tx: Some(tx), + }; + let request = L7RequestInfo { + action: "GET".into(), + target: "/user".into(), + query_params: std::collections::HashMap::new(), + graphql: None, + }; + + emit_l7_request_log( + &ctx, + &request, + "/user", + "deny", + "l7", + "GET /user not permitted by policy", + None, + ); + let event = rx.try_recv().expect("deny must emit a denial event"); + assert_eq!(event.host, "api.example.test"); + assert_eq!(event.port, 443); + assert_eq!(event.binary, "/usr/bin/gh"); + assert_eq!(event.denial_stage, "l7"); + assert_eq!(event.l7_method.as_deref(), Some("GET")); + assert_eq!(event.l7_path.as_deref(), Some("/user")); + + emit_l7_request_log(&ctx, &request, "/user", "allow", "l7", "", None); + assert!(rx.try_recv().is_err(), "allow must not emit a denial event"); + } + #[test] fn parse_rejection_detail_adds_l7_hint_for_encoded_slash() { let detail = parse_rejection_detail( @@ -1383,6 +1458,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let request = L7RequestInfo { action: "WEBSOCKET_TEXT".into(), @@ -1439,6 +1515,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1544,6 +1621,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: resolver.map(Arc::new), activity_tx: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1662,6 +1740,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: resolver.map(Arc::new), activity_tx: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1833,6 +1912,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1921,6 +2001,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); diff --git a/crates/openshell-sandbox/src/l7/websocket.rs b/crates/openshell-sandbox/src/l7/websocket.rs index 89a6e6c51..3ace2b0fc 100644 --- a/crates/openshell-sandbox/src/l7/websocket.rs +++ b/crates/openshell-sandbox/src/l7/websocket.rs @@ -1271,6 +1271,7 @@ network_policies: cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let (mut client_write, mut relay_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); let (mut relay_write, mut upstream_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index ae100d734..3bf035fa3 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -953,6 +953,7 @@ async fn handle_tcp_connection( .collect(), secret_resolver: secret_resolver.clone(), activity_tx: activity_tx.clone(), + denial_tx: denial_tx.clone(), }; if effective_tls_skip { @@ -3120,6 +3121,7 @@ async fn handle_forward_proxy( .collect(), secret_resolver: secret_resolver.clone(), activity_tx: activity_tx.cloned(), + denial_tx: denial_tx.cloned(), }; let mut l7_activity_pending = false; @@ -4177,6 +4179,7 @@ mod tests { cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; (config, tunnel_engine, ctx) } @@ -4343,6 +4346,7 @@ mod tests { cmdline_paths: vec![], secret_resolver: resolver, activity_tx: None, + denial_tx: None, }; let query_params = std::collections::HashMap::new(); @@ -4384,6 +4388,7 @@ mod tests { cmdline_paths: vec![], secret_resolver: None, activity_tx: None, + denial_tx: None, }; let query_params = std::collections::HashMap::new(); let config = websocket_l7_config(crate::l7::L7Protocol::Rest, false); diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 380671f10..402429e71 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -758,9 +758,15 @@ async fn supersede_other_pending_chunks_for_endpoint( } /// If the just-submitted mechanistic chunk targets a `(host, port, binary)` -/// already covered by an approved `agent_authored` chunk, auto-reject the -/// mechanistic chunk on arrival. The agent has already handled this access -/// decision; the mechanistic draft would only add approval-queue noise. +/// already covered by an approved chunk, auto-reject the mechanistic chunk +/// on arrival — but only when the approved grants actually cover what it +/// asks for. A connect-level resubmit (no L7 asks) is the classic +/// straggler-flush noise case: denials recorded just before an approval +/// hot-reloaded, flushed just after. Rejecting those keeps the approval +/// queue clean. A submission carrying L7 method/path evidence OUTSIDE the +/// union of the approved chunks' allow rules is different in kind: it is +/// the agent asking for MORE than was granted, and it must surface as a +/// fresh pending chunk for review, never vanish. /// /// `agent_authored` submissions are NEVER self-rejected — that path remains /// open for refinement. Only the mechanistic side is asymmetric. @@ -771,6 +777,7 @@ async fn self_reject_mechanistic_if_already_covered( host: &str, port: i32, binary: &str, + proposed_rule: &NetworkPolicyRule, ) { if host.is_empty() || port == 0 || binary.is_empty() { return; @@ -792,18 +799,46 @@ async fn self_reject_mechanistic_if_already_covered( } }; - // If any approved chunk for this sandbox already targets the same - // (host, port, binary), the mechanistic submission is redundant. - let covered_by = approved + let covering: Vec<_> = approved .iter() - .find(|c| c.host == host && c.port == port && c.binary == binary); - let Some(covering) = covered_by else { + .filter(|c| c.host == host && c.port == port && c.binary == binary) + .collect(); + if covering.is_empty() { return; - }; + } + + // Compare asks against the union of every approved grant for this + // endpoint, not just the first match: a ghost resubmit of the SECOND + // approval must be recognized as covered even though the first approved + // chunk knows nothing about its paths. + let new_asks = l7_asks(proposed_rule); + if !new_asks.is_empty() { + let granted: Vec<(String, String)> = covering + .iter() + .filter_map(|c| NetworkPolicyRule::decode(c.proposed_rule.as_slice()).ok()) + .flat_map(|r| l7_asks(&r)) + .collect(); + let uncovered = new_asks.iter().any(|(method, path)| { + !granted + .iter() + .any(|(gm, gp)| l7_method_covers(gm, method) && l7_path_covers(gp, path)) + }); + if uncovered { + info!( + sandbox_id = %sandbox_id, + chunk_id = %new_chunk_id, + host = %host, + port = port, + binary = %binary, + "Mechanistic chunk carries L7 evidence beyond the approved grant; kept pending for review" + ); + return; + } + } let reason = format!( - "already covered by approved chunk {} (agent_authored or prior auto-approval)", - covering.id + "already covered by approved chunk {} (no method/path evidence beyond the existing grant)", + covering[0].id ); match state .store @@ -819,7 +854,7 @@ async fn self_reject_mechanistic_if_already_covered( info!( sandbox_id = %sandbox_id, chunk_id = %new_chunk_id, - covering_chunk = %covering.id, + covering_chunk = %covering[0].id, host = %host, port = port, binary = %binary, @@ -836,6 +871,49 @@ async fn self_reject_mechanistic_if_already_covered( } } +/// Extract the L7 `(method, path)` asks from a proposed rule's allow rules. +/// Empty for a connect-level rule (no L7 inspection evidence). +fn l7_asks(rule: &NetworkPolicyRule) -> Vec<(String, String)> { + rule.endpoints + .iter() + .flat_map(|ep| ep.rules.iter()) + .filter_map(|r| r.allow.as_ref()) + .filter(|a| !a.method.is_empty() || !a.path.is_empty()) + .map(|a| (a.method.clone(), a.path.clone())) + .collect() +} + +/// Does a granted method pattern cover an asked method? +fn l7_method_covers(granted: &str, asked: &str) -> bool { + granted == "*" || granted.eq_ignore_ascii_case(asked) +} + +/// Does a granted path glob cover an asked path? +/// +/// Segment-wise: `*` matches exactly one segment, `**` matches any +/// remainder but is only honoured as the FINAL segment — the only form the +/// mechanistic mapper and provider profiles emit. Any unrecognized pattern +/// shape falls back to exact string equality, which errs toward surfacing a +/// reviewable card rather than silently swallowing an escalation. +fn l7_path_covers(granted: &str, asked: &str) -> bool { + if granted == "**" || granted == asked { + return true; + } + let g: Vec<&str> = granted.split('/').collect(); + let a: Vec<&str> = asked.split('/').collect(); + // `**` anywhere but the tail is an unsupported shape; exact equality + // already failed above, so treat as not covered. + if g[..g.len().saturating_sub(1)].contains(&"**") { + return false; + } + if g.last() == Some(&"**") { + let prefix = &g[..g.len() - 1]; + return a.len() >= prefix.len() + && prefix.iter().zip(&a).all(|(gs, asg)| *gs == "*" || gs == asg); + } + g.len() == a.len() && g.iter().zip(&a).all(|(gs, asg)| *gs == "*" || gs == asg) +} + /// Internally approve a chunk on the auto-approval path: merge into the /// active policy, flip status to "approved", notify watchers, and emit a /// `CONFIG:APPROVED` audit event carrying `auto=true`, `source=`, @@ -2354,6 +2432,7 @@ pub(super) async fn handle_submit_policy_analysis( &record.host, record.port, &record.binary, + chunk.proposed_rule.as_ref().expect("checked above"), ) .await; } @@ -7566,6 +7645,225 @@ mod tests { ); } + /// Build a test sandbox + a submit closure for the post-decision tests. + /// The closure submits one mechanistic chunk with the given rule and + /// returns the response. + async fn setup_post_decision_sandbox( + sandbox_id: &str, + sandbox_name: &str, + ) -> Arc { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: sandbox_id.to_string(), + name: sandbox_name.to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Ready as i32); + state.store.put_message(&sandbox).await.unwrap(); + state + } + + /// Connect-level rule for `/usr/bin/curl -> api.example.com:443`, with + /// optional L7 method/path allow rules attached to the endpoint. + fn post_decision_rule(l7: &[(&str, &str)]) -> NetworkPolicyRule { + use openshell_core::proto::{L7Allow, NetworkBinary, NetworkEndpoint}; + + NetworkPolicyRule { + name: "allow_example".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + rules: l7 + .iter() + .map(|(method, path)| L7Rule { + allow: Some(L7Allow { + method: (*method).to_string(), + path: (*path).to_string(), + ..Default::default() + }), + }) + .collect(), + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + } + } + + async fn submit_mechanistic( + state: &Arc, + sandbox_name: &str, + rule: NetworkPolicyRule, + ) -> SubmitPolicyAnalysisResponse { + handle_submit_policy_analysis( + state, + with_user(Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.to_string(), + analysis_mode: "mechanistic".to_string(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_example".to_string(), + proposed_rule: Some(rule), + ..Default::default() + }], + ..Default::default() + })), + ) + .await + .unwrap() + .into_inner() + } + + /// A decided chunk must stop absorbing new mechanistic submissions for + /// the same endpoint. Once the reviewer approves the connect-level + /// proposal, later denials for that host|port|binary carrying L7 + /// method/path evidence beyond the approved grant must surface as a + /// fresh PENDING chunk for review — not fold their hit_count invisibly + /// into the decided row (dedup_key cleared on decision), and not be + /// swallowed by the post-approval self-reject sweep (which only fires + /// when the approved grants cover the new asks). + #[tokio::test] + async fn mechanistic_submit_after_decision_creates_fresh_pending_chunk() { + let sandbox_name = "mechanistic-post-decision"; + let state = setup_post_decision_sandbox("sb-mech-post-decision", sandbox_name).await; + + let first = submit_mechanistic(&state, sandbox_name, post_decision_rule(&[])).await; + assert_eq!(first.accepted_chunk_ids.len(), 1); + let first_id = first.accepted_chunk_ids[0].clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.to_string(), + chunk_id: first_id.clone(), + }), + ) + .await + .unwrap(); + + // Post-approval L7 denial evidence: the agent asked for a write the + // connect-level approval never granted. + let second = submit_mechanistic( + &state, + sandbox_name, + post_decision_rule(&[("POST", "/repos/*/issues")]), + ) + .await; + assert_eq!(second.accepted_chunk_ids.len(), 1); + let second_id = second.accepted_chunk_ids[0].clone(); + assert_ne!( + first_id, second_id, + "a submit after the decision must create a fresh chunk, not fold into the approved one" + ); + + let pending = handle_get_draft_policy( + &state, + with_user(Request::new(GetDraftPolicyRequest { + name: sandbox_name.to_string(), + status_filter: "pending".to_string(), + })), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending.chunks.len(), + 1, + "the post-decision submit with uncovered L7 evidence must be reviewable as a pending chunk" + ); + assert_eq!(pending.chunks[0].id, second_id); + } + + /// The flip side of the previous test: a post-approval mechanistic + /// resubmit that asks for NOTHING beyond the approved grant is straggler + /// noise (denials recorded just before the approval hot-reloaded, + /// flushed just after) and must be self-rejected — no ghost card for the + /// reviewer. Covers both the identical connect-level resubmit and an L7 + /// ask that is a subset of what was already approved. + #[tokio::test] + async fn mechanistic_resubmit_covered_by_approved_grant_self_rejects() { + let sandbox_name = "mechanistic-covered-resubmit"; + let state = setup_post_decision_sandbox("sb-mech-covered", sandbox_name).await; + + let first = submit_mechanistic( + &state, + sandbox_name, + post_decision_rule(&[("GET", "/user")]), + ) + .await; + let first_id = first.accepted_chunk_ids[0].clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.to_string(), + chunk_id: first_id, + }), + ) + .await + .unwrap(); + + // Identical L7 ask (subset of the grant) and a connect-level + // resubmit: both are covered, both must self-reject. + for rule in [ + post_decision_rule(&[("GET", "/user")]), + post_decision_rule(&[]), + ] { + submit_mechanistic(&state, sandbox_name, rule).await; + } + + let pending = handle_get_draft_policy( + &state, + with_user(Request::new(GetDraftPolicyRequest { + name: sandbox_name.to_string(), + status_filter: "pending".to_string(), + })), + ) + .await + .unwrap() + .into_inner(); + assert!( + pending.chunks.is_empty(), + "covered resubmits must self-reject, not pile up as ghost cards: {:?}", + pending.chunks + ); + } + + /// Glob coverage semantics for the self-reject sweep: `*` is one + /// segment, `**` only as the trailing segment, unknown shapes fall back + /// to exact equality (conservative: surface a card rather than swallow). + #[test] + fn l7_path_covers_glob_semantics() { + // Exact and universal. + assert!(l7_path_covers("/user", "/user")); + assert!(l7_path_covers("**", "/anything/at/all")); + // Single-segment wildcard. + assert!(l7_path_covers("/repos/*/issues", "/repos/myorg/issues")); + assert!(!l7_path_covers("/repos/*/issues", "/repos/myorg/pulls")); + assert!(!l7_path_covers("/repos/*/issues", "/repos/a/b/issues")); + // Trailing `**` covers any remainder, including generalized asks. + assert!(l7_path_covers("/v1/models/**", "/v1/models/abc123")); + assert!(l7_path_covers("/v1/models/**", "/v1/models/*")); + assert!(l7_path_covers("/v1/**", "/v1/models/abc/def")); + assert!(!l7_path_covers("/v1/models/**", "/v2/models/abc")); + // Mid-pattern `**` is an unsupported shape: exact match only. + assert!(!l7_path_covers("/a/**/z", "/a/b/z")); + assert!(l7_path_covers("/a/**/z", "/a/**/z")); + // A grant for a specific id does not cover a generalized ask. + assert!(!l7_path_covers("/v1/models/abc", "/v1/models/*")); + } + /// Undo of an approve must clear any `rejection_reason` left over from a /// prior reject. Without this, the in-sandbox agent reading chunks via /// `policy.local` cannot tell "pending and never rejected" from "pending diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 529bc38be..8487dcaeb 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -736,10 +736,15 @@ ORDER BY created_at_ms DESC } let payload = draft_chunk_payload_from_record(&record)?; + // Clear the dedup target once a chunk is decided: new observations for + // the same host|port|binary must surface as a fresh pending chunk + // (possibly carrying new L7 evidence) instead of silently folding + // their hit_count into a row the reviewer already acted on. let result = sqlx::query( r" UPDATE objects -SET status = $3, payload = $4, updated_at_ms = $5 +SET status = $3, payload = $4, updated_at_ms = $5, + dedup_key = CASE WHEN $3 = 'pending' THEN dedup_key ELSE NULL END WHERE object_type = $1 AND id = $2 ", ) diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index 1958b3232..b85d986b8 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -758,10 +758,15 @@ ORDER BY "created_at_ms" DESC } let payload = draft_chunk_payload_from_record(&record)?; + // Clear the dedup target once a chunk is decided: new observations for + // the same host|port|binary must surface as a fresh pending chunk + // (possibly carrying new L7 evidence) instead of silently folding + // their hit_count into a row the reviewer already acted on. let result = sqlx::query( r#" UPDATE "objects" -SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5 +SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5, + "dedup_key" = CASE WHEN ?3 = 'pending' THEN "dedup_key" ELSE NULL END WHERE "object_type" = ?1 AND "id" = ?2 "#, )