diff --git a/src/slot_recovery.rs b/src/slot_recovery.rs index e823784..cac9d61 100644 --- a/src/slot_recovery.rs +++ b/src/slot_recovery.rs @@ -27,11 +27,14 @@ use crate::types::SlotName; /// Checks if an error indicates a replication slot has been invalidated. /// /// Postgres returns error code 55000 (OBJECT_NOT_IN_PREREQUISITE_STATE) with the message -/// "can no longer get changes from replication slot" when a slot is invalidated. +/// "can no longer get changes from replication slot" or +/// "cannot read from logical replication slot" when a slot is invalidated. #[must_use] pub fn is_slot_invalidation_error(error: &EtlError) -> bool { let msg = error.to_string().to_lowercase(); msg.contains("can no longer get changes from replication slot") + || msg.contains("cannot read from logical replication slot") + || msg.contains("has been invalidated because it exceeded the maximum reserved size") } /// Handles recovery from an invalidated replication slot. @@ -180,6 +183,24 @@ mod tests { assert!(is_slot_invalidation_error(&error)); } + #[test] + fn test_is_slot_invalidation_error_logical_slot_read_message() { + let error = etl::etl_error!( + etl::error::ErrorKind::InvalidState, + "db error: ERROR: cannot read from logical replication slot \"test_slot\"" + ); + assert!(is_slot_invalidation_error(&error)); + } + + #[test] + fn test_is_slot_invalidation_error_maximum_reserved_size_message() { + let error = etl::etl_error!( + etl::error::ErrorKind::InvalidState, + "DETAIL: This slot has been invalidated because it exceeded the maximum reserved size" + ); + assert!(is_slot_invalidation_error(&error)); + } + #[test] fn test_is_slot_invalidation_error_no_match() { let error = etl::etl_error!(etl::error::ErrorKind::InvalidState, "connection refused"); diff --git a/tests/slot_recovery_tests.rs b/tests/slot_recovery_tests.rs index cd7806f..960e00b 100644 --- a/tests/slot_recovery_tests.rs +++ b/tests/slot_recovery_tests.rs @@ -12,15 +12,15 @@ use std::time::Duration; use etl::store::both::postgres::PostgresStore; use postgres_stream::migrations::migrate_etl; use postgres_stream::sink::memory::MemorySink; -use postgres_stream::slot_recovery::handle_slot_recovery; +use postgres_stream::slot_recovery::{handle_slot_recovery, is_slot_invalidation_error}; use postgres_stream::stream::PgStream; use postgres_stream::test_utils::{ TestDatabase, acquire_exclusive_test_lock, test_stream_config_with_id, unique_pipeline_id, }; -/// Test that a pipeline fails when attempting to use an invalidated slot. +/// Test startup with an inactive, invalidated slot still triggers recovery flow. #[tokio::test(flavor = "multi_thread")] -async fn test_pipeline_fails_on_invalidated_slot() { +async fn test_start_with_inactive_invalidated_slot_triggers_recovery() { // Acquire exclusive lock since we modify system settings let _lock = acquire_exclusive_test_lock().await; @@ -81,6 +81,19 @@ async fn test_pipeline_fails_on_invalidated_slot() { assert!(slot_exists, "Slot should persist after graceful shutdown"); + // Verify slot is inactive before invalidation and restart. + let slot_active: bool = sqlx::query_scalar(&format!( + "select active from pg_replication_slots where slot_name = '{slot_name}'" + )) + .fetch_one(&db.pool) + .await + .unwrap(); + + assert!( + !slot_active, + "Slot should be inactive after pipeline shutdown" + ); + // Step 4: Invalidate the slot by generating WAL sqlx::query("alter system set max_slot_wal_keep_size = '1MB'") .execute(&db.pool) @@ -123,18 +136,22 @@ async fn test_pipeline_fails_on_invalidated_slot() { .unwrap(); sqlx::query("checkpoint").execute(&db.pool).await.unwrap(); - // Verify slot is now invalidated - let wal_status: String = sqlx::query_scalar(&format!( - "select wal_status from pg_replication_slots where slot_name = '{slot_name}'" + // Verify slot is now invalidated and still inactive + let slot_state: (String, bool) = sqlx::query_as(&format!( + "select wal_status, active from pg_replication_slots where slot_name = '{slot_name}'" )) .fetch_one(&db.pool) .await .unwrap(); assert_eq!( - wal_status, "lost", + slot_state.0, "lost", "Slot should be invalidated (wal_status=lost)" ); + assert!( + !slot_state.1, + "Slot should still be inactive when restart begins" + ); // Verify confirmed_flush_lsn is preserved for recovery let confirmed_lsn: Option = sqlx::query_scalar(&format!( @@ -172,8 +189,8 @@ async fn test_pipeline_fails_on_invalidated_slot() { .await .unwrap(); - // Assert the expected behavior - match start_result { + // Assert startup fails with a slot invalidation error. + let slot_error = match start_result { Ok(()) => { // If start succeeded, wait a bit and check if it fails during operation tokio::time::sleep(Duration::from_secs(1)).await; @@ -190,13 +207,7 @@ async fn test_pipeline_fails_on_invalidated_slot() { "Pipeline should have failed due to invalidated slot, but completed successfully" ); } - Ok(Err(e)) => { - let error_msg = e.to_string().to_lowercase(); - assert!( - error_msg.contains("slot") || error_msg.contains("replication"), - "Expected slot-related error, got: {e}" - ); - } + Ok(Err(e)) => e, Err(_) => { // Timeout - pipeline is still running, need to shut it down let _ = shutdown_tx.shutdown(); @@ -206,17 +217,30 @@ async fn test_pipeline_fails_on_invalidated_slot() { } } } - Err(e) => { - let error_msg = e.to_string().to_lowercase(); - // This is expected - the pipeline should fail because the slot is invalidated - assert!( - error_msg.contains("slot") - || error_msg.contains("replication") - || error_msg.contains("can no longer"), - "Expected slot-related error, got: {e}" - ); - } - } + Err(e) => e, + }; + + assert!( + is_slot_invalidation_error(&slot_error), + "Expected slot invalidation error, got: {slot_error}" + ); + + // Simulate the startup recovery loop trigger path from core.rs. + handle_slot_recovery(&db.pool, pipeline_id) + .await + .expect("Slot recovery should succeed for inactive invalidated slot"); + + let old_slot_dropped: bool = sqlx::query_scalar(&format!( + "SELECT NOT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = '{slot_name}')" + )) + .fetch_one(&db.pool) + .await + .unwrap(); + + assert!( + old_slot_dropped, + "Recovery should drop invalidated inactive slot" + ); } /// Test that handle_slot_recovery sets up failover checkpoint and allows pipeline restart.