aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c50
-rw-r--r--src/backend/replication/logical/reorderbuffer.c20
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c54
-rw-r--r--src/include/replication/reorderbuffer.h12
-rw-r--r--src/tools/pgindent/typedefs.list1
5 files changed, 89 insertions, 48 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd76497..c3ec97a0a62 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
+/* callback to update txn's progress */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+ ReorderBufferTXN *txn,
+ XLogRecPtr lsn);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
/*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+ /*
+ * Callback to support updating progress during sending data of a
+ * transaction (and its subtransactions) to the output plugin.
+ */
+ ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "update_progress_txn";
+ state.report_location = lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * Report this change's lsn so replies from clients can give an up-to-date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = lsn;
+
+ ctx->end_xact = false;
+
+ OutputPluginUpdateProgress(ctx, false);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 0468d12936f..d5f90a5f5d2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
PG_TRY();
{
ReorderBufferChange *change;
+ int changes_count = 0; /* used to accumulate the number of
+ * changes */
if (using_subtxn)
BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
elog(ERROR, "tuplecid value in changequeue");
break;
}
+
+ /*
+ * It is possible that the data is not sent to downstream for a
+ * long time either because the output plugin filtered it or there
+ * is a DDL that generates a lot of data that is not processed by
+ * the plugin. So, in such cases, the downstream can timeout. To
+ * avoid that we try to send a keepalive message if required.
+ * Trying to send a keepalive message after every change has some
+ * overhead, but testing showed there is no noticeable overhead if
+ * we do it after every ~100 changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ if (++changes_count >= CHANGES_THRESHOLD)
+ {
+ rb->update_progress_txn(rb, txn, change->lsn);
+ changes_count = 0;
+ }
}
/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4938d8888f..73b080060da 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
- bool skipped_xact);
/*
* Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* from this transaction has been sent to the downstream.
*/
sent_begin_txn = txndata->sent_begin_txn;
- update_replication_progress(ctx, !sent_begin_txn);
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
pfree(txndata);
txn->output_plugin_private = NULL;
@@ -625,7 +623,7 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- update_replication_progress(ctx, false);
-
if (!is_publishable_relation(relation))
return;
@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx, false);
-
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx, false);
-
if (!data->messages)
return;
@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert(rbtxn_is_streamed(txn));
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
}
}
}
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
- static int changes_count = 0;
-
- /*
- * We don't want to try sending a keepalive message after processing each
- * change as that can have overhead. Tests revealed that there is no
- * noticeable overhead in doing it after continuously processing 100 or so
- * changes.
- */
-#define CHANGES_THRESHOLD 100
-
- /*
- * If we are at the end of transaction LSN, update progress tracking.
- * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
- * try to send a keepalive message if required.
- */
- if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
- {
- OutputPluginUpdateProgress(ctx, skipped_xact);
- changes_count = 0;
- }
-}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index e5db041df18..215d1494e90 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -526,6 +526,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
Relation relations[],
ReorderBufferChange *change);
+/* update progress txn callback signature */
+typedef void (*ReorderBufferUpdateProgressTxnCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr lsn);
+
struct ReorderBuffer
{
/*
@@ -590,6 +596,12 @@ struct ReorderBuffer
ReorderBufferStreamTruncateCB stream_truncate;
/*
+ * Callback to be called when updating progress during sending data of a
+ * transaction (and its subtransactions) to the output plugin.
+ */
+ ReorderBufferUpdateProgressTxnCB update_progress_txn;
+
+ /*
* Pointer that will be passed untouched to the callbacks.
*/
void *private_data;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 07fbb7ccf6e..d3224dfc36e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2311,6 +2311,7 @@ ReorderBufferToastEnt
ReorderBufferTupleBuf
ReorderBufferTupleCidEnt
ReorderBufferTupleCidKey
+ReorderBufferUpdateProgressTxnCB
ReorderTuple
RepOriginId
ReparameterizeForeignPathByChild_function