diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 233 |
1 files changed, 226 insertions, 7 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 03e069c7cdd..82dcffc2db8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -189,6 +190,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/syscache.h" @@ -259,6 +261,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for + * the subscription if the remote transaction's finish LSN matches the subskiplsn. + * Once we start skipping changes, we don't stop it until we skip all changes of + * the transaction even if pg_subscription is updated and MySubscription->skiplsn + * gets changed or reset during that. Also, in streaming transaction cases, we + * don't skip receiving and spooling the changes since we decide whether or not + * to skip applying the changes when starting to apply changes. The subskiplsn is + * cleared after successfully skipping the transaction or applying non-empty + * transaction. The latter prevents the mistakenly specified subskiplsn from + * being left. + */ +static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; +#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); +static void stop_skipping_changes(void); +static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + /* Functions for apply error callback */ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); @@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + maybe_start_skipping_changes(begin_data.final_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + maybe_start_skipping_changes(begin_data.prepare_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + clear_subscription_skip_lsn(prepare_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + + clear_subscription_skip_lsn(rollback_data.rollback_end_lsn); } pgstat_report_stat(false); @@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Similar to prepare case, the subskiplsn could be left in a case of + * server crash but it's okay. See the comments in apply_handle_prepare(). + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) MemoryContext oldcxt; BufFile *fd; + maybe_start_skipping_changes(lsn); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1455,9 +1503,27 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(LogicalRepCommitData *commit_data) { + if (is_skipping_changes()) + { + stop_skipping_changes(); + + /* + * Start a new transaction to clear the subskiplsn, if not started + * yet. + */ + if (!IsTransactionState()) + StartTransactionCommand(); + } + if (IsTransactionState()) { /* + * The transaction is either non-empty or skipped, so we clear the + * subskiplsn. + */ + clear_subscription_skip_lsn(commit_data->commit_lsn); + + /* * Update origin state so we can restart streaming from correct * position in case of crash. */ @@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3738,6 +3824,139 @@ IsLogicalWorker(void) return MyLogicalRepWorker != NULL; } +/* + * Start skipping changes of the transaction if the given LSN matches the + * LSN specified by subscription's skiplsn. + */ +static void +maybe_start_skipping_changes(XLogRecPtr finish_lsn) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || + MySubscription->skiplsn != finish_lsn)) + return; + + /* Start skipping all changes of this transaction */ + skip_xact_finish_lsn = finish_lsn; + + ereport(LOG, + errmsg("start skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn))); +} + +/* + * Stop skipping changes by resetting skip_xact_finish_lsn if enabled. + */ +static void +stop_skipping_changes(void) +{ + if (!is_skipping_changes()) + return; + + ereport(LOG, + (errmsg("done skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn)))); + + /* Stop skipping changes */ + skip_xact_finish_lsn = InvalidXLogRecPtr; +} + +/* + * Clear subskiplsn of pg_subscription catalog. + * + * finish_lsn is the transaction's finish LSN that is used to check if the + * subskiplsn matches it. If not matched, we raise a warning when clearing the + * subskiplsn in order to inform users for cases e.g., where the user mistakenly + * specified the wrong subskiplsn. + */ +static void +clear_subscription_skip_lsn(XLogRecPtr finish_lsn) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + XLogRecPtr myskiplsn = MySubscription->skiplsn; + bool started_tx = false; + + if (likely(XLogRecPtrIsInvalid(myskiplsn))) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Protect subskiplsn of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* + * Clear the subskiplsn. If the user has already changed subskiplsn before + * clearing it we don't update the catalog and the replication origin + * state won't get advanced. So in the worst case, if the server crashes + * before sending an acknowledgment of the flush position the transaction + * will be sent again and the user needs to set subskiplsn again. We can + * reduce the possibility by logging a replication origin WAL record to + * advance the origin LSN instead but there is no way to advance the + * origin timestamp and it doesn't seem to be worth doing anything about + * it since it's a very rare case. + */ + if (subform->subskiplsn == myskiplsn) + { + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subskiplsn */ + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + if (myskiplsn != finish_lsn) + ereport(WARNING, + errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name), + errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X", + LSN_FORMAT_ARGS(finish_lsn), + LSN_FORMAT_ARGS(myskiplsn))); + } + + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} + /* Error callback to give more context info about the change being applied */ static void apply_error_callback(void *arg) |