aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c24
1 files changed, 21 insertions, 3 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 63a818140bd..c6c196b2fab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1482,14 +1482,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
+ bool pending_writes = false;
+ bool end_xact = ctx->end_xact;
/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
* avoid flooding the lag tracker when we commit frequently.
+ *
+ * We don't have a mechanism to get the ack for any LSN other than end
+ * xact LSN from the downstream. So, we track lag only for end of
+ * transaction LSN.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
- if (TimestampDifferenceExceeds(sendTime, now,
- WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
{
LagTrackerWrite(lsn, now);
sendTime = now;
@@ -1515,8 +1521,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
/* If we have pending write here, make sure it's actually flushed */
if (pq_is_send_pending())
- ProcessPendingWrites();
+ pending_writes = true;
}
+
+ /*
+ * Process pending writes if any or try to send a keepalive if required.
+ * We don't need to try sending keep alive messages at the transaction end
+ * as that will be done at a later point in time. This is required only
+ * for large transactions where we don't send any changes to the
+ * downstream and the receiver can timeout due to that.
+ */
+ if (pending_writes || (!end_xact &&
+ now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2)))
+ ProcessPendingWrites();
}
/*