diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-04-03 14:04:59 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-04-03 14:04:59 +0530 |
commit | 2ec005b4e29740f0d36e6646d149af192328b2ff (patch) | |
tree | 666945f7acefb7bf88adb1a84ef22ce368581ae6 /src/backend/replication/logical/logical.c | |
parent | e37662f22158c29bc55eda4eda1757f444cf701a (diff) | |
download | postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.tar.gz postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.zip |
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the
standby. Now, it is possible that at some particular restart_lsn there are
some running xacts, which means if we start reading the WAL from that
location after promotion, we won't reach a consistent snapshot state at
that point. However, on the primary, we would have already been in a
consistent snapshot state at that restart_lsn so we would have just
serialized the existing snapshot.
To avoid this problem we will use the advance_slot functionality unless
the snapshot already exists at the synced restart_lsn location. This will
help us to ensure that snapbuilder/slot statuses are updated properly
without generating any changes. Note that the synced slot will remain as
RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a
consistent snapshot state after which they will be marked as
RS_PERSISTENT.
Per buildfarm
Author: Hou Zhijie
Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 147 |
1 files changed, 143 insertions, 4 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c01..97a4d99c4e7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use physical replication slot for logical decoding"))); - if (slot->data.database != MyDatabaseId) + /* + * We need to access the system tables during decoding to build the + * logical changes unless we are in fast_forward mode where no changes are + * generated. + */ + if (slot->data.database != MyDatabaseId && !fast_forward) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); /* - * Do not allow consumption of a "synchronized" slot until the standby - * gets promoted. + * The slots being synced from the primary can't be used for decoding as + * they are used after failover. However, we do allow advancing the LSNs + * during the synchronization of slots. See update_local_synced_slot. */ - if (RecoveryInProgress() && slot->data.synced) + if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots()) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use replication slot \"%s\" for logical decoding", @@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) return has_pending_wal; } + +/* + * Helper function for advancing our logical replication slot forward. + * + * The slot's restart_lsn is used as start point for reading records, while + * confirmed_flush is used as base point for the decoding context. + * + * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, + * because we need to digest WAL to advance restart_lsn allowing to recycle + * WAL and removal of old catalog tuples. As decoding is done in fast_forward + * mode, no changes are generated anyway. + * + * *found_consistent_snapshot will be true if the initial decoding snapshot has + * been built; Otherwise, it will be false. + */ +XLogRecPtr +LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, + bool *found_consistent_snapshot) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn; + + Assert(moveto != InvalidXLogRecPtr); + + if (found_consistent_snapshot) + *found_consistent_snapshot = false; + + PG_TRY(); + { + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as InvalidXLogRecPtr, so that we start processing from my slot's + * confirmed_flush. + */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, /* fast_forward */ + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + + /* + * Start reading at the slot's restart_lsn, which we know to point to + * a valid record. + */ + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode records until we reach the requested target */ + while (ctx->reader->EndRecPtr < moveto) + { + char *errm = NULL; + XLogRecord *record; + + /* + * Read records. No changes are generated in fast_forward mode, + * but snapbuilder/slot statuses are updated properly. + */ + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "could not find record while advancing replication slot: %s", + errm); + + /* + * Process the record. Storage-level changes are ignored in + * fast_forward mode, but other modules (such as snapbuilder) + * might still have critical updates to do. + */ + if (record) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + CHECK_FOR_INTERRUPTS(); + } + + if (found_consistent_snapshot && DecodingContextReady(ctx)) + *found_consistent_snapshot = true; + + /* + * Logical decoding could have clobbered CurrentResourceOwner during + * transaction management, so restore the executor's value. (This is + * a kluge, but it's not worth cleaning up right now.) + */ + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush LSN has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it is written out at the next checkpoint. The + * LSN position advanced to may still be lost on a crash but this + * makes the data consistent after a clean shutdown. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} |