From 0aa8a01d04c8fe200b7a106878eebc3d0af9105c Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 30 Dec 2020 16:17:26 +0530 Subject: Extend the output plugin API to allow decoding of prepared xacts. This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com --- doc/src/sgml/logicaldecoding.sgml | 172 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 165 insertions(+), 7 deletions(-) (limited to 'doc/src') diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index ca78a81e9c5..d63f90ff282 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodeBeginPrepareCB begin_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; @@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); An output plugin may also define functions to support streaming of large, in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, - stream_commit_cb and stream_change_cb + stream_commit_cb, stream_change_cb, + and stream_prepare_cb are required, while stream_message_cb and stream_truncate_cb are optional. + + + An output plugin may also define functions to support two-phase commits, + which allows actions to be decoded on the PREPARE TRANSACTION. + The begin_prepare_cb, prepare_cb, + stream_prepare_cb, + commit_prepared_cb and rollback_prepared_cb + callbacks are required, while filter_prepare_cb is optional. + @@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding of this transaction will + be aborted too. We will skip all the changes of such a transaction once + the abort is detected and abort the transaction when we read WAL for + ROLLBACK PREPARED. @@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. + The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds @@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; + false otherwise. When the callback is not + defined, false is assumed (i.e. nothing is + filtered). + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + const char *gid); + + The ctx parameter has the same contents as for the + other callbacks. The gid is the identifier that later + identifies this transaction for COMMIT PREPARED or + ROLLBACK PREPARED. + + + The callback has to provide the same static answer for a given + gid every time it is called. + + + + + Transaction Begin Prepare Callback + + + The required begin_prepare_cb callback is called + whenever the start of a prepared transaction has been decoded. The + gid field, which is part of the + txn parameter can be used in this callback to + check if the plugin has already received this prepare in which case it + can skip the remaining changes of the transaction. This can only happen + if the user restarts the decoding after receiving the prepare for a + transaction but before receiving the commit prepared say because of some + error. + + typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + + + + + Transaction Prepare Callback + + + The required prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callback for all modified + rows will have been called before this, if there have been any modified + rows. The gid field, which is part of the + txn parameter can be used in this callback. + + typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Transaction Commit Prepared Callback + + + The required commit_prepared_cb callback is called + whenever a transaction commit prepared has been decoded. The + gid field, which is part of the + txn parameter can be used in this callback. + + typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Transaction Rollback Prepared Callback + + + The required rollback_prepared_cb callback is called + whenever a transaction rollback prepared has been decoded. The + gid field, which is part of the + txn parameter can be used in this callback. The + parameters prepare_end_lsn and + prepare_time can be used to check if the plugin + has received this prepare transaction in which case it can apply the + rollback, otherwise, it can skip the rollback operation. The + gid alone is not sufficient because the downstream + node can have prepared transaction with same identifier. + + typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr preapre_end_lsn, + TimestampTz prepare_time); + + + + Stream Start Callback @@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + + Stream Prepare Callback + + The stream_prepare_cb callback is called to prepare + a previously streamed transaction as part of a two-phase commit. + +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + Stream Commit Callback @@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - stream_commit_cb callback (or possibly aborted using - the stream_abort_cb callback). + changes are transmitted, the transaction can be committed using the + the stream_commit_cb callback + (or possibly aborted using the stream_abort_cb callback). + If two-phase commits are supported, the transaction can be prepared using the + stream_prepare_cb callback, commit prepared using the + commit_prepared_cb callback or aborted using the + rollback_prepared_cb. -- cgit v1.2.3