aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c56
1 files changed, 29 insertions, 27 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 370429d746c..2683385ca6e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -211,7 +211,7 @@ typedef struct
#define LAG_TRACKER_BUFFER_SIZE 8192
/* A mechanism for tracking replication lag. */
-static struct
+typedef struct
{
XLogRecPtr last_lsn;
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
@@ -220,6 +220,8 @@ static struct
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
} LagTracker;
+static LagTracker *lag_tracker;
+
/* Signal handlers */
static void WalSndLastCycleHandler(SIGNAL_ARGS);
@@ -282,7 +284,7 @@ InitWalSender(void)
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/* Initialize empty timestamp buffer for lag tracking. */
- memset(&LagTracker, 0, sizeof(LagTracker));
+ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
}
/*
@@ -3439,9 +3441,9 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
* If the lsn hasn't advanced since last time, then do nothing. This way
* we only record a new sample when new WAL has been written.
*/
- if (LagTracker.last_lsn == lsn)
+ if (lag_tracker->last_lsn == lsn)
return;
- LagTracker.last_lsn = lsn;
+ lag_tracker->last_lsn = lsn;
/*
* If advancing the write head of the circular buffer would crash into any
@@ -3449,11 +3451,11 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
* slowest reader (presumably apply) is the one that controls the release
* of space.
*/
- new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
+ new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
buffer_full = false;
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
{
- if (new_write_head == LagTracker.read_heads[i])
+ if (new_write_head == lag_tracker->read_heads[i])
buffer_full = true;
}
@@ -3464,17 +3466,17 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
*/
if (buffer_full)
{
- new_write_head = LagTracker.write_head;
- if (LagTracker.write_head > 0)
- LagTracker.write_head--;
+ new_write_head = lag_tracker->write_head;
+ if (lag_tracker->write_head > 0)
+ lag_tracker->write_head--;
else
- LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
+ lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
}
/* Store a sample at the current write head position. */
- LagTracker.buffer[LagTracker.write_head].lsn = lsn;
- LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
- LagTracker.write_head = new_write_head;
+ lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
+ lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
+ lag_tracker->write_head = new_write_head;
}
/*
@@ -3496,14 +3498,14 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
TimestampTz time = 0;
/* Read all unread samples up to this LSN or end of buffer. */
- while (LagTracker.read_heads[head] != LagTracker.write_head &&
- LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
+ while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
+ lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
{
- time = LagTracker.buffer[LagTracker.read_heads[head]].time;
- LagTracker.last_read[head] =
- LagTracker.buffer[LagTracker.read_heads[head]];
- LagTracker.read_heads[head] =
- (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
+ time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
+ lag_tracker->last_read[head] =
+ lag_tracker->buffer[lag_tracker->read_heads[head]];
+ lag_tracker->read_heads[head] =
+ (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
}
/*
@@ -3513,8 +3515,8 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
* interpolation at the beginning of the next burst of WAL after a period
* of idleness.
*/
- if (LagTracker.read_heads[head] == LagTracker.write_head)
- LagTracker.last_read[head].time = 0;
+ if (lag_tracker->read_heads[head] == lag_tracker->write_head)
+ lag_tracker->last_read[head].time = 0;
if (time > now)
{
@@ -3532,17 +3534,17 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
* eventually start moving again and cross one of our samples before
* we can show the lag increasing.
*/
- if (LagTracker.read_heads[head] == LagTracker.write_head)
+ if (lag_tracker->read_heads[head] == lag_tracker->write_head)
{
/* There are no future samples, so we can't interpolate. */
return -1;
}
- else if (LagTracker.last_read[head].time != 0)
+ else if (lag_tracker->last_read[head].time != 0)
{
/* We can interpolate between last_read and the next sample. */
double fraction;
- WalTimeSample prev = LagTracker.last_read[head];
- WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
+ WalTimeSample prev = lag_tracker->last_read[head];
+ WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
if (lsn < prev.lsn)
{
@@ -3579,7 +3581,7 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
* standby reaches the future sample the best we can do is report
* the hypothetical lag if that sample were to be replayed now.
*/
- time = LagTracker.buffer[LagTracker.read_heads[head]].time;
+ time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
}
}