aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-03-03 07:28:43 +0530
committerAmit Kapila <akapila@postgresql.org>2021-03-03 07:34:11 +0530
commit19890a064ebf53dedcefed0d8339ed3d449b06e6 (patch)
tree76adf39f2d69ce5d6f8bda9efc48c34c0879e6bc /src/backend/replication/logical/logical.c
parentee28cacf619f4d9c23af5a80e1171a5adae97381 (diff)
downloadpostgresql-19890a064ebf53dedcefed0d8339ed3d449b06e6.tar.gz
postgresql-19890a064ebf53dedcefed0d8339ed3d449b06e6.zip
Add option to enable two_phase commits via pg_create_logical_replication_slot.
Commit 0aa8a01d04 extends the output plugin API to allow decoding of prepared xacts and allowed the user to enable/disable the two-phase option via pg_logical_slot_get_changes(). This can lead to a problem such that the first time when it gets changes via pg_logical_slot_get_changes() without two_phase option enabled it will not get the prepared even though prepare is after consistent snapshot. Now next time during getting changes, if the two_phase option is enabled it can skip prepare because by that time start decoding point has been moved. So the user will only get commit prepared. Allow to enable/disable this option at the create slot time and default will be false. It will break the existing slots which is fine in a major release. Author: Ajin Cherian Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c12
1 files changed, 12 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3f6d723d096..37b75deb728 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin,
startup_cb_wrapper(ctx, &ctx->options, true);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
return ctx;
@@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn,
startup_cb_wrapper(ctx, &ctx->options, false);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
ereport(LOG,