diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/logical.c | 351 | ||||
-rw-r--r-- | src/include/replication/logical.h | 5 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 69 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 59 |
4 files changed, 484 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61902be3b0e..05d24b93da0 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +/* streaming callbacks */ +static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr first_lsn); +static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr last_lsn); +static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); +static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message); +static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); /* @@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* + * To support streaming, we require start/stop/abort/commit/change + * callbacks. The message and truncate callbacks are optional, similar to + * regular output plugins. We however enable streaming when at least one + * of the methods is enabled so that we can easily identify missing + * methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) || + (ctx->callbacks.stream_stop_cb != NULL) || + (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_commit_cb != NULL) || + (ctx->callbacks.stream_change_cb != NULL) || + (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_truncate_cb != NULL); + + /* + * streaming callbacks + * + * stream_message and stream_truncate callbacks are optional, so we do not + * fail with ERROR when missing, but the wrappers simply do nothing. We + * must set the ReorderBuffer callbacks to something, otherwise the calls + * from there will crash (we don't want to move the checks there). + */ + ctx->reorder->stream_start = stream_start_cb_wrapper; + ctx->reorder->stream_stop = stream_stop_cb_wrapper; + ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_commit = stream_commit_cb_wrapper; + ctx->reorder->stream_change = stream_change_cb_wrapper; + ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr first_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_start"; + state.report_location = first_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this message's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = first_lsn; + + /* in streaming mode, stream_start_cb is required */ + if (ctx->callbacks.stream_start_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_start_cb callback"))); + + ctx->callbacks.stream_start_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr last_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_stop"; + state.report_location = last_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this message's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = last_lsn; + + /* in streaming mode, stream_stop_cb is required */ + if (ctx->callbacks.stream_stop_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_stop_cb callback"))); + + ctx->callbacks.stream_stop_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_abort"; + state.report_location = abort_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = abort_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_abort_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_abort_cb callback"))); + + ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_commit"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_commit_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_commit_cb callback"))); + + ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_change"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + /* in streaming mode, stream_change_cb is required */ + if (ctx->callbacks.stream_change_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_change_cb callback"))); + + ctx->callbacks.stream_change_cb(ctx, txn, relation, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_message"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (!ctx->callbacks.stream_truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c2f2475e5d3..deef31825d6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -80,6 +80,11 @@ typedef struct LogicalDecodingContext void *output_writer_private; /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + + /* * State for writing output. */ bool accept_writes; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 3dd9236c576..b78c796450a 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -100,6 +100,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); /* + * Called when starting to stream a block of changes from in-progress + * transaction (may be called repeatedly, if it's streamed in multiple + * chunks). + */ +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called when stopping to stream a block of changes from in-progress + * transaction to a remote node (may be called repeatedly, if it's streamed + * in multiple chunks). + */ +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called to discard changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* + * Called to apply changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Callback for streaming individual changes from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for streaming generic logical decoding messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Callback for streaming truncates from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* * Output plugin callbacks */ typedef struct OutputPluginCallbacks @@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + /* streaming of changes */ + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; /* Functions in replication/logical/logical.c */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1055e99e2e1..42bc8176487 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* start streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStartCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr first_lsn); + +/* stop streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStopCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr last_lsn); + +/* discard streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* commit streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* stream change callback signature */ +typedef void (*ReorderBufferStreamChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* stream message callback signature */ +typedef void (*ReorderBufferStreamMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* stream truncate callback signature */ +typedef void (*ReorderBufferStreamTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + struct ReorderBuffer { /* @@ -387,6 +435,17 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callbacks to be called when streaming a transaction. + */ + ReorderBufferStreamStartCB stream_start; + ReorderBufferStreamStopCB stream_stop; + ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamCommitCB stream_commit; + ReorderBufferStreamChangeCB stream_change; + ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamTruncateCB stream_truncate; + + /* * Pointer that will be passed untouched to the callbacks. */ void *private_data; |