aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-03-29 21:16:12 -0400
committerRobert Haas <rhaas@postgresql.org>2016-03-29 21:29:49 -0400
commit314cbfc5da988eff8998655158f84c9815ecfbcd (patch)
tree94415b7fed00b4a70f337e403c1d5fe7e811ec70 /src/backend
parenta898b409f66f956e99694710f537829db02652c0 (diff)
downloadpostgresql-314cbfc5da988eff8998655158f84c9815ecfbcd.tar.gz
postgresql-314cbfc5da988eff8998655158f84c9815ecfbcd.zip
Add new replication mode synchronous_commit = 'remote_apply'.
In this mode, the master waits for the transaction to be applied on the remote side, not just written to disk. That means that you can count on a transaction started on the standby to see all commits previously acknowledged by the master. To make this work, the standby sends a reply after replaying each commit record generated with synchronous_commit >= 'remote_apply'. This introduces a small inefficiency: the extra replies will be sent even by standbys that aren't the current synchronous standby. But previously-existing synchronous_commit levels make no attempt at all to optimize which replies are sent based on what the primary cares about, so this is no worse, and at least avoids any extra replies for people not using the feature at all. Thomas Munro, reviewed by Michael Paquier and by me. Some additional tweaks by me.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/twophase.c6
-rw-r--r--src/backend/access/transam/xact.c16
-rw-r--r--src/backend/access/transam/xlog.c23
-rw-r--r--src/backend/replication/README18
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c26
-rw-r--r--src/backend/replication/syncrep.c35
-rw-r--r--src/backend/replication/walreceiver.c85
-rw-r--r--src/backend/utils/misc/guc.c5
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
9 files changed, 163 insertions, 53 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e7234c87bbc..a65048b683b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_end_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
*/
- SyncRepWaitForLSN(recptr);
+ SyncRepWaitForLSN(recptr, true);
}
/*
@@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
*/
- SyncRepWaitForLSN(recptr);
+ SyncRepWaitForLSN(recptr, false);
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e31540548e0..7e373316139 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1324,7 +1324,7 @@ RecordTransactionCommit(void)
* in the procarray and continue to hold locks.
*/
if (wrote_xlog && markXidCommitted)
- SyncRepWaitForLSN(XactLastRecEnd);
+ SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */
XactLastCommitEnd = XactLastRecEnd;
@@ -5123,6 +5123,13 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
/*
+ * Check if the caller would like to ask standbys for immediate feedback
+ * once this commit is applied.
+ */
+ if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+ xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
+
+ /*
* Relcache invalidations requires information about the current database
* and so does logical decoding.
*/
@@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
+ /*
+ * If asked by the primary (because someone is waiting for a synchronous
+ * commit = remote_apply), we will need to ask walreceiver to send a
+ * reply immediately.
+ */
+ if (XactCompletionApplyFeedback(parsed->xinfo))
+ XLogRequestWalReceiverReply();
}
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b119a47e4e0..06cefe2efeb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr;
*/
static bool doPageWrites;
+/* Has the recovery code requested a walreceiver wakeup? */
+static bool doRequestWalReceiverReply;
+
/*
* RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby
@@ -6879,6 +6882,17 @@ StartupXLOG(void)
XLogCtl->lastReplayedTLI = ThisTimeLineID;
SpinLockRelease(&XLogCtl->info_lck);
+ /*
+ * If rm_redo called XLogRequestWalReceiverReply, then we
+ * wake up the receiver so that it notices the updated
+ * lastReplayedEndRecPtr and sends a reply to the master.
+ */
+ if (doRequestWalReceiverReply)
+ {
+ doRequestWalReceiverReply = false;
+ WalRcvForceReply();
+ }
+
/* Remember this record as the last-applied one */
LastRec = ReadRecPtr;
@@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping)
XLogCtl->WalWriterSleeping = sleeping;
SpinLockRelease(&XLogCtl->info_lck);
}
+
+/*
+ * Schedule a walreceiver wakeup in the main recovery loop.
+ */
+void
+XLogRequestWalReceiverReply(void)
+{
+ doRequestWalReceiverReply = true;
+}
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 8e5bf0d2b8a..419a2d74d73 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
Establish connection to the primary, and starts streaming from 'startpoint'.
Returns true on success.
-bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
-
-Retrieve any message available through the connection, blocking for
-maximum of 'timeout' ms. If a message was successfully read, returns true,
-otherwise false. On success, a pointer to the message payload is stored in
-*buffer, length in *len, and the type of message received in *type. The
-returned buffer is valid until the next call to walrcv_* functions, the
-caller should not attempt freeing it.
+int walrcv_receive(char **buffer, int *wait_fd)
+
+Retrieve any message available without blocking through the
+connection. If a message was successfully read, returns its
+length. If the connection is closed, returns -1. Otherwise returns 0
+to indicate that no data is available, and sets *wait_fd to a file
+descriptor which can be waited on before trying again. On success, a
+pointer to the message payload is stored in *buffer. The returned
+buffer is valid until the next call to walrcv_* functions, and the
+caller should not attempt to free it.
void walrcv_send(const char *buffer, int nbytes)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 4ee4d7106da..a3bec498fa0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, int *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -463,8 +463,7 @@ libpqrcv_disconnect(void)
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
@@ -472,15 +471,15 @@ libpqrcv_disconnect(void)
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * file descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, int *wait_fd)
{
int rawlen;
@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
@@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 92faf4e8e0d..2da9cba5dc7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
+ *
+ * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
+ * represents a commit record. If it doesn't, then we wait only for the WAL
+ * to be flushed if synchronous_commit is set to the higher level of
+ * remote_apply, because only commit records provide apply feedback.
*/
void
-SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char *new_status = NULL;
const char *old_status;
- int mode = SyncRepWaitMode;
+ int mode;
+
+ /* Cap the level for anything other than commit to remote flush only. */
+ if (commit)
+ mode = SyncRepWaitMode;
+ else
+ mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/*
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XactCommitLSN <= WalSndCtl->lsn[mode])
+ lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
- MyProc->waitLSN = XactCommitLSN;
+ MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
- (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
+ (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
+ int numapply = 0;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+ }
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+ numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+ numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/*
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7b36e02faa9..057c250793d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -352,8 +352,6 @@ WalReceiverMain(void)
if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
- bool endofwal = false;
-
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
@@ -376,18 +374,13 @@ WalReceiverMain(void)
ping_sent = false;
/* Loop until end-of-streaming or error */
- while (!endofwal)
+ for (;;)
{
char *buf;
int len;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid
- * the necessity for manual cleanup of all postmaster
- * children.
- */
- if (!PostmasterIsAlive())
- exit(1);
+ bool endofwal = false;
+ int wait_fd = PGINVALID_SOCKET;
+ int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -407,8 +400,8 @@ WalReceiverMain(void)
XLogWalRcvSendHSFeedback(true);
}
- /* Wait a while for data to arrive */
- len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ /* See if we can read data immediately */
+ len = walrcv_receive(&buf, &wait_fd);
if (len != 0)
{
/*
@@ -439,7 +432,7 @@ WalReceiverMain(void)
endofwal = true;
break;
}
- len = walrcv_receive(0, &buf);
+ len = walrcv_receive(&buf, &wait_fd);
}
/* Let the master know that we received some data. */
@@ -452,7 +445,54 @@ WalReceiverMain(void)
*/
XLogWalRcvFlush(false);
}
- else
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofwal)
+ break;
+
+ /*
+ * Ideally we would reuse a WaitEventSet object repeatedly
+ * here to avoid the overheads of WaitLatchOrSocket on epoll
+ * systems, but we can't be sure that libpq (or any other
+ * walreceiver implementation) has the same socket (even if
+ * the fd is the same number, it may have been closed and
+ * reopened since the last time). In future, if there is a
+ * function for removing sockets from WaitEventSet, then we
+ * could add and remove just the socket each time, potentially
+ * avoiding some system calls.
+ */
+ Assert(wait_fd != PGINVALID_SOCKET);
+ rc = WaitLatchOrSocket(&walrcv->latch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_LATCH_SET,
+ wait_fd,
+ NAPTIME_PER_CYCLE);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(&walrcv->latch);
+ if (walrcv->force_reply)
+ {
+ /*
+ * The recovery process has asked us to send apply
+ * feedback now. Make sure the flag is really set to
+ * false in shared memory before sending the reply,
+ * so we don't miss a new request for a reply.
+ */
+ walrcv->force_reply = false;
+ pg_memory_barrier();
+ XLogWalRcvSendReply(true, false);
+ }
+ }
+ if (rc & WL_POSTMASTER_DEATH)
+ {
+ /*
+ * Emergency bailout if postmaster has died. This is to
+ * avoid the necessity for manual cleanup of all
+ * postmaster children.
+ */
+ exit(1);
+ }
+ if (rc & WL_TIMEOUT)
{
/*
* We didn't receive anything new. If we haven't heard
@@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
/*
+ * Wake up the walreceiver main loop.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = remote_apply.
+ */
+void
+WalRcvForceReply(void)
+{
+ WalRcv->force_reply = true;
+ SetLatch(&WalRcv->latch);
+}
+
+/*
* Return a string constant representing the state. This is used
* in system functions and views, and should *not* be translated.
*/
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 65a6cd4404f..06cb1660eb6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
};
/*
- * Although only "on", "off", "remote_write", and "local" are documented, we
- * accept all the likely variants of "on" and "off".
+ * Although only "on", "off", "remote_apply", "remote_write", and "local" are
+ * documented, we accept all the likely variants of "on" and "off".
*/
static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+ {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
{"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5536012e050..ec4427f2d88 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -177,7 +177,7 @@
# (change requires restart)
#fsync = on # turns forced synchronization on or off
#synchronous_commit = on # synchronization level;
- # off, local, remote_write, or on
+ # off, local, remote_write, remote_apply, or on
#wal_sync_method = fsync # the default is the first option
# supported by the operating system:
# open_datasync