diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-12-30 16:17:26 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-12-30 16:17:26 +0530 |
commit | 0aa8a01d04c8fe200b7a106878eebc3d0af9105c (patch) | |
tree | 79fe885496f4d3493ae327156c0baf1aa0e1e43a /doc/src | |
parent | fa744697c79189a661f802d9a979d959b4454df0 (diff) | |
download | postgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.tar.gz postgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.zip |
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for
streaming changes of two-phase transactions at prepare time.
* begin_prepare
* filter_prepare
* prepare
* commit_prepared
* rollback_prepared
* stream_prepare
Most of this is a simple extension of the existing methods, with the
semantic difference that the transaction is not yet committed and maybe
aborted later.
Until now two-phase transactions were translated into regular transactions
on the subscriber, and the GID was not forwarded to it. None of the
two-phase commands were communicated to the subscriber.
This patch provides the infrastructure for logical decoding plugins to be
informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED
and ROLLBACK PREPARED commands with the corresponding GID.
This also extends the 'test_decoding' plugin, implementing these new
methods.
This commit simply adds these new APIs and the upcoming patch to "allow
the decoding at prepare time in ReorderBuffer" will use these APIs.
Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'doc/src')
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 172 |
1 files changed, 165 insertions, 7 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index ca78a81e9c5..d63f90ff282 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodeBeginPrepareCB begin_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; @@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); An output plugin may also define functions to support streaming of large, in-progress transactions. The <function>stream_start_cb</function>, <function>stream_stop_cb</function>, <function>stream_abort_cb</function>, - <function>stream_commit_cb</function> and <function>stream_change_cb</function> + <function>stream_commit_cb</function>, <function>stream_change_cb</function>, + and <function>stream_prepare_cb</function> are required, while <function>stream_message_cb</function> and <function>stream_truncate_cb</function> are optional. </para> + + <para> + An output plugin may also define functions to support two-phase commits, + which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>. + The <function>begin_prepare_cb</function>, <function>prepare_cb</function>, + <function>stream_prepare_cb</function>, + <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function> + callbacks are required, while <function>filter_prepare_cb</function> is optional. + </para> </sect2> <sect2 id="logicaldecoding-capabilities"> @@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using <command>PREPARE TRANSACTION</command> will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command> + command. In that case, the logical decoding of this transaction will + be aborted too. We will skip all the changes of such a transaction once + the abort is detected and abort the transaction when we read WAL for + <command>ROLLBACK PREPARED</command>. </para> <note> @@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an <command>INSERT</command>, <command>UPDATE</command>, or <command>DELETE</command>. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The <function>change_cb</function> callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. <programlisting> typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The <parameter>lsn</parameter> has WAL location of the message. The <parameter>transactional</parameter> says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. + The <parameter>prefix</parameter> is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the <parameter>message</parameter> parameter holds @@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, </para> </sect3> + <sect3 id="logicaldecoding-output-plugin-filter-prepare"> + <title>Prepare Filter Callback</title> + + <para> + The optional <function>filter_prepare_cb</function> callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + <command>COMMIT PREPARED</command> time later. To signal that + decoding should be skipped, return <literal>true</literal>; + <literal>false</literal> otherwise. When the callback is not + defined, <literal>false</literal> is assumed (i.e. nothing is + filtered). +<programlisting> +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + const char *gid); +</programlisting> + The <parameter>ctx</parameter> parameter has the same contents as for the + other callbacks. The <parameter>gid</parameter> is the identifier that later + identifies this transaction for <command>COMMIT PREPARED</command> or + <command>ROLLBACK PREPARED</command>. + </para> + <para> + The callback has to provide the same static answer for a given + <parameter>gid</parameter> every time it is called. + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-begin-prepare"> + <title>Transaction Begin Prepare Callback</title> + + <para> + The required <function>begin_prepare_cb</function> callback is called + whenever the start of a prepared transaction has been decoded. The + <parameter>gid</parameter> field, which is part of the + <parameter>txn</parameter> parameter can be used in this callback to + check if the plugin has already received this prepare in which case it + can skip the remaining changes of the transaction. This can only happen + if the user restarts the decoding after receiving the prepare for a + transaction but before receiving the commit prepared say because of some + error. + <programlisting> + typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + </programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-prepare"> + <title>Transaction Prepare Callback</title> + + <para> + The required <function>prepare_cb</function> callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The <function>change_cb</function> callback for all modified + rows will have been called before this, if there have been any modified + rows. The <parameter>gid</parameter> field, which is part of the + <parameter>txn</parameter> parameter can be used in this callback. + <programlisting> + typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + </programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-commit-prepared"> + <title>Transaction Commit Prepared Callback</title> + + <para> + The required <function>commit_prepared_cb</function> callback is called + whenever a transaction commit prepared has been decoded. The + <parameter>gid</parameter> field, which is part of the + <parameter>txn</parameter> parameter can be used in this callback. + <programlisting> + typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + </programlisting> + </para> + </sect3> + + <sect3 id="logicaldecoding-output-plugin-rollback-prepared"> + <title>Transaction Rollback Prepared Callback</title> + + <para> + The required <function>rollback_prepared_cb</function> callback is called + whenever a transaction rollback prepared has been decoded. The + <parameter>gid</parameter> field, which is part of the + <parameter>txn</parameter> parameter can be used in this callback. The + parameters <parameter>prepare_end_lsn</parameter> and + <parameter>prepare_time</parameter> can be used to check if the plugin + has received this prepare transaction in which case it can apply the + rollback, otherwise, it can skip the rollback operation. The + <parameter>gid</parameter> alone is not sufficient because the downstream + node can have prepared transaction with same identifier. + <programlisting> + typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr preapre_end_lsn, + TimestampTz prepare_time); + </programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-stream-start"> <title>Stream Start Callback</title> <para> @@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, </para> </sect3> + <sect3 id="logicaldecoding-output-plugin-stream-prepare"> + <title>Stream Prepare Callback</title> + <para> + The <function>stream_prepare_cb</function> callback is called to prepare + a previously streamed transaction as part of a two-phase commit. +<programlisting> +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +</programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-stream-commit"> <title>Stream Commit Callback</title> <para> @@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by <function>stream_start_cb</function> and <function>stream_stop_cb</function> callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - <function>stream_commit_cb</function> callback (or possibly aborted using - the <function>stream_abort_cb</function> callback). + changes are transmitted, the transaction can be committed using the + the <function>stream_commit_cb</function> callback + (or possibly aborted using the <function>stream_abort_cb</function> callback). + If two-phase commits are supported, the transaction can be prepared using the + <function>stream_prepare_cb</function> callback, commit prepared using the + <function>commit_prepared_cb</function> callback or aborted using the + <function>rollback_prepared_cb</function>. </para> <para> |