diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-08-21 09:22:32 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-08-21 09:22:32 +0530 |
commit | 3f28b2fcac33fb352d261fac298cfe68c3899d32 (patch) | |
tree | 9c0bf70251848546b145cb070a701b01fed0a02f /src/backend/replication/logical/worker.c | |
parent | a95ff1fe2eb4926b13e0940ad1f37d048704bdb0 (diff) | |
download | postgresql-3f28b2fcac33fb352d261fac298cfe68c3899d32.tar.gz postgresql-3f28b2fcac33fb352d261fac298cfe68c3899d32.zip |
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming and
application of ROLLBACK in parallel streaming mode. But the origin
shouldn't be advanced during an error or unsuccessful apply due to
shutdown. Otherwise, it will result in a transaction loss as such a
transaction won't be sent again by the server.
Reported-by: Hou Zhijie
Author: Hayato Kuroda and Shveta Malik
Reviewed-by: Amit Kapila
Backpatch-through: 16
Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cdea6295d8a..38c28953078 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + /* Common function to setup the leader apply or tablesync worker. */ void SetupApplyOrSyncWorker(int worker_slot) @@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -4967,12 +4991,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) |