diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/README | 18 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 26 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 35 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 85 |
4 files changed, 118 insertions, 46 deletions
diff --git a/src/backend/replication/README b/src/backend/replication/README index 8e5bf0d2b8a..419a2d74d73 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint) Establish connection to the primary, and starts streaming from 'startpoint'. Returns true on success. -bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) - -Retrieve any message available through the connection, blocking for -maximum of 'timeout' ms. If a message was successfully read, returns true, -otherwise false. On success, a pointer to the message payload is stored in -*buffer, length in *len, and the type of message received in *type. The -returned buffer is valid until the next call to walrcv_* functions, the -caller should not attempt freeing it. +int walrcv_receive(char **buffer, int *wait_fd) + +Retrieve any message available without blocking through the +connection. If a message was successfully read, returns its +length. If the connection is closed, returns -1. Otherwise returns 0 +to indicate that no data is available, and sets *wait_fd to a file +descriptor which can be waited on before trying again. On success, a +pointer to the message payload is stored in *buffer. The returned +buffer is valid until the next call to walrcv_* functions, and the +caller should not attempt to free it. void walrcv_send(const char *buffer, int nbytes) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 4ee4d7106da..a3bec498fa0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); -static int libpqrcv_receive(int timeout, char **buffer); +static int libpqrcv_receive(char **buffer, int *wait_fd); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -463,8 +463,7 @@ libpqrcv_disconnect(void) } /* - * Receive a message available from XLOG stream, blocking for - * maximum of 'timeout' ms. + * Receive a message available from XLOG stream. * * Returns: * @@ -472,15 +471,15 @@ libpqrcv_disconnect(void) * point to a buffer holding the received message. The buffer is only valid * until the next libpqrcv_* call. * - * 0 if no data was available within timeout, or wait was interrupted - * by signal. + * If no data was available immediately, returns 0, and *wait_fd is set to a + * file descriptor which can be waited on before trying again. * * -1 if the server ended the COPY. * * ereports on error. */ static int -libpqrcv_receive(int timeout, char **buffer) +libpqrcv_receive(char **buffer, int *wait_fd) { int rawlen; @@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer) rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) { - /* - * No data available yet. If the caller requested to block, wait for - * more data to arrive. - */ - if (timeout > 0) - { - if (!libpq_select(timeout)) - return 0; - } - + /* Try consuming some data. */ if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", @@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer) /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) + { + /* Tell caller to try again when our socket is ready. */ + *wait_fd = PQsocket(streamConn); return 0; + } } if (rawlen == -1) /* end-of-streaming or error */ { diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 92faf4e8e0d..2da9cba5dc7 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to the wait queue. During SyncRepWakeQueue() a WALSender changes * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. * This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN + * represents a commit record. If it doesn't, then we wait only for the WAL + * to be flushed if synchronous_commit is set to the higher level of + * remote_apply, because only commit records provide apply feedback. */ void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode; + + /* Cap the level for anything other than commit to remote flush only. */ + if (commit) + mode = SyncRepWaitMode; + else + mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); /* * Fast exit if user has not requested sync replication, or there are no @@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XactCommitLSN <= WalSndCtl->lsn[mode]) + lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; @@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ - MyProc->waitLSN = XactCommitLSN; + MyProc->waitLSN = lsn; MyProc->syncRepState = SYNC_REP_WAITING; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); @@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status = (char *) palloc(len + 32 + 1); memcpy(new_status, old_status, len); sprintf(new_status + len, " waiting for %X/%X", - (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); + (uint32) (lsn >> 32), (uint32) lsn); set_ps_display(new_status, false); new_status[len] = '\0'; /* truncate off " waiting ..." */ } @@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02faa9..057c250793d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -352,8 +352,6 @@ WalReceiverMain(void) if (walrcv_startstreaming(startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { - bool endofwal = false; - if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", @@ -376,18 +374,13 @@ WalReceiverMain(void) ping_sent = false; /* Loop until end-of-streaming or error */ - while (!endofwal) + for (;;) { char *buf; int len; - - /* - * Emergency bailout if postmaster has died. This is to avoid - * the necessity for manual cleanup of all postmaster - * children. - */ - if (!PostmasterIsAlive()) - exit(1); + bool endofwal = false; + int wait_fd = PGINVALID_SOCKET; + int rc; /* * Exit walreceiver if we're not in recovery. This should not @@ -407,8 +400,8 @@ WalReceiverMain(void) XLogWalRcvSendHSFeedback(true); } - /* Wait a while for data to arrive */ - len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + /* See if we can read data immediately */ + len = walrcv_receive(&buf, &wait_fd); if (len != 0) { /* @@ -439,7 +432,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(0, &buf); + len = walrcv_receive(&buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -452,7 +445,54 @@ WalReceiverMain(void) */ XLogWalRcvFlush(false); } - else + + /* Check if we need to exit the streaming loop. */ + if (endofwal) + break; + + /* + * Ideally we would reuse a WaitEventSet object repeatedly + * here to avoid the overheads of WaitLatchOrSocket on epoll + * systems, but we can't be sure that libpq (or any other + * walreceiver implementation) has the same socket (even if + * the fd is the same number, it may have been closed and + * reopened since the last time). In future, if there is a + * function for removing sockets from WaitEventSet, then we + * could add and remove just the socket each time, potentially + * avoiding some system calls. + */ + Assert(wait_fd != PGINVALID_SOCKET); + rc = WaitLatchOrSocket(&walrcv->latch, + WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_LATCH_SET, + wait_fd, + NAPTIME_PER_CYCLE); + if (rc & WL_LATCH_SET) + { + ResetLatch(&walrcv->latch); + if (walrcv->force_reply) + { + /* + * The recovery process has asked us to send apply + * feedback now. Make sure the flag is really set to + * false in shared memory before sending the reply, + * so we don't miss a new request for a reply. + */ + walrcv->force_reply = false; + pg_memory_barrier(); + XLogWalRcvSendReply(true, false); + } + } + if (rc & WL_POSTMASTER_DEATH) + { + /* + * Emergency bailout if postmaster has died. This is to + * avoid the necessity for manual cleanup of all + * postmaster children. + */ + exit(1); + } + if (rc & WL_TIMEOUT) { /* * We didn't receive anything new. If we haven't heard @@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver main loop. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvForceReply(void) +{ + WalRcv->force_reply = true; + SetLatch(&WalRcv->latch); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ |