diff options
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r-- | src/backend/replication/slotfuncs.c | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b02df593e9c..93d2e20f760 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -17,11 +17,14 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/pg_lsn.h" +#include "utils/resowner.h" static void check_permissions(void) @@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Helper function for advancing physical replication slot forward. + */ +static XLogRecPtr +pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + XLogRecPtr retlsn = InvalidXLogRecPtr; + + SpinLockAcquire(&MyReplicationSlot->mutex); + if (MyReplicationSlot->data.restart_lsn < moveto) + { + MyReplicationSlot->data.restart_lsn = moveto; + retlsn = moveto; + } + SpinLockRelease(&MyReplicationSlot->mutex); + + return retlsn; +} + +/* + * Helper function for advancing logical replication slot forward. + */ +static XLogRecPtr +pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn = InvalidXLogRecPtr; + + PG_TRY(); + { + /* restart at slot's confirmed_flush */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, + logical_read_local_xlog_page, + NULL, NULL, NULL); + + CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode until we run out of records */ + while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || + (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + { + XLogRecord *record; + char *errm = NULL; + + record = XLogReadRecord(ctx->reader, startlsn, &errm); + if (errm) + elog(ERROR, "%s", errm); + + /* + * Now that we've set up the xlog reader state, subsequent calls + * pass InvalidXLogRecPtr to say "continue from last record" + */ + startlsn = InvalidXLogRecPtr; + + /* + * The {begin_txn,change,commit_txn}_wrapper callbacks above will + * store the description into our tuplestore. + */ + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* check limits */ + if (moveto <= ctx->reader->EndRecPtr) + break; + + CHECK_FOR_INTERRUPTS(); + } + + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush_lsn has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it's written out at the next checkpoint. + * We'll still lose its position on crash, as documented, but it's + * better than always losing the position even on clean restart. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} + +/* + * SQL function for moving the position in a replication slot. + */ +Datum +pg_replication_slot_advance(PG_FUNCTION_ARGS) +{ + Name slotname = PG_GETARG_NAME(0); + XLogRecPtr moveto = PG_GETARG_LSN(1); + XLogRecPtr endlsn; + XLogRecPtr startlsn; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + Datum result; + + Assert(!MyReplicationSlot); + + check_permissions(); + + if (XLogRecPtrIsInvalid(moveto)) + ereport(ERROR, + (errmsg("invalid target wal lsn"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* + * We can't move slot past what's been flushed/replayed so clamp the + * target possition accordingly. + */ + if (!RecoveryInProgress()) + moveto = Min(moveto, GetFlushRecPtr()); + else + moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); + + /* Acquire the slot so we "own" it */ + ReplicationSlotAcquire(NameStr(*slotname), true); + + startlsn = MyReplicationSlot->data.confirmed_flush; + if (moveto < startlsn) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move slot to %X/%X, minimum is %X/%X", + (uint32) (moveto >> 32), (uint32) moveto, + (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), + (uint32) (MyReplicationSlot->data.confirmed_flush)))); + } + + if (OidIsValid(MyReplicationSlot->data.database)) + endlsn = pg_logical_replication_slot_advance(startlsn, moveto); + else + endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[0] = false; + + /* Update the on disk state when lsn was updated. */ + if (XLogRecPtrIsInvalid(endlsn)) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + } + + ReplicationSlotRelease(); + + /* Return the reached position. */ + values[1] = LSNGetDatum(endlsn); + nulls[1] = false; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} |