diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-07-14 07:33:50 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-07-14 07:33:50 +0530 |
commit | a8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch) | |
tree | bfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical | |
parent | 6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff) | |
download | postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.tar.gz postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.zip |
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/decode.c | 11 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 31 | ||||
-rw-r--r-- | src/backend/replication/logical/origin.c | 7 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 217 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 25 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 33 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 197 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 347 |
8 files changed, 787 insertions, 81 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 453efc51e16..2874dc06122 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -374,11 +374,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * * XXX Now, this can even lead to a deadlock if the prepare * transaction is waiting to get it logically replicated for - * distributed 2PC. Currently, we don't have an in-core - * implementation of prepares for distributed 2PC but some - * out-of-core logical replication solution can have such an - * implementation. They need to inform users to not have locks - * on catalog tables in such transactions. + * distributed 2PC. This can be avoided by disallowing + * preparing transactions that have locked [user] catalog + * tables exclusively but as of now, we ask users not to do + * such an operation. */ DecodePrepare(ctx, buf, &parsed); break; @@ -735,7 +734,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, - SnapBuildInitialConsistentPoint(ctx->snapshot_builder), + SnapBuildGetTwoPhaseAt(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d536a5f3ba3..d61ef4cfada 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.initial_consistent_point); + need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= slot->data.two_phase; ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; - slot->data.initial_consistent_point = ctx->reader->EndRecPtr; + if (slot->data.two_phase) + slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index cb42fcb34d1..2c191dec045 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node, /* * Due to - harmless - race conditions during a checkpoint we could see - * values here that are older than the ones we already have in memory. - * Don't overwrite those. + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1cf59e0fb0f..13c8c3bd5bb 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) /* fixed fields */ pq_sendint64(out, txn->final_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); pq_sendint32(out, txn->xid); } @@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* @@ -107,6 +107,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) +{ + /* read fields */ + begin_data->prepare_lsn = pq_getmsgint64(in); + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn not set in begin prepare message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin prepare message"); + begin_data->prepare_time = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in prepare message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in prepare message"); + prepare_data->prepare_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepared message", flags); + + /* read fields */ + prepare_data->commit_lsn = pq_getmsgint64(in); + if (prepare_data->commit_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepared message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); + rollback_data->prepare_time = pq_getmsgint64(in); + rollback_data->rollback_time = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + +/* * Write ORIGIN to the output stream. */ void @@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1b4f4a528aa..7378beb684d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2576,7 +2576,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn, txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; - txn->commit_time = commit_time; + txn->xact_time.commit_time = commit_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; @@ -2667,7 +2667,7 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, */ txn->final_lsn = prepare_lsn; txn->end_lsn = end_lsn; - txn->commit_time = prepare_time; + txn->xact_time.prepare_time = prepare_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; @@ -2714,7 +2714,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, Assert(txn->final_lsn != InvalidXLogRecPtr); ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, - txn->commit_time, txn->origin_id, txn->origin_lsn); + txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); /* * We send the prepare for the concurrently aborted xacts so that later @@ -2734,7 +2734,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2753,19 +2753,20 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * be later used for rollback. */ prepare_end_lsn = txn->end_lsn; - prepare_time = txn->commit_time; + prepare_time = txn->xact_time.prepare_time; /* add the gid in the txn */ txn->gid = pstrdup(gid); /* * It is possible that this transaction is not decoded at prepare time - * either because by that time we didn't have a consistent snapshot or it - * was decoded earlier but we have restarted. We only need to send the - * prepare if it was not decoded earlier. We don't need to decode the xact - * for aborts if it is not done already. + * either because by that time we didn't have a consistent snapshot, or + * two_phase was not enabled, or it was decoded earlier but we have + * restarted. We only need to send the prepare if it was not decoded + * earlier. We don't need to decode the xact for aborts if it is not done + * already. */ - if ((txn->final_lsn < initial_consistent_point) && is_commit) + if ((txn->final_lsn < two_phase_at) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; @@ -2783,12 +2784,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * prepared after the restart. */ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, - txn->commit_time, txn->origin_id, txn->origin_lsn); + txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); } txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; - txn->commit_time = commit_time; + txn->xact_time.commit_time = commit_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 04f3355f602..a14a3d69005 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,15 +165,15 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. * - * The prepared transactions that are not covered by initial snapshot - * needs to be sent later along with commit prepared and they must be - * before this point. + * The prepared transactions, that were skipped because previously + * two-phase was not enabled or are not covered by initial snapshot, need + * to be sent later along with commit prepared and they must be before + * this point. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Don't start decoding WAL until the "xl_running_xacts" information @@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point) + XLogRecPtr two_phase_at) { MemoryContext context; MemoryContext oldcontext; @@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; - builder->initial_consistent_point = initial_consistent_point; + builder->two_phase_at = two_phase_at; MemoryContextSwitchTo(oldcontext); @@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder) } /* - * Return the LSN at which the snapshot was exported + * Return the LSN at which the two-phase decoding was first enabled. */ XLogRecPtr -SnapBuildInitialConsistentPoint(SnapBuild *builder) +SnapBuildGetTwoPhaseAt(SnapBuild *builder) { - return builder->initial_consistent_point; + return builder->two_phase_at; +} + +/* + * Set the LSN at which two-phase decoding is enabled. + */ +void +SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) +{ + builder->two_phase_at = ptr; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 682c107e747..f07983a43cb 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -96,6 +96,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" @@ -114,8 +115,11 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static bool FetchTableStates(bool *started_tx); StringInfo copybuf = NULL; @@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Oid relid; TimestampTz last_start_time; }; - static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; @@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - if (!table_states_valid) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; - - StartTransactionCommand(); - started_tx = true; - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); - } - MemoryContextSwitchTo(oldctx); - - table_states_valid = true; - } + FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ - if (table_states && !last_start_times) + if (table_states_not_ready && !last_start_times) { HASHCTL ctl; @@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ - else if (!table_states && last_start_times) + else if (!table_states_not_ready && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + + /* * Process all tables that are being synchronized. */ - foreach(lc, table_states) + foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1158,3 +1156,134 @@ copy_table_done: wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +static bool +FetchTableStates(bool *started_tx) +{ + static bool has_subrels = false; + + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY relations found then we know it does. But + * if table_state_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subrels = (list_length(table_states_not_ready) > 0) || + HasSubscriptionRelations(MySubscription->oid); + + table_states_valid = true; + } + + return has_subrels; +} + +/* + * If the subscription has no tables then return false. + * + * Otherwise, are all tablesyncs READY? + * + * Note: This function is not suitable to be called from outside of apply or + * tablesync workers because MySubscription needs to be already initialized. + */ +bool +AllTablesyncsReady(void) +{ + bool started_tx = false; + bool has_subrels = false; + + /* We need up-to-date sync state info for subscription tables here. */ + has_subrels = FetchTableStates(&started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } + + /* + * Return false when there are no tables in subscription or not all tables + * are in ready state; true otherwise. + */ + return has_subrels && list_length(table_states_not_ready) == 0; +} + +/* + * Update the two_phase state of the specified subscription in pg_subscription. + */ +void +UpdateTwoPhaseState(Oid suboid, char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + suboid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase state */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5fc620c7f19..b9a7a7ffbb3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -49,6 +49,79 @@ * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. + * + * TWO_PHASE TRANSACTIONS + * ---------------------- + * Two phase transactions are replayed at prepare and then committed or + * rolled back at commit prepared and rollback prepared respectively. It is + * possible to have a prepared transaction that arrives at the apply worker + * when the tablesync is busy doing the initial copy. In this case, the apply + * worker skips all the prepared operations [e.g. inserts] while the tablesync + * is still busy (see the condition of should_apply_changes_for_rel). The + * tablesync worker might not get such a prepared transaction because say it + * was prior to the initial consistent point but might have got some later + * commits. Now, the tablesync worker will exit without doing anything for the + * prepared transaction skipped by the apply worker as the sync location for it + * will be already ahead of the apply worker's current location. This would lead + * to an "empty prepare", because later when the apply worker does the commit + * prepare, there is nothing in it (the inserts were skipped earlier). + * + * To avoid this, and similar prepare confusions the subscription's two_phase + * commit is enabled only after the initial sync is over. The two_phase option + * has been implemented as a tri-state with values DISABLED, PENDING, and + * ENABLED. + * + * Even if the user specifies they want a subscription with two_phase = on, + * internally it will start with a tri-state of PENDING which only becomes + * ENABLED after all tablesync initializations are completed - i.e. when all + * tablesync workers have reached their READY state. In other words, the value + * PENDING is only a temporary state for subscription start-up. + * + * Until the two_phase is properly available (ENABLED) the subscription will + * behave as if two_phase = off. When the apply worker detects that all + * tablesyncs have become READY (while the tri-state was PENDING) it will + * restart the apply worker process. This happens in + * process_syncing_tables_for_apply. + * + * When the (re-started) apply worker finds that all tablesyncs are READY for a + * two_phase tri-state of PENDING it start streaming messages with the + * two_phase option which in turn enables the decoding of two-phase commits at + * the publisher. Then, it updates the tri-state value from PENDING to ENABLED. + * Now, it is possible that during the time we have not enabled two_phase, the + * publisher (replication server) would have skipped some prepares but we + * ensure that such prepares are sent along with commit prepare, see + * ReorderBufferFinishPrepared. + * + * If the subscription has no tables then a two_phase tri-state PENDING is + * left unchanged. This lets the user still do an ALTER TABLE REFRESH + * PUBLICATION which might otherwise be disallowed (see below). + * + * If ever a user needs to be aware of the tri-state value, they can fetch it + * from the pg_subscription catalog (see column subtwophasestate). + * + * We don't allow to toggle two_phase option of a subscription because it can + * lead to an inconsistent replica. Consider, initially, it was on and we have + * received some prepare then we turn it off, now at commit time the server + * will send the entire transaction data along with the commit. With some more + * analysis, we can allow changing this option from off to on but not sure if + * that alone would be useful. + * + * Finally, to avoid problems mentioned in previous paragraphs from any + * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' + * to 'off' and then again back to 'on') there is a restriction for + * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when + * the two_phase tri-state is ENABLED, except when copy_data = false. + * + * We can get prepare of the same GID more than once for the genuine cases + * where we have defined multiple subscriptions for publications on the same + * server and prepared transaction has operations on tables subscribed to those + * subscriptions. For such cases, if we use the GID sent by publisher one of + * the prepares will be successful and others will fail, in which case the + * server will send them again. Now, this can lead to a deadlock if user has + * set synchronous_standby_names for all the subscriptions on subscriber. To + * avoid such deadlocks, we generate a unique GID (consisting of the + * subscription oid and the xid of the prepared transaction) for each prepare + * transaction on the subscriber. *------------------------------------------------------------------------- */ @@ -59,6 +132,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" @@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, LogicalRepTupleData *newtup, CmdType operation); +/* Compute GID for two_phase transactions */ +static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); + + /* * Should this worker apply changes for given relation. * @@ -784,6 +862,185 @@ apply_handle_commit(StringInfo s) } /* + * Handle BEGIN PREPARE message. + */ +static void +apply_handle_begin_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData begin_data; + + /* Tablesync should never receive prepare. */ + if (am_tablesync_worker()) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + + logicalrep_read_begin_prepare(s, &begin_data); + + remote_final_lsn = begin_data.prepare_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_prepare(s, &prepare_data); + + if (prepare_data.prepare_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", + LSN_FORMAT_ARGS(prepare_data.prepare_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because at + * commit prepared time, we won't know whether we have skipped preparing a + * transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; + + PrepareTransactionBlock(gid); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a COMMIT PREPARED of a previously PREPARED transaction. + */ +static void +apply_handle_commit_prepared(StringInfo s) +{ + LogicalRepCommitPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_commit_prepared(s, &prepare_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* There is no transaction when COMMIT PREPARED is called */ + begin_replication_step(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared(StringInfo s) +{ + LogicalRepRollbackPreparedTxnData rollback_data; + char gid[GIDSIZE]; + + logicalrep_read_rollback_prepared(s, &rollback_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, + gid, sizeof(gid)); + + /* + * It is possible that we haven't received prepare because it occurred + * before walsender reached a consistent point or the two_phase was still + * not enabled by that time, so in such cases, we need to skip rollback + * prepared. + */ + if (LookupGXact(gid, rollback_data.prepare_end_lsn, + rollback_data.prepare_time)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; + replorigin_session_origin_timestamp = rollback_data.rollback_time; + + /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ + begin_replication_step(); + FinishPreparedTransaction(gid, false); + end_replication_step(); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(rollback_data.rollback_end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(rollback_data.rollback_end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_BEGIN_PREPARE: + apply_handle_begin_prepare(s); + return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_COMMIT_PREPARED: + apply_handle_commit_prepared(s); + return; + + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + apply_handle_rollback_prepared(s); + return; } ereport(ERROR, @@ -2539,6 +2812,9 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); + /* two-phase should not be altered */ + Assert(newsub->twophasestate == MySubscription->twophasestate); + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker. @@ -3040,6 +3316,24 @@ cleanup_subxact_info() subxact_data.nsubxacts_max = 0; } +/* + * Form the prepared transaction GID for two_phase transactions. + * + * Return the GID in the supplied buffer. + */ +static void +TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) +{ + Assert(subid != InvalidRepOriginId); + + if (!TransactionIdIsValid(xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid two-phase transaction ID"))); + + snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg) XLogRecPtr origin_startpos; char *myslotname; WalRcvStreamOptions options; + int server_version; /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg) options.logical = true; options.startpoint = origin_startpos; options.slotname = myslotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); options.proto.logical.proto_version = - walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ? - LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.twophase = false; + + if (!am_tablesync_worker()) + { + /* + * Even when the two_phase mode is requested by the user, it remains + * as the tri-state PENDING until all tablesyncs have reached READY + * state. Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + else + { + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + } + + ereport(DEBUG1, + (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + } + else + { + /* Start normal logical streaming replication. */ + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + } /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); |