diff options
author | Thomas Munro <tmunro@postgresql.org> | 2021-03-12 19:07:27 +1300 |
---|---|---|
committer | Thomas Munro <tmunro@postgresql.org> | 2021-03-12 19:45:42 +1300 |
commit | de829ddf23f69190efb4e0178704c4c4228e17cd (patch) | |
tree | daf6a6291626971ffa700db0b2b2d1e4b457eaab /src | |
parent | 600f2f50b7a57c8481276450c9019fa7b3656411 (diff) | |
download | postgresql-de829ddf23f69190efb4e0178704c4c4228e17cd.tar.gz postgresql-de829ddf23f69190efb4e0178704c4c4228e17cd.zip |
Add condition variable for walreceiver shutdown.
Use this new CV to wait for walreceiver shutdown without a sleep/poll
loop, while also benefiting from standard postmaster death handling.
Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 3 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 3 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 41 | ||||
-rw-r--r-- | src/include/pgstat.h | 1 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 2 |
5 files changed, 37 insertions, 13 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 68eefb97227..b1e2d94951d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_WALRCV_EXIT: + event_name = "WalrcvExit"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e5f8a06fea0..8532296f26c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -207,6 +207,7 @@ WalReceiverMain(void) case WALRCV_STOPPED: SpinLockRelease(&walrcv->mutex); + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); proc_exit(1); break; @@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg) walrcv->latch = NULL; SpinLockRelease(&walrcv->mutex); + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + /* Terminate the connection gracefully. */ if (wrconn != NULL) walrcv_disconnect(wrconn); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 63e60478ea6..fff6c54c45d 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -23,6 +23,7 @@ #include <signal.h> #include "access/xlog_internal.h" +#include "pgstat.h" #include "postmaster/startup.h" #include "replication/walreceiver.h" #include "storage/pmsignal.h" @@ -62,6 +63,7 @@ WalRcvShmemInit(void) /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; + ConditionVariableInit(&WalRcv->walRcvStoppedCV); SpinLockInit(&WalRcv->mutex); pg_atomic_init_u64(&WalRcv->writtenUpto, 0); WalRcv->latch = NULL; @@ -95,12 +97,18 @@ WalRcvRunning(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - SpinLockAcquire(&walrcv->mutex); + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -140,12 +148,18 @@ WalRcvStreaming(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - SpinLockAcquire(&walrcv->mutex); + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -165,6 +179,7 @@ ShutdownWalRcv(void) { WalRcvData *walrcv = WalRcv; pid_t walrcvpid = 0; + bool stopped = false; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -178,6 +193,7 @@ ShutdownWalRcv(void) break; case WALRCV_STARTING: walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; break; case WALRCV_STREAMING: @@ -191,6 +207,10 @@ ShutdownWalRcv(void) } SpinLockRelease(&walrcv->mutex); + /* Unnecessary but consistent. */ + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + /* * Signal walreceiver process if it was still running. */ @@ -201,16 +221,11 @@ ShutdownWalRcv(void) * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ + ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV); while (WalRcvRunning()) - { - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000); /* 100ms */ - } + ConditionVariableSleep(&walrcv->walRcvStoppedCV, + WAIT_EVENT_WALRCV_EXIT); + ConditionVariableCancelSleep(); } /* diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f9166b86558..be43c048028 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1009,6 +1009,7 @@ typedef enum WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, + WAIT_EVENT_WALRCV_EXIT, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index a97a59a6a30..4fd7c25ea74 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -19,6 +19,7 @@ #include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" +#include "storage/condition_variable.h" #include "storage/latch.h" #include "storage/spin.h" #include "utils/tuplestore.h" @@ -62,6 +63,7 @@ typedef struct */ pid_t pid; WalRcvState walRcvState; + ConditionVariable walRcvStoppedCV; pg_time_t startTime; /* |