aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
commita8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch)
treebfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical
parent6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff)
downloadpostgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.tar.gz
postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.zip
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/decode.c11
-rw-r--r--src/backend/replication/logical/logical.c31
-rw-r--r--src/backend/replication/logical/origin.c7
-rw-r--r--src/backend/replication/logical/proto.c217
-rw-r--r--src/backend/replication/logical/reorderbuffer.c25
-rw-r--r--src/backend/replication/logical/snapbuild.c33
-rw-r--r--src/backend/replication/logical/tablesync.c197
-rw-r--r--src/backend/replication/logical/worker.c347
8 files changed, 787 insertions, 81 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 453efc51e16..2874dc06122 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -374,11 +374,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*
* XXX Now, this can even lead to a deadlock if the prepare
* transaction is waiting to get it logically replicated for
- * distributed 2PC. Currently, we don't have an in-core
- * implementation of prepares for distributed 2PC but some
- * out-of-core logical replication solution can have such an
- * implementation. They need to inform users to not have locks
- * on catalog tables in such transactions.
+ * distributed 2PC. This can be avoided by disallowing
+ * preparing transactions that have locked [user] catalog
+ * tables exclusively but as of now, we ask users not to do
+ * such an operation.
*/
DecodePrepare(ctx, buf, &parsed);
break;
@@ -735,7 +734,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
- SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
+ SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d536a5f3ba3..d61ef4cfada 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot, slot->data.initial_consistent_point);
+ need_full_snapshot, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
@@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin,
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= slot->data.two_phase;
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
+
+ /* Mark slot to allow two_phase decoding if not already marked */
+ if (ctx->twophase && !slot->data.two_phase)
+ {
+ slot->data.two_phase = true;
+ slot->data.two_phase_at = start_lsn;
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
+ }
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
- slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
+ if (slot->data.two_phase)
+ slot->data.two_phase_at = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index cb42fcb34d1..2c191dec045 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node,
/*
* Due to - harmless - race conditions during a checkpoint we could see
- * values here that are older than the ones we already have in memory.
- * Don't overwrite those.
+ * values here that are older than the ones we already have in memory. We
+ * could also see older values for prepared transactions when the prepare
+ * is sent at a later point of time along with commit prepared and there
+ * are other transactions commits between prepare and commit prepared. See
+ * ReorderBufferFinishPrepared. Don't overwrite those.
*/
if (go_backward || replication_state->remote_lsn < remote_commit)
replication_state->remote_lsn = remote_commit;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0f..13c8c3bd5bb 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
}
@@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
@@ -107,6 +107,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
}
/*
+ * Write BEGIN PREPARE to the output stream.
+ */
+void
+logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction BEGIN PREPARE from the stream.
+ */
+void
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
+{
+ /* read fields */
+ begin_data->prepare_lsn = pq_getmsgint64(in);
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn not set in begin prepare message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin prepare message");
+ begin_data->prepare_time = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(begin_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+ Assert(rbtxn_prepared(txn));
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+ /* read fields */
+ prepare_data->prepare_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn is not set in prepare message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in prepare message");
+ prepare_data->prepare_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write COMMIT PREPARED to the output stream.
+ */
+void
+logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction COMMIT PREPARED from the stream.
+ */
+void
+logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
+
+ /* read fields */
+ prepare_data->commit_lsn = pq_getmsgint64(in);
+ if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "commit_lsn is not set in commit prepared message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in commit prepared message");
+ prepare_data->commit_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write ROLLBACK PREPARED to the output stream.
+ */
+void
+logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction ROLLBACK PREPARED from the stream.
+ */
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
+
+ /* read fields */
+ rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
+ rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
+ rollback_data->prepare_time = pq_getmsgint64(in);
+ rollback_data->rollback_time = pq_getmsgint64(in);
+ rollback_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(rollback_data->gid, pq_getmsgstring(in));
+}
+
+/*
* Write ORIGIN to the output stream.
*/
void
@@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1b4f4a528aa..7378beb684d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2576,7 +2576,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
+ txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
@@ -2667,7 +2667,7 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
*/
txn->final_lsn = prepare_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = prepare_time;
+ txn->xact_time.prepare_time = prepare_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
@@ -2714,7 +2714,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
Assert(txn->final_lsn != InvalidXLogRecPtr);
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
- txn->commit_time, txn->origin_id, txn->origin_lsn);
+ txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
* We send the prepare for the concurrently aborted xacts so that later
@@ -2734,7 +2734,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- XLogRecPtr initial_consistent_point,
+ XLogRecPtr two_phase_at,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2753,19 +2753,20 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* be later used for rollback.
*/
prepare_end_lsn = txn->end_lsn;
- prepare_time = txn->commit_time;
+ prepare_time = txn->xact_time.prepare_time;
/* add the gid in the txn */
txn->gid = pstrdup(gid);
/*
* It is possible that this transaction is not decoded at prepare time
- * either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We only need to send the
- * prepare if it was not decoded earlier. We don't need to decode the xact
- * for aborts if it is not done already.
+ * either because by that time we didn't have a consistent snapshot, or
+ * two_phase was not enabled, or it was decoded earlier but we have
+ * restarted. We only need to send the prepare if it was not decoded
+ * earlier. We don't need to decode the xact for aborts if it is not done
+ * already.
*/
- if ((txn->final_lsn < initial_consistent_point) && is_commit)
+ if ((txn->final_lsn < two_phase_at) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
@@ -2783,12 +2784,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* prepared after the restart.
*/
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
- txn->commit_time, txn->origin_id, txn->origin_lsn);
+ txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
}
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
+ txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 04f3355f602..a14a3d69005 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,15 +165,15 @@ struct SnapBuild
XLogRecPtr start_decoding_at;
/*
- * LSN at which we found a consistent point at the time of slot creation.
- * This is also the point where we have exported a snapshot for the
- * initial copy.
+ * LSN at which two-phase decoding was enabled or LSN at which we found a
+ * consistent point at the time of slot creation.
*
- * The prepared transactions that are not covered by initial snapshot
- * needs to be sent later along with commit prepared and they must be
- * before this point.
+ * The prepared transactions, that were skipped because previously
+ * two-phase was not enabled or are not covered by initial snapshot, need
+ * to be sent later along with commit prepared and they must be before
+ * this point.
*/
- XLogRecPtr initial_consistent_point;
+ XLogRecPtr two_phase_at;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
@@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
bool need_full_snapshot,
- XLogRecPtr initial_consistent_point)
+ XLogRecPtr two_phase_at)
{
MemoryContext context;
MemoryContext oldcontext;
@@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
- builder->initial_consistent_point = initial_consistent_point;
+ builder->two_phase_at = two_phase_at;
MemoryContextSwitchTo(oldcontext);
@@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder)
}
/*
- * Return the LSN at which the snapshot was exported
+ * Return the LSN at which the two-phase decoding was first enabled.
*/
XLogRecPtr
-SnapBuildInitialConsistentPoint(SnapBuild *builder)
+SnapBuildGetTwoPhaseAt(SnapBuild *builder)
{
- return builder->initial_consistent_point;
+ return builder->two_phase_at;
+}
+
+/*
+ * Set the LSN at which two-phase decoding is enabled.
+ */
+void
+SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
+{
+ builder->two_phase_at = ptr;
}
/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 682c107e747..f07983a43cb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -96,6 +96,7 @@
#include "access/table.h"
#include "access/xact.h"
+#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
@@ -114,8 +115,11 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
static bool table_states_valid = false;
+static List *table_states_not_ready = NIL;
+static bool FetchTableStates(bool *started_tx);
StringInfo copybuf = NULL;
@@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Oid relid;
TimestampTz last_start_time;
};
- static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
@@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- if (!table_states_valid)
- {
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
- SubscriptionRelState *rstate;
-
- /* Clean the old list. */
- list_free_deep(table_states);
- table_states = NIL;
-
- StartTransactionCommand();
- started_tx = true;
-
- /* Fetch all non-ready tables. */
- rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
-
- /* Allocate the tracking info in a permanent memory context. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
- {
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states = lappend(table_states, rstate);
- }
- MemoryContextSwitchTo(oldctx);
-
- table_states_valid = true;
- }
+ FetchTableStates(&started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
* immediate restarts. We don't need it if there are no tables that need
* syncing.
*/
- if (table_states && !last_start_times)
+ if (table_states_not_ready && !last_start_times)
{
HASHCTL ctl;
@@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
*/
- else if (!table_states && last_start_times)
+ else if (!table_states_not_ready && last_start_times)
{
hash_destroy(last_start_times);
last_start_times = NULL;
}
/*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * 'pending' until all tablesyncs have reached READY state.
+ *
+ * When this happens, we restart the apply worker and (if the conditions
+ * are still ok) then the two_phase tri-state will become 'enabled' at
+ * that time.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /*
* Process all tables that are being synchronized.
*/
- foreach(lc, table_states)
+ foreach(lc, table_states_not_ready)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn,
+ slotname, false /* permanent */ , false /* two_phase */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1158,3 +1156,134 @@ copy_table_done:
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+static bool
+FetchTableStates(bool *started_tx)
+{
+ static bool has_subrels = false;
+
+ *started_tx = false;
+
+ if (!table_states_valid)
+ {
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
+ SubscriptionRelState *rstate;
+
+ /* Clean the old lists. */
+ list_free_deep(table_states_not_ready);
+ table_states_not_ready = NIL;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ *started_tx = true;
+ }
+
+ /* Fetch all non-ready tables. */
+ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+
+ /* Allocate the tracking info in a permanent memory context. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ foreach(lc, rstates)
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready, rstate);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Does the subscription have tables?
+ *
+ * If there were not-READY relations found then we know it does. But
+ * if table_state_not_ready was empty we still need to check again to
+ * see if there are 0 tables.
+ */
+ has_subrels = (list_length(table_states_not_ready) > 0) ||
+ HasSubscriptionRelations(MySubscription->oid);
+
+ table_states_valid = true;
+ }
+
+ return has_subrels;
+}
+
+/*
+ * If the subscription has no tables then return false.
+ *
+ * Otherwise, are all tablesyncs READY?
+ *
+ * Note: This function is not suitable to be called from outside of apply or
+ * tablesync workers because MySubscription needs to be already initialized.
+ */
+bool
+AllTablesyncsReady(void)
+{
+ bool started_tx = false;
+ bool has_subrels = false;
+
+ /* We need up-to-date sync state info for subscription tables here. */
+ has_subrels = FetchTableStates(&started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+ }
+
+ /*
+ * Return false when there are no tables in subscription or not all tables
+ * are in ready state; true otherwise.
+ */
+ return has_subrels && list_length(table_states_not_ready) == 0;
+}
+
+/*
+ * Update the two_phase state of the specified subscription in pg_subscription.
+ */
+void
+UpdateTwoPhaseState(Oid suboid, char new_state)
+{
+ Relation rel;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
+ new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
+ new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR,
+ "cache lookup failed for subscription oid %u",
+ suboid);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* And update/set two_phase state */
+ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel),
+ values, nulls, replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ heap_freetuple(tup);
+ table_close(rel, RowExclusiveLock);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5fc620c7f19..b9a7a7ffbb3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -49,6 +49,79 @@
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
+ *
+ * TWO_PHASE TRANSACTIONS
+ * ----------------------
+ * Two phase transactions are replayed at prepare and then committed or
+ * rolled back at commit prepared and rollback prepared respectively. It is
+ * possible to have a prepared transaction that arrives at the apply worker
+ * when the tablesync is busy doing the initial copy. In this case, the apply
+ * worker skips all the prepared operations [e.g. inserts] while the tablesync
+ * is still busy (see the condition of should_apply_changes_for_rel). The
+ * tablesync worker might not get such a prepared transaction because say it
+ * was prior to the initial consistent point but might have got some later
+ * commits. Now, the tablesync worker will exit without doing anything for the
+ * prepared transaction skipped by the apply worker as the sync location for it
+ * will be already ahead of the apply worker's current location. This would lead
+ * to an "empty prepare", because later when the apply worker does the commit
+ * prepare, there is nothing in it (the inserts were skipped earlier).
+ *
+ * To avoid this, and similar prepare confusions the subscription's two_phase
+ * commit is enabled only after the initial sync is over. The two_phase option
+ * has been implemented as a tri-state with values DISABLED, PENDING, and
+ * ENABLED.
+ *
+ * Even if the user specifies they want a subscription with two_phase = on,
+ * internally it will start with a tri-state of PENDING which only becomes
+ * ENABLED after all tablesync initializations are completed - i.e. when all
+ * tablesync workers have reached their READY state. In other words, the value
+ * PENDING is only a temporary state for subscription start-up.
+ *
+ * Until the two_phase is properly available (ENABLED) the subscription will
+ * behave as if two_phase = off. When the apply worker detects that all
+ * tablesyncs have become READY (while the tri-state was PENDING) it will
+ * restart the apply worker process. This happens in
+ * process_syncing_tables_for_apply.
+ *
+ * When the (re-started) apply worker finds that all tablesyncs are READY for a
+ * two_phase tri-state of PENDING it start streaming messages with the
+ * two_phase option which in turn enables the decoding of two-phase commits at
+ * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
+ * Now, it is possible that during the time we have not enabled two_phase, the
+ * publisher (replication server) would have skipped some prepares but we
+ * ensure that such prepares are sent along with commit prepare, see
+ * ReorderBufferFinishPrepared.
+ *
+ * If the subscription has no tables then a two_phase tri-state PENDING is
+ * left unchanged. This lets the user still do an ALTER TABLE REFRESH
+ * PUBLICATION which might otherwise be disallowed (see below).
+ *
+ * If ever a user needs to be aware of the tri-state value, they can fetch it
+ * from the pg_subscription catalog (see column subtwophasestate).
+ *
+ * We don't allow to toggle two_phase option of a subscription because it can
+ * lead to an inconsistent replica. Consider, initially, it was on and we have
+ * received some prepare then we turn it off, now at commit time the server
+ * will send the entire transaction data along with the commit. With some more
+ * analysis, we can allow changing this option from off to on but not sure if
+ * that alone would be useful.
+ *
+ * Finally, to avoid problems mentioned in previous paragraphs from any
+ * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
+ * to 'off' and then again back to 'on') there is a restriction for
+ * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
+ * the two_phase tri-state is ENABLED, except when copy_data = false.
+ *
+ * We can get prepare of the same GID more than once for the genuine cases
+ * where we have defined multiple subscriptions for publications on the same
+ * server and prepared transaction has operations on tables subscribed to those
+ * subscriptions. For such cases, if we use the GID sent by publisher one of
+ * the prepares will be successful and others will fail, in which case the
+ * server will send them again. Now, this can lead to a deadlock if user has
+ * set synchronous_standby_names for all the subscriptions on subscriber. To
+ * avoid such deadlocks, we generate a unique GID (consisting of the
+ * subscription oid and the xid of the prepared transaction) for each prepare
+ * transaction on the subscriber.
*-------------------------------------------------------------------------
*/
@@ -59,6 +132,7 @@
#include "access/table.h"
#include "access/tableam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
@@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
+/* Compute GID for two_phase transactions */
+static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+
+
/*
* Should this worker apply changes for given relation.
*
@@ -784,6 +862,185 @@ apply_handle_commit(StringInfo s)
}
/*
+ * Handle BEGIN PREPARE message.
+ */
+static void
+apply_handle_begin_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData begin_data;
+
+ /* Tablesync should never receive prepare. */
+ if (am_tablesync_worker())
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+
+ logicalrep_read_begin_prepare(s, &begin_data);
+
+ remote_final_lsn = begin_data.prepare_lsn;
+
+ in_remote_transaction = true;
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_prepare(s, &prepare_data);
+
+ if (prepare_data.prepare_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * Unlike commit, here, we always prepare the transaction even though no
+ * change has happened in this transaction. It is done this way because at
+ * commit prepared time, we won't know whether we have skipped preparing a
+ * transaction because of no change.
+ *
+ * XXX, We can optimize such that at commit prepared time, we first check
+ * whether we have prepared the transaction or not but that doesn't seem
+ * worthwhile because such cases shouldn't be common.
+ */
+ begin_replication_step();
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+ PrepareTransactionBlock(gid);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ */
+static void
+apply_handle_commit_prepared(StringInfo s)
+{
+ LogicalRepCommitPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_commit_prepared(s, &prepare_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /* There is no transaction when COMMIT PREPARED is called */
+ begin_replication_step();
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.commit_time;
+
+ FinishPreparedTransaction(gid, true);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ */
+static void
+apply_handle_rollback_prepared(StringInfo s)
+{
+ LogicalRepRollbackPreparedTxnData rollback_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_rollback_prepared(s, &rollback_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * It is possible that we haven't received prepare because it occurred
+ * before walsender reached a consistent point or the two_phase was still
+ * not enabled by that time, so in such cases, we need to skip rollback
+ * prepared.
+ */
+ if (LookupGXact(gid, rollback_data.prepare_end_lsn,
+ rollback_data.prepare_time))
+ {
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
+ replorigin_session_origin_timestamp = rollback_data.rollback_time;
+
+ /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
+ begin_replication_step();
+ FinishPreparedTransaction(gid, false);
+ end_replication_step();
+ CommitTransactionCommand();
+ }
+
+ pgstat_report_stat(false);
+
+ store_flush_position(rollback_data.rollback_end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(rollback_data.rollback_end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
* Handle ORIGIN message.
*
* TODO, support tracking of multiple origins
@@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s)
case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
return;
+
+ case LOGICAL_REP_MSG_BEGIN_PREPARE:
+ apply_handle_begin_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_PREPARE:
+ apply_handle_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_COMMIT_PREPARED:
+ apply_handle_commit_prepared(s);
+ return;
+
+ case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+ apply_handle_rollback_prepared(s);
+ return;
}
ereport(ERROR,
@@ -2539,6 +2812,9 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
+ /* two-phase should not be altered */
+ Assert(newsub->twophasestate == MySubscription->twophasestate);
+
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
@@ -3040,6 +3316,24 @@ cleanup_subxact_info()
subxact_data.nsubxacts_max = 0;
}
+/*
+ * Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+static void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
+{
+ Assert(subid != InvalidRepOriginId);
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid two-phase transaction ID")));
+
+ snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg)
XLogRecPtr origin_startpos;
char *myslotname;
WalRcvStreamOptions options;
+ int server_version;
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
@@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg)
options.logical = true;
options.startpoint = origin_startpos;
options.slotname = myslotname;
+
+ server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
- walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
- LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
+ server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+ server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+ LOGICALREP_PROTO_VERSION_NUM;
+
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
+ options.proto.logical.twophase = false;
+
+ if (!am_tablesync_worker())
+ {
+ /*
+ * Even when the two_phase mode is requested by the user, it remains
+ * as the tri-state PENDING until all tablesyncs have reached READY
+ * state. Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ /* Start streaming with two_phase enabled */
+ options.proto.logical.twophase = true;
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- /* Start normal logical streaming replication. */
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ StartTransactionCommand();
+ UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
+
+ ereport(DEBUG1,
+ (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));
+ }
+ else
+ {
+ /* Start normal logical streaming replication. */
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);