aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c31
1 files changed, 23 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d536a5f3ba3..d61ef4cfada 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot, slot->data.initial_consistent_point);
+ need_full_snapshot, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
@@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin,
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= slot->data.two_phase;
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
+
+ /* Mark slot to allow two_phase decoding if not already marked */
+ if (ctx->twophase && !slot->data.two_phase)
+ {
+ slot->data.two_phase = true;
+ slot->data.two_phase_at = start_lsn;
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
+ }
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
- slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
+ if (slot->data.two_phase)
+ slot->data.two_phase_at = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}