diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 880e83035d..e8709df3c8 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3747,14 +3747,36 @@ include_dir 'conf.d' are sent to archive storage by setting or . In addition to off, - to disable, there are two modes: on, and - always. During normal operation, there is no - difference between the two modes, but when set to always - the WAL archiver is enabled also during archive recovery or standby - mode. In always mode, all files restored from the archive - or streamed with streaming replication will be archived (again). See - for details. + to disable, there are three modes: on, shared, + and always. During normal operation as a primary, there is no + difference between the three modes, but they differ during archive recovery or + standby mode: + + + + on: Archives WAL only when running as a primary. + + + + + shared: Coordinates archiving between primary and standby. + The standby defers WAL archival and deletion until the primary confirms + archival via streaming replication. This prevents WAL history loss during + standby promotion in high availability setups. Upon promotion, the standby + automatically starts archiving any remaining unarchived WAL. This mode works + with cascading replication, where each standby coordinates with its immediate + upstream server. See for details. + + + + + always: Archives all WAL independently, even during recovery. + All files restored from the archive or streamed with streaming physical + replication will be archived (again), regardless of their source. + + + archive_mode is a separate setting from archive_command and diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index acf3ac0601..fc81deb542 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1385,35 +1385,61 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' - When continuous WAL archiving is used in a standby, there are two - different scenarios: the WAL archive can be shared between the primary - and the standby, or the standby can have its own WAL archive. When - the standby has its own WAL archive, set archive_mode + When continuous WAL archiving is used in a standby, there are three + different scenarios: the standby can have its own independent WAL archive, + the WAL archive can be shared between the primary and standby, or archiving + can be coordinated between them. + + + + For an independent archive, set archive_mode to always, and the standby will call the archive command for every WAL segment it receives, whether it's by restoring - from the archive or by streaming replication. The shared archive can - be handled similarly, but the archive_command or archive_library must - test if the file being archived exists already, and if the existing file - has identical contents. This requires more care in the - archive_command or archive_library, as it must - be careful to not overwrite an existing file with different contents, - but return success if the exactly same file is archived twice. And - all that must be done free of race conditions, if two servers attempt - to archive the same file at the same time. + from the archive or by streaming replication. + + + + For a shared archive where both primary and standby can write, use + always mode as well, but the archive_command + or archive_library must test if the file being archived + exists already, and if the existing file has identical contents. This requires + more care in the archive_command or archive_library, + as it must be careful to not overwrite an existing file with different contents, + but return success if the exactly same file is archived twice. And all that must + be done free of race conditions, if two servers attempt to archive the same file + at the same time. + + + + For coordinated archiving in high availability setups, use + archive_mode=shared. In this mode, only + the primary archives WAL segments. The standby creates .ready + files for received segments but defers actual archiving. The primary periodically + sends archival status updates to the standby via streaming replication, informing + it which segments have been archived. The standby then marks these as archived + and allows them to be recycled. Upon promotion, the standby automatically starts + archiving any remaining WAL segments that weren't confirmed as archived by the + former primary. This prevents WAL history loss during failover while avoiding + the complexity of coordinating concurrent archiving. This mode works with cascading + replication, where each standby coordinates with its immediate upstream server. If archive_mode is set to on, the - archiver is not enabled during recovery or standby mode. If the standby - server is promoted, it will start archiving after the promotion, but - will not archive any WAL or timeline history files that - it did not generate itself. To get a complete - series of WAL files in the archive, you must ensure that all WAL is - archived, before it reaches the standby. This is inherently true with - file-based log shipping, as the standby can only restore files that - are found in the archive, but not if streaming replication is enabled. - When a server is not in recovery mode, there is no difference between - on and always modes. + archiver is not enabled during recovery or standby mode, and this setting + cannot be used on a standby. If a standby with archive_mode + set to on is promoted, it will start archiving after the + promotion, but will not archive any WAL or timeline history files that it did + not generate itself. To get a complete series of WAL files in the archive, you + must ensure that all WAL is archived before it reaches the standby. This is + inherently true with file-based log shipping, as the standby can only restore + files that are found in the archive, but not if streaming replication is enabled. + + + + When a server is not in recovery mode, on, + shared, and always modes all behave + identically, archiving completed WAL segments. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 948cb1fc70..838be68f51 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -118,6 +118,7 @@ int wal_keep_size_mb = 0; int XLOGbuffers = -1; int XLogArchiveTimeout = 0; int XLogArchiveMode = ARCHIVE_MODE_OFF; +bool ycmdb_shared_archive = false; /* makes archive_mode=on act as shared */ char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; bool fullPageWrites = true; @@ -193,6 +194,7 @@ const struct config_enum_entry archive_mode_options[] = { {"always", ARCHIVE_MODE_ALWAYS, false}, {"on", ARCHIVE_MODE_ON, false}, {"off", ARCHIVE_MODE_OFF, false}, + {"shared", ARCHIVE_MODE_SHARED, false}, {"true", ARCHIVE_MODE_ON, true}, {"false", ARCHIVE_MODE_OFF, true}, {"yes", ARCHIVE_MODE_ON, true}, diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c index 02f91431f5..42050c3314 100644 --- a/src/backend/postmaster/pgarch.c +++ b/src/backend/postmaster/pgarch.c @@ -381,6 +381,15 @@ pgarch_ArchiverCopyLoop(void) { char xlog[MAX_XFN_CHARS + 1]; + /* + * In shared archive mode during recovery, the archiver doesn't archive + * files. The primary is responsible for archiving, and the walreceiver + * marks files as .done when the primary confirms archival. After + * promotion, the archiver starts working normally. + */ + if (EffectiveArchiveModeIsShared() && RecoveryInProgress()) + return; + /* force directory scan in the first call to pgarch_readyXlog() */ arch_files->arch_files_size = 0; @@ -471,10 +480,10 @@ pgarch_ArchiverCopyLoop(void) continue; } - if (pgarch_archiveXlog(xlog)) - { - /* successful */ - pgarch_archiveDone(xlog); + if (pgarch_archiveXlog(xlog)) + { + /* successful */ + pgarch_archiveDone(xlog); /* * Tell the cumulative stats system about the WAL file that we diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index acda5f68d9..d4900df98e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -130,6 +130,19 @@ static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; static StringInfoData reply_message; +/* Last archived WAL segment file reported by the primary */ +static char primary_last_archived[MAX_XFN_CHARS + 1]; +static TimeLineID primary_last_archived_tli = 0; +static XLogSegNo primary_last_archived_segno = 0; + +/* + * Last segment we successfully marked as .done. Used to optimize + * ProcessArchivalReport() by generating expected filenames instead + * of scanning the archive_status directory. + */ +static TimeLineID last_processed_tli = 0; +static XLogSegNo last_processed_segno = 0; + /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); @@ -143,6 +156,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessArchivalReport(void); static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); /* @@ -895,6 +909,30 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) XLogWalRcvSendReply(true, false); break; } + case PqReplMsg_ArchiveStatusReport: + { + /* Check that the filename looks valid */ + if (len >= sizeof(primary_last_archived)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid archival report message with length %d", + (int) len))); + + memcpy(primary_last_archived, buf, len); + primary_last_archived[len] = '\0'; + + /* Verify it contains only valid characters */ + if (strspn(buf, VALID_XFN_CHARS) != len) + { + primary_last_archived[0] = '\0'; + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("unexpected character in primary's last archived filename"))); + } + + ProcessArchivalReport(); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1073,12 +1111,44 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) /* * Create .done file forcibly to prevent the streamed segment from being - * archived later. + * archived later, unless archive_mode is 'always' or 'shared'. + * + * In 'always' mode, the standby archives independently. + * + * In 'shared' mode, we optimize by checking if this segment is already + * covered by the last archival report from the primary. If so, create + * .done directly. Otherwise, create .ready and wait for the next report. */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else + if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS) + { XLogArchiveNotify(xlogfname); + } + else if (EffectiveArchiveModeIsShared()) + { + /* + * In shared mode, check if this segment is already archived on primary. + * If we're on the same timeline and this segment is <= last archived, + * mark it .done immediately. Otherwise create .ready. + * + * We don't check ancestor timeline cases here to avoid reading timeline + * history files on every segment close. ProcessArchivalReport() will + * handle marking ancestor timeline segments as .done when it scans + * the archive_status directory. + */ + if (primary_last_archived_tli == recvFileTLI && + recvSegNo <= primary_last_archived_segno) + { + XLogArchiveForceDone(xlogfname); + } + else + { + XLogArchiveNotify(xlogfname); + } + } + else + { + XLogArchiveForceDone(xlogfname); + } recvFile = -1; } @@ -1255,6 +1325,187 @@ XLogWalRcvSendHSFeedback(bool immed) primary_has_standby_xmin = false; } +/* + * Process archival report from primary. + * + * The primary sends us the last WAL segment it has archived. We scan the + * archive_status directory for .ready files and mark segments on the same + * timeline as .done if they're <= the reported segment. + */ +static void +ProcessArchivalReport(void) +{ + TimeLineID reported_tli; + XLogSegNo reported_segno; + char status_path[MAXPGPATH]; + bool use_direct_check = false; + XLogSegNo start_segno; + + elog(DEBUG2, "received archival report from primary: %s", + primary_last_archived); + + /* Parse the reported WAL filename */ + if (!IsXLogFileName(primary_last_archived)) + { + elog(DEBUG2, "invalid WAL filename in archival report: %s", + primary_last_archived); + return; + } + + XLogFromFileName(primary_last_archived, &reported_tli, &reported_segno, + wal_segment_size); + + /* Remember the last archived segment for XLogWalRcvClose() */ + primary_last_archived_tli = reported_tli; + primary_last_archived_segno = reported_segno; + + /* + * Optimization: If the new report is on the same timeline as the last + * processed segment and moves forward, we can directly check for .ready + * files for segments between last_processed_segno and reported_segno + * instead of scanning the entire archive_status directory. + * + * Fall back to directory scan if: + * - Timeline changed (need to handle ancestor timelines) + * - This is the first report (last_processed_tli == 0) + * - Reported segment is not ahead (nothing new to process) + */ + if (last_processed_tli == reported_tli && + last_processed_tli != 0 && + reported_segno > last_processed_segno) + { + use_direct_check = true; + start_segno = last_processed_segno + 1; + } + + if (use_direct_check) + { + /* + * Direct check: generate filenames for expected segments. + * XLogArchiveForceDone() will handle the case where .ready doesn't + * exist or .done already exists, so no need to stat() first. + */ + XLogSegNo segno; + + for (segno = start_segno; segno <= reported_segno; segno++) + { + char walfile[MAXFNAMELEN]; + + /* Generate WAL filename and mark as archived */ + XLogFileName(walfile, reported_tli, segno, wal_segment_size); + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked WAL segment %s as archived (primary archived up to %s)", + walfile, primary_last_archived); + + /* Track the last segment we processed */ + last_processed_tli = reported_tli; + last_processed_segno = segno; + } + } + else + { + /* + * Directory scan: needed when timeline changed or first report. + * This handles both same-timeline and ancestor-timeline cases. + */ + DIR *status_dir; + struct dirent *status_de; + List *tli_history = NIL; + + snprintf(status_path, MAXPGPATH, XLOGDIR "/archive_status"); + status_dir = AllocateDir(status_path); + if (status_dir == NULL) + { + elog(DEBUG2, "could not open archive_status directory: %m"); + return; + } + + while ((status_de = ReadDir(status_dir, status_path)) != NULL) + { + char *ready_suffix; + char walfile[MAXPGPATH]; + TimeLineID file_tli; + XLogSegNo file_segno; + + /* Look for .ready files only */ + ready_suffix = strstr(status_de->d_name, ".ready"); + if (ready_suffix == NULL || ready_suffix[6] != '\0') + continue; + + /* Extract WAL filename (remove .ready suffix) */ + strlcpy(walfile, status_de->d_name, ready_suffix - status_de->d_name + 1); + + /* Parse the WAL filename */ + if (!IsXLogFileName(walfile)) + continue; + + XLogFromFileName(walfile, &file_tli, &file_segno, wal_segment_size); + + /* + * Mark as .done if: + * 1. Same timeline and segment <= reported segment, OR + * 2. Ancestor timeline and segment is before the timeline switch point + * + * For ancestor timelines: if primary archived segment X on timeline T, + * then all segments on ancestor timelines before the switch to T must + * have been archived (they're required to reach timeline T). + */ + if (file_tli == reported_tli && file_segno <= reported_segno) + { + /* Same timeline, segment already archived */ + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked WAL segment %s as archived (primary archived up to %s)", + walfile, primary_last_archived); + } + else if (file_tli != reported_tli) + { + /* + * Different timeline - check if it's an ancestor and if this + * segment is before the timeline switch point. Only read timeline + * history if we haven't already (lazy loading). + * + * Note: Timelines form a tree structure, not a linear sequence, + * so we can't use < or > to compare them. + */ + if (tli_history == NIL) + tli_history = readTimeLineHistory(reported_tli); + + if (tliInHistory(file_tli, tli_history)) + { + XLogRecPtr switchpoint; + XLogSegNo switchpoint_segno; + + /* Get the point where we switched away from this timeline */ + switchpoint = tliSwitchPoint(file_tli, tli_history, NULL); + + /* + * If the segment is at or before the switch point, it must have + * been archived (it's required to reach the reported timeline). + * The segment containing the switch point belongs to the old + * timeline up to the switch point and should be archived. + */ + XLByteToSeg(switchpoint, switchpoint_segno, wal_segment_size); + if (file_segno <= switchpoint_segno) + { + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked ancestor timeline segment %s as archived (before switch to timeline %u)", + walfile, reported_tli); + } + } + } + } + + FreeDir(status_dir); + + /* + * After a full directory scan following a timeline change, update + * our tracking to the newly reported position for future optimizations. + */ + last_processed_tli = reported_tli; + last_processed_segno = reported_segno; + } +} + /* * Update shared memory status upon receiving a message from primary. * diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 49998f08b1..ab7760ecfb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -83,6 +83,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -192,6 +193,17 @@ static TimestampTz last_reply_timestamp = 0; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* + * Last archived WAL file. This is fetched from pgstat periodically and sent + * to the standby. last_archival_report_timestamp tracks when we last sent + * the report to avoid excessive pgstat access. + */ +static char last_archived_wal[MAX_XFN_CHARS + 1]; +static TimestampTz last_archival_report_timestamp = 0; + +/* Interval for sending archival reports (10 seconds) */ +#define ARCHIVAL_REPORT_INTERVAL 10000 + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -292,6 +304,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void WalSndArchivalReport(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -2746,6 +2759,84 @@ ProcessStandbyHSFeedbackMessage(void) } } +/* + * Send archival status report to standby. + * + * This is called periodically during physical replication to inform the + * standby about the last WAL segment archived by the primary. The standby + * can then mark segments up to that point as .done, allowing them to be + * recycled. This prevents WAL loss during standby promotion. + */ +static void +WalSndArchivalReport(void) +{ + PgStat_ArchiverStats *archiver_stats; + TimestampTz now; + char *last_archived; + + /* Only send reports when shared archive is active */ + if (!EffectiveArchiveModeIsShared()) + return; + + /* Only send reports during physical streaming replication, not during backup */ + if (MyWalSnd->kind != REPLICATION_KIND_PHYSICAL) + return; + if (MyWalSnd->state != WALSNDSTATE_CATCHUP && + MyWalSnd->state != WALSNDSTATE_STREAMING) + return; + + /* + * Don't send to temporary replication slots (used by pg_basebackup). + * Connections without slots (regular standbys) are OK. + */ + if (MyReplicationSlot != NULL && + MyReplicationSlot->data.persistency == RS_TEMPORARY) + return; + + now = GetCurrentTimestamp(); + + /* + * Send report at most once per ARCHIVAL_REPORT_INTERVAL (10 seconds). + * This avoids excessive pgstat access. + */ + if (now < TimestampTzPlusMilliseconds(last_archival_report_timestamp, + ARCHIVAL_REPORT_INTERVAL)) + return; + last_archival_report_timestamp = now; + /* + * Get archiver statistics. We use non-blocking access to avoid delaying + * replication if stats collector is slow. If stats are unavailable or + * stale, we'll just try again at the next interval. + */ + archiver_stats = pgstat_fetch_stat_archiver(); + if (archiver_stats == NULL) + return; + + last_archived = archiver_stats->last_archived_wal; + /* + * Only send a report if the last archived WAL has changed. This is both + * an optimization and ensures we don't send empty reports on startup. + */ + if (strcmp(last_archived, last_archived_wal) == 0) + return; + + /* Only send reports for WAL segments, not backup history files or other archived files */ + if (!IsXLogFileName(last_archived)) + return; + + elog(DEBUG2, "sending archival report: %s", last_archived); + + /* Remember what we sent */ + strlcpy(last_archived_wal, last_archived, sizeof(last_archived_wal)); + + /* Construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, PqReplMsg_ArchiveStatusReport); + pq_sendbytes(&output_message, last_archived, strlen(last_archived)); + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); +} + /* * Compute how long send/receive loops should sleep. * @@ -4167,6 +4258,9 @@ WalSndKeepaliveIfNecessary(void) if (pq_flush_if_writable() != 0) WalSndShutdown(); } + + /* Send archival status report if needed */ + WalSndArchivalReport(); } /* diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c25ff52a58..9f7de099aa 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -31,6 +31,7 @@ #include "access/slru.h" #include "access/toast_compression.h" #include "access/twophase.h" +#include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" @@ -1190,6 +1191,17 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"ycmdb.shared_archive", PGC_POSTMASTER, WAL_ARCHIVING, + gettext_noop("Makes archive_mode=on behave as shared (for managed service compatibility)."), + gettext_noop("When true, archive_mode=on is treated as archive_mode=shared. Does not affect archive_mode=off or archive_mode=always. Used when control plane cannot configure archive_mode=shared directly."), + GUC_NOT_IN_SAMPLE + }, + &ycmdb_shared_archive, + false, + NULL, NULL, NULL + }, + { {"wal_init_zero", PGC_SUSET, WAL_SETTINGS, gettext_noop("Writes zeroes to new WAL files before first use."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 3d8bc98886..c28eeb88e3 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -63,8 +63,18 @@ typedef enum ArchiveMode ARCHIVE_MODE_OFF = 0, /* disabled */ ARCHIVE_MODE_ON, /* enabled while server is running normally */ ARCHIVE_MODE_ALWAYS, /* enabled always (even during recovery) */ + ARCHIVE_MODE_SHARED, /* shared archive between primary and standby */ } ArchiveMode; extern PGDLLIMPORT int XLogArchiveMode; +extern PGDLLIMPORT bool ycmdb_shared_archive; + +/* + * True when shared archive behavior is active: either archive_mode=shared + * or archive_mode=on with ycmdb.shared_archive=true (managed service). + */ +#define EffectiveArchiveModeIsShared() \ + (XLogArchiveMode == ARCHIVE_MODE_SHARED || \ + (XLogArchiveMode == ARCHIVE_MODE_ON && ycmdb_shared_archive)) /* WAL levels */ typedef enum WalLevel diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h index b71add1ec1..20b0871e90 100644 --- a/src/include/libpq/protocol.h +++ b/src/include/libpq/protocol.h @@ -69,6 +69,28 @@ #define PqMsg_Progress 'P' +/* Replication codes sent by the primary (wrapped in CopyData messages). */ + +#define PqReplMsg_ArchiveStatusReport 'a' +#define PqReplMsg_Keepalive 'k' +#define PqReplMsg_PrimaryStatusUpdate 's' +#define PqReplMsg_WALData 'w' + + +/* Replication codes sent by the standby (wrapped in CopyData messages). */ + +#define PqReplMsg_HotStandbyFeedback 'h' +#define PqReplMsg_PrimaryStatusRequest 'p' +#define PqReplMsg_StandbyStatusUpdate 'r' + + +/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */ + +#define PqBackupMsg_Manifest 'm' +#define PqBackupMsg_NewArchive 'n' +#define PqBackupMsg_ProgressReport 'p' + + /* These are the authentication request codes sent by the backend. */ #define AUTH_REQ_OK 0 /* User is authenticated */ diff --git a/src/test/recovery/t/050_archive_shared.pl b/src/test/recovery/t/050_archive_shared.pl new file mode 100644 index 0000000000..397b71ad79 --- /dev/null +++ b/src/test/recovery/t/050_archive_shared.pl @@ -0,0 +1,270 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test archive_mode=shared for coordinated WAL archiving between primary and standby +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use File::Path qw(rmtree); + +# Initialize primary node with archiving +my $archive_dir = PostgreSQL::Test::Utils::tempdir(); +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(has_archiving => 1, allows_streaming => 1); +$primary->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_keep_size = 128MB +"); +$primary->start; + +# Create a test table and generate some WAL +$primary->safe_psql('postgres', 'CREATE TABLE test_table (id int, data text);'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1, 500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(501, 1000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for archiver to archive segments +$primary->poll_query_until('postgres', + "SELECT archived_count > 0 FROM pg_stat_archiver") + or die "Timed out waiting for archiver to start"; + +my $archived_count = () = glob("$archive_dir/*"); +ok($archived_count > 0, "primary has archived WAL files to shared archive"); +note("Primary archived $archived_count files"); + +# Take backup for standby +my $backup_name = 'standby_backup'; +$primary->backup($backup_name); + +# Exclude possible race condition when backup WAL is last archived +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(501, 1000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Set up standby with archive_mode=shared +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($primary, $backup_name, has_streaming => 1); +$standby->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby->start; + +# Wait for standby to catch up +$primary->wait_for_catchup($standby); + +# Generate more WAL on primary (these are new segments not yet archived) +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1001, 1500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1501, 2000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for standby to receive the new WAL +$primary->wait_for_catchup($standby); + +# Check that standby has .ready or .done files for the newly received segments. +# Normally they should be .ready (not yet archived by primary), but in rare cases +# the archiver could be very fast and an archive report sent immediately, creating +# .done files instead. Both are correct behavior - the key is that files exist. +my $standby_archive_status = $standby->data_dir . '/pg_wal/archive_status'; +my $status_count = 0; +if (opendir(my $dh, $standby_archive_status)) +{ + my @files = grep { /\.(ready|done)$/ } readdir($dh); + $status_count = scalar(@files); + my $ready_count = scalar(grep { /\.ready$/ } @files); + my $done_count = scalar(grep { /\.done$/ } @files); + note("Standby has $ready_count .ready files and $done_count .done files"); + closedir($dh); +} +cmp_ok($status_count, '>', 0, "standby creates archive status files for received WAL"); + +# Generate more WAL and wait for archiving on primary +my $initial_archived = $primary->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'more-data' || i FROM generate_series(2001, 2500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'more-data2' || i FROM generate_series(2501, 3000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for primary to archive the new segments +$primary->poll_query_until('postgres', + "SELECT archived_count > $initial_archived FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive new segments"; + +# Wait for standby to catch up (archive status is sent during replication) +$primary->wait_for_catchup($standby); + +# Wait for primary to send archival status updates and standby to process them +# The standby should mark segments as .done after receiving archive status from primary +my $done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $done_count = 0; + if (opendir(my $dh, $standby_archive_status)) + { + $done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $done_count > 0; + sleep(1); +} +ok($done_count > 0, "standby marked segments as .done after primary's archival report"); +note("Standby has $done_count .done files"); + +############################################################################### +# Test 2: Standby promotion - verify archiver activates +############################################################################### + +# Before promotion, verify archiver is not running on standby (shared mode during recovery) +# In shared mode, the standby's archiver should not be archiving during recovery +my $archived_before = $standby->safe_psql('postgres', + "SELECT archived_count FROM pg_stat_archiver"); +is($archived_before, '0', + "archiver not active on standby before promotion (archived_count=0)"); + +# Verify standby is still in recovery before promoting +my $in_recovery = $standby->safe_psql('postgres', "SELECT pg_is_in_recovery();"); +is($in_recovery, 't', "standby is in recovery before promotion"); + +# Promote the standby +$standby->promote; +$standby->poll_query_until('postgres', "SELECT NOT pg_is_in_recovery();"); + +# Generate WAL on new primary (former standby) +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'post-promotion' || i FROM generate_series(2001, 2500) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for archiver to activate and archive the new WAL +# Check pg_stat_archiver to verify archiving is happening +$standby->poll_query_until('postgres', + "SELECT archived_count > 0 FROM pg_stat_archiver") + or die "Timed out waiting for promoted standby to start archiving"; +pass("promoted standby started archiving"); + +# Verify data integrity +my $count = $standby->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($count >= 2500, "promoted standby has all data (got $count rows)"); + +############################################################################### +# Test 3: Cascading replication +############################################################################### + +# Take a backup from the promoted standby (now the new primary) +my $promoted_backup = 'promoted_backup'; +$standby->backup($promoted_backup); + +# Set up second-level standby (cascading from first standby, now promoted) +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($standby, $promoted_backup, has_streaming => 1); +$standby2->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby2->start; + +# Generate WAL on promoted standby (now primary for standby2) +my $cascading_archived_before = $standby->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'cascading' || i FROM generate_series(2501, 3000) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for the promoted standby (acting as primary) to archive the new segment +$standby->poll_query_until('postgres', + "SELECT archived_count > $cascading_archived_before FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive segment in cascading test"; + +# Wait for cascading standby to catch up +$standby->wait_for_catchup($standby2); + +# Wait for cascading standby to receive archive status and mark segments as .done +my $standby2_archive_status = $standby2->data_dir . '/pg_wal/archive_status'; +my $standby2_done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby2_done_count = 0; + if (opendir(my $dh, $standby2_archive_status)) + { + $standby2_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby2_done_count > 0; + sleep(1); +} +ok($standby2_done_count > 0, "cascading standby marks segments as .done"); +note("Cascading standby has $standby2_done_count .done files"); + +# Verify cascading standby has all data +my $standby2_count = $standby2->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($standby2_count >= 3000, "cascading standby has all data (got $standby2_count rows)"); + +############################################################################### +# Test 4: Multiple standbys from same primary +############################################################################### + +# Create third standby from promoted standby (current primary) +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +my $backup2 = 'multi_standby_backup'; +$standby->backup($backup2); +$standby3->init_from_backup($standby, $backup2, has_streaming => 1); +$standby3->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby3->start; + +# Generate WAL and ensure both standbys receive it +my $standby_archived_before = $standby->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'multi' || i FROM generate_series(3001, 3500) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for the promoted standby (acting as primary) to archive the new segment +$standby->poll_query_until('postgres', + "SELECT archived_count > $standby_archived_before FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive segment in multi-standby test"; + +$standby->wait_for_catchup($standby2); +$standby->wait_for_catchup($standby3); + +# Verify both standbys eventually mark segments as .done +my $standby3_archive_status = $standby3->data_dir . '/pg_wal/archive_status'; + +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby2_done_count = 0; + if (opendir(my $dh, $standby2_archive_status)) + { + $standby2_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby2_done_count > 0; + sleep(1); +} + +my $standby3_done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby3_done_count = 0; + if (opendir(my $dh, $standby3_archive_status)) + { + $standby3_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby3_done_count > 0; + sleep(1); +} + +ok($standby2_done_count > 0, "standby2 marks segments as .done"); +ok($standby3_done_count > 0, "standby3 marks segments as .done"); +note("standby2 has $standby2_done_count .done files, standby3 has $standby3_done_count .done files"); + +# Verify both standbys have all data +$standby2_count = $standby2->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +my $standby3_count = $standby3->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($standby2_count >= 3500, "standby2 has all data (got $standby2_count rows)"); +ok($standby3_count >= 3500, "standby3 has all data (got $standby3_count rows)"); + +done_testing();