aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-08-21 09:22:32 +0530
committerAmit Kapila <akapila@postgresql.org>2024-08-21 09:22:32 +0530
commit3f28b2fcac33fb352d261fac298cfe68c3899d32 (patch)
tree9c0bf70251848546b145cb070a701b01fed0a02f /src/backend/replication/logical/worker.c
parenta95ff1fe2eb4926b13e0940ad1f37d048704bdb0 (diff)
downloadpostgresql-3f28b2fcac33fb352d261fac298cfe68c3899d32.tar.gz
postgresql-3f28b2fcac33fb352d261fac298cfe68c3899d32.zip
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming and application of ROLLBACK in parallel streaming mode. But the origin shouldn't be advanced during an error or unsuccessful apply due to shutdown. Otherwise, it will result in a transaction loss as such a transaction won't be sent again by the server. Reported-by: Hou Zhijie Author: Hayato Kuroda and Shveta Malik Reviewed-by: Amit Kapila Backpatch-through: 16 Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c35
1 files changed, 35 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cdea6295d8a..38c28953078 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
CommitTransactionCommand();
}
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
/* Common function to setup the leader apply or tablesync worker. */
void
SetupApplyOrSyncWorker(int worker_slot)
@@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
InitializeLogRepWorker();
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an in-complete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
@@ -4967,12 +4991,23 @@ void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+ int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
+ elevel = geterrlevel();
+
+ /*
+ * Reset the origin state to prevent the advancement of origin progress if
+ * we fail to apply. Otherwise, this will result in transaction loss as
+ * that transaction won't be sent again by the server.
+ */
+ if (elevel >= ERROR)
+ replorigin_reset(0, (Datum) 0);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))