aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2017-05-12 10:50:56 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2017-05-12 10:50:56 +0100
commit024711bb544645c8b1061e9f02b261e2e336981d (patch)
tree8c0a7b85c0260c260ddd6cb89cd4ee578b56ab12
parentefa2c18f4e8a8ccc74d9005d960f4c1a2bf05ea9 (diff)
downloadpostgresql-024711bb544645c8b1061e9f02b261e2e336981d.tar.gz
postgresql-024711bb544645c8b1061e9f02b261e2e336981d.zip
Lag tracking for logical replication
Lag tracking is called for each commit, but we introduce a pacing delay to ensure we don't swamp the lag tracker. Author: Petr Jelinek, with minor pacing delay code from me
-rw-r--r--src/backend/replication/logical/logical.c34
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c2
-rw-r--r--src/backend/replication/slotfuncs.c3
-rw-r--r--src/backend/replication/walsender.c45
-rw-r--r--src/include/replication/logical.h15
-rw-r--r--src/include/replication/output_plugin.h1
7 files changed, 79 insertions, 23 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c53456..7409e5ce3de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
+ ctx->update_progress = update_progress;
ctx->output_plugin_options = output_plugin_options;
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
*
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- * perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ * callbacks that have to be filled to perform the use-case dependent,
+ * actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
need_full_snapshot, read_page, prepare_write,
- do_write);
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
* output_plugin_options
* contains options passed to the output plugin.
*
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- read_page, prepare_write, do_write);
+ read_page, prepare_write, do_write,
+ update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
}
/*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+ if (!ctx->update_progress)
+ return;
+
+ ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
* Load the output plugin, lookup its output plugin init function, and check
* that it provides the required callbacks.
*/
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92f57b..27164de093d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
options,
logical_read_local_xlog_page,
LogicalOutputPrepareWrite,
- LogicalOutputWrite);
+ LogicalOutputWrite, NULL);
MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccffd5b..4ddfbf7a98b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ OutputPluginUpdateProgress(ctx);
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68819a..56a9ca96517 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
*/
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
false, /* do not build snapshot */
- logical_read_local_xlog_page, NULL, NULL);
+ logical_read_local_xlog_page, NULL, NULL,
+ NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45d027803ab..e4e5337d549 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
logical_read_xlog_page,
- WalSndPrepareWrite, WalSndWriteData);
+ WalSndPrepareWrite, WalSndWriteData,
+ WalSndUpdateProgress);
/*
* Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
*/
- logical_decoding_ctx = CreateDecodingContext(
- cmd->startpoint, cmd->options,
+ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
logical_read_xlog_page,
- WalSndPrepareWrite, WalSndWriteData);
+ WalSndPrepareWrite,
+ WalSndWriteData,
+ WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1240,6 +1244,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+ static TimestampTz sendTime = 0;
+ TimestampTz now = GetCurrentTimestamp();
+
+ /*
+ * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+ * to avoid flooding the lag tracker when we commit frequently.
+ */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
+ if (!TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ return;
+
+ LagTrackerWrite(lsn, now);
+ sendTime = now;
+}
+
+/*
* Wait till WAL < loc is flushed to disk so it can be safely read.
*/
static XLogRecPtr
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
if (record != NULL)
{
/*
- * Note the lack of any call to LagTrackerWrite() which is the responsibility
- * of the logical decoding plugin. Response messages are handled normally,
- * so this responsibility does not extend to needing to call LagTrackerRead().
+ * Note the lack of any call to LagTrackerWrite() which is handled
+ * by WalSndUpdateProgress which is called by output plugin through
+ * logical decoding write api.
*/
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
* LagTrackerRead can compute the elapsed time (lag) when this WAL position is
* eventually reported to have been written, flushed and applied by the
* standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
*/
-void
+static void
LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
{
bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0bbaef..090f9c82680 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+ struct LogicalDecodingContext *lr,
+ XLogRecPtr Ptr,
+ TransactionId xid
+);
+
typedef struct LogicalDecodingContext
{
/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite write;
+ LogicalOutputPluginWriterUpdateProgress update_progress;
/*
* Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write);
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(
XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write);
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress);
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
#endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d0c0c..2435e2be2d2 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
/* Functions in replication/logical/logical.c */
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
#endif /* OUTPUT_PLUGIN_H */