diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/logical.c | 297 | ||||
-rw-r--r-- | src/include/replication/logical.h | 6 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 56 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 41 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 12 |
5 files changed, 412 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f1f4df7d70f..6e3de92a67c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, TimestampTz prepare_time); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn); static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_start = stream_start_cb_wrapper; ctx->reorder->stream_stop = stream_stop_cb_wrapper; ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_prepare = stream_prepare_cb_wrapper; ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; + + /* + * To support two-phase logical decoding, we require + * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The + * filter_prepare callback is optional. We however enable two-phase + * logical decoding when at least one of the methods is enabled so that we + * can easily identify missing methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) || + (ctx->callbacks.prepare_cb != NULL) || + (ctx->callbacks.commit_prepared_cb != NULL) || + (ctx->callbacks.rollback_prepared_cb != NULL) || + (ctx->callbacks.stream_prepare_cb != NULL) || + (ctx->callbacks.filter_prepare_cb != NULL); + + /* + * Callback to support decoding at prepare time. + */ + ctx->reorder->begin_prepare = begin_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +/* + * The functionality of begin_prepare is quite similar to begin with the + * exception that this will have gid (global transaction id) information which + * can be used by plugin. Now, we thought about extending the existing begin + * but that would break the replication protocol and additionally this looks + * cleaner. + */ +static void +begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "begin_prepare"; + state.report_location = txn->first_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->first_lsn; + + /* + * If the plugin supports two-phase commits then begin prepare callback is + * mandatory + */ + if (ctx->callbacks.begin_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication at prepare time requires begin_prepare_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.begin_prepare_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of prepare record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* + * If the plugin supports two-phase commits then prepare callback is + * mandatory + */ + if (ctx->callbacks.prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication at prepare time requires prepare_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* + * If the plugin support two-phase commits then commit prepared callback + * is mandatory + */ + if (ctx->callbacks.commit_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication at prepare time requires commit_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "rollback_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* + * If the plugin support two-phase commits then rollback prepared callback + * is mandatory + */ + if (ctx->callbacks.rollback_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication at prepare time requires rollback_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn, + prepare_time); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) @@ -860,6 +1075,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } bool +filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) +{ + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + Assert(!ctx->fast_forward); + + /* + * Skip if decoding of two-phase transactions at PREPARE time is not + * enabled. In that case, all two-phase transactions are considered + * filtered out and will be applied as regular transactions at COMMIT + * PREPARED. + */ + if (!ctx->twophase) + return true; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + +bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { LogicalErrorCallbackState state; @@ -1057,6 +1311,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* + * We're only supposed to call this when streaming and two-phase commits + * are supported. + */ + Assert(ctx->streaming); + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_prepare"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode with two-phase commits, stream_prepare_cb is required */ + if (ctx->callbacks.stream_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming at prepare time requires a stream_prepare_cb callback"))); + + ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 40bab7ee02d..28c9c1f474e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext bool streaming; /* + * Does the output plugin support two-phase decoding, and is it enabled? + */ + bool twophase; + + /* * State for writing output. */ bool accept_writes; @@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index b78c796450a..89e1dc3517d 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -100,6 +100,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + const char *gid); + +/* + * Callback called for every BEGIN of a prepared trnsaction. + */ +typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + + +/* * Called when starting to stream a block of changes from in-progress * transaction (may be called repeatedly, if it's streamed in multiple * chunks). @@ -124,6 +163,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* * Called to apply changes streamed to remote node from in-progress * transaction. */ @@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + + /* streaming of changes at prepare time */ + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodeBeginPrepareCB begin_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; + /* streaming of changes */ LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bd9dd7ec676..1e60afe70f4 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -245,6 +245,12 @@ typedef struct ReorderBufferTXN TransactionId toplevel_xid; /* + * Global transaction id required for identification of prepared + * transactions. + */ + char *gid; + + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if * the previous records aren't relevant for logical decoding. @@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* begin prepare callback signature */ +typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* rollback prepared callback signature */ +typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + /* start streaming transaction callback signature */ typedef void (*ReorderBufferStreamStartCB) ( ReorderBuffer *rb, @@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + /* commit streamed transaction callback signature */ typedef void (*ReorderBufferStreamCommitCB) ( ReorderBuffer *rb, @@ -505,11 +537,20 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callbacks to be called when streaming a transaction at prepare time. + */ + ReorderBufferBeginCB begin_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferRollbackPreparedCB rollback_prepared; + + /* * Callbacks to be called when streaming a transaction. */ ReorderBufferStreamStartCB stream_start; ReorderBufferStreamStopCB stream_stop; ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bca37c536ee..9cd047ba25e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1315,9 +1315,21 @@ LogStmtLevel LogicalDecodeBeginCB LogicalDecodeChangeCB LogicalDecodeCommitCB +LogicalDecodeFilterPrepareCB +LogicalDecodeBeginPrepareCB +LogicalDecodePrepareCB +LogicalDecodeCommitPreparedCB +LogicalDecodeRollbackPreparedCB LogicalDecodeFilterByOriginCB LogicalDecodeMessageCB LogicalDecodeShutdownCB +LogicalDecodeStreamStartCB +LogicalDecodeStreamStopCB +LogicalDecodeStreamAbortCB +LogicalDecodeStreamPrepareCB +LogicalDecodeStreamCommitCB +LogicalDecodeStreamChangeCB +LogicalDecodeStreamMessageCB LogicalDecodeStartupCB LogicalDecodeTruncateCB LogicalDecodingContext |