diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 85 |
1 files changed, 70 insertions, 15 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02faa9..057c250793d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -352,8 +352,6 @@ WalReceiverMain(void) if (walrcv_startstreaming(startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { - bool endofwal = false; - if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", @@ -376,18 +374,13 @@ WalReceiverMain(void) ping_sent = false; /* Loop until end-of-streaming or error */ - while (!endofwal) + for (;;) { char *buf; int len; - - /* - * Emergency bailout if postmaster has died. This is to avoid - * the necessity for manual cleanup of all postmaster - * children. - */ - if (!PostmasterIsAlive()) - exit(1); + bool endofwal = false; + int wait_fd = PGINVALID_SOCKET; + int rc; /* * Exit walreceiver if we're not in recovery. This should not @@ -407,8 +400,8 @@ WalReceiverMain(void) XLogWalRcvSendHSFeedback(true); } - /* Wait a while for data to arrive */ - len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + /* See if we can read data immediately */ + len = walrcv_receive(&buf, &wait_fd); if (len != 0) { /* @@ -439,7 +432,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(0, &buf); + len = walrcv_receive(&buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -452,7 +445,54 @@ WalReceiverMain(void) */ XLogWalRcvFlush(false); } - else + + /* Check if we need to exit the streaming loop. */ + if (endofwal) + break; + + /* + * Ideally we would reuse a WaitEventSet object repeatedly + * here to avoid the overheads of WaitLatchOrSocket on epoll + * systems, but we can't be sure that libpq (or any other + * walreceiver implementation) has the same socket (even if + * the fd is the same number, it may have been closed and + * reopened since the last time). In future, if there is a + * function for removing sockets from WaitEventSet, then we + * could add and remove just the socket each time, potentially + * avoiding some system calls. + */ + Assert(wait_fd != PGINVALID_SOCKET); + rc = WaitLatchOrSocket(&walrcv->latch, + WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_LATCH_SET, + wait_fd, + NAPTIME_PER_CYCLE); + if (rc & WL_LATCH_SET) + { + ResetLatch(&walrcv->latch); + if (walrcv->force_reply) + { + /* + * The recovery process has asked us to send apply + * feedback now. Make sure the flag is really set to + * false in shared memory before sending the reply, + * so we don't miss a new request for a reply. + */ + walrcv->force_reply = false; + pg_memory_barrier(); + XLogWalRcvSendReply(true, false); + } + } + if (rc & WL_POSTMASTER_DEATH) + { + /* + * Emergency bailout if postmaster has died. This is to + * avoid the necessity for manual cleanup of all + * postmaster children. + */ + exit(1); + } + if (rc & WL_TIMEOUT) { /* * We didn't receive anything new. If we haven't heard @@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver main loop. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvForceReply(void) +{ + WalRcv->force_reply = true; + SetLatch(&WalRcv->latch); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ |