diff --git a/rust/cloud-storage/email_service/src/pubsub/backfill/increment_counters.rs b/rust/cloud-storage/email_service/src/pubsub/backfill/increment_counters.rs index b3da8175a7..c9fde06859 100644 --- a/rust/cloud-storage/email_service/src/pubsub/backfill/increment_counters.rs +++ b/rust/cloud-storage/email_service/src/pubsub/backfill/increment_counters.rs @@ -98,7 +98,7 @@ pub async fn incr_completed_messages( /// performs actions when all threads and messages have been backfilled for the user. #[tracing::instrument(skip(ctx))] -async fn handle_job_completed( +pub(super) async fn handle_job_completed( ctx: &PubSubContext, link: &Link, job_id: Uuid, diff --git a/rust/cloud-storage/email_service/src/pubsub/backfill/init.rs b/rust/cloud-storage/email_service/src/pubsub/backfill/init.rs index d1756e35d1..2467a65b0a 100644 --- a/rust/cloud-storage/email_service/src/pubsub/backfill/init.rs +++ b/rust/cloud-storage/email_service/src/pubsub/backfill/init.rs @@ -1,3 +1,4 @@ +use super::increment_counters; use crate::pubsub::context::PubSubContext; use crate::pubsub::util::{CheckGmailRateLimitArgs, check_gmail_rate_limit}; use crate::util::process_pre_insert::sync_labels::sync_labels; @@ -91,6 +92,14 @@ pub async fn init_backfill( }) })?; + // With no threads to list, no per-thread message is ever enqueued, so the + // per-thread completion path that finalizes the job never runs. Complete + // the job directly instead. + if total_threads == 0 { + increment_counters::handle_job_completed(ctx, link, scope.job_id).await?; + return Ok(()); + } + ctx.redis_client .init_backfill_job_progress(scope.job_id, total_threads) .await diff --git a/rust/cloud-storage/email_service/src/pubsub/backfill/list_threads.rs b/rust/cloud-storage/email_service/src/pubsub/backfill/list_threads.rs index db9aa4671d..fd9295e122 100644 --- a/rust/cloud-storage/email_service/src/pubsub/backfill/list_threads.rs +++ b/rust/cloud-storage/email_service/src/pubsub/backfill/list_threads.rs @@ -1,3 +1,4 @@ +use super::increment_counters; use crate::pubsub::context::PubSubContext; use crate::pubsub::util::{CheckGmailRateLimitArgs, check_gmail_rate_limit}; use models_email::email::service::backfill::{ @@ -63,12 +64,13 @@ pub async fn list_threads( // pass along token if it exists for fetching next batch of thread_ids let next_page_token = thread_list.next_page_token.clone(); + let threads = thread_list.threads; // add the threads we just discovered to the job counter email_db_client::backfill::job::update::update_job_threads_retrieved_count( &ctx.db, scope.job_id, - thread_list.threads.len() as i32, + threads.len() as i32, ) .await .map_err(|e| { @@ -79,7 +81,7 @@ pub async fn list_threads( })?; // send a pubsub message for each discovered thread - for thread in thread_list.threads { + for thread in &threads { let thread_sqs_msg = BackfillPubsubMessage { backfill_operation: BackfillOperation::BackfillThread(JobScopedPayload { link_id: scope.link_id, @@ -120,6 +122,18 @@ pub async fn list_threads( source: e.context("Failed to enqueue list threads message".to_string()), }) })?; + } else if threads.is_empty() && threads_retrieved_count == 0 { + // A truly empty mailbox is completed in init_backfill (total_threads + // is 0 there). Reaching here means the profile reported threads but + // the listing returned none, so no per-thread message was ever + // enqueued and the per-thread completion path will never finalize the + // job. Complete it here. + increment_counters::handle_job_completed(ctx, link, scope.job_id).await?; + let _ = ctx + .redis_client + .delete_backfill_job_progress(scope.job_id) + .await + .inspect_err(|e| tracing::error!(error = ?e, "Failed to delete backfill job progress")); } Ok(())