aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/README18
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c26
-rw-r--r--src/backend/replication/syncrep.c35
-rw-r--r--src/backend/replication/walreceiver.c85
4 files changed, 118 insertions, 46 deletions
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 8e5bf0d2b8a..419a2d74d73 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
Establish connection to the primary, and starts streaming from 'startpoint'.
Returns true on success.
-bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
-
-Retrieve any message available through the connection, blocking for
-maximum of 'timeout' ms. If a message was successfully read, returns true,
-otherwise false. On success, a pointer to the message payload is stored in
-*buffer, length in *len, and the type of message received in *type. The
-returned buffer is valid until the next call to walrcv_* functions, the
-caller should not attempt freeing it.
+int walrcv_receive(char **buffer, int *wait_fd)
+
+Retrieve any message available without blocking through the
+connection. If a message was successfully read, returns its
+length. If the connection is closed, returns -1. Otherwise returns 0
+to indicate that no data is available, and sets *wait_fd to a file
+descriptor which can be waited on before trying again. On success, a
+pointer to the message payload is stored in *buffer. The returned
+buffer is valid until the next call to walrcv_* functions, and the
+caller should not attempt to free it.
void walrcv_send(const char *buffer, int nbytes)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 4ee4d7106da..a3bec498fa0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, int *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -463,8 +463,7 @@ libpqrcv_disconnect(void)
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
@@ -472,15 +471,15 @@ libpqrcv_disconnect(void)
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * file descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, int *wait_fd)
{
int rawlen;
@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
@@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 92faf4e8e0d..2da9cba5dc7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
+ *
+ * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
+ * represents a commit record. If it doesn't, then we wait only for the WAL
+ * to be flushed if synchronous_commit is set to the higher level of
+ * remote_apply, because only commit records provide apply feedback.
*/
void
-SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char *new_status = NULL;
const char *old_status;
- int mode = SyncRepWaitMode;
+ int mode;
+
+ /* Cap the level for anything other than commit to remote flush only. */
+ if (commit)
+ mode = SyncRepWaitMode;
+ else
+ mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/*
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XactCommitLSN <= WalSndCtl->lsn[mode])
+ lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
- MyProc->waitLSN = XactCommitLSN;
+ MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
- (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
+ (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
+ int numapply = 0;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+ }
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+ numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+ numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/*
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7b36e02faa9..057c250793d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -352,8 +352,6 @@ WalReceiverMain(void)
if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
- bool endofwal = false;
-
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
@@ -376,18 +374,13 @@ WalReceiverMain(void)
ping_sent = false;
/* Loop until end-of-streaming or error */
- while (!endofwal)
+ for (;;)
{
char *buf;
int len;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid
- * the necessity for manual cleanup of all postmaster
- * children.
- */
- if (!PostmasterIsAlive())
- exit(1);
+ bool endofwal = false;
+ int wait_fd = PGINVALID_SOCKET;
+ int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -407,8 +400,8 @@ WalReceiverMain(void)
XLogWalRcvSendHSFeedback(true);
}
- /* Wait a while for data to arrive */
- len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ /* See if we can read data immediately */
+ len = walrcv_receive(&buf, &wait_fd);
if (len != 0)
{
/*
@@ -439,7 +432,7 @@ WalReceiverMain(void)
endofwal = true;
break;
}
- len = walrcv_receive(0, &buf);
+ len = walrcv_receive(&buf, &wait_fd);
}
/* Let the master know that we received some data. */
@@ -452,7 +445,54 @@ WalReceiverMain(void)
*/
XLogWalRcvFlush(false);
}
- else
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofwal)
+ break;
+
+ /*
+ * Ideally we would reuse a WaitEventSet object repeatedly
+ * here to avoid the overheads of WaitLatchOrSocket on epoll
+ * systems, but we can't be sure that libpq (or any other
+ * walreceiver implementation) has the same socket (even if
+ * the fd is the same number, it may have been closed and
+ * reopened since the last time). In future, if there is a
+ * function for removing sockets from WaitEventSet, then we
+ * could add and remove just the socket each time, potentially
+ * avoiding some system calls.
+ */
+ Assert(wait_fd != PGINVALID_SOCKET);
+ rc = WaitLatchOrSocket(&walrcv->latch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_LATCH_SET,
+ wait_fd,
+ NAPTIME_PER_CYCLE);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(&walrcv->latch);
+ if (walrcv->force_reply)
+ {
+ /*
+ * The recovery process has asked us to send apply
+ * feedback now. Make sure the flag is really set to
+ * false in shared memory before sending the reply,
+ * so we don't miss a new request for a reply.
+ */
+ walrcv->force_reply = false;
+ pg_memory_barrier();
+ XLogWalRcvSendReply(true, false);
+ }
+ }
+ if (rc & WL_POSTMASTER_DEATH)
+ {
+ /*
+ * Emergency bailout if postmaster has died. This is to
+ * avoid the necessity for manual cleanup of all
+ * postmaster children.
+ */
+ exit(1);
+ }
+ if (rc & WL_TIMEOUT)
{
/*
* We didn't receive anything new. If we haven't heard
@@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
/*
+ * Wake up the walreceiver main loop.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = remote_apply.
+ */
+void
+WalRcvForceReply(void)
+{
+ WalRcv->force_reply = true;
+ SetLatch(&WalRcv->latch);
+}
+
+/*
* Return a string constant representing the state. This is used
* in system functions and views, and should *not* be translated.
*/