aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/Makefile2
-rw-r--r--src/backend/replication/syncrep.c289
-rw-r--r--src/backend/replication/syncrep_gram.y14
-rw-r--r--src/backend/replication/syncrep_scanner.l3
-rw-r--r--src/backend/replication/walsender.c12
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample3
6 files changed, 267 insertions, 56 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index c99717e0aee..da8bcf0471c 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -26,7 +26,7 @@ repl_gram.o: repl_scanner.c
# syncrep_scanner is complied as part of syncrep_gram
syncrep_gram.o: syncrep_scanner.c
-syncrep_scanner.c: FLEXFLAGS = -CF -p
+syncrep_scanner.c: FLEXFLAGS = -CF -p -i
syncrep_scanner.c: FLEX_NO_BACKUP=yes
# repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ce2009882d9..9143c47f92d 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -30,23 +30,34 @@
* searching the through all waiters each time we receive a reply.
*
* In 9.5 or before only a single standby could be considered as
- * synchronous. In 9.6 we support multiple synchronous standbys.
- * The number of synchronous standbys that transactions must wait for
- * replies from is specified in synchronous_standby_names.
- * This parameter also specifies a list of standby names,
- * which determines the priority of each standby for being chosen as
- * a synchronous standby. The standbys whose names appear earlier
- * in the list are given higher priority and will be considered as
- * synchronous. Other standby servers appearing later in this list
- * represent potential synchronous standbys. If any of the current
- * synchronous standbys disconnects for whatever reason, it will be
- * replaced immediately with the next-highest-priority standby.
+ * synchronous. In 9.6 we support a priority-based multiple synchronous
+ * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
+ * supported. The number of synchronous standbys that transactions
+ * must wait for replies from is specified in synchronous_standby_names.
+ * This parameter also specifies a list of standby names and the method
+ * (FIRST and ANY) to choose synchronous standbys from the listed ones.
+ *
+ * The method FIRST specifies a priority-based synchronous replication
+ * and makes transaction commits wait until their WAL records are
+ * replicated to the requested number of synchronous standbys chosen based
+ * on their priorities. The standbys whose names appear earlier in the list
+ * are given higher priority and will be considered as synchronous.
+ * Other standby servers appearing later in this list represent potential
+ * synchronous standbys. If any of the current synchronous standbys
+ * disconnects for whatever reason, it will be replaced immediately with
+ * the next-highest-priority standby.
+ *
+ * The method ANY specifies a quorum-based synchronous replication
+ * and makes transaction commits wait until their WAL records are
+ * replicated to at least the requested number of synchronous standbys
+ * in the list. All the standbys appearing in the list are considered as
+ * candidates for quorum synchronous standbys.
*
* Before the standbys chosen from synchronous_standby_names can
* become the synchronous standbys they must have caught up with
* the primary; that may take some time. Once caught up,
- * the current higher priority standbys which are considered as
- * synchronous at that moment will release waiters from the queue.
+ * the standbys which are considered as synchronous at that moment
+ * will release waiters from the queue.
*
* Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
@@ -79,18 +90,29 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
-static SyncRepConfigData *SyncRepConfig = NULL;
+SyncRepConfigData *SyncRepConfig = NULL;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
-static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
- XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr,
- bool *am_sync);
+static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ bool *am_sync);
+static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ List *sync_standbys);
+static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ List *sync_standbys, uint8 nth);
static int SyncRepGetStandbyPriority(void);
+static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(int mode);
@@ -386,7 +408,7 @@ SyncRepReleaseWaiters(void)
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
- bool got_oldest;
+ bool got_recptr;
bool am_sync;
int numwrite = 0;
int numflush = 0;
@@ -413,11 +435,10 @@ SyncRepReleaseWaiters(void)
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/*
- * Check whether we are a sync standby or not, and calculate the oldest
+ * Check whether we are a sync standby or not, and calculate the synced
* positions among all sync standbys.
*/
- got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
- &applyPtr, &am_sync);
+ got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
/*
* If we are managing a sync standby, though we weren't prior to this,
@@ -426,16 +447,22 @@ SyncRepReleaseWaiters(void)
if (announce_next_takeover && am_sync)
{
announce_next_takeover = false;
- ereport(LOG,
- (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
- application_name, MyWalSnd->sync_standby_priority)));
+
+ if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
+ application_name, MyWalSnd->sync_standby_priority)));
+ else
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
+ application_name)));
}
/*
* If the number of sync standbys is less than requested or we aren't
* managing a sync standby then just leave.
*/
- if (!got_oldest || !am_sync)
+ if (!got_recptr || !am_sync)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = !am_sync;
@@ -471,21 +498,20 @@ SyncRepReleaseWaiters(void)
}
/*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the synced Write, Flush and Apply positions among sync standbys.
*
* Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and
- * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ * store the positions into *writePtr, *flushPtr and *applyPtr.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static bool
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
List *sync_standbys;
- ListCell *cell;
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
@@ -508,12 +534,49 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
}
/*
- * Scan through all sync standbys and calculate the oldest Write, Flush
- * and Apply positions.
+ * In a priority-based sync replication, the synced positions are the
+ * oldest ones among sync standbys. In a quorum-based, they are the Nth
+ * latest ones.
+ *
+ * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions.
+ * But we use SyncRepGetOldestSyncRecPtr() for that calculation because
+ * it's a bit more efficient.
+ *
+ * XXX If the numbers of current and requested sync standbys are the same,
+ * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
+ * positions even in a quorum-based sync replication.
+ */
+ if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+ {
+ SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
+ sync_standbys);
+ }
+ else
+ {
+ SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
+ sync_standbys, SyncRepConfig->num_sync);
+ }
+
+ list_free(sync_standbys);
+ return true;
+}
+
+/*
+ * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ */
+static void
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr, List *sync_standbys)
+{
+ ListCell *cell;
+
+ /*
+ * Scan through all sync standbys and calculate the oldest
+ * Write, Flush and Apply positions.
*/
- foreach(cell, sync_standbys)
+ foreach (cell, sync_standbys)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
@@ -531,23 +594,163 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
*applyPtr = apply;
}
+}
- list_free(sync_standbys);
- return true;
+/*
+ * Calculate the Nth latest Write, Flush and Apply positions among sync
+ * standbys.
+ */
+static void
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+{
+ ListCell *cell;
+ XLogRecPtr *write_array;
+ XLogRecPtr *flush_array;
+ XLogRecPtr *apply_array;
+ int len;
+ int i = 0;
+
+ len = list_length(sync_standbys);
+ write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+ flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+ apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+
+ foreach (cell, sync_standbys)
+ {
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+
+ SpinLockAcquire(&walsnd->mutex);
+ write_array[i] = walsnd->write;
+ flush_array[i] = walsnd->flush;
+ apply_array[i] = walsnd->apply;
+ SpinLockRelease(&walsnd->mutex);
+
+ i++;
+ }
+
+ qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+
+ /* Get Nth latest Write, Flush, Apply positions */
+ *writePtr = write_array[nth - 1];
+ *flushPtr = flush_array[nth - 1];
+ *applyPtr = apply_array[nth - 1];
+
+ pfree(write_array);
+ pfree(flush_array);
+ pfree(apply_array);
+}
+
+/*
+ * Compare lsn in order to sort array in descending order.
+ */
+static int
+cmp_lsn(const void *a, const void *b)
+{
+ XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
+ XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
+
+ if (lsn1 > lsn2)
+ return -1;
+ else if (lsn1 == lsn2)
+ return 0;
+ else
+ return 1;
}
/*
* Return the list of sync standbys, or NIL if no sync standby is connected.
*
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
* The caller must hold SyncRepLock.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
List *
-SyncRepGetSyncStandbys(bool *am_sync)
+SyncRepGetSyncStandbys(bool *am_sync)
+{
+ /* Set default result */
+ if (am_sync != NULL)
+ *am_sync = false;
+
+ /* Quick exit if sync replication is not requested */
+ if (SyncRepConfig == NULL)
+ return NIL;
+
+ return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
+ SyncRepGetSyncStandbysPriority(am_sync) :
+ SyncRepGetSyncStandbysQuorum(am_sync);
+}
+
+/*
+ * Return the list of all the candidates for quorum sync standbys,
+ * or NIL if no such standby is connected.
+ *
+ * The caller must hold SyncRepLock. This function must be called only in
+ * a quorum-based sync replication.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysQuorum(bool *am_sync)
+{
+ List *result = NIL;
+ int i;
+ volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
+ * rearrangement */
+
+ Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ walsnd = &WalSndCtl->walsnds[i];
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ if (walsnd->sync_standby_priority == 0)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ /*
+ * Consider this standby as a candidate for quorum sync standbys
+ * and append it to the result.
+ */
+ result = lappend_int(result, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ *am_sync = true;
+ }
+
+ return result;
+}
+
+/*
+ * Return the list of sync standbys chosen based on their priorities,
+ * or NIL if no sync standby is connected.
+ *
+ * If there are multiple standbys with the same priority,
+ * the first one found is selected preferentially.
+ *
+ * The caller must hold SyncRepLock. This function must be called only in
+ * a priority-based sync replication.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysPriority(bool *am_sync)
{
List *result = NIL;
List *pending = NIL;
@@ -560,13 +763,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */
- /* Set default result */
- if (am_sync != NULL)
- *am_sync = false;
-
- /* Quick exit if sync replication is not requested */
- if (SyncRepConfig == NULL)
- return NIL;
+ Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
lowest_priority = SyncRepConfig->nmembers;
next_highest_priority = lowest_priority + 1;
diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y
index 35c27760d12..281edc6f285 100644
--- a/src/backend/replication/syncrep_gram.y
+++ b/src/backend/replication/syncrep_gram.y
@@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result;
char *syncrep_parse_error_msg;
static SyncRepConfigData *create_syncrep_config(const char *num_sync,
- List *members);
+ List *members, uint8 syncrep_method);
/*
* Bison doesn't allocate anything that needs to live across parser calls,
@@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync,
SyncRepConfigData *config;
}
-%token <str> NAME NUM JUNK
+%token <str> NAME NUM JUNK ANY FIRST
%type <config> result standby_config
%type <list> standby_list
@@ -60,8 +60,10 @@ result:
;
standby_config:
- standby_list { $$ = create_syncrep_config("1", $1); }
- | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); }
+ standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
+ | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); }
+ | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
+ | FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
;
standby_list:
@@ -75,9 +77,8 @@ standby_name:
;
%%
-
static SyncRepConfigData *
-create_syncrep_config(const char *num_sync, List *members)
+create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method)
{
SyncRepConfigData *config;
int size;
@@ -98,6 +99,7 @@ create_syncrep_config(const char *num_sync, List *members)
config->config_size = size;
config->num_sync = atoi(num_sync);
+ config->syncrep_method = syncrep_method;
config->nmembers = list_length(members);
ptr = config->member_names;
foreach(lc, members)
diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l
index d20662ed038..261b30e976a 100644
--- a/src/backend/replication/syncrep_scanner.l
+++ b/src/backend/replication/syncrep_scanner.l
@@ -64,6 +64,9 @@ xdinside [^"]+
%%
{space}+ { /* ignore */ }
+ANY { return ANY; }
+FIRST { return FIRST; }
+
{xdstart} {
initStringInfo(&xdbuf);
BEGIN(xd);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d80bcc00a13..5cdb8a0ad68 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2868,12 +2868,20 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
/*
* More easily understood version of standby state. This is purely
- * informational, not different from priority.
+ * informational.
+ *
+ * In quorum-based sync replication, the role of each standby
+ * listed in synchronous_standby_names can be changing very
+ * frequently. Any standbys considered as "sync" at one moment can
+ * be switched to "potential" ones at the next moment. So, it's
+ * basically useless to report "sync" or "potential" as their sync
+ * states. We report just "quorum" for them.
*/
if (priority == 0)
values[7] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
- values[7] = CStringGetTextDatum("sync");
+ values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+ CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
values[7] = CStringGetTextDatum("potential");
}
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 7f9acfda067..2c638b2c097 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -245,7 +245,8 @@
# These settings are ignored on a standby server.
#synchronous_standby_names = '' # standby servers that provide sync rep
- # number of sync standbys and comma-separated list of application_name
+ # method to choose sync standbys, number of sync standbys
+ # and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed