aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c51
1 files changed, 34 insertions, 17 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..c5fb627aa56 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,13 +109,6 @@
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
- * We don't allow to toggle two_phase option of a subscription because it can
- * lead to an inconsistent replica. Consider, initially, it was on and we have
- * received some prepare then we turn it off, now at commit time the server
- * will send the entire transaction data along with the commit. With some more
- * analysis, we can allow changing this option from off to on but not sure if
- * that alone would be useful.
- *
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
@@ -1023,7 +1016,7 @@ apply_handle_commit(StringInfo s)
if (commit_data.commit_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -1115,7 +1108,7 @@ apply_handle_prepare(StringInfo s)
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -3910,7 +3903,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
@@ -4626,8 +4619,16 @@ run_apply_worker()
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so
+ * ensure we have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ PopActiveSnapshot();
CommitTransactionCommand();
}
else
@@ -4843,7 +4844,15 @@ DisableSubscriptionAndExit(void)
/* Disable the subscription */
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
DisableSubscription(MySubscription->oid);
+ PopActiveSnapshot();
CommitTransactionCommand();
/* Ensure we remove no-longer-useful entry for worker's start time */
@@ -4900,7 +4909,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
skip_xact_finish_lsn = finish_lsn;
ereport(LOG,
- errmsg("logical replication starts skipping transaction at LSN %X/%X",
+ errmsg("logical replication starts skipping transaction at LSN %X/%08X",
LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
}
@@ -4914,8 +4923,8 @@ stop_skipping_changes(void)
return;
ereport(LOG,
- (errmsg("logical replication completed skipping transaction at LSN %X/%X",
- LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+ errmsg("logical replication completed skipping transaction at LSN %X/%08X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
/* Stop skipping changes */
skip_xact_finish_lsn = InvalidXLogRecPtr;
@@ -4948,6 +4957,12 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /*
* Protect subskiplsn of pg_subscription from being concurrently updated
* while clearing it.
*/
@@ -4997,7 +5012,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
if (myskiplsn != finish_lsn)
ereport(WARNING,
errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
- errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
LSN_FORMAT_ARGS(finish_lsn),
LSN_FORMAT_ARGS(myskiplsn)));
}
@@ -5005,6 +5020,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
heap_freetuple(tup);
table_close(rel, NoLock);
+ PopActiveSnapshot();
+
if (started_tx)
CommitTransactionCommand();
}
@@ -5032,7 +5049,7 @@ apply_error_callback(void *arg)
logicalrep_message_type(errarg->command),
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
@@ -5050,7 +5067,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.relname,
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
@@ -5069,7 +5086,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,