aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/parallel.c4
-rw-r--r--src/backend/libpq/pqmq.c4
-rw-r--r--src/backend/postmaster/bgworker.c4
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c13
-rw-r--r--src/backend/replication/logical/launcher.c35
-rw-r--r--src/backend/replication/logical/tablesync.c12
-rw-r--r--src/backend/replication/logical/worker.c10
-rw-r--r--src/backend/storage/lmgr/condition_variable.c6
-rw-r--r--src/test/modules/worker_spi/worker_spi.c2
9 files changed, 56 insertions, 34 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index cb221742706..16a0bee6103 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
if (!anyone_alive)
break;
- WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1,
+ WaitLatch(MyLatch, WL_LATCH_SET, -1,
WAIT_EVENT_PARALLEL_FINISH);
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
}
if (pcxt->toc != NULL)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 96939327c38..8fbc03819d9 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
if (result != SHM_MQ_WOULD_BLOCK)
break;
- WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0,
+ WaitLatch(MyLatch, WL_LATCH_SET, 0,
WAIT_EVENT_MQ_PUT_MESSAGE);
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index c3454276bfa..712d700481d 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
if (status == BGWH_STOPPED)
break;
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
break;
}
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
}
return status;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 726d1b5bd81..89c34b82252 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
? WL_SOCKET_READABLE
: WL_SOCKET_WRITEABLE);
- rc = WaitLatchOrSocket(&MyProc->procLatch,
+ rc = WaitLatchOrSocket(MyLatch,
WL_POSTMASTER_DEATH |
WL_LATCH_SET | io_flag,
PQsocket(conn->streamConn),
@@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
/* Interrupted? */
if (rc & WL_LATCH_SET)
{
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
@@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
* the signal arrives in the middle of establishment of
* replication connection.
*/
- ResetLatch(&MyProc->procLatch);
- rc = WaitLatchOrSocket(&MyProc->procLatch,
+ rc = WaitLatchOrSocket(MyLatch,
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
WL_LATCH_SET,
PQsocket(streamConn),
0,
WAIT_EVENT_LIBPQWALRECEIVER);
+
+ /* Emergency bailout? */
if (rc & WL_POSTMASTER_DEATH)
exit(1);
- /* interrupted */
+ /* Interrupted? */
if (rc & WL_LATCH_SET)
{
+ ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
- continue;
}
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5aaf24bfe4f..5a3274b2c23 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
+ /* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(MyLatch);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
}
return;
@@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockRelease(LogicalRepWorkerLock);
- CHECK_FOR_INTERRUPTS();
-
/* Wait for signal. */
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
@@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(&MyProc->procLatch);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
/* Check worker status. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
/* Wait for more work. */
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(&MyProc->procLatch);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
}
}
@@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg)
}
/* Wait for more work. */
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
wait_time,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
-
- ResetLatch(&MyProc->procLatch);
}
LogicalRepCtx->launcher_pid = 0;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ed66602935f..6e55d2d6069 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
if (!worker)
return false;
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
@@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
}
return false;
@@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state)
if (MyLogicalRepWorker->relstate == expected_state)
return true;
- rc = WaitLatch(&MyProc->procLatch,
+ rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
@@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
}
return false;
@@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Wait for more data or latch.
*/
- rc = WaitLatchOrSocket(&MyProc->procLatch,
+ rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_POSTMASTER_DEATH,
fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
@@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
}
return bytesread;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 51a64487cd9..999d627c872 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/*
* Wait for more data or latch.
*/
- rc = WaitLatchOrSocket(&MyProc->procLatch,
+ rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_POSTMASTER_DEATH,
fd, NAPTIME_PER_CYCLE,
@@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
if (got_SIGHUP)
{
got_SIGHUP = false;
@@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
}
-
- ResetLatch(&MyProc->procLatch);
}
}
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 5afb21121b6..b4b7d28dd5d 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
{
cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
- &MyProc->procLatch, NULL);
+ MyLatch, NULL);
}
/*
* Reset my latch before adding myself to the queue and before entering
* the caller's predicate loop.
*/
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
/* Add myself to the wait queue. */
SpinLockAcquire(&cv->mutex);
@@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
/* Reset latch before testing whether we can return. */
- ResetLatch(&MyProc->procLatch);
+ ResetLatch(MyLatch);
/*
* If this process has been taken out of the wait list, then we know
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index 9abfc714a99..553baf00454 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
+ CHECK_FOR_INTERRUPTS();
+
/*
* In case of a SIGHUP, just reload the configuration.
*/