aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_subscription.c1
-rw-r--r--src/backend/catalog/system_views.sql2
-rw-r--r--src/backend/commands/subscriptioncmds.c73
-rw-r--r--src/backend/parser/gram.y9
-rw-r--r--src/backend/replication/logical/worker.c233
5 files changed, 310 insertions, 8 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a6304f5f81a..0ff0982f7b2 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
+ sub->skiplsn = subform->subskiplsn;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bb1ac30cd19..bd48ee7bd25 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subtwophasestate, subdisableonerr, subslotname,
+ substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
subsynccommit, subpublications)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3922658bbca..e16f04626de 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -45,6 +45,7 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/syscache.h"
/*
@@ -62,6 +63,7 @@
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
+#define SUBOPT_LSN 0x00000800
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
bool streaming;
bool twophase;
bool disableonerr;
+ XLogRecPtr lsn;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_LSN) &&
+ strcmp(defel->defname, "lsn") == 0)
+ {
+ char *lsn_str = defGetString(defel);
+ XLogRecPtr lsn;
+
+ if (IsSet(opts->specified_opts, SUBOPT_LSN))
+ errorConflictingDefElem(defel, pstate);
+
+ /* Setting lsn = NONE is treated as resetting LSN */
+ if (strcmp(lsn_str, "none") == 0)
+ lsn = InvalidXLogRecPtr;
+ else
+ {
+ /* Parse the argument as LSN */
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(lsn_str)));
+
+ if (XLogRecPtrIsInvalid(lsn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid WAL location (LSN): %s", lsn_str)));
+ }
+
+ opts->specified_opts |= SUBOPT_LSN;
+ opts->lsn = lsn;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_SKIP:
+ {
+ parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
+
+ /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
+ Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
+
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to skip transaction")));
+
+ /*
+ * If the user sets subskiplsn, we do a sanity check to make
+ * sure that the specified LSN is a probable value.
+ */
+ if (!XLogRecPtrIsInvalid(opts.lsn))
+ {
+ RepOriginId originid;
+ char originname[NAMEDATALEN];
+ XLogRecPtr remote_lsn;
+
+ snprintf(originname, sizeof(originname), "pg_%u", subid);
+ originid = replorigin_by_name(originname, false);
+ remote_lsn = replorigin_get_progress(originid, false);
+
+ /* Check the given LSN is at least a future LSN */
+ if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
+ LSN_FORMAT_ARGS(opts.lsn),
+ LSN_FORMAT_ARGS(remote_lsn))));
+ }
+
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ update_tuple = true;
+ break;
+ }
+
default:
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a03b33b53bd..0036c2f9e2d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
(Node *)makeBoolean(false), @1));
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name SKIP definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_SKIP;
+ n->subname = $3;
+ n->options = $5;
+ $$ = (Node *)n;
+ }
;
/*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cdd..82dcffc2db8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
+#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
@@ -189,6 +190,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
@@ -259,6 +261,21 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's finish LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that. Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes since we decide whether or not
+ * to skip applying the changes when starting to apply changes. The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction. The latter prevents the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
/* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
+static void stop_skipping_changes(void);
+static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
+
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
@@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s)
remote_final_lsn = begin_data.final_lsn;
+ maybe_start_skipping_changes(begin_data.final_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s)
remote_final_lsn = begin_data.prepare_lsn;
+ maybe_start_skipping_changes(begin_data.prepare_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s)
/*
* Unlike commit, here, we always prepare the transaction even though no
- * change has happened in this transaction. It is done this way because at
- * commit prepared time, we won't know whether we have skipped preparing a
- * transaction because of no change.
+ * change has happened in this transaction or all changes are skipped. It
+ * is done this way because at commit prepared time, we won't know whether
+ * we have skipped preparing a transaction because of those reasons.
*
* XXX, We can optimize such that at commit prepared time, we first check
* whether we have prepared the transaction or not but that doesn't seem
@@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Since we have already prepared the transaction, in a case where the
+ * server crashes before clearing the subskiplsn, it will be left but the
+ * transaction won't be resent. But that's okay because it's a rare case
+ * and the subskiplsn will be cleared when finishing the next transaction.
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ clear_subscription_skip_lsn(prepare_data.end_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s)
FinishPreparedTransaction(gid, false);
end_replication_step();
CommitTransactionCommand();
+
+ clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
}
pgstat_report_stat(false);
@@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Similar to prepare case, the subskiplsn could be left in a case of
+ * server crash but it's okay. See the comments in apply_handle_prepare().
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
MemoryContext oldcxt;
BufFile *fd;
+ maybe_start_skipping_changes(lsn);
+
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1455,9 +1503,27 @@ apply_handle_stream_commit(StringInfo s)
static void
apply_handle_commit_internal(LogicalRepCommitData *commit_data)
{
+ if (is_skipping_changes())
+ {
+ stop_skipping_changes();
+
+ /*
+ * Start a new transaction to clear the subskiplsn, if not started
+ * yet.
+ */
+ if (!IsTransactionState())
+ StartTransactionCommand();
+ }
+
if (IsTransactionState())
{
/*
+ * The transaction is either non-empty or skipped, so we clear the
+ * subskiplsn.
+ */
+ clear_subscription_skip_lsn(commit_data->commit_lsn);
+
+ /*
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
@@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
return;
begin_replication_step();
@@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s)
RangeTblEntry *target_rte;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
return;
begin_replication_step();
@@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
return;
begin_replication_step();
@@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s)
ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
return;
begin_replication_step();
@@ -3738,6 +3824,139 @@ IsLogicalWorker(void)
return MyLogicalRepWorker != NULL;
}
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr finish_lsn)
+{
+ Assert(!is_skipping_changes());
+ Assert(!in_remote_transaction);
+ Assert(!in_streamed_transaction);
+
+ /*
+ * Quick return if it's not requested to skip this transaction. This
+ * function is called for every remote transaction and we assume that
+ * skipping the transaction is not used often.
+ */
+ if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+ MySubscription->skiplsn != finish_lsn))
+ return;
+
+ /* Start skipping all changes of this transaction */
+ skip_xact_finish_lsn = finish_lsn;
+
+ ereport(LOG,
+ errmsg("start skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
+ */
+static void
+stop_skipping_changes(void)
+{
+ if (!is_skipping_changes())
+ return;
+
+ ereport(LOG,
+ (errmsg("done skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+
+ /* Stop skipping changes */
+ skip_xact_finish_lsn = InvalidXLogRecPtr;
+}
+
+/*
+ * Clear subskiplsn of pg_subscription catalog.
+ *
+ * finish_lsn is the transaction's finish LSN that is used to check if the
+ * subskiplsn matches it. If not matched, we raise a warning when clearing the
+ * subskiplsn in order to inform users for cases e.g., where the user mistakenly
+ * specified the wrong subskiplsn.
+ */
+static void
+clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
+{
+ Relation rel;
+ Form_pg_subscription subform;
+ HeapTuple tup;
+ XLogRecPtr myskiplsn = MySubscription->skiplsn;
+ bool started_tx = false;
+
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ return;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ /*
+ * Protect subskiplsn of pg_subscription from being concurrently updated
+ * while clearing it.
+ */
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+ AccessShareLock);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+ /* Fetch the existing tuple. */
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+ ObjectIdGetDatum(MySubscription->oid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ /*
+ * Clear the subskiplsn. If the user has already changed subskiplsn before
+ * clearing it we don't update the catalog and the replication origin
+ * state won't get advanced. So in the worst case, if the server crashes
+ * before sending an acknowledgment of the flush position the transaction
+ * will be sent again and the user needs to set subskiplsn again. We can
+ * reduce the possibility by logging a replication origin WAL record to
+ * advance the origin LSN instead but there is no way to advance the
+ * origin timestamp and it doesn't seem to be worth doing anything about
+ * it since it's a very rare case.
+ */
+ if (subform->subskiplsn == myskiplsn)
+ {
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* reset subskiplsn */
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ if (myskiplsn != finish_lsn)
+ ereport(WARNING,
+ errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+ LSN_FORMAT_ARGS(finish_lsn),
+ LSN_FORMAT_ARGS(myskiplsn)));
+ }
+
+ heap_freetuple(tup);
+ table_close(rel, NoLock);
+
+ if (started_tx)
+ CommitTransactionCommand();
+}
+
/* Error callback to give more context info about the change being applied */
static void
apply_error_callback(void *arg)