From e8fdc861ce923b66d0037914793332b10183aee0 Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Thu, 7 May 2026 22:34:56 +0000 Subject: [PATCH 1/2] fix: Group API submitted patches by submission ID Patches submitted via the API were being split into multiple incomplete patchsets due to timestamp differences and distinct authors. This change adds an article ID to the RawMboxSubmitted event to track the submission ID. The API passes the synthetic ID as article ID, and the main application uses it to group all patches in the batch into a single thread. It also disables strict author checking for API submissions to allow patches from different authors in the same series. Signed-off-by: Elkin Cruz --- src/api.rs | 23 ++++++++++++-- src/db.rs | 17 ++-------- src/events.rs | 14 +++++++++ src/fetcher.rs | 7 ++++- src/main.rs | 85 +++++++++++++++++++++++++++++++++++++++----------- 5 files changed, 110 insertions(+), 36 deletions(-) diff --git a/src/api.rs b/src/api.rs index 356d1590..ef30344b 100644 --- a/src/api.rs +++ b/src/api.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::db::Database; -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::fetcher::FetchRequest; use crate::settings::ServerSettings; use axum::{ @@ -305,7 +305,7 @@ fn generate_synthetic_id(prefix: &str) -> String { .expect("Time went backwards"); // e.g. sashiko-local-1715890000-12345 format!( - "sashiko-{}-{}-{}", + "sashiko-{}-{}-{}@sashiko.local", prefix, since_the_epoch.as_secs(), fastrand::u32(..) @@ -346,6 +346,8 @@ async fn submit_patch( let event = Event::RawMboxSubmitted { raw, + submission_id: id.clone(), + source: MessageSource::ApiInject, group: "api-submit".to_string(), baseline: base_commit, skip_subjects, @@ -385,7 +387,7 @@ async fn submit_patch( if let Err(e) = state .db .create_fetching_patchset( - &id, + &format!("{}@sashiko.local", id), &format!("Fetching {} from {}...", &sha, repo_display), skip_subjects.as_ref(), only_subjects.as_ref(), @@ -444,6 +446,7 @@ async fn submit_patch( .send(Event::IngestionFailed { article_id: msgid_clone.clone(), error: format!("Failed to fetch thread: {}", e), + source: MessageSource::ApiFetchThread, }) .await; } @@ -487,6 +490,8 @@ async fn fetch_and_inject_thread( let event = Event::RawMboxSubmitted { raw, + submission_id: msgid.to_string(), + source: MessageSource::ApiFetchThread, group: "api-submit".to_string(), baseline: None, skip_subjects: None, @@ -1008,3 +1013,15 @@ async fn rerun_patch( Ok(Json(serde_json::json!({ "status": "accepted" }))) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_synthetic_id_format() { + let id = generate_synthetic_id("test"); + assert!(id.starts_with("sashiko-test-")); + assert!(id.ends_with("@sashiko.local")); + } +} diff --git a/src/db.rs b/src/db.rs index a5ef50aa..494f7b08 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3296,7 +3296,7 @@ impl Database { pub async fn create_fetching_patchset( &self, - article_id: &str, + root_msg_id: &str, subject: &str, skip_filters: Option<&Vec>, only_filters: Option<&Vec>, @@ -3305,13 +3305,7 @@ impl Database { .duration_since(std::time::UNIX_EPOCH)? .as_secs() as i64; - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; - - let clid_candidates = vec![article_id.to_string(), root_msg_id.clone()]; + let clid_candidates = vec![root_msg_id.to_string()]; let skip_filters_json = skip_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); let only_filters_json = only_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); @@ -3360,12 +3354,7 @@ impl Database { Err(anyhow::anyhow!("Failed to get patchset ID")) } } - pub async fn update_patchset_error(&self, article_id: &str, error: &str) -> Result<()> { - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; + pub async fn update_patchset_error(&self, root_msg_id: &str, error: &str) -> Result<()> { self.conn .execute( "UPDATE patchsets SET status = 'Failed', failed_reason = ? WHERE cover_letter_message_id = ?", diff --git a/src/events.rs b/src/events.rs index 2b7f1c71..3a96ce67 100644 --- a/src/events.rs +++ b/src/events.rs @@ -14,6 +14,16 @@ use crate::patch::{Patch, PatchsetMetadata}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MessageSource { + Nntp, + ApiInject, + ApiFetchThread, + GitFetch, + GitImport, + GitArchive, +} + #[derive(Debug)] #[allow(dead_code)] pub enum Event { @@ -39,6 +49,8 @@ pub enum Event { }, RawMboxSubmitted { raw: String, + submission_id: String, + source: MessageSource, group: String, baseline: Option, skip_subjects: Option>, @@ -47,6 +59,7 @@ pub enum Event { IngestionFailed { article_id: String, error: String, + source: MessageSource, }, } @@ -54,6 +67,7 @@ pub enum Event { pub struct ParsedArticle { pub group: String, pub article_id: String, + pub source: MessageSource, pub metadata: Option, pub patch: Option, pub baseline: Option, diff --git a/src/fetcher.rs b/src/fetcher.rs index c24b8d29..fff3fdcb 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::utils::redact_secret; use anyhow::{Result, anyhow}; use std::collections::{HashMap, HashSet}; @@ -134,6 +134,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit.clone(), error: format!("Failed to set up remote {}: {}", url, e), + source: MessageSource::GitFetch, }) .await; } @@ -155,6 +156,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit.clone(), error: format!("Failed to fetch from {}: {}", url, e), + source: MessageSource::GitFetch, }) .await; } @@ -185,6 +187,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: range.clone(), error: format!("Failed to resolve git range: {}", e), + source: MessageSource::GitFetch, }) .await; continue; @@ -225,6 +228,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit_or_range.clone(), error: format!("Failed to resolve SHA: {}", e), + source: MessageSource::GitFetch, }) .await; continue; @@ -252,6 +256,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit_or_range, error: format!("Failed to extract patch: {}", e), + source: MessageSource::GitFetch, }) .await; } diff --git a/src/main.rs b/src/main.rs index 0bc425b5..105b8a34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; -use sashiko::events::{Event, ParsedArticle}; +use sashiko::events::{Event, ParsedArticle, MessageSource}; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -235,11 +235,12 @@ async fn main() -> Result<(), Box> { let _permit = permit; // Hold permit until task completion match event { - Event::IngestionFailed { article_id, error } => { + Event::IngestionFailed { article_id, error, source } => { if let Err(e) = tx .send(ParsedArticle { group: "error".to_string(), article_id, + source, metadata: None, patch: None, baseline: None, @@ -298,10 +299,17 @@ async fn main() -> Result<(), Box> { part_index: index, }); + let source = if group.starts_with("git-import") { + MessageSource::GitImport + } else { + MessageSource::GitFetch + }; + if let Err(e) = tx .send(ParsedArticle { group, article_id, + source, metadata: Some(metadata), patch, baseline: base_commit, @@ -316,6 +324,8 @@ async fn main() -> Result<(), Box> { } Event::RawMboxSubmitted { raw, + submission_id, + source, group, baseline, skip_subjects, @@ -350,17 +360,14 @@ async fn main() -> Result<(), Box> { match parse_result { Ok(Ok((metadata, patch_opt))) => { - // Override group "api-submit" -> "manual" to avoid synthetic ID logic - let effective_group = if group_clone == "api-submit" { - "manual".to_string() - } else { - group_clone - }; + // Do not override group "api-submit" to allow grouping logic to trigger + let effective_group = group_clone; if let Err(e) = tx_clone .send(ParsedArticle { group: effective_group, - article_id: msg_id, + article_id: submission_id.clone(), + source, metadata: Some(metadata), patch: patch_opt, baseline: baseline_clone, @@ -415,6 +422,7 @@ async fn main() -> Result<(), Box> { .send(ParsedArticle { group, article_id, + source: MessageSource::Nntp, metadata: Some(metadata), patch: patch_opt, baseline, @@ -593,6 +601,7 @@ async fn process_parsed_article( let ParsedArticle { group, article_id, + source, metadata, patch, baseline, @@ -601,10 +610,12 @@ async fn process_parsed_article( only_filters, } = article; + let root_msg_id = resolve_root_msg_id(source, &article_id); + // Handle ingestion failure if let Some(err) = failed_error { info!("Handling ingestion failure for {}: {}", article_id, err); - if let Err(e) = worker_db.update_patchset_error(&article_id, &err).await { + if let Err(e) = worker_db.update_patchset_error(&root_msg_id, &err).await { error!("Failed to update patchset error in DB: {}", e); } return ProcessStatus::Ingested; // Successfully handled the failure event @@ -675,12 +686,6 @@ async fn process_parsed_article( } else if group == "git-fetch" || group == "api-submit" { // Group these by article_id (which is the range or single SHA/local_id) // For singletons, the message itself is the root. - let root_msg_id = if metadata.total == 1 { - metadata.message_id.clone() - } else { - format!("{}@sashiko.local", article_id) - }; - match worker_db .ensure_thread_for_message(&root_msg_id, metadata.date) .await @@ -820,7 +825,7 @@ async fn process_parsed_article( ); */ - let root_msg_id = format!("{}@sashiko.local", article_id); + let cover_letter_id = if group == "git-fetch" || group == "api-submit" { if metadata.total == 1 { Some(metadata.message_id.as_str()) @@ -854,7 +859,7 @@ async fn process_parsed_article( metadata.subject.clone(), metadata.author.clone(), metadata.total, - !group.starts_with("git-import"), + is_strict_author(source, metadata.total), ) }; @@ -1105,6 +1110,26 @@ fn calculate_embargo_hours( } } +fn resolve_root_msg_id(source: MessageSource, article_id: &str) -> String { + match source { + MessageSource::Nntp | MessageSource::ApiFetchThread | MessageSource::GitArchive | MessageSource::ApiInject => { + article_id.to_string() + } + MessageSource::GitFetch | MessageSource::GitImport => { + format!("{}@sashiko.local", article_id) + } + } +} + +fn is_strict_author(source: MessageSource, total_parts: u32) -> bool { + match source { + MessageSource::GitImport | MessageSource::GitArchive => false, + MessageSource::ApiInject if total_parts > 1 => false, + MessageSource::ApiFetchThread if total_parts > 1 => false, + _ => true, + } +} + fn identify_subsystems(to: &str, cc: &str) -> Vec<(String, String)> { let mut subsystems = Vec::new(); let mut all_recipients = String::new(); @@ -1312,4 +1337,28 @@ mod tests { 0 ); } + + #[test] + fn test_resolve_root_msg_id() { + assert_eq!(resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), "sashiko-123"); + assert_eq!(resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), "abc123_sha@sashiko.local"); + assert_eq!(resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), "range_a_b@sashiko.local"); + } + + #[test] + fn test_is_strict_author() { + assert!(is_strict_author(MessageSource::Nntp, 1)); + assert!(is_strict_author(MessageSource::Nntp, 6)); + assert!(!is_strict_author(MessageSource::ApiFetchThread, 6)); // Lenient for series + assert!(is_strict_author(MessageSource::GitFetch, 6)); + + assert!(!is_strict_author(MessageSource::GitImport, 6)); + assert!(!is_strict_author(MessageSource::GitArchive, 6)); + + assert!(is_strict_author(MessageSource::ApiInject, 1)); // Strict for singleton + assert!(!is_strict_author(MessageSource::ApiInject, 6)); // Lenient for series + } } From 6ce60513660741b2bb358283f1f33db6d885b7bc Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Tue, 28 Apr 2026 21:25:54 +0000 Subject: [PATCH 2/2] db: Fix linter issues (cargo clippy and cargo fmt) Removes a reference operator from root_msg_id in call to ensure_thread_for_message, as it is already a reference. And fixes formatting errors in src/main.rs :) Signed-off-by: Elkin Cruz --- src/db.rs | 2 +- src/main.rs | 46 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/db.rs b/src/db.rs index 494f7b08..fdc3b723 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3337,7 +3337,7 @@ impl Database { } // 2. Ensure a placeholder thread and message exist to satisfy Foreign Key constraints - let thread_id = self.ensure_thread_for_message(&root_msg_id, now).await?; + let thread_id = self.ensure_thread_for_message(root_msg_id, now).await?; // 3. Create the fetching patchset let mut rows = self.conn diff --git a/src/main.rs b/src/main.rs index 105b8a34..a1d70bdd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; -use sashiko::events::{Event, ParsedArticle, MessageSource}; +use sashiko::events::{Event, MessageSource, ParsedArticle}; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -235,7 +235,11 @@ async fn main() -> Result<(), Box> { let _permit = permit; // Hold permit until task completion match event { - Event::IngestionFailed { article_id, error, source } => { + Event::IngestionFailed { + article_id, + error, + source, + } => { if let Err(e) = tx .send(ParsedArticle { group: "error".to_string(), @@ -825,7 +829,6 @@ async fn process_parsed_article( ); */ - let cover_letter_id = if group == "git-fetch" || group == "api-submit" { if metadata.total == 1 { Some(metadata.message_id.as_str()) @@ -1112,9 +1115,10 @@ fn calculate_embargo_hours( fn resolve_root_msg_id(source: MessageSource, article_id: &str) -> String { match source { - MessageSource::Nntp | MessageSource::ApiFetchThread | MessageSource::GitArchive | MessageSource::ApiInject => { - article_id.to_string() - } + MessageSource::Nntp + | MessageSource::ApiFetchThread + | MessageSource::GitArchive + | MessageSource::ApiInject => article_id.to_string(), MessageSource::GitFetch | MessageSource::GitImport => { format!("{}@sashiko.local", article_id) } @@ -1340,12 +1344,30 @@ mod tests { #[test] fn test_resolve_root_msg_id() { - assert_eq!(resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), "sashiko-123"); - assert_eq!(resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), "abc123_sha@sashiko.local"); - assert_eq!(resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), "range_a_b@sashiko.local"); + assert_eq!( + resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), + "sashiko-123" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), + "abc123_sha@sashiko.local" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), + "range_a_b@sashiko.local" + ); } #[test]