aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c233
1 files changed, 226 insertions, 7 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cdd..82dcffc2db8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
+#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
@@ -189,6 +190,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
@@ -259,6 +261,21 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's finish LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that. Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes since we decide whether or not
+ * to skip applying the changes when starting to apply changes. The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction. The latter prevents the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
/* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
+static void stop_skipping_changes(void);
+static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
+
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
@@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s)
remote_final_lsn = begin_data.final_lsn;
+ maybe_start_skipping_changes(begin_data.final_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s)
remote_final_lsn = begin_data.prepare_lsn;
+ maybe_start_skipping_changes(begin_data.prepare_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s)
/*
* Unlike commit, here, we always prepare the transaction even though no
- * change has happened in this transaction. It is done this way because at
- * commit prepared time, we won't know whether we have skipped preparing a
- * transaction because of no change.
+ * change has happened in this transaction or all changes are skipped. It
+ * is done this way because at commit prepared time, we won't know whether
+ * we have skipped preparing a transaction because of those reasons.
*
* XXX, We can optimize such that at commit prepared time, we first check
* whether we have prepared the transaction or not but that doesn't seem
@@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Since we have already prepared the transaction, in a case where the
+ * server crashes before clearing the subskiplsn, it will be left but the
+ * transaction won't be resent. But that's okay because it's a rare case
+ * and the subskiplsn will be cleared when finishing the next transaction.
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ clear_subscription_skip_lsn(prepare_data.end_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s)
FinishPreparedTransaction(gid, false);
end_replication_step();
CommitTransactionCommand();
+
+ clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
}
pgstat_report_stat(false);
@@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Similar to prepare case, the subskiplsn could be left in a case of
+ * server crash but it's okay. See the comments in apply_handle_prepare().
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
MemoryContext oldcxt;
BufFile *fd;
+ maybe_start_skipping_changes(lsn);
+
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1455,9 +1503,27 @@ apply_handle_stream_commit(StringInfo s)
static void
apply_handle_commit_internal(LogicalRepCommitData *commit_data)
{
+ if (is_skipping_changes())
+ {
+ stop_skipping_changes();
+
+ /*
+ * Start a new transaction to clear the subskiplsn, if not started
+ * yet.
+ */
+ if (!IsTransactionState())
+ StartTransactionCommand();
+ }
+
if (IsTransactionState())
{
/*
+ * The transaction is either non-empty or skipped, so we clear the
+ * subskiplsn.
+ */
+ clear_subscription_skip_lsn(commit_data->commit_lsn);
+
+ /*
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
@@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
return;
begin_replication_step();
@@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s)
RangeTblEntry *target_rte;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
return;
begin_replication_step();
@@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
return;
begin_replication_step();
@@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s)
ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
return;
begin_replication_step();
@@ -3738,6 +3824,139 @@ IsLogicalWorker(void)
return MyLogicalRepWorker != NULL;
}
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr finish_lsn)
+{
+ Assert(!is_skipping_changes());
+ Assert(!in_remote_transaction);
+ Assert(!in_streamed_transaction);
+
+ /*
+ * Quick return if it's not requested to skip this transaction. This
+ * function is called for every remote transaction and we assume that
+ * skipping the transaction is not used often.
+ */
+ if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+ MySubscription->skiplsn != finish_lsn))
+ return;
+
+ /* Start skipping all changes of this transaction */
+ skip_xact_finish_lsn = finish_lsn;
+
+ ereport(LOG,
+ errmsg("start skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
+ */
+static void
+stop_skipping_changes(void)
+{
+ if (!is_skipping_changes())
+ return;
+
+ ereport(LOG,
+ (errmsg("done skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+
+ /* Stop skipping changes */
+ skip_xact_finish_lsn = InvalidXLogRecPtr;
+}
+
+/*
+ * Clear subskiplsn of pg_subscription catalog.
+ *
+ * finish_lsn is the transaction's finish LSN that is used to check if the
+ * subskiplsn matches it. If not matched, we raise a warning when clearing the
+ * subskiplsn in order to inform users for cases e.g., where the user mistakenly
+ * specified the wrong subskiplsn.
+ */
+static void
+clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
+{
+ Relation rel;
+ Form_pg_subscription subform;
+ HeapTuple tup;
+ XLogRecPtr myskiplsn = MySubscription->skiplsn;
+ bool started_tx = false;
+
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ return;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ /*
+ * Protect subskiplsn of pg_subscription from being concurrently updated
+ * while clearing it.
+ */
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+ AccessShareLock);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+ /* Fetch the existing tuple. */
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+ ObjectIdGetDatum(MySubscription->oid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ /*
+ * Clear the subskiplsn. If the user has already changed subskiplsn before
+ * clearing it we don't update the catalog and the replication origin
+ * state won't get advanced. So in the worst case, if the server crashes
+ * before sending an acknowledgment of the flush position the transaction
+ * will be sent again and the user needs to set subskiplsn again. We can
+ * reduce the possibility by logging a replication origin WAL record to
+ * advance the origin LSN instead but there is no way to advance the
+ * origin timestamp and it doesn't seem to be worth doing anything about
+ * it since it's a very rare case.
+ */
+ if (subform->subskiplsn == myskiplsn)
+ {
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* reset subskiplsn */
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ if (myskiplsn != finish_lsn)
+ ereport(WARNING,
+ errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+ LSN_FORMAT_ARGS(finish_lsn),
+ LSN_FORMAT_ARGS(myskiplsn)));
+ }
+
+ heap_freetuple(tup);
+ table_close(rel, NoLock);
+
+ if (started_tx)
+ CommitTransactionCommand();
+}
+
/* Error callback to give more context info about the change being applied */
static void
apply_error_callback(void *arg)