diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/syncrep.c | 112 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 3 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 5 | ||||
-rw-r--r-- | src/include/access/xact.h | 1 | ||||
-rw-r--r-- | src/include/replication/syncrep.h | 13 | ||||
-rw-r--r-- | src/include/replication/walsender_private.h | 8 |
6 files changed, 98 insertions, 44 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 6bf69f0d35b..1273a8b9ebf 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -20,8 +20,8 @@ * per-transaction state information. * * Replication is either synchronous or not synchronous (async). If it is - * async, we just fastpath out of here. If it is sync, then in 9.1 we wait - * for the flush location on the standby before releasing the waiting backend. + * async, we just fastpath out of here. If it is sync, then we wait for + * the write or flush location on the standby before releasing the waiting backend. * Further complexity in that interaction is expected in later releases. * * The best performing way to manage the waiting backends is to have a @@ -67,13 +67,15 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; -static void SyncRepQueueInsert(void); +static int SyncRepWaitMode = SYNC_REP_NO_WAIT; + +static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING -static bool SyncRepQueueIsOrderedByLSN(void); +static bool SyncRepQueueIsOrderedByLSN(int mode); #endif /* @@ -120,7 +122,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XLByteLE(XactCommitLSN, WalSndCtl->lsn)) + XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode])) { LWLockRelease(SyncRepLock); return; @@ -132,8 +134,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) */ MyProc->waitLSN = XactCommitLSN; MyProc->syncRepState = SYNC_REP_WAITING; - SyncRepQueueInsert(); - Assert(SyncRepQueueIsOrderedByLSN()); + SyncRepQueueInsert(SyncRepWaitMode); + Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode)); LWLockRelease(SyncRepLock); /* Alter ps display to show waiting for sync rep. */ @@ -267,18 +269,19 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) } /* - * Insert MyProc into SyncRepQueue, maintaining sorted invariant. + * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant. * * Usually we will go at tail of queue, though it's possible that we arrive * here out of order, so start at tail and work back to insertion point. */ static void -SyncRepQueueInsert(void) +SyncRepQueueInsert(int mode) { PGPROC *proc; - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -290,7 +293,7 @@ SyncRepQueueInsert(void) if (XLByteLT(proc->waitLSN, MyProc->waitLSN)) break; - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } @@ -298,7 +301,7 @@ SyncRepQueueInsert(void) if (proc) SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); else - SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks)); + SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); } /* @@ -368,7 +371,8 @@ SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; volatile WalSnd *syncWalSnd = NULL; - int numprocs = 0; + int numwrite = 0; + int numflush = 0; int priority = 0; int i; @@ -419,20 +423,28 @@ SyncRepReleaseWaiters(void) return; } - if (XLByteLT(walsndctl->lsn, MyWalSnd->flush)) + /* + * Set the lsn first so that when we wake backends they will release + * up to this location. + */ + if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write)) { - /* - * Set the lsn first so that when we wake backends they will release - * up to this location. - */ - walsndctl->lsn = MyWalSnd->flush; - numprocs = SyncRepWakeQueue(false); + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); + } + if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush)) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to %X/%X", - numprocs, + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + numwrite, + MyWalSnd->write.xlogid, + MyWalSnd->write.xrecoff, + numflush, MyWalSnd->flush.xlogid, MyWalSnd->flush.xrecoff); @@ -507,24 +519,26 @@ SyncRepGetStandbyPriority(void) } /* - * Walk queue from head. Set the state of any backends that need to be woken, - * remove them from the queue, and then wake them. Pass all = true to wake - * whole queue; otherwise, just wake up to the walsender's LSN. + * Walk the specified queue from head. Set the state of any backends that + * need to be woken, remove them from the queue, and then wake them. + * Pass all = true to wake whole queue; otherwise, just wake up to + * the walsender's LSN. * * Must hold SyncRepLock. */ int -SyncRepWakeQueue(bool all) +SyncRepWakeQueue(bool all, int mode) { volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; - Assert(SyncRepQueueIsOrderedByLSN()); + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(SyncRepQueueIsOrderedByLSN(mode)); - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -532,7 +546,7 @@ SyncRepWakeQueue(bool all) /* * Assume the queue is ordered by LSN */ - if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN)) + if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN)) return numprocs; /* @@ -540,7 +554,7 @@ SyncRepWakeQueue(bool all) * thisproc is valid, proc may be NULL after this. */ thisproc = proc; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); @@ -588,7 +602,12 @@ SyncRepUpdateSyncStandbysDefined(void) * wants synchronous replication, we'd better wake them up. */ if (!sync_standbys_defined) - SyncRepWakeQueue(true); + { + int i; + + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SyncRepWakeQueue(true, i); + } /* * Only allow people to join the queue when there are synchronous @@ -605,16 +624,18 @@ SyncRepUpdateSyncStandbysDefined(void) #ifdef USE_ASSERT_CHECKING static bool -SyncRepQueueIsOrderedByLSN(void) +SyncRepQueueIsOrderedByLSN(int mode) { PGPROC *proc = NULL; XLogRecPtr lastLSN; + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + lastLSN.xlogid = 0; lastLSN.xrecoff = 0; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -628,7 +649,7 @@ SyncRepQueueIsOrderedByLSN(void) lastLSN = proc->waitLSN; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } @@ -675,3 +696,20 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source) return true; } + +void +assign_synchronous_commit(int newval, void *extra) +{ + switch (newval) + { + case SYNCHRONOUS_COMMIT_REMOTE_WRITE: + SyncRepWaitMode = SYNC_REP_WAIT_WRITE; + break; + case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: + SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; + break; + default: + SyncRepWaitMode = SYNC_REP_NO_WAIT; + break; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3611713434a..5f938124e72 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1410,7 +1410,8 @@ WalSndShmemInit(void) /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); - SHMQueueInit(&(WalSndCtl->SyncRepQueue)); + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9fc96b2126a..ec8f2f2309b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -370,11 +370,12 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", and "local" are documented, we + * Although only "on", "off", "write", and "local" are documented, we * accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, + {"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, @@ -3164,7 +3165,7 @@ static struct config_enum ConfigureNamesEnum[] = }, &synchronous_commit, SYNCHRONOUS_COMMIT_ON, synchronous_commit_options, - NULL, NULL, NULL + NULL, assign_synchronous_commit, NULL }, { diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5f063a9f452..20e344e5b73 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -55,6 +55,7 @@ typedef enum { SYNCHRONOUS_COMMIT_OFF, /* asynchronous commit */ SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ + SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */ SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ } SyncCommitLevel; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 91446a8bebe..74820cbbb46 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -15,6 +15,16 @@ #include "utils/guc.h" +#define SyncRepRequested() \ + (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) + +/* SyncRepWaitMode */ +#define SYNC_REP_NO_WAIT -1 +#define SYNC_REP_WAIT_WRITE 0 +#define SYNC_REP_WAIT_FLUSH 1 + +#define NUM_SYNC_REP_WAIT_MODE 2 + /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 @@ -37,8 +47,9 @@ extern void SyncRepReleaseWaiters(void); extern void SyncRepUpdateSyncStandbysDefined(void); /* called by various procs */ -extern int SyncRepWakeQueue(bool all); +extern int SyncRepWakeQueue(bool all, int mode); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); +extern void assign_synchronous_commit(int newval, void *extra); #endif /* _SYNCREP_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 89666d73833..f6cae84a6d4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -14,6 +14,7 @@ #include "access/xlog.h" #include "nodes/nodes.h" +#include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -68,15 +69,16 @@ extern WalSnd *MyWalSnd; typedef struct { /* - * Synchronous replication queue. Protected by SyncRepLock. + * Synchronous replication queue with one queue per request type. + * Protected by SyncRepLock. */ - SHM_QUEUE SyncRepQueue; + SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; /* * Current location of the head of the queue. All waiters should have a * waitLSN that follows this value. Protected by SyncRepLock. */ - XLogRecPtr lsn; + XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* * Are any sync standbys defined? Waiting backends can't reload the |