aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c297
-rw-r--r--src/include/replication/logical.h6
-rw-r--r--src/include/replication/output_plugin.h56
-rw-r--r--src/include/replication/reorderbuffer.h41
-rw-r--r--src/tools/pgindent/typedefs.list12
5 files changed, 412 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1f4df7d70f..6e3de92a67c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr last_lsn);
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_start = stream_start_cb_wrapper;
ctx->reorder->stream_stop = stream_stop_cb_wrapper;
ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+ ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
+ /*
+ * To support two-phase logical decoding, we require
+ * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
+ * filter_prepare callback is optional. We however enable two-phase
+ * logical decoding when at least one of the methods is enabled so that we
+ * can easily identify missing methods.
+ *
+ * We decide it here, but only check it later in the wrappers.
+ */
+ ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
+ (ctx->callbacks.prepare_cb != NULL) ||
+ (ctx->callbacks.commit_prepared_cb != NULL) ||
+ (ctx->callbacks.rollback_prepared_cb != NULL) ||
+ (ctx->callbacks.stream_prepare_cb != NULL) ||
+ (ctx->callbacks.filter_prepare_cb != NULL);
+
+ /*
+ * Callback to support decoding at prepare time.
+ */
+ ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
+ ctx->reorder->prepare = prepare_cb_wrapper;
+ ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+ ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+/*
+ * The functionality of begin_prepare is quite similar to begin with the
+ * exception that this will have gid (global transaction id) information which
+ * can be used by plugin. Now, we thought about extending the existing begin
+ * but that would break the replication protocol and additionally this looks
+ * cleaner.
+ */
+static void
+begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "begin_prepare";
+ state.report_location = txn->first_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->first_lsn;
+
+ /*
+ * If the plugin supports two-phase commits then begin prepare callback is
+ * mandatory
+ */
+ if (ctx->callbacks.begin_prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires begin_prepare_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.begin_prepare_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "prepare";
+ state.report_location = txn->final_lsn; /* beginning of prepare record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin supports two-phase commits then prepare callback is
+ * mandatory
+ */
+ if (ctx->callbacks.prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires prepare_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "commit_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin support two-phase commits then commit prepared callback
+ * is mandatory
+ */
+ if (ctx->callbacks.commit_prepared_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires commit_prepared_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when two-phase commits are supported */
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "rollback_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /*
+ * If the plugin support two-phase commits then rollback prepared callback
+ * is mandatory
+ */
+ if (ctx->callbacks.rollback_prepared_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication at prepare time requires rollback_prepared_cb callback")));
+
+ /* do the actual work: call callback */
+ ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
+ prepare_time);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
@@ -860,6 +1075,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
bool
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /*
+ * Skip if decoding of two-phase transactions at PREPARE time is not
+ * enabled. In that case, all two-phase transactions are considered
+ * filtered out and will be applied as regular transactions at COMMIT
+ * PREPARED.
+ */
+ if (!ctx->twophase)
+ return true;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_prepare";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
+bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
LogicalErrorCallbackState state;
@@ -1057,6 +1311,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /*
+ * We're only supposed to call this when streaming and two-phase commits
+ * are supported.
+ */
+ Assert(ctx->streaming);
+ Assert(ctx->twophase);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_prepare";
+ state.report_location = txn->final_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn;
+
+ /* in streaming mode with two-phase commits, stream_prepare_cb is required */
+ if (ctx->callbacks.stream_prepare_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming at prepare time requires a stream_prepare_cb callback")));
+
+ ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 40bab7ee02d..28c9c1f474e 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext
bool streaming;
/*
+ * Does the output plugin support two-phase decoding, and is it enabled?
+ */
+ bool twophase;
+
+ /*
* State for writing output.
*/
bool accept_writes;
@@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796450a..89e1dc3517d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+
+/*
* Called when starting to stream a block of changes from in-progress
* transaction (may be called repeatedly, if it's streamed in multiple
* chunks).
@@ -124,6 +163,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn);
/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
* Called to apply changes streamed to remote node from in-progress
* transaction.
*/
@@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+
+ /* streaming of changes at prepare time */
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
/* streaming of changes */
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bd9dd7ec676..1e60afe70f4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -245,6 +245,12 @@ typedef struct ReorderBufferTXN
TransactionId toplevel_xid;
/*
+ * Global transaction id required for identification of prepared
+ * transactions.
+ */
+ char *gid;
+
+ /*
* LSN of the first data carrying, WAL record with knowledge about this
* xid. This is allowed to *not* be first record adorned with this xid, if
* the previous records aren't relevant for logical decoding.
@@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* rollback prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
/* start streaming transaction callback signature */
typedef void (*ReorderBufferStreamStartCB) (
ReorderBuffer *rb,
@@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
/* commit streamed transaction callback signature */
typedef void (*ReorderBufferStreamCommitCB) (
ReorderBuffer *rb,
@@ -505,11 +537,20 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction at prepare time.
+ */
+ ReorderBufferBeginCB begin_prepare;
+ ReorderBufferPrepareCB prepare;
+ ReorderBufferCommitPreparedCB commit_prepared;
+ ReorderBufferRollbackPreparedCB rollback_prepared;
+
+ /*
* Callbacks to be called when streaming a transaction.
*/
ReorderBufferStreamStartCB stream_start;
ReorderBufferStreamStopCB stream_stop;
ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamPrepareCB stream_prepare;
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bca37c536ee..9cd047ba25e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1315,9 +1315,21 @@ LogStmtLevel
LogicalDecodeBeginCB
LogicalDecodeChangeCB
LogicalDecodeCommitCB
+LogicalDecodeFilterPrepareCB
+LogicalDecodeBeginPrepareCB
+LogicalDecodePrepareCB
+LogicalDecodeCommitPreparedCB
+LogicalDecodeRollbackPreparedCB
LogicalDecodeFilterByOriginCB
LogicalDecodeMessageCB
LogicalDecodeShutdownCB
+LogicalDecodeStreamStartCB
+LogicalDecodeStreamStopCB
+LogicalDecodeStreamAbortCB
+LogicalDecodeStreamPrepareCB
+LogicalDecodeStreamCommitCB
+LogicalDecodeStreamChangeCB
+LogicalDecodeStreamMessageCB
LogicalDecodeStartupCB
LogicalDecodeTruncateCB
LogicalDecodingContext