aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/slotfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r--src/backend/replication/slotfuncs.c200
1 files changed, 200 insertions, 0 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index b02df593e9c..93d2e20f760 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -17,11 +17,14 @@
#include "miscadmin.h"
#include "access/htup_details.h"
+#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/pg_lsn.h"
+#include "utils/resowner.h"
static void
check_permissions(void)
@@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
return (Datum) 0;
}
+
+/*
+ * Helper function for advancing physical replication slot forward.
+ */
+static XLogRecPtr
+pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+ XLogRecPtr retlsn = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (MyReplicationSlot->data.restart_lsn < moveto)
+ {
+ MyReplicationSlot->data.restart_lsn = moveto;
+ retlsn = moveto;
+ }
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ return retlsn;
+}
+
+/*
+ * Helper function for advancing logical replication slot forward.
+ */
+static XLogRecPtr
+pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+ LogicalDecodingContext *ctx;
+ ResourceOwner old_resowner = CurrentResourceOwner;
+ XLogRecPtr retlsn = InvalidXLogRecPtr;
+
+ PG_TRY();
+ {
+ /* restart at slot's confirmed_flush */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true,
+ logical_read_local_xlog_page,
+ NULL, NULL, NULL);
+
+ CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
+ "logical decoding");
+
+ /* invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Decode until we run out of records */
+ while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
+ (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+ {
+ XLogRecord *record;
+ char *errm = NULL;
+
+ record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ if (errm)
+ elog(ERROR, "%s", errm);
+
+ /*
+ * Now that we've set up the xlog reader state, subsequent calls
+ * pass InvalidXLogRecPtr to say "continue from last record"
+ */
+ startlsn = InvalidXLogRecPtr;
+
+ /*
+ * The {begin_txn,change,commit_txn}_wrapper callbacks above will
+ * store the description into our tuplestore.
+ */
+ if (record != NULL)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ /* check limits */
+ if (moveto <= ctx->reader->EndRecPtr)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ CurrentResourceOwner = old_resowner;
+
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ {
+ LogicalConfirmReceivedLocation(moveto);
+
+ /*
+ * If only the confirmed_flush_lsn has changed the slot won't get
+ * marked as dirty by the above. Callers on the walsender
+ * interface are expected to keep track of their own progress and
+ * don't need it written out. But SQL-interface users cannot
+ * specify their own start positions and it's harder for them to
+ * keep track of their progress, so we should make more of an
+ * effort to save it for them.
+ *
+ * Dirty the slot so it's written out at the next checkpoint.
+ * We'll still lose its position on crash, as documented, but it's
+ * better than always losing the position even on clean restart.
+ */
+ ReplicationSlotMarkDirty();
+ }
+
+ retlsn = MyReplicationSlot->data.confirmed_flush;
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return retlsn;
+}
+
+/*
+ * SQL function for moving the position in a replication slot.
+ */
+Datum
+pg_replication_slot_advance(PG_FUNCTION_ARGS)
+{
+ Name slotname = PG_GETARG_NAME(0);
+ XLogRecPtr moveto = PG_GETARG_LSN(1);
+ XLogRecPtr endlsn;
+ XLogRecPtr startlsn;
+ TupleDesc tupdesc;
+ Datum values[2];
+ bool nulls[2];
+ HeapTuple tuple;
+ Datum result;
+
+ Assert(!MyReplicationSlot);
+
+ check_permissions();
+
+ if (XLogRecPtrIsInvalid(moveto))
+ ereport(ERROR,
+ (errmsg("invalid target wal lsn")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /*
+ * We can't move slot past what's been flushed/replayed so clamp the
+ * target possition accordingly.
+ */
+ if (!RecoveryInProgress())
+ moveto = Min(moveto, GetFlushRecPtr());
+ else
+ moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
+
+ /* Acquire the slot so we "own" it */
+ ReplicationSlotAcquire(NameStr(*slotname), true);
+
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ if (moveto < startlsn)
+ {
+ ReplicationSlotRelease();
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move slot to %X/%X, minimum is %X/%X",
+ (uint32) (moveto >> 32), (uint32) moveto,
+ (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+ (uint32) (MyReplicationSlot->data.confirmed_flush))));
+ }
+
+ if (OidIsValid(MyReplicationSlot->data.database))
+ endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+ else
+ endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ nulls[0] = false;
+
+ /* Update the on disk state when lsn was updated. */
+ if (XLogRecPtrIsInvalid(endlsn))
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ ReplicationSlotSave();
+ }
+
+ ReplicationSlotRelease();
+
+ /* Return the reached position. */
+ values[1] = LSNGetDatum(endlsn);
+ nulls[1] = false;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ PG_RETURN_DATUM(result);
+}