Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/slot_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ use crate::types::SlotName;
/// Checks if an error indicates a replication slot has been invalidated.
///
/// Postgres returns error code 55000 (OBJECT_NOT_IN_PREREQUISITE_STATE) with the message
/// "can no longer get changes from replication slot" when a slot is invalidated.
/// "can no longer get changes from replication slot" or
/// "cannot read from logical replication slot" when a slot is invalidated.
#[must_use]
pub fn is_slot_invalidation_error(error: &EtlError) -> bool {
let msg = error.to_string().to_lowercase();
msg.contains("can no longer get changes from replication slot")
|| msg.contains("cannot read from logical replication slot")
|| msg.contains("has been invalidated because it exceeded the maximum reserved size")
}

/// Handles recovery from an invalidated replication slot.
Expand Down Expand Up @@ -180,6 +183,24 @@ mod tests {
assert!(is_slot_invalidation_error(&error));
}

#[test]
fn test_is_slot_invalidation_error_logical_slot_read_message() {
let error = etl::etl_error!(
etl::error::ErrorKind::InvalidState,
"db error: ERROR: cannot read from logical replication slot \"test_slot\""
);
assert!(is_slot_invalidation_error(&error));
}

#[test]
fn test_is_slot_invalidation_error_maximum_reserved_size_message() {
let error = etl::etl_error!(
etl::error::ErrorKind::InvalidState,
"DETAIL: This slot has been invalidated because it exceeded the maximum reserved size"
);
assert!(is_slot_invalidation_error(&error));
}

#[test]
fn test_is_slot_invalidation_error_no_match() {
let error = etl::etl_error!(etl::error::ErrorKind::InvalidState, "connection refused");
Expand Down
78 changes: 51 additions & 27 deletions tests/slot_recovery_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use std::time::Duration;
use etl::store::both::postgres::PostgresStore;
use postgres_stream::migrations::migrate_etl;
use postgres_stream::sink::memory::MemorySink;
use postgres_stream::slot_recovery::handle_slot_recovery;
use postgres_stream::slot_recovery::{handle_slot_recovery, is_slot_invalidation_error};
use postgres_stream::stream::PgStream;
use postgres_stream::test_utils::{
TestDatabase, acquire_exclusive_test_lock, test_stream_config_with_id, unique_pipeline_id,
};

/// Test that a pipeline fails when attempting to use an invalidated slot.
/// Test startup with an inactive, invalidated slot still triggers recovery flow.
#[tokio::test(flavor = "multi_thread")]
async fn test_pipeline_fails_on_invalidated_slot() {
async fn test_start_with_inactive_invalidated_slot_triggers_recovery() {
// Acquire exclusive lock since we modify system settings
let _lock = acquire_exclusive_test_lock().await;

Expand Down Expand Up @@ -81,6 +81,19 @@ async fn test_pipeline_fails_on_invalidated_slot() {

assert!(slot_exists, "Slot should persist after graceful shutdown");

// Verify slot is inactive before invalidation and restart.
let slot_active: bool = sqlx::query_scalar(&format!(
"select active from pg_replication_slots where slot_name = '{slot_name}'"
))
.fetch_one(&db.pool)
.await
.unwrap();

assert!(
!slot_active,
"Slot should be inactive after pipeline shutdown"
);

// Step 4: Invalidate the slot by generating WAL
sqlx::query("alter system set max_slot_wal_keep_size = '1MB'")
.execute(&db.pool)
Expand Down Expand Up @@ -123,18 +136,22 @@ async fn test_pipeline_fails_on_invalidated_slot() {
.unwrap();
sqlx::query("checkpoint").execute(&db.pool).await.unwrap();

// Verify slot is now invalidated
let wal_status: String = sqlx::query_scalar(&format!(
"select wal_status from pg_replication_slots where slot_name = '{slot_name}'"
// Verify slot is now invalidated and still inactive
let slot_state: (String, bool) = sqlx::query_as(&format!(
"select wal_status, active from pg_replication_slots where slot_name = '{slot_name}'"
))
.fetch_one(&db.pool)
.await
.unwrap();

assert_eq!(
wal_status, "lost",
slot_state.0, "lost",
"Slot should be invalidated (wal_status=lost)"
);
assert!(
!slot_state.1,
"Slot should still be inactive when restart begins"
);

// Verify confirmed_flush_lsn is preserved for recovery
let confirmed_lsn: Option<String> = sqlx::query_scalar(&format!(
Expand Down Expand Up @@ -172,8 +189,8 @@ async fn test_pipeline_fails_on_invalidated_slot() {
.await
.unwrap();

// Assert the expected behavior
match start_result {
// Assert startup fails with a slot invalidation error.
let slot_error = match start_result {
Ok(()) => {
// If start succeeded, wait a bit and check if it fails during operation
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -190,13 +207,7 @@ async fn test_pipeline_fails_on_invalidated_slot() {
"Pipeline should have failed due to invalidated slot, but completed successfully"
);
}
Ok(Err(e)) => {
let error_msg = e.to_string().to_lowercase();
assert!(
error_msg.contains("slot") || error_msg.contains("replication"),
"Expected slot-related error, got: {e}"
);
}
Ok(Err(e)) => e,
Err(_) => {
// Timeout - pipeline is still running, need to shut it down
let _ = shutdown_tx.shutdown();
Expand All @@ -206,17 +217,30 @@ async fn test_pipeline_fails_on_invalidated_slot() {
}
}
}
Err(e) => {
let error_msg = e.to_string().to_lowercase();
// This is expected - the pipeline should fail because the slot is invalidated
assert!(
error_msg.contains("slot")
|| error_msg.contains("replication")
|| error_msg.contains("can no longer"),
"Expected slot-related error, got: {e}"
);
}
}
Err(e) => e,
};

assert!(
is_slot_invalidation_error(&slot_error),
"Expected slot invalidation error, got: {slot_error}"
);

// Simulate the startup recovery loop trigger path from core.rs.
handle_slot_recovery(&db.pool, pipeline_id)
.await
.expect("Slot recovery should succeed for inactive invalidated slot");

let old_slot_dropped: bool = sqlx::query_scalar(&format!(
"SELECT NOT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = '{slot_name}')"
))
.fetch_one(&db.pool)
.await
.unwrap();

assert!(
old_slot_dropped,
"Recovery should drop invalidated inactive slot"
);
}

/// Test that handle_slot_recovery sets up failover checkpoint and allows pipeline restart.
Expand Down