diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d536a5f3ba3..d61ef4cfada 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.initial_consistent_point); + need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= slot->data.two_phase; ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; - slot->data.initial_consistent_point = ctx->reader->EndRecPtr; + if (slot->data.two_phase) + slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } |