aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c35
1 files changed, 28 insertions, 7 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d144d3..0e933228fc9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
* Found a free slot. Reserve it for us.
*/
walsnd->pid = MyProcPid;
+ walsnd->state = WALSNDSTATE_STARTUP;
walsnd->sentPtr = InvalidXLogRecPtr;
+ walsnd->needreload = false;
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
walsnd->writeLag = -1;
walsnd->flushLag = -1;
walsnd->applyLag = -1;
- walsnd->state = WALSNDSTATE_STARTUP;
+ walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- List *sync_standbys;
+ SyncRepStandbyData *sync_standbys;
+ int num_standbys;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
/*
- * Get the currently active synchronous standbys.
+ * Get the currently active synchronous standbys. This could be out of
+ * date before we're done, but we'll use the data anyway.
*/
- LWLockAcquire(SyncRepLock, LW_SHARED);
- sync_standbys = SyncRepGetSyncStandbys(NULL);
- LWLockRelease(SyncRepLock);
+ num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
for (i = 0; i < max_wal_senders; i++)
{
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int64 spillTxns;
int64 spillCount;
int64 spillBytes;
+ bool is_sync_standby;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ int j;
+ /* Collect data from shared memory */
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
@@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex);
+ /*
+ * Detect whether walsender is/was considered synchronous. We can
+ * provide some protection against stale data by checking the PID
+ * along with walsnd_index.
+ */
+ is_sync_standby = false;
+ for (j = 0; j < num_standbys; j++)
+ {
+ if (sync_standbys[j].walsnd_index == i &&
+ sync_standbys[j].pid == pid)
+ {
+ is_sync_standby = true;
+ break;
+ }
+ }
+
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(pid);
@@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
if (priority == 0)
values[10] = CStringGetTextDatum("async");
- else if (list_member_int(sync_standbys, i))
+ else if (is_sync_standby)
values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else