aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-04-03 14:04:59 +0530
committerAmit Kapila <akapila@postgresql.org>2024-04-03 14:04:59 +0530
commit2ec005b4e29740f0d36e6646d149af192328b2ff (patch)
tree666945f7acefb7bf88adb1a84ef22ce368581ae6 /src/backend/replication/logical/logical.c
parente37662f22158c29bc55eda4eda1757f444cf701a (diff)
downloadpostgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.tar.gz
postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.zip
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the standby. Now, it is possible that at some particular restart_lsn there are some running xacts, which means if we start reading the WAL from that location after promotion, we won't reach a consistent snapshot state at that point. However, on the primary, we would have already been in a consistent snapshot state at that restart_lsn so we would have just serialized the existing snapshot. To avoid this problem we will use the advance_slot functionality unless the snapshot already exists at the synced restart_lsn location. This will help us to ensure that snapbuilder/slot statuses are updated properly without generating any changes. Note that the synced slot will remain as RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a consistent snapshot state after which they will be marked as RS_PERSISTENT. Per buildfarm Author: Hou Zhijie Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c147
1 files changed, 143 insertions, 4 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c01..97a4d99c4e7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
#include "replication/snapbuild.h"
#include "storage/proc.h"
#include "storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use physical replication slot for logical decoding")));
- if (slot->data.database != MyDatabaseId)
+ /*
+ * We need to access the system tables during decoding to build the
+ * logical changes unless we are in fast_forward mode where no changes are
+ * generated.
+ */
+ if (slot->data.database != MyDatabaseId && !fast_forward)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot \"%s\" was not created in this database",
NameStr(slot->data.name))));
/*
- * Do not allow consumption of a "synchronized" slot until the standby
- * gets promoted.
+ * The slots being synced from the primary can't be used for decoding as
+ * they are used after failover. However, we do allow advancing the LSNs
+ * during the synchronization of slots. See update_local_synced_slot.
*/
- if (RecoveryInProgress() && slot->data.synced)
+ if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
return has_pending_wal;
}
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *found_consistent_snapshot will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
+ bool *found_consistent_snapshot)
+{
+ LogicalDecodingContext *ctx;
+ ResourceOwner old_resowner = CurrentResourceOwner;
+ XLogRecPtr retlsn;
+
+ Assert(moveto != InvalidXLogRecPtr);
+
+ if (found_consistent_snapshot)
+ *found_consistent_snapshot = false;
+
+ PG_TRY();
+ {
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
+ * confirmed_flush.
+ */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true, /* fast_forward */
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
+
+ /*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to moveto lsn.
+ */
+ WaitForStandbyConfirmation(moveto);
+
+ /*
+ * Start reading at the slot's restart_lsn, which we know to point to
+ * a valid record.
+ */
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+ /* invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Decode records until we reach the requested target */
+ while (ctx->reader->EndRecPtr < moveto)
+ {
+ char *errm = NULL;
+ XLogRecord *record;
+
+ /*
+ * Read records. No changes are generated in fast_forward mode,
+ * but snapbuilder/slot statuses are updated properly.
+ */
+ record = XLogReadRecord(ctx->reader, &errm);
+ if (errm)
+ elog(ERROR, "could not find record while advancing replication slot: %s",
+ errm);
+
+ /*
+ * Process the record. Storage-level changes are ignored in
+ * fast_forward mode, but other modules (such as snapbuilder)
+ * might still have critical updates to do.
+ */
+ if (record)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (found_consistent_snapshot && DecodingContextReady(ctx))
+ *found_consistent_snapshot = true;
+
+ /*
+ * Logical decoding could have clobbered CurrentResourceOwner during
+ * transaction management, so restore the executor's value. (This is
+ * a kluge, but it's not worth cleaning up right now.)
+ */
+ CurrentResourceOwner = old_resowner;
+
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ {
+ LogicalConfirmReceivedLocation(moveto);
+
+ /*
+ * If only the confirmed_flush LSN has changed the slot won't get
+ * marked as dirty by the above. Callers on the walsender
+ * interface are expected to keep track of their own progress and
+ * don't need it written out. But SQL-interface users cannot
+ * specify their own start positions and it's harder for them to
+ * keep track of their progress, so we should make more of an
+ * effort to save it for them.
+ *
+ * Dirty the slot so it is written out at the next checkpoint. The
+ * LSN position advanced to may still be lost on a crash but this
+ * makes the data consistent after a clean shutdown.
+ */
+ ReplicationSlotMarkDirty();
+ }
+
+ retlsn = MyReplicationSlot->data.confirmed_flush;
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return retlsn;
+}