aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/test_decoding.c176
-rw-r--r--doc/src/sgml/logicaldecoding.sgml218
-rw-r--r--src/backend/replication/logical/logical.c351
-rw-r--r--src/include/replication/logical.h5
-rw-r--r--src/include/replication/output_plugin.h69
-rw-r--r--src/include/replication/reorderbuffer.h59
6 files changed, 878 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 93c948856e7..dbef52a3af4 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_start(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pg_decode_stream_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+static void pg_decode_stream_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
+static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change);
void
_PG_init(void)
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->stream_start_cb = pg_decode_stream_start;
+ cb->stream_stop_cb = pg_decode_stream_stop;
+ cb->stream_abort_cb = pg_decode_stream_abort;
+ cb->stream_commit_cb = pg_decode_stream_commit;
+ cb->stream_change_cb = pg_decode_stream_change;
+ cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_start(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "opening a streamed block for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_stop(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "closing a streamed block for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_abort(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_commit(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "committing streamed transaction");
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the changes as the transaction can abort
+ * at a later point in time. We don't want users to see the changes until the
+ * transaction is committed.
+ */
+static void
+pg_decode_stream_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "streaming change for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the contents for transactional messages
+ * as the transaction can abort at a later point in time. We don't want users to
+ * see the message contents until the transaction is committed.
+ */
+static void
+pg_decode_stream_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (transactional)
+ {
+ appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
+ transactional, prefix, sz);
+ }
+ else
+ {
+ appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
+ transactional, prefix, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ }
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the detailed information of Truncate.
+ * See pg_decode_stream_change.
+ */
+static void
+pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "streaming truncate for transaction");
+ OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index c89f93cf6bb..791a62b57c9 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
@@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para>
+
+ <para>
+ An output plugin may also define functions to support streaming of large,
+ in-progress transactions. The <function>stream_start_cb</function>,
+ <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
+ <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+ are required, while <function>stream_message_cb</function> and
+ <function>stream_truncate_cb</function> are optional.
+ </para>
</sect2>
<sect2 id="logicaldecoding-capabilities">
@@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-start">
+ <title>Stream Start Callback</title>
+ <para>
+ The <function>stream_start_cb</function> callback is called when opening
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-stop">
+ <title>Stream Stop Callback</title>
+ <para>
+ The <function>stream_stop_cb</function> callback is called when closing
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-abort">
+ <title>Stream Abort Callback</title>
+ <para>
+ The <function>stream_abort_cb</function> callback is called to abort
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-commit">
+ <title>Stream Commit Callback</title>
+ <para>
+ The <function>stream_commit_cb</function> callback is called to commit
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-change">
+ <title>Stream Change Callback</title>
+ <para>
+ The <function>stream_change_cb</function> callback is called when sending
+ a change in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The actual changes are not displayed as the transaction can abort at a later
+ point in time and we don't decode changes for aborted transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-message">
+ <title>Stream Message Callback</title>
+ <para>
+ The <function>stream_message_cb</function> callback is called when sending
+ a generic message in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The message contents for transactional messages are not displayed as the transaction
+ can abort at a later point in time and we don't decode changes for aborted
+ transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-truncate">
+ <title>Stream Truncate Callback</title>
+ <para>
+ The <function>stream_truncate_cb</function> callback is called for a
+ <command>TRUNCATE</command> command in a block of streamed changes
+ (demarcated by <function>stream_start_cb</function> and
+ <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+</programlisting>
+ The parameters are analogous to the <function>stream_change_cb</function>
+ callback. However, because <command>TRUNCATE</command> actions on
+ tables connected by foreign keys need to be executed together, this
+ callback receives an array of relations instead of just a single one.
+ See the description of the <xref linkend="sql-truncate"/> statement for
+ details.
+ </para>
+ </sect3>
+
</sect2>
<sect2 id="logicaldecoding-output-plugin-output">
@@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true);
</para>
</note>
</sect1>
+
+ <sect1 id="logicaldecoding-streaming">
+ <title>Streaming of Large Transactions for Logical Decoding</title>
+
+ <para>
+ The basic output plugin callbacks (e.g. <function>begin_cb</function>,
+ <function>change_cb</function>, <function>commit_cb</function> and
+ <function>message_cb</function>) are only invoked when the transaction
+ actually commits. The changes are still decoded from the transaction
+ log, but are only passed to the output plugin at commit (and discarded
+ if the transaction aborts).
+ </para>
+
+ <para>
+ This means that while the decoding happens incrementally, and may spill
+ to disk to keep memory usage under control, all the decoded changes have
+ to be transmitted when the transaction finally commits (or more precisely,
+ when the commit is decoded from the transaction log). Depending on the
+ size of the transaction and network bandwidth, the transfer time may
+ significantly increase the apply lag.
+ </para>
+
+ <para>
+ To reduce the apply lag caused by large transactions, an output plugin
+ may provide additional callback to support incremental streaming of
+ in-progress transactions. There are multiple required streaming callbacks
+ (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
+ <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
+ and <function>stream_change_cb</function>) and two optional callbacks
+ (<function>stream_message_cb</function>) and (<function>stream_truncate_cb</function>).
+ </para>
+
+ <para>
+ When streaming an in-progress transaction, the changes (and messages) are
+ streamed in blocks demarcated by <function>stream_start_cb</function>
+ and <function>stream_stop_cb</function> callbacks. Once all the decoded
+ changes are transmitted, the transaction is committed using the
+ <function>stream_commit_cb</function> callback (or possibly aborted using
+ the <function>stream_abort_cb</function> callback).
+ </para>
+
+ <para>
+ One example sequence of streaming callback calls for one transaction may
+ look like this:
+<programlisting>
+stream_start_cb(...); &lt;-- start of first block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_message_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of first block of changes
+
+stream_start_cb(...); &lt;-- start of second block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_message_cb(...);
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of second block of changes
+
+stream_commit_cb(...); &lt;-- commit of the streamed transaction
+</programlisting>
+ </para>
+
+ <para>
+ The actual sequence of callback calls may be more complicated, of course.
+ There may be blocks for multiple streamed transactions, some of the
+ transactions may get aborted, etc.
+ </para>
+
+ <para>
+ Similar to spill-to-disk behavior, streaming is triggered when the total
+ amount of changes decoded from the WAL (for all in-progress transactions)
+ exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting.
+ At that point the largest toplevel transaction (measured by amount of memory
+ currently used for decoded changes) is selected and streamed. However, in
+ some cases we still have to spill to the disk even if streaming is enabled
+ because if we cross the memory limit but we still have not decoded the
+ complete tuple e.g. only decoded toast table insert but not the main table
+ insert.
+ </para>
+
+ <para>
+ Even when streaming large transactions, the changes are still applied in
+ commit order, preserving the same guarantees as the non-streaming mode.
+ </para>
+
+ </sect1>
</chapter>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0e..05d24b93da0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+/* streaming callbacks */
+static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change);
+static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
+static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
/*
@@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ /*
+ * To support streaming, we require start/stop/abort/commit/change
+ * callbacks. The message and truncate callbacks are optional, similar to
+ * regular output plugins. We however enable streaming when at least one
+ * of the methods is enabled so that we can easily identify missing
+ * methods.
+ *
+ * We decide it here, but only check it later in the wrappers.
+ */
+ ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
+ (ctx->callbacks.stream_stop_cb != NULL) ||
+ (ctx->callbacks.stream_abort_cb != NULL) ||
+ (ctx->callbacks.stream_commit_cb != NULL) ||
+ (ctx->callbacks.stream_change_cb != NULL) ||
+ (ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_truncate_cb != NULL);
+
+ /*
+ * streaming callbacks
+ *
+ * stream_message and stream_truncate callbacks are optional, so we do not
+ * fail with ERROR when missing, but the wrappers simply do nothing. We
+ * must set the ReorderBuffer callbacks to something, otherwise the calls
+ * from there will crash (we don't want to move the checks there).
+ */
+ ctx->reorder->stream_start = stream_start_cb_wrapper;
+ ctx->reorder->stream_stop = stream_stop_cb_wrapper;
+ ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+ ctx->reorder->stream_commit = stream_commit_cb_wrapper;
+ ctx->reorder->stream_change = stream_change_cb_wrapper;
+ ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_start";
+ state.report_location = first_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = first_lsn;
+
+ /* in streaming mode, stream_start_cb is required */
+ if (ctx->callbacks.stream_start_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_start_cb callback")));
+
+ ctx->callbacks.stream_start_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_stop";
+ state.report_location = last_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = last_lsn;
+
+ /* in streaming mode, stream_stop_cb is required */
+ if (ctx->callbacks.stream_stop_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_stop_cb callback")));
+
+ ctx->callbacks.stream_stop_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_abort";
+ state.report_location = abort_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = abort_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_abort_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_abort_cb callback")));
+
+ ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_commit";
+ state.report_location = txn->final_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_commit_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_commit_cb callback")));
+
+ ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_change";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ /* in streaming mode, stream_change_cb is required */
+ if (ctx->callbacks.stream_change_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_change_cb callback")));
+
+ ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (!ctx->callbacks.stream_truncate_cb)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_truncate";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2f2475e5d3..deef31825d6 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -80,6 +80,11 @@ typedef struct LogicalDecodingContext
void *output_writer_private;
/*
+ * Does the output plugin support streaming, and is it enabled?
+ */
+ bool streaming;
+
+ /*
* State for writing output.
*/
bool accept_writes;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 3dd9236c576..b78c796450a 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ /* streaming of changes */
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
/* Functions in replication/logical/logical.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1055e99e2e1..42bc8176487 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* start streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStartCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+
+/* stop streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStopCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+
+/* discard streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/* commit streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* stream change callback signature */
+typedef void (*ReorderBufferStreamChangeCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* stream message callback signature */
+typedef void (*ReorderBufferStreamMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
+/* stream truncate callback signature */
+typedef void (*ReorderBufferStreamTruncateCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
struct ReorderBuffer
{
/*
@@ -387,6 +435,17 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction.
+ */
+ ReorderBufferStreamStartCB stream_start;
+ ReorderBufferStreamStopCB stream_stop;
+ ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamCommitCB stream_commit;
+ ReorderBufferStreamChangeCB stream_change;
+ ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamTruncateCB stream_truncate;
+
+ /*
* Pointer that will be passed untouched to the callbacks.
*/
void *private_data;