diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/Makefile | 2 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 289 | ||||
-rw-r--r-- | src/backend/replication/syncrep_gram.y | 14 | ||||
-rw-r--r-- | src/backend/replication/syncrep_scanner.l | 3 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 12 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 3 |
6 files changed, 267 insertions, 56 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index c99717e0aee..da8bcf0471c 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -26,7 +26,7 @@ repl_gram.o: repl_scanner.c # syncrep_scanner is complied as part of syncrep_gram syncrep_gram.o: syncrep_scanner.c -syncrep_scanner.c: FLEXFLAGS = -CF -p +syncrep_scanner.c: FLEXFLAGS = -CF -p -i syncrep_scanner.c: FLEX_NO_BACKUP=yes # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ce2009882d9..9143c47f92d 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -30,23 +30,34 @@ * searching the through all waiters each time we receive a reply. * * In 9.5 or before only a single standby could be considered as - * synchronous. In 9.6 we support multiple synchronous standbys. - * The number of synchronous standbys that transactions must wait for - * replies from is specified in synchronous_standby_names. - * This parameter also specifies a list of standby names, - * which determines the priority of each standby for being chosen as - * a synchronous standby. The standbys whose names appear earlier - * in the list are given higher priority and will be considered as - * synchronous. Other standby servers appearing later in this list - * represent potential synchronous standbys. If any of the current - * synchronous standbys disconnects for whatever reason, it will be - * replaced immediately with the next-highest-priority standby. + * synchronous. In 9.6 we support a priority-based multiple synchronous + * standbys. In 10.0 a quorum-based multiple synchronous standbys is also + * supported. The number of synchronous standbys that transactions + * must wait for replies from is specified in synchronous_standby_names. + * This parameter also specifies a list of standby names and the method + * (FIRST and ANY) to choose synchronous standbys from the listed ones. + * + * The method FIRST specifies a priority-based synchronous replication + * and makes transaction commits wait until their WAL records are + * replicated to the requested number of synchronous standbys chosen based + * on their priorities. The standbys whose names appear earlier in the list + * are given higher priority and will be considered as synchronous. + * Other standby servers appearing later in this list represent potential + * synchronous standbys. If any of the current synchronous standbys + * disconnects for whatever reason, it will be replaced immediately with + * the next-highest-priority standby. + * + * The method ANY specifies a quorum-based synchronous replication + * and makes transaction commits wait until their WAL records are + * replicated to at least the requested number of synchronous standbys + * in the list. All the standbys appearing in the list are considered as + * candidates for quorum synchronous standbys. * * Before the standbys chosen from synchronous_standby_names can * become the synchronous standbys they must have caught up with * the primary; that may take some time. Once caught up, - * the current higher priority standbys which are considered as - * synchronous at that moment will release waiters from the queue. + * the standbys which are considered as synchronous at that moment + * will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * @@ -79,18 +90,29 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; -static SyncRepConfigData *SyncRepConfig = NULL; +SyncRepConfigData *SyncRepConfig = NULL; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); -static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); +static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + bool *am_sync); +static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys); +static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys, uint8 nth); static int SyncRepGetStandbyPriority(void); +static List *SyncRepGetSyncStandbysPriority(bool *am_sync); +static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); @@ -386,7 +408,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_oldest; + bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; @@ -413,11 +435,10 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the oldest + * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ - got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, - &applyPtr, &am_sync); + got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, @@ -426,16 +447,22 @@ SyncRepReleaseWaiters(void) if (announce_next_takeover && am_sync) { announce_next_takeover = false; - ereport(LOG, - (errmsg("standby \"%s\" is now a synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); + + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, MyWalSnd->sync_standby_priority))); + else + ereport(LOG, + (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", + application_name))); } /* * If the number of sync standbys is less than requested or we aren't * managing a sync standby then just leave. */ - if (!got_oldest || !am_sync) + if (!got_recptr || !am_sync) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; @@ -471,21 +498,20 @@ SyncRepReleaseWaiters(void) } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the synced Write, Flush and Apply positions among sync standbys. * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and - * store the oldest positions into *writePtr, *flushPtr and *applyPtr. + * store the positions into *writePtr, *flushPtr and *applyPtr. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ static bool -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, +SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; - ListCell *cell; *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; @@ -508,12 +534,49 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, } /* - * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * In a priority-based sync replication, the synced positions are the + * oldest ones among sync standbys. In a quorum-based, they are the Nth + * latest ones. + * + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions. + * But we use SyncRepGetOldestSyncRecPtr() for that calculation because + * it's a bit more efficient. + * + * XXX If the numbers of current and requested sync standbys are the same, + * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced + * positions even in a quorum-based sync replication. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys); + } + else + { + SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, SyncRepConfig->num_sync); + } + + list_free(sync_standbys); + return true; +} + +/* + * Calculate the oldest Write, Flush and Apply positions among sync standbys. + */ +static void +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, List *sync_standbys) +{ + ListCell *cell; + + /* + * Scan through all sync standbys and calculate the oldest + * Write, Flush and Apply positions. */ - foreach(cell, sync_standbys) + foreach (cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; @@ -531,23 +594,163 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) *applyPtr = apply; } +} - list_free(sync_standbys); - return true; +/* + * Calculate the Nth latest Write, Flush and Apply positions among sync + * standbys. + */ +static void +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +{ + ListCell *cell; + XLogRecPtr *write_array; + XLogRecPtr *flush_array; + XLogRecPtr *apply_array; + int len; + int i = 0; + + len = list_length(sync_standbys); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + + foreach (cell, sync_standbys) + { + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + + SpinLockAcquire(&walsnd->mutex); + write_array[i] = walsnd->write; + flush_array[i] = walsnd->flush; + apply_array[i] = walsnd->apply; + SpinLockRelease(&walsnd->mutex); + + i++; + } + + qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + + /* Get Nth latest Write, Flush, Apply positions */ + *writePtr = write_array[nth - 1]; + *flushPtr = flush_array[nth - 1]; + *applyPtr = apply_array[nth - 1]; + + pfree(write_array); + pfree(flush_array); + pfree(apply_array); +} + +/* + * Compare lsn in order to sort array in descending order. + */ +static int +cmp_lsn(const void *a, const void *b) +{ + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + + if (lsn1 > lsn2) + return -1; + else if (lsn1 == lsn2) + return 0; + else + return 1; } /* * Return the list of sync standbys, or NIL if no sync standby is connected. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ List * -SyncRepGetSyncStandbys(bool *am_sync) +SyncRepGetSyncStandbys(bool *am_sync) +{ + /* Set default result */ + if (am_sync != NULL) + *am_sync = false; + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return NIL; + + return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? + SyncRepGetSyncStandbysPriority(am_sync) : + SyncRepGetSyncStandbysQuorum(am_sync); +} + +/* + * Return the list of all the candidates for quorum sync standbys, + * or NIL if no such standby is connected. + * + * The caller must hold SyncRepLock. This function must be called only in + * a quorum-based sync replication. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static List * +SyncRepGetSyncStandbysQuorum(bool *am_sync) +{ + List *result = NIL; + int i; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + + Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + + for (i = 0; i < max_wal_senders; i++) + { + walsnd = &WalSndCtl->walsnds[i]; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * Consider this standby as a candidate for quorum sync standbys + * and append it to the result. + */ + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + } + + return result; +} + +/* + * Return the list of sync standbys chosen based on their priorities, + * or NIL if no sync standby is connected. + * + * If there are multiple standbys with the same priority, + * the first one found is selected preferentially. + * + * The caller must hold SyncRepLock. This function must be called only in + * a priority-based sync replication. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static List * +SyncRepGetSyncStandbysPriority(bool *am_sync) { List *result = NIL; List *pending = NIL; @@ -560,13 +763,7 @@ SyncRepGetSyncStandbys(bool *am_sync) volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; - - /* Quick exit if sync replication is not requested */ - if (SyncRepConfig == NULL) - return NIL; + Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y index 35c27760d12..281edc6f285 100644 --- a/src/backend/replication/syncrep_gram.y +++ b/src/backend/replication/syncrep_gram.y @@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result; char *syncrep_parse_error_msg; static SyncRepConfigData *create_syncrep_config(const char *num_sync, - List *members); + List *members, uint8 syncrep_method); /* * Bison doesn't allocate anything that needs to live across parser calls, @@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync, SyncRepConfigData *config; } -%token <str> NAME NUM JUNK +%token <str> NAME NUM JUNK ANY FIRST %type <config> result standby_config %type <list> standby_list @@ -60,8 +60,10 @@ result: ; standby_config: - standby_list { $$ = create_syncrep_config("1", $1); } - | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); } + standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); } + | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); } + | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); } + | FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); } ; standby_list: @@ -75,9 +77,8 @@ standby_name: ; %% - static SyncRepConfigData * -create_syncrep_config(const char *num_sync, List *members) +create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method) { SyncRepConfigData *config; int size; @@ -98,6 +99,7 @@ create_syncrep_config(const char *num_sync, List *members) config->config_size = size; config->num_sync = atoi(num_sync); + config->syncrep_method = syncrep_method; config->nmembers = list_length(members); ptr = config->member_names; foreach(lc, members) diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l index d20662ed038..261b30e976a 100644 --- a/src/backend/replication/syncrep_scanner.l +++ b/src/backend/replication/syncrep_scanner.l @@ -64,6 +64,9 @@ xdinside [^"]+ %% {space}+ { /* ignore */ } +ANY { return ANY; } +FIRST { return FIRST; } + {xdstart} { initStringInfo(&xdbuf); BEGIN(xd); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d80bcc00a13..5cdb8a0ad68 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2868,12 +2868,20 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * More easily understood version of standby state. This is purely - * informational, not different from priority. + * informational. + * + * In quorum-based sync replication, the role of each standby + * listed in synchronous_standby_names can be changing very + * frequently. Any standbys considered as "sync" at one moment can + * be switched to "potential" ones at the next moment. So, it's + * basically useless to report "sync" or "potential" as their sync + * states. We report just "quorum" for them. */ if (priority == 0) values[7] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) - values[7] = CStringGetTextDatum("sync"); + values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[7] = CStringGetTextDatum("potential"); } diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 7f9acfda067..2c638b2c097 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -245,7 +245,8 @@ # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep - # number of sync standbys and comma-separated list of application_name + # method to choose sync standbys, number of sync standbys + # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed |