aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-12-30 16:17:26 +0530
committerAmit Kapila <akapila@postgresql.org>2020-12-30 16:17:26 +0530
commit0aa8a01d04c8fe200b7a106878eebc3d0af9105c (patch)
tree79fe885496f4d3493ae327156c0baf1aa0e1e43a
parentfa744697c79189a661f802d9a979d959b4454df0 (diff)
downloadpostgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.tar.gz
postgresql-0aa8a01d04c8fe200b7a106878eebc3d0af9105c.zip
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
-rw-r--r--contrib/test_decoding/test_decoding.c167
-rw-r--r--doc/src/sgml/logicaldecoding.sgml172
-rw-r--r--src/backend/replication/logical/logical.c297
-rw-r--r--src/include/replication/logical.h6
-rw-r--r--src/include/replication/output_plugin.h56
-rw-r--r--src/include/replication/reorderbuffer.h41
-rw-r--r--src/tools/pgindent/typedefs.list12
7 files changed, 744 insertions, 7 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e12278beb58..05763553a40 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+ const char *gid);
+static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_output_stream_start(LogicalDecodingContext *ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->filter_prepare_cb = pg_decode_filter_prepare;
+ cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
+ cb->prepare_cb = pg_decode_prepare_txn;
+ cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+ cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
cb->stream_start_cb = pg_decode_stream_start;
cb->stream_stop_cb = pg_decode_stream_stop;
cb->stream_abort_cb = pg_decode_stream_abort;
+ cb->stream_prepare_cb = pg_decode_stream_prepare;
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option;
TestDecodingData *data;
bool enable_streaming = false;
+ bool enable_twophase = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "two-phase-commit") == 0)
+ {
+ if (elem->arg == NULL)
+ continue;
+ else if (!parse_bool(strVal(elem->arg), &enable_twophase))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
ctx->streaming &= enable_streaming;
+ ctx->twophase &= enable_twophase;
}
/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+/* BEGIN PREPARE callback */
+static void
+pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
+
+ if (data->skip_empty_xacts)
+ return;
+
+ pg_output_begin(ctx, data, txn, true);
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* ROLLBACK PREPARED callback */
+static void
+pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here we demonstrate a
+ * simple logic by checking the GID. If the GID contains the "_nodecode"
+ * substring, then we filter it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+{
+ if (strstr(gid, "_nodecode") != NULL)
+ return true;
+
+ return false;
+}
+
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
@@ -702,6 +842,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
}
static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
+ quote_literal_cstr(txn->gid), txn->xid);
+ else
+ appendStringInfo(ctx->out, "preparing streamed transaction %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index ca78a81e9c5..d63f90ff282 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
@@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
An output plugin may also define functions to support streaming of large,
in-progress transactions. The <function>stream_start_cb</function>,
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
- <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+ <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
+ and <function>stream_prepare_cb</function>
are required, while <function>stream_message_cb</function> and
<function>stream_truncate_cb</function> are optional.
</para>
+
+ <para>
+ An output plugin may also define functions to support two-phase commits,
+ which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
+ The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
+ <function>stream_prepare_cb</function>,
+ <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
+ callbacks are required, while <function>filter_prepare_cb</function> is optional.
+ </para>
</sect2>
<sect2 id="logicaldecoding-capabilities">
@@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
never get
decoded. Successful savepoints are
folded into the transaction containing them in the order they were
- executed within that transaction.
+ executed within that transaction. A transaction that is prepared for
+ a two-phase commit using <command>PREPARE TRANSACTION</command> will
+ also be decoded if the output plugin callbacks needed for decoding
+ them are provided. It is possible that the current transaction which
+ is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+ command. In that case, the logical decoding of this transaction will
+ be aborted too. We will skip all the changes of such a transaction once
+ the abort is detected and abort the transaction when we read WAL for
+ <command>ROLLBACK PREPARED</command>.
</para>
<note>
@@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
an <command>INSERT</command>, <command>UPDATE</command>,
or <command>DELETE</command>. Even if the original command modified
several rows at once the callback will be called individually for each
- row.
+ row. The <function>change_cb</function> callback may access system or
+ user catalog tables to aid in the process of outputting the row
+ modification details. In case of decoding a prepared (but yet
+ uncommitted) transaction or decoding of an uncommitted transaction, this
+ change callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
<programlisting>
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
non-transactional and the XID was not assigned yet in the transaction
which logged the message. The <parameter>lsn</parameter> has WAL
location of the message. The <parameter>transactional</parameter> says
- if the message was sent as transactional or not.
+ if the message was sent as transactional or not. Similar to the change
+ callback, in case of decoding a prepared (but yet uncommitted)
+ transaction or decoding of an uncommitted transaction, this message
+ callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
+
The <parameter>prefix</parameter> is arbitrary null-terminated prefix
which can be used for identifying interesting messages for the current
plugin. And finally the <parameter>message</parameter> parameter holds
@@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+ <title>Prepare Filter Callback</title>
+
+ <para>
+ The optional <function>filter_prepare_cb</function> callback
+ is called to determine whether data that is part of the current
+ two-phase commit transaction should be considered for decode
+ at this prepare stage or as a regular one-phase transaction at
+ <command>COMMIT PREPARED</command> time later. To signal that
+ decoding should be skipped, return <literal>true</literal>;
+ <literal>false</literal> otherwise. When the callback is not
+ defined, <literal>false</literal> is assumed (i.e. nothing is
+ filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ const char *gid);
+</programlisting>
+ The <parameter>ctx</parameter> parameter has the same contents as for the
+ other callbacks. The <parameter>gid</parameter> is the identifier that later
+ identifies this transaction for <command>COMMIT PREPARED</command> or
+ <command>ROLLBACK PREPARED</command>.
+ </para>
+ <para>
+ The callback has to provide the same static answer for a given
+ <parameter>gid</parameter> every time it is called.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-begin-prepare">
+ <title>Transaction Begin Prepare Callback</title>
+
+ <para>
+ The required <function>begin_prepare_cb</function> callback is called
+ whenever the start of a prepared transaction has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback to
+ check if the plugin has already received this prepare in which case it
+ can skip the remaining changes of the transaction. This can only happen
+ if the user restarts the decoding after receiving the prepare for a
+ transaction but before receiving the commit prepared say because of some
+ error.
+ <programlisting>
+ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-prepare">
+ <title>Transaction Prepare Callback</title>
+
+ <para>
+ The required <function>prepare_cb</function> callback is called whenever
+ a transaction which is prepared for two-phase commit has been
+ decoded. The <function>change_cb</function> callback for all modified
+ rows will have been called before this, if there have been any modified
+ rows. The <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback.
+ <programlisting>
+ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+ <title>Transaction Commit Prepared Callback</title>
+
+ <para>
+ The required <function>commit_prepared_cb</function> callback is called
+ whenever a transaction commit prepared has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback.
+ <programlisting>
+ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+ </programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
+ <title>Transaction Rollback Prepared Callback</title>
+
+ <para>
+ The required <function>rollback_prepared_cb</function> callback is called
+ whenever a transaction rollback prepared has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter can be used in this callback. The
+ parameters <parameter>prepare_end_lsn</parameter> and
+ <parameter>prepare_time</parameter> can be used to check if the plugin
+ has received this prepare transaction in which case it can apply the
+ rollback, otherwise, it can skip the rollback operation. The
+ <parameter>gid</parameter> alone is not sufficient because the downstream
+ node can have prepared transaction with same identifier.
+ <programlisting>
+ typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr preapre_end_lsn,
+ TimestampTz prepare_time);
+ </programlisting>
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-start">
<title>Stream Start Callback</title>
<para>
@@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+ <title>Stream Prepare Callback</title>
+ <para>
+ The <function>stream_prepare_cb</function> callback is called to prepare
+ a previously streamed transaction as part of a two-phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-commit">
<title>Stream Commit Callback</title>
<para>
@@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true);
When streaming an in-progress transaction, the changes (and messages) are
streamed in blocks demarcated by <function>stream_start_cb</function>
and <function>stream_stop_cb</function> callbacks. Once all the decoded
- changes are transmitted, the transaction is committed using the
- <function>stream_commit_cb</function> callback (or possibly aborted using
- the <function>stream_abort_cb</function> callback).
+ changes are transmitted, the transaction can be committed using the
+ the <function>stream_commit_cb</function> callback
+ (or possibly aborted using the <function>stream_abort_cb</function> callback).
+ If two-phase commits are supported, the transaction can be prepared using the
+ <function>stream_prepare_cb</function> callback, commit prepared using the
+ <function>commit_prepared_cb</function> callback or aborted using the
+ <function>rollback_prepared_cb</function>.
</para>
<para>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1f4df7d70f..6e3de92a67c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr last_lsn);
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_start = stream_start_cb_wrapper;
ctx->reorder->stream_stop = stream_stop_cb_wrapper;
ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+ ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
+ /*
+ * To support two-phase logical decoding, we require
+ * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
+ * filter_prepare callback is optional. We however enable two-phase
+ * logical decoding when at least one of the methods is enabled so that we
+ * can easily identify missing methods.
+ *
+ * We decide it here, but only check it later in the wrappers.
+ */
+ ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
+ (ctx->callbacks.prepare_cb != NULL) ||
+ (ctx->callbacks.commit_prepared_cb != NULL) ||
+ (ctx->callbacks.rollback_prepared_cb != NULL) ||
+ (ctx->callbacks.stream_prepare_cb != NULL) ||
+ (ctx->callbacks.filter_prepare_cb != NULL);
+
+ /*
+ * Callback to support decoding at prepare time.
+ */
+ ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
+ ctx->reorder->prepare = prepare_cb_wrapper;
+ ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+ ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+/*
+ * The functionality of begin_prepare is quite similar to begin with the
+ * exception that this will have gid (global transaction id) information which
+ * can be used by plugin. Now, we thought about extending the existing begin
+ * but that would break the replication protocol and additionally this looks
+ * cleaner.
+ */
+static void
+begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "begin_prepare";
+ state.report_location = txn->first_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->first_lsn;
+
+ /*
+ * If the plugin supports two-phase commits then begin prepare callback is
+ * mandatory
+ */
+ if (ctx->callbacks.begin_prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires begin_prepare_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.begin_prepare_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "prepare";
+ state.report_location = txn->final_lsn; /* beginning of prepare record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin supports two-phase commits then prepare callback is
+ * mandatory
+ */
+ if (ctx->callbacks.prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires prepare_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "commit_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin support two-phase commits then commit prepared callback
+ * is mandatory
+ */
+ if (ctx->callbacks.commit_prepared_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires commit_prepared_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "rollback_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin support two-phase commits then rollback prepared callback
+ * is mandatory
+ */
+ if (ctx->callbacks.rollback_prepared_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires rollback_prepared_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
+ prepare_time);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
@@ -860,6 +1075,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
bool
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /*
+ * Skip if decoding of two-phase transactions at PREPARE time is not
+ * enabled. In that case, all two-phase transactions are considered
+ * filtered out and will be applied as regular transactions at COMMIT
+ * PREPARED.
+ */
+ if (!ctx->twophase)
+ return true;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_prepare";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
+bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
LogicalErrorCallbackState state;
@@ -1057,6 +1311,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /*
+ * We're only supposed to call this when streaming and two-phase commits
+ * are supported.
+ */
+ Assert(ctx->streaming);
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_prepare";
+ state.report_location = txn->final_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn;
+
+ /* in streaming mode with two-phase commits, stream_prepare_cb is required */
+ if (ctx->callbacks.stream_prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming at prepare time requires a stream_prepare_cb callback")));
+
+ ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 40bab7ee02d..28c9c1f474e 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext
bool streaming;
/*
+ * Does the output plugin support two-phase decoding, and is it enabled?
+ */
+ bool twophase;
+
+ /*
* State for writing output.
*/
bool accept_writes;
@@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796450a..89e1dc3517d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+
+/*
* Called when starting to stream a block of changes from in-progress
* transaction (may be called repeatedly, if it's streamed in multiple
* chunks).
@@ -124,6 +163,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn);
/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
* Called to apply changes streamed to remote node from in-progress
* transaction.
*/
@@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+
+ /* streaming of changes at prepare time */
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
/* streaming of changes */
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bd9dd7ec676..1e60afe70f4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -245,6 +245,12 @@ typedef struct ReorderBufferTXN
TransactionId toplevel_xid;
/*
+ * Global transaction id required for identification of prepared
+ * transactions.
+ */
+ char *gid;
+
+ /*
* LSN of the first data carrying, WAL record with knowledge about this
* xid. This is allowed to *not* be first record adorned with this xid, if
* the previous records aren't relevant for logical decoding.
@@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* rollback prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
/* start streaming transaction callback signature */
typedef void (*ReorderBufferStreamStartCB) (
ReorderBuffer *rb,
@@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
/* commit streamed transaction callback signature */
typedef void (*ReorderBufferStreamCommitCB) (
ReorderBuffer *rb,
@@ -505,11 +537,20 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction at prepare time.
+ */
+ ReorderBufferBeginCB begin_prepare;
+ ReorderBufferPrepareCB prepare;
+ ReorderBufferCommitPreparedCB commit_prepared;
+ ReorderBufferRollbackPreparedCB rollback_prepared;
+
+ /*
* Callbacks to be called when streaming a transaction.
*/
ReorderBufferStreamStartCB stream_start;
ReorderBufferStreamStopCB stream_stop;
ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamPrepareCB stream_prepare;
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bca37c536ee..9cd047ba25e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1315,9 +1315,21 @@ LogStmtLevel
LogicalDecodeBeginCB
LogicalDecodeChangeCB
LogicalDecodeCommitCB
+LogicalDecodeFilterPrepareCB
+LogicalDecodeBeginPrepareCB
+LogicalDecodePrepareCB
+LogicalDecodeCommitPreparedCB
+LogicalDecodeRollbackPreparedCB
LogicalDecodeFilterByOriginCB
LogicalDecodeMessageCB
LogicalDecodeShutdownCB
+LogicalDecodeStreamStartCB
+LogicalDecodeStreamStopCB
+LogicalDecodeStreamAbortCB
+LogicalDecodeStreamPrepareCB
+LogicalDecodeStreamCommitCB
+LogicalDecodeStreamChangeCB
+LogicalDecodeStreamMessageCB
LogicalDecodeStartupCB
LogicalDecodeTruncateCB
LogicalDecodingContext