aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/syncrep.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-03-29 21:16:12 -0400
committerRobert Haas <rhaas@postgresql.org>2016-03-29 21:29:49 -0400
commit314cbfc5da988eff8998655158f84c9815ecfbcd (patch)
tree94415b7fed00b4a70f337e403c1d5fe7e811ec70 /src/backend/replication/syncrep.c
parenta898b409f66f956e99694710f537829db02652c0 (diff)
downloadpostgresql-314cbfc5da988eff8998655158f84c9815ecfbcd.tar.gz
postgresql-314cbfc5da988eff8998655158f84c9815ecfbcd.zip
Add new replication mode synchronous_commit = 'remote_apply'.
In this mode, the master waits for the transaction to be applied on the remote side, not just written to disk. That means that you can count on a transaction started on the standby to see all commits previously acknowledged by the master. To make this work, the standby sends a reply after replaying each commit record generated with synchronous_commit >= 'remote_apply'. This introduces a small inefficiency: the extra replies will be sent even by standbys that aren't the current synchronous standby. But previously-existing synchronous_commit levels make no attempt at all to optimize which replies are sent based on what the primary cares about, so this is no worse, and at least avoids any extra replies for people not using the feature at all. Thomas Munro, reviewed by Michael Paquier and by me. Some additional tweaks by me.
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r--src/backend/replication/syncrep.c35
1 files changed, 28 insertions, 7 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 92faf4e8e0d..2da9cba5dc7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
+ *
+ * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
+ * represents a commit record. If it doesn't, then we wait only for the WAL
+ * to be flushed if synchronous_commit is set to the higher level of
+ * remote_apply, because only commit records provide apply feedback.
*/
void
-SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char *new_status = NULL;
const char *old_status;
- int mode = SyncRepWaitMode;
+ int mode;
+
+ /* Cap the level for anything other than commit to remote flush only. */
+ if (commit)
+ mode = SyncRepWaitMode;
+ else
+ mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/*
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XactCommitLSN <= WalSndCtl->lsn[mode])
+ lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
- MyProc->waitLSN = XactCommitLSN;
+ MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
- (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
+ (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
+ int numapply = 0;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+ }
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+ numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+ numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/*
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;