diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 35 |
1 files changed, 28 insertions, 7 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d3..0e933228fc9 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* + * Detect whether walsender is/was considered synchronous. We can + * provide some protection against stale data by checking the PID + * along with walsnd_index. + */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i && + sync_standbys[j].pid == pid) + { + is_sync_standby = true; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else |