diff options
Diffstat (limited to 'doc/src')
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index c89f93cf6bb..791a62b57c9 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); @@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); If <function>truncate_cb</function> is not set but a <command>TRUNCATE</command> is to be decoded, the action will be ignored. </para> + + <para> + An output plugin may also define functions to support streaming of large, + in-progress transactions. The <function>stream_start_cb</function>, + <function>stream_stop_cb</function>, <function>stream_abort_cb</function>, + <function>stream_commit_cb</function> and <function>stream_change_cb</function> + are required, while <function>stream_message_cb</function> and + <function>stream_truncate_cb</function> are optional. + </para> </sect2> <sect2 id="logicaldecoding-capabilities"> @@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, </para> </sect3> + <sect3 id="logicaldecoding-output-plugin-stream-start"> + <title>Stream Start Callback</title> + <para> + The <function>stream_start_cb</function> callback is called when opening + a block of streamed changes from an in-progress transaction. +<programlisting> +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-stop"> + <title>Stream Stop Callback</title> + <para> + The <function>stream_stop_cb</function> callback is called when closing + a block of streamed changes from an in-progress transaction. +<programlisting> +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-abort"> + <title>Stream Abort Callback</title> + <para> + The <function>stream_abort_cb</function> callback is called to abort + a previously streamed transaction. +<programlisting> +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-commit"> + <title>Stream Commit Callback</title> + <para> + The <function>stream_commit_cb</function> callback is called to commit + a previously streamed transaction. +<programlisting> +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-change"> + <title>Stream Change Callback</title> + <para> + The <function>stream_change_cb</function> callback is called when sending + a change in a block of streamed changes (demarcated by + <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls). + The actual changes are not displayed as the transaction can abort at a later + point in time and we don't decode changes for aborted transactions. +<programlisting> +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-message"> + <title>Stream Message Callback</title> + <para> + The <function>stream_message_cb</function> callback is called when sending + a generic message in a block of streamed changes (demarcated by + <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls). + The message contents for transactional messages are not displayed as the transaction + can abort at a later point in time and we don't decode changes for aborted + transactions. +<programlisting> +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); +</programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-stream-truncate"> + <title>Stream Truncate Callback</title> + <para> + The <function>stream_truncate_cb</function> callback is called for a + <command>TRUNCATE</command> command in a block of streamed changes + (demarcated by <function>stream_start_cb</function> and + <function>stream_stop_cb</function> calls). +<programlisting> +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); +</programlisting> + The parameters are analogous to the <function>stream_change_cb</function> + callback. However, because <command>TRUNCATE</command> actions on + tables connected by foreign keys need to be executed together, this + callback receives an array of relations instead of just a single one. + See the description of the <xref linkend="sql-truncate"/> statement for + details. + </para> + </sect3> + </sect2> <sect2 id="logicaldecoding-output-plugin-output"> @@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true); </para> </note> </sect1> + + <sect1 id="logicaldecoding-streaming"> + <title>Streaming of Large Transactions for Logical Decoding</title> + + <para> + The basic output plugin callbacks (e.g. <function>begin_cb</function>, + <function>change_cb</function>, <function>commit_cb</function> and + <function>message_cb</function>) are only invoked when the transaction + actually commits. The changes are still decoded from the transaction + log, but are only passed to the output plugin at commit (and discarded + if the transaction aborts). + </para> + + <para> + This means that while the decoding happens incrementally, and may spill + to disk to keep memory usage under control, all the decoded changes have + to be transmitted when the transaction finally commits (or more precisely, + when the commit is decoded from the transaction log). Depending on the + size of the transaction and network bandwidth, the transfer time may + significantly increase the apply lag. + </para> + + <para> + To reduce the apply lag caused by large transactions, an output plugin + may provide additional callback to support incremental streaming of + in-progress transactions. There are multiple required streaming callbacks + (<function>stream_start_cb</function>, <function>stream_stop_cb</function>, + <function>stream_abort_cb</function>, <function>stream_commit_cb</function> + and <function>stream_change_cb</function>) and two optional callbacks + (<function>stream_message_cb</function>) and (<function>stream_truncate_cb</function>). + </para> + + <para> + When streaming an in-progress transaction, the changes (and messages) are + streamed in blocks demarcated by <function>stream_start_cb</function> + and <function>stream_stop_cb</function> callbacks. Once all the decoded + changes are transmitted, the transaction is committed using the + <function>stream_commit_cb</function> callback (or possibly aborted using + the <function>stream_abort_cb</function> callback). + </para> + + <para> + One example sequence of streaming callback calls for one transaction may + look like this: +<programlisting> +stream_start_cb(...); <-- start of first block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_message_cb(...); + stream_change_cb(...); + ... + stream_change_cb(...); +stream_stop_cb(...); <-- end of first block of changes + +stream_start_cb(...); <-- start of second block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_change_cb(...); + ... + stream_message_cb(...); + stream_change_cb(...); +stream_stop_cb(...); <-- end of second block of changes + +stream_commit_cb(...); <-- commit of the streamed transaction +</programlisting> + </para> + + <para> + The actual sequence of callback calls may be more complicated, of course. + There may be blocks for multiple streamed transactions, some of the + transactions may get aborted, etc. + </para> + + <para> + Similar to spill-to-disk behavior, streaming is triggered when the total + amount of changes decoded from the WAL (for all in-progress transactions) + exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting. + At that point the largest toplevel transaction (measured by amount of memory + currently used for decoded changes) is selected and streamed. However, in + some cases we still have to spill to the disk even if streaming is enabled + because if we cross the memory limit but we still have not decoded the + complete tuple e.g. only decoded toast table insert but not the main table + insert. + </para> + + <para> + Even when streaming large transactions, the changes are still applied in + commit order, preserving the same guarantees as the non-streaming mode. + </para> + + </sect1> </chapter> |