aboutsummaryrefslogtreecommitdiff
path: root/doc/src
diff options
context:
space:
mode:
Diffstat (limited to 'doc/src')
-rw-r--r--doc/src/sgml/logicaldecoding.sgml218
1 files changed, 218 insertions, 0 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index c89f93cf6bb..791a62b57c9 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
@@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para>
+
+ <para>
+ An output plugin may also define functions to support streaming of large,
+ in-progress transactions. The <function>stream_start_cb</function>,
+ <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
+ <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+ are required, while <function>stream_message_cb</function> and
+ <function>stream_truncate_cb</function> are optional.
+ </para>
</sect2>
<sect2 id="logicaldecoding-capabilities">
@@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-start">
+ <title>Stream Start Callback</title>
+ <para>
+ The <function>stream_start_cb</function> callback is called when opening
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-stop">
+ <title>Stream Stop Callback</title>
+ <para>
+ The <function>stream_stop_cb</function> callback is called when closing
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-abort">
+ <title>Stream Abort Callback</title>
+ <para>
+ The <function>stream_abort_cb</function> callback is called to abort
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-commit">
+ <title>Stream Commit Callback</title>
+ <para>
+ The <function>stream_commit_cb</function> callback is called to commit
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-change">
+ <title>Stream Change Callback</title>
+ <para>
+ The <function>stream_change_cb</function> callback is called when sending
+ a change in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The actual changes are not displayed as the transaction can abort at a later
+ point in time and we don't decode changes for aborted transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-message">
+ <title>Stream Message Callback</title>
+ <para>
+ The <function>stream_message_cb</function> callback is called when sending
+ a generic message in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The message contents for transactional messages are not displayed as the transaction
+ can abort at a later point in time and we don't decode changes for aborted
+ transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-truncate">
+ <title>Stream Truncate Callback</title>
+ <para>
+ The <function>stream_truncate_cb</function> callback is called for a
+ <command>TRUNCATE</command> command in a block of streamed changes
+ (demarcated by <function>stream_start_cb</function> and
+ <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+</programlisting>
+ The parameters are analogous to the <function>stream_change_cb</function>
+ callback. However, because <command>TRUNCATE</command> actions on
+ tables connected by foreign keys need to be executed together, this
+ callback receives an array of relations instead of just a single one.
+ See the description of the <xref linkend="sql-truncate"/> statement for
+ details.
+ </para>
+ </sect3>
+
</sect2>
<sect2 id="logicaldecoding-output-plugin-output">
@@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true);
</para>
</note>
</sect1>
+
+ <sect1 id="logicaldecoding-streaming">
+ <title>Streaming of Large Transactions for Logical Decoding</title>
+
+ <para>
+ The basic output plugin callbacks (e.g. <function>begin_cb</function>,
+ <function>change_cb</function>, <function>commit_cb</function> and
+ <function>message_cb</function>) are only invoked when the transaction
+ actually commits. The changes are still decoded from the transaction
+ log, but are only passed to the output plugin at commit (and discarded
+ if the transaction aborts).
+ </para>
+
+ <para>
+ This means that while the decoding happens incrementally, and may spill
+ to disk to keep memory usage under control, all the decoded changes have
+ to be transmitted when the transaction finally commits (or more precisely,
+ when the commit is decoded from the transaction log). Depending on the
+ size of the transaction and network bandwidth, the transfer time may
+ significantly increase the apply lag.
+ </para>
+
+ <para>
+ To reduce the apply lag caused by large transactions, an output plugin
+ may provide additional callback to support incremental streaming of
+ in-progress transactions. There are multiple required streaming callbacks
+ (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
+ <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
+ and <function>stream_change_cb</function>) and two optional callbacks
+ (<function>stream_message_cb</function>) and (<function>stream_truncate_cb</function>).
+ </para>
+
+ <para>
+ When streaming an in-progress transaction, the changes (and messages) are
+ streamed in blocks demarcated by <function>stream_start_cb</function>
+ and <function>stream_stop_cb</function> callbacks. Once all the decoded
+ changes are transmitted, the transaction is committed using the
+ <function>stream_commit_cb</function> callback (or possibly aborted using
+ the <function>stream_abort_cb</function> callback).
+ </para>
+
+ <para>
+ One example sequence of streaming callback calls for one transaction may
+ look like this:
+<programlisting>
+stream_start_cb(...); &lt;-- start of first block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_message_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of first block of changes
+
+stream_start_cb(...); &lt;-- start of second block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_message_cb(...);
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of second block of changes
+
+stream_commit_cb(...); &lt;-- commit of the streamed transaction
+</programlisting>
+ </para>
+
+ <para>
+ The actual sequence of callback calls may be more complicated, of course.
+ There may be blocks for multiple streamed transactions, some of the
+ transactions may get aborted, etc.
+ </para>
+
+ <para>
+ Similar to spill-to-disk behavior, streaming is triggered when the total
+ amount of changes decoded from the WAL (for all in-progress transactions)
+ exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting.
+ At that point the largest toplevel transaction (measured by amount of memory
+ currently used for decoded changes) is selected and streamed. However, in
+ some cases we still have to spill to the disk even if streaming is enabled
+ because if we cross the memory limit but we still have not decoded the
+ complete tuple e.g. only decoded toast table insert but not the main table
+ insert.
+ </para>
+
+ <para>
+ Even when streaming large transactions, the changes are still applied in
+ commit order, preserving the same guarantees as the non-streaming mode.
+ </para>
+
+ </sect1>
</chapter>