diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 1 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 2 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 73 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 233 |
5 files changed, 310 insertions, 8 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a6304f5f81a..0ff0982f7b2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->skiplsn = subform->subskiplsn; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bb1ac30cd19..bd48ee7bd25 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subdisableonerr, subslotname, + substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3922658bbca..e16f04626de 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -45,6 +45,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" /* @@ -62,6 +63,7 @@ #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 +#define SUBOPT_LSN 0x00000800 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +86,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + XLogRecPtr lsn; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_LSN) && + strcmp(defel->defname, "lsn") == 0) + { + char *lsn_str = defGetString(defel); + XLogRecPtr lsn; + + if (IsSet(opts->specified_opts, SUBOPT_LSN)) + errorConflictingDefElem(defel, pstate); + + /* Setting lsn = NONE is treated as resetting LSN */ + if (strcmp(lsn_str, "none") == 0) + lsn = InvalidXLogRecPtr; + else + { + /* Parse the argument as LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(lsn_str))); + + if (XLogRecPtrIsInvalid(lsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid WAL location (LSN): %s", lsn_str))); + } + + opts->specified_opts |= SUBOPT_LSN; + opts->lsn = lsn; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SKIP: + { + parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); + + /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */ + Assert(IsSet(opts.specified_opts, SUBOPT_LSN)); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + /* + * If the user sets subskiplsn, we do a sanity check to make + * sure that the specified LSN is a probable value. + */ + if (!XLogRecPtrIsInvalid(opts.lsn)) + { + RepOriginId originid; + char originname[NAMEDATALEN]; + XLogRecPtr remote_lsn; + + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, false); + remote_lsn = replorigin_get_progress(originid, false); + + /* Check the given LSN is at least a future LSN */ + if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X", + LSN_FORMAT_ARGS(opts.lsn), + LSN_FORMAT_ARGS(remote_lsn)))); + } + + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + update_tuple = true; + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a03b33b53bd..0036c2f9e2d 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9983,6 +9983,15 @@ AlterSubscriptionStmt: (Node *)makeBoolean(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name SKIP definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SKIP; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 03e069c7cdd..82dcffc2db8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -189,6 +190,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/syscache.h" @@ -259,6 +261,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for + * the subscription if the remote transaction's finish LSN matches the subskiplsn. + * Once we start skipping changes, we don't stop it until we skip all changes of + * the transaction even if pg_subscription is updated and MySubscription->skiplsn + * gets changed or reset during that. Also, in streaming transaction cases, we + * don't skip receiving and spooling the changes since we decide whether or not + * to skip applying the changes when starting to apply changes. The subskiplsn is + * cleared after successfully skipping the transaction or applying non-empty + * transaction. The latter prevents the mistakenly specified subskiplsn from + * being left. + */ +static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; +#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); +static void stop_skipping_changes(void); +static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + /* Functions for apply error callback */ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); @@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + maybe_start_skipping_changes(begin_data.final_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + maybe_start_skipping_changes(begin_data.prepare_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + clear_subscription_skip_lsn(prepare_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + + clear_subscription_skip_lsn(rollback_data.rollback_end_lsn); } pgstat_report_stat(false); @@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Similar to prepare case, the subskiplsn could be left in a case of + * server crash but it's okay. See the comments in apply_handle_prepare(). + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) MemoryContext oldcxt; BufFile *fd; + maybe_start_skipping_changes(lsn); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1455,9 +1503,27 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(LogicalRepCommitData *commit_data) { + if (is_skipping_changes()) + { + stop_skipping_changes(); + + /* + * Start a new transaction to clear the subskiplsn, if not started + * yet. + */ + if (!IsTransactionState()) + StartTransactionCommand(); + } + if (IsTransactionState()) { /* + * The transaction is either non-empty or skipped, so we clear the + * subskiplsn. + */ + clear_subscription_skip_lsn(commit_data->commit_lsn); + + /* * Update origin state so we can restart streaming from correct * position in case of crash. */ @@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3738,6 +3824,139 @@ IsLogicalWorker(void) return MyLogicalRepWorker != NULL; } +/* + * Start skipping changes of the transaction if the given LSN matches the + * LSN specified by subscription's skiplsn. + */ +static void +maybe_start_skipping_changes(XLogRecPtr finish_lsn) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || + MySubscription->skiplsn != finish_lsn)) + return; + + /* Start skipping all changes of this transaction */ + skip_xact_finish_lsn = finish_lsn; + + ereport(LOG, + errmsg("start skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn))); +} + +/* + * Stop skipping changes by resetting skip_xact_finish_lsn if enabled. + */ +static void +stop_skipping_changes(void) +{ + if (!is_skipping_changes()) + return; + + ereport(LOG, + (errmsg("done skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn)))); + + /* Stop skipping changes */ + skip_xact_finish_lsn = InvalidXLogRecPtr; +} + +/* + * Clear subskiplsn of pg_subscription catalog. + * + * finish_lsn is the transaction's finish LSN that is used to check if the + * subskiplsn matches it. If not matched, we raise a warning when clearing the + * subskiplsn in order to inform users for cases e.g., where the user mistakenly + * specified the wrong subskiplsn. + */ +static void +clear_subscription_skip_lsn(XLogRecPtr finish_lsn) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + XLogRecPtr myskiplsn = MySubscription->skiplsn; + bool started_tx = false; + + if (likely(XLogRecPtrIsInvalid(myskiplsn))) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Protect subskiplsn of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* + * Clear the subskiplsn. If the user has already changed subskiplsn before + * clearing it we don't update the catalog and the replication origin + * state won't get advanced. So in the worst case, if the server crashes + * before sending an acknowledgment of the flush position the transaction + * will be sent again and the user needs to set subskiplsn again. We can + * reduce the possibility by logging a replication origin WAL record to + * advance the origin LSN instead but there is no way to advance the + * origin timestamp and it doesn't seem to be worth doing anything about + * it since it's a very rare case. + */ + if (subform->subskiplsn == myskiplsn) + { + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subskiplsn */ + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + if (myskiplsn != finish_lsn) + ereport(WARNING, + errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name), + errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X", + LSN_FORMAT_ARGS(finish_lsn), + LSN_FORMAT_ARGS(myskiplsn))); + } + + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} + /* Error callback to give more context info about the change being applied */ static void apply_error_callback(void *arg) |