diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/logical.c | 25 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 12 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 25 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 1 | ||||
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 14 | ||||
-rw-r--r-- | src/include/replication/logical.h | 1 | ||||
-rw-r--r-- | src/include/storage/procarray.h | 2 |
8 files changed, 66 insertions, 18 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8fb4f..032e91c3710 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that, temporarily, the data, not just the catalog, xmin has to be + * reserved if a data snapshot is to be exported. Otherwise the initial + * data snapshot created here is not guaranteed to be valid. After that + * the data xmin doesn't need to be managed anymore and the global xmin + * should be recomputed. As we are fine with losing the pegged data xmin + * after crash - no chance a snapshot would get exported anymore - we can + * get away with just setting the slot's + * effective_xmin. ReplicationSlotRelease will reset it again. + * * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); - slot->data.catalog_xmin = slot->effective_catalog_xmin; + xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot); + + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + if (need_full_snapshot) + slot->effective_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - /* - * tell the snapshot builder to only assemble snapshot once reaching the - * running_xact's record with the respective xmin. - */ - xmin_horizon = slot->data.catalog_xmin; - ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 358ec289321..5d15e25fb5f 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -533,6 +533,18 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * mechanism. Due to that we can do this without locks, we're only * changing our own value. */ +#ifdef USE_ASSERT_CHECKING + { + TransactionId safeXid; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + safeXid = GetOldestSafeDecodingTransactionId(true); + LWLockRelease(ProcArrayLock); + + Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin)); + } +#endif + MyPgXact->xmin = snap->xmin; /* allocate in transaction context */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e8ad0f7b394..5f63d0484a0 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -398,6 +398,22 @@ ReplicationSlotRelease(void) SpinLockRelease(&slot->mutex); } + + /* + * If slot needed to temporarily restrain both data and catalog xmin to + * create the catalog snapshot, remove that temporary constraint. + * Snapshots can only be exported while the initial snapshot is still + * acquired. + */ + if (!TransactionIdIsValid(slot->data.xmin) && + TransactionIdIsValid(slot->effective_xmin)) + { + SpinLockAcquire(&slot->mutex); + slot->effective_xmin = InvalidTransactionId; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(false); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -612,6 +628,9 @@ ReplicationSlotPersist(void) /* * Compute the oldest xmin across all slots and store it in the ProcArray. + * + * If already_locked is true, ProcArrayLock has already been acquired + * exclusively. */ void ReplicationSlotsComputeRequiredXmin(bool already_locked) @@ -622,8 +641,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) Assert(ReplicationSlotCtl != NULL); - if (!already_locked) - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -652,8 +670,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) agg_catalog_xmin = effective_catalog_xmin; } - if (!already_locked) - LWLockRelease(ReplicationSlotControlLock); + LWLockRelease(ReplicationSlotControlLock); ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 7104c94795b..6ee1e68819a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -131,8 +131,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext( - NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + false, /* do not build snapshot */ logical_read_local_xlog_page, NULL, NULL); /* build initial snapshot, might take a while */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 064cf5ee28b..43c8a73f3e1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -909,6 +909,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, + true, /* build snapshot */ logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 4976bb03c7f..8a715367918 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2151,7 +2151,7 @@ GetOldestActiveTransactionId(void) * that the caller will immediately use the xid to peg the xmin horizon. */ TransactionId -GetOldestSafeDecodingTransactionId(void) +GetOldestSafeDecodingTransactionId(bool catalogOnly) { ProcArrayStruct *arrayP = procArray; TransactionId oldestSafeXid; @@ -2174,9 +2174,17 @@ GetOldestSafeDecodingTransactionId(void) /* * If there's already a slot pegging the xmin horizon, we can start with * that value, it's guaranteed to be safe since it's computed by this - * routine initially and has been enforced since. + * routine initially and has been enforced since. We can always use the + * slot's general xmin horizon, but the catalog horizon is only usable + * when we only catalog data is going to be looked at. */ - if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && + if (TransactionIdIsValid(procArray->replication_slot_xmin) && + TransactionIdPrecedes(procArray->replication_slot_xmin, + oldestSafeXid)) + oldestSafeXid = procArray->replication_slot_xmin; + + if (catalogOnly && + TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, oldestSafeXid)) oldestSafeXid = procArray->replication_slot_catalog_xmin; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index f012735f14e..d0b2e0bbaef 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 9b42e495243..805ecd25eca 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -88,7 +88,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestXmin(Relation rel, int flags); extern TransactionId GetOldestActiveTransactionId(void); -extern TransactionId GetOldestSafeDecodingTransactionId(void); +extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids); extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids); |