aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/syncrep.c112
-rw-r--r--src/backend/replication/walsender.c3
-rw-r--r--src/backend/utils/misc/guc.c5
-rw-r--r--src/include/access/xact.h1
-rw-r--r--src/include/replication/syncrep.h13
-rw-r--r--src/include/replication/walsender_private.h8
6 files changed, 98 insertions, 44 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 6bf69f0d35b..1273a8b9ebf 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -20,8 +20,8 @@
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
- * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
- * for the flush location on the standby before releasing the waiting backend.
+ * async, we just fastpath out of here. If it is sync, then we wait for
+ * the write or flush location on the standby before releasing the waiting backend.
* Further complexity in that interaction is expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
@@ -67,13 +67,15 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
-static void SyncRepQueueInsert(void);
+static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+
+static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
-static bool SyncRepQueueIsOrderedByLSN(void);
+static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
/*
@@ -120,7 +122,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XLByteLE(XactCommitLSN, WalSndCtl->lsn))
+ XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
{
LWLockRelease(SyncRepLock);
return;
@@ -132,8 +134,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
- SyncRepQueueInsert();
- Assert(SyncRepQueueIsOrderedByLSN());
+ SyncRepQueueInsert(SyncRepWaitMode);
+ Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
@@ -267,18 +269,19 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
}
/*
- * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
+ * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
-SyncRepQueueInsert(void)
+SyncRepQueueInsert(int mode)
{
PGPROC *proc;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
@@ -290,7 +293,7 @@ SyncRepQueueInsert(void)
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
@@ -298,7 +301,7 @@ SyncRepQueueInsert(void)
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
- SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
+ SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}
/*
@@ -368,7 +371,8 @@ SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
- int numprocs = 0;
+ int numwrite = 0;
+ int numflush = 0;
int priority = 0;
int i;
@@ -419,20 +423,28 @@ SyncRepReleaseWaiters(void)
return;
}
- if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
+ /*
+ * Set the lsn first so that when we wake backends they will release
+ * up to this location.
+ */
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
{
- /*
- * Set the lsn first so that when we wake backends they will release
- * up to this location.
- */
- walsndctl->lsn = MyWalSnd->flush;
- numprocs = SyncRepWakeQueue(false);
+ walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+ numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
+ }
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+ numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to %X/%X",
- numprocs,
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ numwrite,
+ MyWalSnd->write.xlogid,
+ MyWalSnd->write.xrecoff,
+ numflush,
MyWalSnd->flush.xlogid,
MyWalSnd->flush.xrecoff);
@@ -507,24 +519,26 @@ SyncRepGetStandbyPriority(void)
}
/*
- * Walk queue from head. Set the state of any backends that need to be woken,
- * remove them from the queue, and then wake them. Pass all = true to wake
- * whole queue; otherwise, just wake up to the walsender's LSN.
+ * Walk the specified queue from head. Set the state of any backends that
+ * need to be woken, remove them from the queue, and then wake them.
+ * Pass all = true to wake whole queue; otherwise, just wake up to
+ * the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
-SyncRepWakeQueue(bool all)
+SyncRepWakeQueue(bool all, int mode)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
- Assert(SyncRepQueueIsOrderedByLSN());
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ Assert(SyncRepQueueIsOrderedByLSN(mode));
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
@@ -532,7 +546,7 @@ SyncRepWakeQueue(bool all)
/*
* Assume the queue is ordered by LSN
*/
- if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
+ if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
return numprocs;
/*
@@ -540,7 +554,7 @@ SyncRepWakeQueue(bool all)
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
@@ -588,7 +602,12 @@ SyncRepUpdateSyncStandbysDefined(void)
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
- SyncRepWakeQueue(true);
+ {
+ int i;
+
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SyncRepWakeQueue(true, i);
+ }
/*
* Only allow people to join the queue when there are synchronous
@@ -605,16 +624,18 @@ SyncRepUpdateSyncStandbysDefined(void)
#ifdef USE_ASSERT_CHECKING
static bool
-SyncRepQueueIsOrderedByLSN(void)
+SyncRepQueueIsOrderedByLSN(int mode)
{
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
@@ -628,7 +649,7 @@ SyncRepQueueIsOrderedByLSN(void)
lastLSN = proc->waitLSN;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
@@ -675,3 +696,20 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source)
return true;
}
+
+void
+assign_synchronous_commit(int newval, void *extra)
+{
+ switch (newval)
+ {
+ case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+ SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+ break;
+ case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+ SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+ break;
+ default:
+ SyncRepWaitMode = SYNC_REP_NO_WAIT;
+ break;
+ }
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3611713434a..5f938124e72 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1410,7 +1410,8 @@ WalSndShmemInit(void)
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
- SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
for (i = 0; i < max_wal_senders; i++)
{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9fc96b2126a..ec8f2f2309b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -370,11 +370,12 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
};
/*
- * Although only "on", "off", and "local" are documented, we
+ * Although only "on", "off", "write", and "local" are documented, we
* accept all the likely variants of "on" and "off".
*/
static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
+ {"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
{"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true},
@@ -3164,7 +3165,7 @@ static struct config_enum ConfigureNamesEnum[] =
},
&synchronous_commit,
SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
- NULL, NULL, NULL
+ NULL, assign_synchronous_commit, NULL
},
{
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5f063a9f452..20e344e5b73 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -55,6 +55,7 @@ typedef enum
{
SYNCHRONOUS_COMMIT_OFF, /* asynchronous commit */
SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */
+ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */
SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */
} SyncCommitLevel;
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 91446a8bebe..74820cbbb46 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -15,6 +15,16 @@
#include "utils/guc.h"
+#define SyncRepRequested() \
+ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+
+/* SyncRepWaitMode */
+#define SYNC_REP_NO_WAIT -1
+#define SYNC_REP_WAIT_WRITE 0
+#define SYNC_REP_WAIT_FLUSH 1
+
+#define NUM_SYNC_REP_WAIT_MODE 2
+
/* syncRepState */
#define SYNC_REP_NOT_WAITING 0
#define SYNC_REP_WAITING 1
@@ -37,8 +47,9 @@ extern void SyncRepReleaseWaiters(void);
extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */
-extern int SyncRepWakeQueue(bool all);
+extern int SyncRepWakeQueue(bool all, int mode);
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern void assign_synchronous_commit(int newval, void *extra);
#endif /* _SYNCREP_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 89666d73833..f6cae84a6d4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -14,6 +14,7 @@
#include "access/xlog.h"
#include "nodes/nodes.h"
+#include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
#include "storage/spin.h"
@@ -68,15 +69,16 @@ extern WalSnd *MyWalSnd;
typedef struct
{
/*
- * Synchronous replication queue. Protected by SyncRepLock.
+ * Synchronous replication queue with one queue per request type.
+ * Protected by SyncRepLock.
*/
- SHM_QUEUE SyncRepQueue;
+ SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
/*
* Current location of the head of the queue. All waiters should have a
* waitLSN that follows this value. Protected by SyncRepLock.
*/
- XLogRecPtr lsn;
+ XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
/*
* Are any sync standbys defined? Waiting backends can't reload the