aboutsummaryrefslogtreecommitdiff
path: root/doc/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-12-30 16:17:26 +0530
committerAmit Kapila <akapila@postgresql.org>2020-12-30 16:17:26 +0530
commit0aa8a01d04c8fe200b7a106878eebc3d0af9105c (patch)
tree79fe885496f4d3493ae327156c0baf1aa0e1e43a /doc/src
parentfa744697c79189a661f802d9a979d959b4454df0 (diff)
downloadpostgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.tar.gz
postgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.zip
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'doc/src')
-rw-r--r--doc/src/sgml/logicaldecoding.sgml172
1 files changed, 165 insertions, 7 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index ca78a81e9c5..d63f90ff282 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
@@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
An output plugin may also define functions to support streaming of large,
in-progress transactions. The <function>stream_start_cb</function>,
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
- <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+ <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
+ and <function>stream_prepare_cb</function>
are required, while <function>stream_message_cb</function> and
<function>stream_truncate_cb</function> are optional.
</para>
+
+ <para>
+ An output plugin may also define functions to support two-phase commits,
+ which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
+ The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
+ <function>stream_prepare_cb</function>,
+ <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
+ callbacks are required, while <function>filter_prepare_cb</function> is optional.
+ </para>
</sect2>
<sect2 id="logicaldecoding-capabilities">
@@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
never get
decoded. Successful savepoints are
folded into the transaction containing them in the order they were
- executed within that transaction.
+ executed within that transaction. A transaction that is prepared for
+ a two-phase commit using <command>PREPARE TRANSACTION</command> will
+ also be decoded if the output plugin callbacks needed for decoding
+ them are provided. It is possible that the current transaction which
+ is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+ command. In that case, the logical decoding of this transaction will
+ be aborted too. We will skip all the changes of such a transaction once
+ the abort is detected and abort the transaction when we read WAL for
+ <command>ROLLBACK PREPARED</command>.
</para>
<note>
@@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
an <command>INSERT</command>, <command>UPDATE</command>,
or <command>DELETE</command>. Even if the original command modified
several rows at once the callback will be called individually for each
- row.
+ row. The <function>change_cb</function> callback may access system or
+ user catalog tables to aid in the process of outputting the row
+ modification details. In case of decoding a prepared (but yet
+ uncommitted) transaction or decoding of an uncommitted transaction, this
+ change callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
<programlisting>
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
non-transactional and the XID was not assigned yet in the transaction
which logged the message. The <parameter>lsn</parameter> has WAL
location of the message. The <parameter>transactional</parameter> says
- if the message was sent as transactional or not.
+ if the message was sent as transactional or not. Similar to the change
+ callback, in case of decoding a prepared (but yet uncommitted)
+ transaction or decoding of an uncommitted transaction, this message
+ callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
+
The <parameter>prefix</parameter> is arbitrary null-terminated prefix
which can be used for identifying interesting messages for the current
plugin. And finally the <parameter>message</parameter> parameter holds
@@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+ <title>Prepare Filter Callback</title>
+
+ <para>
+ The optional <function>filter_prepare_cb</function> callback
+ is called to determine whether data that is part of the current
+ two-phase commit transaction should be considered for decode
+ at this prepare stage or as a regular one-phase transaction at
+ <command>COMMIT PREPARED</command> time later. To signal that
+ decoding should be skipped, return <literal>true</literal>;
+ <literal>false</literal> otherwise. When the callback is not
+ defined, <literal>false</literal> is assumed (i.e. nothing is
+ filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ const char *gid);
+</programlisting>
+ The <parameter>ctx</parameter> parameter has the same contents as for the
+ other callbacks. The <parameter>gid</parameter> is the identifier that later
+ identifies this transaction for <command>COMMIT PREPARED</command> or
+ <command>ROLLBACK PREPARED</command>.
+ </para>
+ <para>
+ The callback has to provide the same static answer for a given
+ <parameter>gid</parameter> every time it is called.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-begin-prepare">
+ <title>Transaction Begin Prepare Callback</title>
+
+ <para>
+ The required <function>begin_prepare_cb</function> callback is called
+ whenever the start of a prepared transaction has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback to
+ check if the plugin has already received this prepare in which case it
+ can skip the remaining changes of the transaction. This can only happen
+ if the user restarts the decoding after receiving the prepare for a
+ transaction but before receiving the commit prepared say because of some
+ error.
+ <programlisting>
+ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-prepare">
+ <title>Transaction Prepare Callback</title>
+
+ <para>
+ The required <function>prepare_cb</function> callback is called whenever
+ a transaction which is prepared for two-phase commit has been
+ decoded. The <function>change_cb</function> callback for all modified
+ rows will have been called before this, if there have been any modified
+ rows. The <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback.
+ <programlisting>
+ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+ <title>Transaction Commit Prepared Callback</title>
+
+ <para>
+ The required <function>commit_prepared_cb</function> callback is called
+ whenever a transaction commit prepared has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback.
+ <programlisting>
+ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
+ <title>Transaction Rollback Prepared Callback</title>
+
+ <para>
+ The required <function>rollback_prepared_cb</function> callback is called
+ whenever a transaction rollback prepared has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback. The
+ parameters <parameter>prepare_end_lsn</parameter> and
+ <parameter>prepare_time</parameter> can be used to check if the plugin
+ has received this prepare transaction in which case it can apply the
+ rollback, otherwise, it can skip the rollback operation. The
+ <parameter>gid</parameter> alone is not sufficient because the downstream
+ node can have prepared transaction with same identifier.
+ <programlisting>
+ typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr preapre_end_lsn,
+ TimestampTz prepare_time);
+ </programlisting>
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-start">
<title>Stream Start Callback</title>
<para>
@@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+ <title>Stream Prepare Callback</title>
+ <para>
+ The <function>stream_prepare_cb</function> callback is called to prepare
+ a previously streamed transaction as part of a two-phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-commit">
<title>Stream Commit Callback</title>
<para>
@@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true);
When streaming an in-progress transaction, the changes (and messages) are
streamed in blocks demarcated by <function>stream_start_cb</function>
and <function>stream_stop_cb</function> callbacks. Once all the decoded
- changes are transmitted, the transaction is committed using the
- <function>stream_commit_cb</function> callback (or possibly aborted using
- the <function>stream_abort_cb</function> callback).
+ changes are transmitted, the transaction can be committed using the
+ the <function>stream_commit_cb</function> callback
+ (or possibly aborted using the <function>stream_abort_cb</function> callback).
+ If two-phase commits are supported, the transaction can be prepared using the
+ <function>stream_prepare_cb</function> callback, commit prepared using the
+ <function>commit_prepared_cb</function> callback or aborted using the
+ <function>rollback_prepared_cb</function>.
</para>
<para>