diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index d4bd981dfa..0fbfc96198 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3542,14 +3542,36 @@ include_dir 'conf.d' When archive_mode is enabled, completed WAL segments are sent to archive storage by setting . In addition to off, - to disable, there are two modes: on, and - always. During normal operation, there is no - difference between the two modes, but when set to always - the WAL archiver is enabled also during archive recovery or standby - mode. In always mode, all files restored from the archive - or streamed with streaming replication will be archived (again). See - for details. + to disable, there are three modes: on, shared, + and always. During normal operation as a primary, there is no + difference between the three modes, but they differ during archive recovery or + standby mode: + + + + on: Archives WAL only when running as a primary. + + + + + shared: Coordinates archiving between primary and standby. + The standby defers WAL archival and deletion until the primary confirms + archival via streaming replication. This prevents WAL history loss during + standby promotion in high availability setups. Upon promotion, the standby + automatically starts archiving any remaining unarchived WAL. This mode works + with cascading replication, where each standby coordinates with its immediate + upstream server. See for details. + + + + + always: Archives all WAL independently, even during recovery. + All files restored from the archive or streamed with streaming physical + replication will be archived (again), regardless of their source. + + + archive_mode and archive_command are separate variables so that archive_command can be diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 1a81feb5c6..e4d823d3a2 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1379,35 +1379,60 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' - When continuous WAL archiving is used in a standby, there are two - different scenarios: the WAL archive can be shared between the primary - and the standby, or the standby can have its own WAL archive. When - the standby has its own WAL archive, set archive_mode + When continuous WAL archiving is used in a standby, there are three + different scenarios: the standby can have its own independent WAL archive, + the WAL archive can be shared between the primary and standby, or archiving + can be coordinated between them. + + + + For an independent archive, set archive_mode to always, and the standby will call the archive command for every WAL segment it receives, whether it's by restoring - from the archive or by streaming replication. The shared archive can - be handled similarly, but the archive_command must - test if the file being archived exists already, and if the existing file + from the archive or by streaming replication. + + + + For a shared archive where both primary and standby can write, use + always mode as well, but the archive_command + must test if the file being archived exists already, and if the existing file has identical contents. This requires more care in the - archive_command, as it must - be careful to not overwrite an existing file with different contents, - but return success if the exactly same file is archived twice. And - all that must be done free of race conditions, if two servers attempt - to archive the same file at the same time. + archive_command, as it must be careful to not overwrite an + existing file with different contents, but return success if the exactly same + file is archived twice. And all that must be done free of race conditions, if + two servers attempt to archive the same file at the same time. + + + + For coordinated archiving in high availability setups, use + archive_mode=shared. In this mode, only + the primary archives WAL segments. The standby creates .ready + files for received segments but defers actual archiving. The primary periodically + sends archival status updates to the standby via streaming replication, informing + it which segments have been archived. The standby then marks these as archived + and allows them to be recycled. Upon promotion, the standby automatically starts + archiving any remaining WAL segments that weren't confirmed as archived by the + former primary. This prevents WAL history loss during failover while avoiding + the complexity of coordinating concurrent archiving. This mode works with cascading + replication, where each standby coordinates with its immediate upstream server. If archive_mode is set to on, the - archiver is not enabled during recovery or standby mode. If the standby - server is promoted, it will start archiving after the promotion, but - will not archive any WAL or timeline history files that - it did not generate itself. To get a complete - series of WAL files in the archive, you must ensure that all WAL is - archived, before it reaches the standby. This is inherently true with - file-based log shipping, as the standby can only restore files that - are found in the archive, but not if streaming replication is enabled. - When a server is not in recovery mode, there is no difference between - on and always modes. + archiver is not enabled during recovery or standby mode, and this setting + cannot be used on a standby. If a standby with archive_mode + set to on is promoted, it will start archiving after the + promotion, but will not archive any WAL or timeline history files that it did + not generate itself. To get a complete series of WAL files in the archive, you + must ensure that all WAL is archived before it reaches the standby. This is + inherently true with file-based log shipping, as the standby can only restore + files that are found in the archive, but not if streaming replication is enabled. + + + + When a server is not in recovery mode, on, + shared, and always modes all behave + identically, archiving completed WAL segments. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a4908cce42..373ee0034e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -96,6 +96,7 @@ int wal_keep_size_mb = 0; int XLOGbuffers = -1; int XLogArchiveTimeout = 0; int XLogArchiveMode = ARCHIVE_MODE_OFF; +bool ycmdb_shared_archive = false; /* makes archive_mode=on act as shared */ char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; bool fullPageWrites = true; @@ -166,6 +167,7 @@ const struct config_enum_entry archive_mode_options[] = { {"always", ARCHIVE_MODE_ALWAYS, false}, {"on", ARCHIVE_MODE_ON, false}, {"off", ARCHIVE_MODE_OFF, false}, + {"shared", ARCHIVE_MODE_SHARED, false}, {"true", ARCHIVE_MODE_ON, true}, {"false", ARCHIVE_MODE_OFF, true}, {"yes", ARCHIVE_MODE_ON, true}, diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c index 6b99be85a9..865ec535fc 100644 --- a/src/backend/postmaster/pgarch.c +++ b/src/backend/postmaster/pgarch.c @@ -377,6 +377,15 @@ pgarch_ArchiverCopyLoop(void) /* force directory scan in the first call to pgarch_readyXlog() */ arch_files->arch_files_size = 0; + /* + * In shared archive mode during recovery, the archiver doesn't archive + * files. The primary is responsible for archiving, and the walreceiver + * marks files as .done when the primary confirms archival. After + * promotion, the archiver starts working normally. + */ + if (EffectiveArchiveModeIsShared() && RecoveryInProgress()) + return; + /* * loop through all xlogs with archive_status of .ready and archive * them...mostly we expect this to be a single file, though it is possible @@ -458,10 +467,10 @@ pgarch_ArchiverCopyLoop(void) continue; } - if (pgarch_archiveXlog(xlog)) - { - /* successful */ - pgarch_archiveDone(xlog); + if (pgarch_archiveXlog(xlog)) + { + /* successful */ + pgarch_archiveDone(xlog); /* * Tell the collector about the WAL file that we successfully diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7ae04b9682..fbdcc7353b 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -61,6 +61,7 @@ #include "common/ip.h" #include "funcapi.h" #include "libpq/pqformat.h" +#include "libpq/protocol.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -118,6 +119,19 @@ static struct static StringInfoData reply_message; static StringInfoData incoming_message; +/* Last archived WAL segment file reported by the primary */ +static char primary_last_archived[MAX_XFN_CHARS + 1]; +static TimeLineID primary_last_archived_tli = 0; +static XLogSegNo primary_last_archived_segno = 0; + +/* + * Last segment we successfully marked as .done. Used to optimize + * ProcessArchivalReport() by generating expected filenames instead + * of scanning the archive_status directory. + */ +static TimeLineID last_processed_tli = 0; +static XLogSegNo last_processed_segno = 0; + /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); @@ -129,6 +143,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessArchivalReport(void); /* * Process any interrupts the walreceiver process may have received. @@ -863,6 +878,30 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogWalRcvSendReply(true, false); break; } + case PqReplMsg_ArchiveStatusReport: + { + /* Check that the filename looks valid */ + if (len >= sizeof(primary_last_archived)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid archival report message with length %d", + (int) len))); + + memcpy(primary_last_archived, buf, len); + primary_last_archived[len] = '\0'; + + /* Verify it contains only valid characters */ + if (strspn(buf, VALID_XFN_CHARS) != len) + { + primary_last_archived[0] = '\0'; + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("unexpected character in primary's last archived filename"))); + } + + ProcessArchivalReport(); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1036,12 +1075,44 @@ XLogWalRcvClose(XLogRecPtr recptr) /* * Create .done file forcibly to prevent the streamed segment from being - * archived later. + * archived later, unless archive_mode is 'always' or 'shared'. + * + * In 'always' mode, the standby archives independently. + * + * In 'shared' mode, we optimize by checking if this segment is already + * covered by the last archival report from the primary. If so, create + * .done directly. Otherwise, create .ready and wait for the next report. */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else + if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS) + { XLogArchiveNotify(xlogfname); + } + else if (EffectiveArchiveModeIsShared()) + { + /* + * In shared mode, check if this segment is already archived on primary. + * If we're on the same timeline and this segment is <= last archived, + * mark it .done immediately. Otherwise create .ready. + * + * We don't check ancestor timeline cases here to avoid reading timeline + * history files on every segment close. ProcessArchivalReport() will + * handle marking ancestor timeline segments as .done when it scans + * the archive_status directory. + */ + if (primary_last_archived_tli == recvFileTLI && + recvSegNo <= primary_last_archived_segno) + { + XLogArchiveForceDone(xlogfname); + } + else + { + XLogArchiveNotify(xlogfname); + } + } + else + { + XLogArchiveForceDone(xlogfname); + } recvFile = -1; } @@ -1223,6 +1294,187 @@ XLogWalRcvSendHSFeedback(bool immed) primary_has_standby_xmin = false; } +/* + * Process archival report from primary. + * + * The primary sends us the last WAL segment it has archived. We scan the + * archive_status directory for .ready files and mark segments on the same + * timeline as .done if they're <= the reported segment. + */ +static void +ProcessArchivalReport(void) +{ + TimeLineID reported_tli; + XLogSegNo reported_segno; + char status_path[MAXPGPATH]; + bool use_direct_check = false; + XLogSegNo start_segno; + + elog(DEBUG2, "received archival report from primary: %s", + primary_last_archived); + + /* Parse the reported WAL filename */ + if (!IsXLogFileName(primary_last_archived)) + { + elog(DEBUG2, "invalid WAL filename in archival report: %s", + primary_last_archived); + return; + } + + XLogFromFileName(primary_last_archived, &reported_tli, &reported_segno, + wal_segment_size); + + /* Remember the last archived segment for XLogWalRcvClose() */ + primary_last_archived_tli = reported_tli; + primary_last_archived_segno = reported_segno; + + /* + * Optimization: If the new report is on the same timeline as the last + * processed segment and moves forward, we can directly check for .ready + * files for segments between last_processed_segno and reported_segno + * instead of scanning the entire archive_status directory. + * + * Fall back to directory scan if: + * - Timeline changed (need to handle ancestor timelines) + * - This is the first report (last_processed_tli == 0) + * - Reported segment is not ahead (nothing new to process) + */ + if (last_processed_tli == reported_tli && + last_processed_tli != 0 && + reported_segno > last_processed_segno) + { + use_direct_check = true; + start_segno = last_processed_segno + 1; + } + + if (use_direct_check) + { + /* + * Direct check: generate filenames for expected segments. + * XLogArchiveForceDone() will handle the case where .ready doesn't + * exist or .done already exists, so no need to stat() first. + */ + XLogSegNo segno; + + for (segno = start_segno; segno <= reported_segno; segno++) + { + char walfile[MAXFNAMELEN]; + + /* Generate WAL filename and mark as archived */ + XLogFileName(walfile, reported_tli, segno, wal_segment_size); + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked WAL segment %s as archived (primary archived up to %s)", + walfile, primary_last_archived); + + /* Track the last segment we processed */ + last_processed_tli = reported_tli; + last_processed_segno = segno; + } + } + else + { + /* + * Directory scan: needed when timeline changed or first report. + * This handles both same-timeline and ancestor-timeline cases. + */ + DIR *status_dir; + struct dirent *status_de; + List *tli_history = NIL; + + snprintf(status_path, MAXPGPATH, XLOGDIR "/archive_status"); + status_dir = AllocateDir(status_path); + if (status_dir == NULL) + { + elog(DEBUG2, "could not open archive_status directory: %m"); + return; + } + + while ((status_de = ReadDir(status_dir, status_path)) != NULL) + { + char *ready_suffix; + char walfile[MAXPGPATH]; + TimeLineID file_tli; + XLogSegNo file_segno; + + /* Look for .ready files only */ + ready_suffix = strstr(status_de->d_name, ".ready"); + if (ready_suffix == NULL || ready_suffix[6] != '\0') + continue; + + /* Extract WAL filename (remove .ready suffix) */ + strlcpy(walfile, status_de->d_name, ready_suffix - status_de->d_name + 1); + + /* Parse the WAL filename */ + if (!IsXLogFileName(walfile)) + continue; + + XLogFromFileName(walfile, &file_tli, &file_segno, wal_segment_size); + + /* + * Mark as .done if: + * 1. Same timeline and segment <= reported segment, OR + * 2. Ancestor timeline and segment is before the timeline switch point + * + * For ancestor timelines: if primary archived segment X on timeline T, + * then all segments on ancestor timelines before the switch to T must + * have been archived (they're required to reach timeline T). + */ + if (file_tli == reported_tli && file_segno <= reported_segno) + { + /* Same timeline, segment already archived */ + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked WAL segment %s as archived (primary archived up to %s)", + walfile, primary_last_archived); + } + else if (file_tli != reported_tli) + { + /* + * Different timeline - check if it's an ancestor and if this + * segment is before the timeline switch point. Only read timeline + * history if we haven't already (lazy loading). + * + * Note: Timelines form a tree structure, not a linear sequence, + * so we can't use < or > to compare them. + */ + if (tli_history == NIL) + tli_history = readTimeLineHistory(reported_tli); + + if (tliInHistory(file_tli, tli_history)) + { + XLogRecPtr switchpoint; + XLogSegNo switchpoint_segno; + + /* Get the point where we switched away from this timeline */ + switchpoint = tliSwitchPoint(file_tli, tli_history, NULL); + + /* + * If the segment is at or before the switch point, it must have + * been archived (it's required to reach the reported timeline). + * The segment containing the switch point belongs to the old + * timeline up to the switch point and should be archived. + */ + XLByteToSeg(switchpoint, switchpoint_segno, wal_segment_size); + if (file_segno <= switchpoint_segno) + { + XLogArchiveForceDone(walfile); + elog(DEBUG3, "marked ancestor timeline segment %s as archived (before switch to timeline %u)", + walfile, reported_tli); + } + } + } + } + + FreeDir(status_dir); + + /* + * After a full directory scan following a timeline change, update + * our tracking to the newly reported position for future optimizations. + */ + last_processed_tli = reported_tli; + last_processed_segno = reported_segno; + } +} + /* * Update shared memory status upon receiving a message from primary. * diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 014ef7e208..764d78e0b3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -62,6 +62,7 @@ #include "commands/defrem.h" #include "funcapi.h" #include "libpq/libpq.h" +#include "libpq/protocol.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/replnodes.h" @@ -174,6 +175,17 @@ static TimestampTz last_reply_timestamp = 0; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* + * Last archived WAL file. This is fetched from pgstat periodically and sent + * to the standby. last_archival_report_timestamp tracks when we last sent + * the report to avoid excessive pgstat access. + */ +static char last_archived_wal[MAX_XFN_CHARS + 1]; +static TimestampTz last_archival_report_timestamp = 0; + +/* Interval for sending archival reports (10 seconds) */ +#define ARCHIVAL_REPORT_INTERVAL 10000 + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -291,6 +303,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void WalSndArchivalReport(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply); @@ -2279,6 +2292,84 @@ ProcessStandbyHSFeedbackMessage(void) } } +/* + * Send archival status report to standby. + * + * This is called periodically during physical replication to inform the + * standby about the last WAL segment archived by the primary. The standby + * can then mark segments up to that point as .done, allowing them to be + * recycled. This prevents WAL loss during standby promotion. + */ +static void +WalSndArchivalReport(void) +{ + PgStat_ArchiverStats *archiver_stats; + TimestampTz now; + char *last_archived; + + /* Only send reports when shared archive is active */ + if (!EffectiveArchiveModeIsShared()) + return; + + /* Only send reports during physical streaming replication, not logical or backup */ + if (am_db_walsender) + return; + if (MyWalSnd->state != WALSNDSTATE_CATCHUP && + MyWalSnd->state != WALSNDSTATE_STREAMING) + return; + + /* + * Don't send to temporary replication slots (used by pg_basebackup). + * Connections without slots (regular standbys) are OK. + */ + if (MyReplicationSlot != NULL && + MyReplicationSlot->data.persistency == RS_TEMPORARY) + return; + + now = GetCurrentTimestamp(); + + /* + * Send report at most once per ARCHIVAL_REPORT_INTERVAL (10 seconds). + * This avoids excessive pgstat access. + */ + if (now < TimestampTzPlusMilliseconds(last_archival_report_timestamp, + ARCHIVAL_REPORT_INTERVAL)) + return; + last_archival_report_timestamp = now; + /* + * Get archiver statistics. We use non-blocking access to avoid delaying + * replication if stats collector is slow. If stats are unavailable or + * stale, we'll just try again at the next interval. + */ + archiver_stats = pgstat_fetch_stat_archiver(); + if (archiver_stats == NULL) + return; + + last_archived = archiver_stats->last_archived_wal; + /* + * Only send a report if the last archived WAL has changed. This is both + * an optimization and ensures we don't send empty reports on startup. + */ + if (strcmp(last_archived, last_archived_wal) == 0) + return; + + /* Only send reports for WAL segments, not backup history files or other archived files */ + if (!IsXLogFileName(last_archived)) + return; + + elog(DEBUG2, "sending archival report: %s", last_archived); + + /* Remember what we sent */ + strlcpy(last_archived_wal, last_archived, sizeof(last_archived_wal)); + + /* Construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, PqReplMsg_ArchiveStatusReport); + pq_sendbytes(&output_message, last_archived, strlen(last_archived)); + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); +} + /* * Compute how long send/receive loops should sleep. * @@ -3622,6 +3713,9 @@ WalSndKeepaliveIfNecessary(void) if (pq_flush_if_writable() != 0) WalSndShutdown(); } + + /* Send archival status report if needed */ + WalSndArchivalReport(); } /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9ed8cb10ae..1e001ca74e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -41,6 +41,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/yc_checker.h" +#include "access/xlog.h" #include "access/xlog_internal.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" @@ -1326,6 +1327,17 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"ycmdb.shared_archive", PGC_POSTMASTER, WAL_ARCHIVING, + gettext_noop("Makes archive_mode=on behave as shared (for managed service compatibility)."), + gettext_noop("When true, archive_mode=on is treated as archive_mode=shared. Does not affect archive_mode=off or archive_mode=always. Used when control plane cannot configure archive_mode=shared directly."), + GUC_NOT_IN_SAMPLE + }, + &ycmdb_shared_archive, + false, + NULL, NULL, NULL + }, + { {"wal_init_zero", PGC_SUSET, WAL_SETTINGS, gettext_noop("Writes zeroes to new WAL files before first use."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a0706c06a7..4bda330e57 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -155,9 +155,19 @@ typedef enum ArchiveMode { ARCHIVE_MODE_OFF = 0, /* disabled */ ARCHIVE_MODE_ON, /* enabled while server is running normally */ - ARCHIVE_MODE_ALWAYS /* enabled always (even during recovery) */ + ARCHIVE_MODE_ALWAYS, /* enabled always (even during recovery) */ + ARCHIVE_MODE_SHARED, /* shared archive between primary and standby */ } ArchiveMode; -extern int XLogArchiveMode; +extern PGDLLIMPORT int XLogArchiveMode; +extern PGDLLIMPORT bool ycmdb_shared_archive; + +/* + * True when shared archive behavior is active: either archive_mode=shared + * or archive_mode=on with ycmdb.shared_archive=true (managed service). + */ +#define EffectiveArchiveModeIsShared() \ + (XLogArchiveMode == ARCHIVE_MODE_SHARED || \ + (XLogArchiveMode == ARCHIVE_MODE_ON && ycmdb_shared_archive)) /* WAL levels */ typedef enum WalLevel diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h new file mode 100644 index 0000000000..925c7568ea --- /dev/null +++ b/src/include/libpq/protocol.h @@ -0,0 +1,111 @@ +/*------------------------------------------------------------------------- + * + * protocol.h + * Definitions of the request/response codes for the wire protocol. + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/protocol.h + * + *------------------------------------------------------------------------- + */ +#ifndef PROTOCOL_H +#define PROTOCOL_H + +/* These are the request codes sent by the frontend. */ + +#define PqMsg_Bind 'B' +#define PqMsg_Close 'C' +#define PqMsg_Describe 'D' +#define PqMsg_Execute 'E' +#define PqMsg_FunctionCall 'F' +#define PqMsg_Flush 'H' +#define PqMsg_Parse 'P' +#define PqMsg_Query 'Q' +#define PqMsg_Sync 'S' +#define PqMsg_Terminate 'X' +#define PqMsg_CopyFail 'f' +#define PqMsg_GSSResponse 'p' +#define PqMsg_PasswordMessage 'p' +#define PqMsg_SASLInitialResponse 'p' +#define PqMsg_SASLResponse 'p' + + +/* These are the response codes sent by the backend. */ + +#define PqMsg_ParseComplete '1' +#define PqMsg_BindComplete '2' +#define PqMsg_CloseComplete '3' +#define PqMsg_NotificationResponse 'A' +#define PqMsg_CommandComplete 'C' +#define PqMsg_DataRow 'D' +#define PqMsg_ErrorResponse 'E' +#define PqMsg_CopyInResponse 'G' +#define PqMsg_CopyOutResponse 'H' +#define PqMsg_EmptyQueryResponse 'I' +#define PqMsg_BackendKeyData 'K' +#define PqMsg_NoticeResponse 'N' +#define PqMsg_AuthenticationRequest 'R' +#define PqMsg_ParameterStatus 'S' +#define PqMsg_RowDescription 'T' +#define PqMsg_FunctionCallResponse 'V' +#define PqMsg_CopyBothResponse 'W' +#define PqMsg_ReadyForQuery 'Z' +#define PqMsg_NoData 'n' +#define PqMsg_PortalSuspended 's' +#define PqMsg_ParameterDescription 't' +#define PqMsg_NegotiateProtocolVersion 'v' + + +/* These are the codes sent by both the frontend and backend. */ + +#define PqMsg_CopyDone 'c' +#define PqMsg_CopyData 'd' + + +/* These are the codes sent by parallel workers to leader processes. */ +#define PqMsg_Progress 'P' + + +/* Replication codes sent by the primary (wrapped in CopyData messages). */ + +#define PqReplMsg_ArchiveStatusReport 'a' +#define PqReplMsg_Keepalive 'k' +#define PqReplMsg_PrimaryStatusUpdate 's' +#define PqReplMsg_WALData 'w' + + +/* Replication codes sent by the standby (wrapped in CopyData messages). */ + +#define PqReplMsg_HotStandbyFeedback 'h' +#define PqReplMsg_PrimaryStatusRequest 'p' +#define PqReplMsg_StandbyStatusUpdate 'r' + + +/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */ + +#define PqBackupMsg_Manifest 'm' +#define PqBackupMsg_NewArchive 'n' +#define PqBackupMsg_ProgressReport 'p' + + +/* These are the authentication request codes sent by the backend. */ + +#define AUTH_REQ_OK 0 /* User is authenticated */ +#define AUTH_REQ_KRB4 1 /* Kerberos V4. Not supported any more. */ +#define AUTH_REQ_KRB5 2 /* Kerberos V5. Not supported any more. */ +#define AUTH_REQ_PASSWORD 3 /* Password */ +#define AUTH_REQ_CRYPT 4 /* crypt password. Not supported any more. */ +#define AUTH_REQ_MD5 5 /* md5 password */ +/* 6 is available. It was used for SCM creds, not supported any more. */ +#define AUTH_REQ_GSS 7 /* GSSAPI without wrap() */ +#define AUTH_REQ_GSS_CONT 8 /* Continue GSS exchanges */ +#define AUTH_REQ_SSPI 9 /* SSPI negotiate without wrap() */ +#define AUTH_REQ_SASL 10 /* Begin SASL authentication */ +#define AUTH_REQ_SASL_CONT 11 /* Continue SASL authentication */ +#define AUTH_REQ_SASL_FIN 12 /* Final SASL message */ +#define AUTH_REQ_MAX AUTH_REQ_SASL_FIN /* maximum AUTH_REQ_* value */ + +#endif /* PROTOCOL_H */ diff --git a/src/test/recovery/t/050_archive_shared.pl b/src/test/recovery/t/050_archive_shared.pl new file mode 100644 index 0000000000..397b71ad79 --- /dev/null +++ b/src/test/recovery/t/050_archive_shared.pl @@ -0,0 +1,270 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test archive_mode=shared for coordinated WAL archiving between primary and standby +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use File::Path qw(rmtree); + +# Initialize primary node with archiving +my $archive_dir = PostgreSQL::Test::Utils::tempdir(); +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(has_archiving => 1, allows_streaming => 1); +$primary->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_keep_size = 128MB +"); +$primary->start; + +# Create a test table and generate some WAL +$primary->safe_psql('postgres', 'CREATE TABLE test_table (id int, data text);'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1, 500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(501, 1000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for archiver to archive segments +$primary->poll_query_until('postgres', + "SELECT archived_count > 0 FROM pg_stat_archiver") + or die "Timed out waiting for archiver to start"; + +my $archived_count = () = glob("$archive_dir/*"); +ok($archived_count > 0, "primary has archived WAL files to shared archive"); +note("Primary archived $archived_count files"); + +# Take backup for standby +my $backup_name = 'standby_backup'; +$primary->backup($backup_name); + +# Exclude possible race condition when backup WAL is last archived +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(501, 1000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Set up standby with archive_mode=shared +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($primary, $backup_name, has_streaming => 1); +$standby->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby->start; + +# Wait for standby to catch up +$primary->wait_for_catchup($standby); + +# Generate more WAL on primary (these are new segments not yet archived) +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1001, 1500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'data' || i FROM generate_series(1501, 2000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for standby to receive the new WAL +$primary->wait_for_catchup($standby); + +# Check that standby has .ready or .done files for the newly received segments. +# Normally they should be .ready (not yet archived by primary), but in rare cases +# the archiver could be very fast and an archive report sent immediately, creating +# .done files instead. Both are correct behavior - the key is that files exist. +my $standby_archive_status = $standby->data_dir . '/pg_wal/archive_status'; +my $status_count = 0; +if (opendir(my $dh, $standby_archive_status)) +{ + my @files = grep { /\.(ready|done)$/ } readdir($dh); + $status_count = scalar(@files); + my $ready_count = scalar(grep { /\.ready$/ } @files); + my $done_count = scalar(grep { /\.done$/ } @files); + note("Standby has $ready_count .ready files and $done_count .done files"); + closedir($dh); +} +cmp_ok($status_count, '>', 0, "standby creates archive status files for received WAL"); + +# Generate more WAL and wait for archiving on primary +my $initial_archived = $primary->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'more-data' || i FROM generate_series(2001, 2500) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$primary->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'more-data2' || i FROM generate_series(2501, 3000) i;"); +$primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for primary to archive the new segments +$primary->poll_query_until('postgres', + "SELECT archived_count > $initial_archived FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive new segments"; + +# Wait for standby to catch up (archive status is sent during replication) +$primary->wait_for_catchup($standby); + +# Wait for primary to send archival status updates and standby to process them +# The standby should mark segments as .done after receiving archive status from primary +my $done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $done_count = 0; + if (opendir(my $dh, $standby_archive_status)) + { + $done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $done_count > 0; + sleep(1); +} +ok($done_count > 0, "standby marked segments as .done after primary's archival report"); +note("Standby has $done_count .done files"); + +############################################################################### +# Test 2: Standby promotion - verify archiver activates +############################################################################### + +# Before promotion, verify archiver is not running on standby (shared mode during recovery) +# In shared mode, the standby's archiver should not be archiving during recovery +my $archived_before = $standby->safe_psql('postgres', + "SELECT archived_count FROM pg_stat_archiver"); +is($archived_before, '0', + "archiver not active on standby before promotion (archived_count=0)"); + +# Verify standby is still in recovery before promoting +my $in_recovery = $standby->safe_psql('postgres', "SELECT pg_is_in_recovery();"); +is($in_recovery, 't', "standby is in recovery before promotion"); + +# Promote the standby +$standby->promote; +$standby->poll_query_until('postgres', "SELECT NOT pg_is_in_recovery();"); + +# Generate WAL on new primary (former standby) +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'post-promotion' || i FROM generate_series(2001, 2500) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for archiver to activate and archive the new WAL +# Check pg_stat_archiver to verify archiving is happening +$standby->poll_query_until('postgres', + "SELECT archived_count > 0 FROM pg_stat_archiver") + or die "Timed out waiting for promoted standby to start archiving"; +pass("promoted standby started archiving"); + +# Verify data integrity +my $count = $standby->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($count >= 2500, "promoted standby has all data (got $count rows)"); + +############################################################################### +# Test 3: Cascading replication +############################################################################### + +# Take a backup from the promoted standby (now the new primary) +my $promoted_backup = 'promoted_backup'; +$standby->backup($promoted_backup); + +# Set up second-level standby (cascading from first standby, now promoted) +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($standby, $promoted_backup, has_streaming => 1); +$standby2->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby2->start; + +# Generate WAL on promoted standby (now primary for standby2) +my $cascading_archived_before = $standby->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'cascading' || i FROM generate_series(2501, 3000) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for the promoted standby (acting as primary) to archive the new segment +$standby->poll_query_until('postgres', + "SELECT archived_count > $cascading_archived_before FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive segment in cascading test"; + +# Wait for cascading standby to catch up +$standby->wait_for_catchup($standby2); + +# Wait for cascading standby to receive archive status and mark segments as .done +my $standby2_archive_status = $standby2->data_dir . '/pg_wal/archive_status'; +my $standby2_done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby2_done_count = 0; + if (opendir(my $dh, $standby2_archive_status)) + { + $standby2_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby2_done_count > 0; + sleep(1); +} +ok($standby2_done_count > 0, "cascading standby marks segments as .done"); +note("Cascading standby has $standby2_done_count .done files"); + +# Verify cascading standby has all data +my $standby2_count = $standby2->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($standby2_count >= 3000, "cascading standby has all data (got $standby2_count rows)"); + +############################################################################### +# Test 4: Multiple standbys from same primary +############################################################################### + +# Create third standby from promoted standby (current primary) +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +my $backup2 = 'multi_standby_backup'; +$standby->backup($backup2); +$standby3->init_from_backup($standby, $backup2, has_streaming => 1); +$standby3->append_conf('postgresql.conf', " +archive_mode = shared +archive_command = 'cp %p \"$archive_dir\"/%f' +wal_receiver_status_interval = 1s +"); +$standby3->start; + +# Generate WAL and ensure both standbys receive it +my $standby_archived_before = $standby->safe_psql('postgres', 'SELECT archived_count FROM pg_stat_archiver'); +$standby->safe_psql('postgres', "INSERT INTO test_table SELECT i, 'multi' || i FROM generate_series(3001, 3500) i;"); +$standby->safe_psql('postgres', 'SELECT pg_switch_wal();'); + +# Wait for the promoted standby (acting as primary) to archive the new segment +$standby->poll_query_until('postgres', + "SELECT archived_count > $standby_archived_before FROM pg_stat_archiver") + or die "Timed out waiting for primary to archive segment in multi-standby test"; + +$standby->wait_for_catchup($standby2); +$standby->wait_for_catchup($standby3); + +# Verify both standbys eventually mark segments as .done +my $standby3_archive_status = $standby3->data_dir . '/pg_wal/archive_status'; + +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby2_done_count = 0; + if (opendir(my $dh, $standby2_archive_status)) + { + $standby2_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby2_done_count > 0; + sleep(1); +} + +my $standby3_done_count = 0; +for (my $i = 0; $i < $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $standby3_done_count = 0; + if (opendir(my $dh, $standby3_archive_status)) + { + $standby3_done_count = scalar(grep { /\.done$/ } readdir($dh)); + closedir($dh); + } + last if $standby3_done_count > 0; + sleep(1); +} + +ok($standby2_done_count > 0, "standby2 marks segments as .done"); +ok($standby3_done_count > 0, "standby3 marks segments as .done"); +note("standby2 has $standby2_done_count .done files, standby3 has $standby3_done_count .done files"); + +# Verify both standbys have all data +$standby2_count = $standby2->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +my $standby3_count = $standby3->safe_psql('postgres', 'SELECT COUNT(*) FROM test_table;'); +ok($standby2_count >= 3500, "standby2 has all data (got $standby2_count rows)"); +ok($standby3_count >= 3500, "standby3 has all data (got $standby3_count rows)"); + +done_testing();