aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-07-28 08:06:44 +0530
committerAmit Kapila <akapila@postgresql.org>2020-07-28 08:09:44 +0530
commit45fdc9738b36d1068d3ad8fdb06436d6fd14436b (patch)
tree1168c9368d1d3c0e7daa47f82c5f8531ed0197c5 /src
parent13838740f61fc455aa4196d257efc0b761daba1f (diff)
downloadpostgresql-45fdc9738b36d1068d3ad8fdb06436d6fd14436b.tar.gz
postgresql-45fdc9738b36d1068d3ad8fdb06436d6fd14436b.zip
Extend the logical decoding output plugin API with stream methods.
This adds seven methods to the output plugin API, adding support for streaming changes of large in-progress transactions. * stream_start * stream_stop * stream_abort * stream_commit * stream_change * stream_message * stream_truncate Most of this is a simple extension of the existing methods, with the semantic difference that the transaction (or subtransaction) is incomplete and may be aborted later (which is something the regular API does not really need to deal with). This also extends the 'test_decoding' plugin, implementing these new stream methods. The stream_start/start_stop are used to demarcate a chunk of changes streamed for a particular toplevel transaction. This commit simply adds these new APIs and the upcoming patch to "allow the streaming mode in ReorderBuffer" will use these APIs. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c351
-rw-r--r--src/include/replication/logical.h5
-rw-r--r--src/include/replication/output_plugin.h69
-rw-r--r--src/include/replication/reorderbuffer.h59
4 files changed, 484 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0e..05d24b93da0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+/* streaming callbacks */
+static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change);
+static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
+static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
/*
@@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ /*
+ * To support streaming, we require start/stop/abort/commit/change
+ * callbacks. The message and truncate callbacks are optional, similar to
+ * regular output plugins. We however enable streaming when at least one
+ * of the methods is enabled so that we can easily identify missing
+ * methods.
+ *
+ * We decide it here, but only check it later in the wrappers.
+ */
+ ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
+ (ctx->callbacks.stream_stop_cb != NULL) ||
+ (ctx->callbacks.stream_abort_cb != NULL) ||
+ (ctx->callbacks.stream_commit_cb != NULL) ||
+ (ctx->callbacks.stream_change_cb != NULL) ||
+ (ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_truncate_cb != NULL);
+
+ /*
+ * streaming callbacks
+ *
+ * stream_message and stream_truncate callbacks are optional, so we do not
+ * fail with ERROR when missing, but the wrappers simply do nothing. We
+ * must set the ReorderBuffer callbacks to something, otherwise the calls
+ * from there will crash (we don't want to move the checks there).
+ */
+ ctx->reorder->stream_start = stream_start_cb_wrapper;
+ ctx->reorder->stream_stop = stream_stop_cb_wrapper;
+ ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+ ctx->reorder->stream_commit = stream_commit_cb_wrapper;
+ ctx->reorder->stream_change = stream_change_cb_wrapper;
+ ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_start";
+ state.report_location = first_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = first_lsn;
+
+ /* in streaming mode, stream_start_cb is required */
+ if (ctx->callbacks.stream_start_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_start_cb callback")));
+
+ ctx->callbacks.stream_start_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_stop";
+ state.report_location = last_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = last_lsn;
+
+ /* in streaming mode, stream_stop_cb is required */
+ if (ctx->callbacks.stream_stop_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_stop_cb callback")));
+
+ ctx->callbacks.stream_stop_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_abort";
+ state.report_location = abort_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = abort_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_abort_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_abort_cb callback")));
+
+ ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_commit";
+ state.report_location = txn->final_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_commit_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_commit_cb callback")));
+
+ ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_change";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ /* in streaming mode, stream_change_cb is required */
+ if (ctx->callbacks.stream_change_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_change_cb callback")));
+
+ ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (!ctx->callbacks.stream_truncate_cb)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_truncate";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2f2475e5d3..deef31825d6 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -80,6 +80,11 @@ typedef struct LogicalDecodingContext
void *output_writer_private;
/*
+ * Does the output plugin support streaming, and is it enabled?
+ */
+ bool streaming;
+
+ /*
* State for writing output.
*/
bool accept_writes;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 3dd9236c576..b78c796450a 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ /* streaming of changes */
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
/* Functions in replication/logical/logical.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1055e99e2e1..42bc8176487 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* start streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStartCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+
+/* stop streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStopCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+
+/* discard streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/* commit streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* stream change callback signature */
+typedef void (*ReorderBufferStreamChangeCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* stream message callback signature */
+typedef void (*ReorderBufferStreamMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
+/* stream truncate callback signature */
+typedef void (*ReorderBufferStreamTruncateCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
struct ReorderBuffer
{
/*
@@ -387,6 +435,17 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction.
+ */
+ ReorderBufferStreamStartCB stream_start;
+ ReorderBufferStreamStopCB stream_stop;
+ ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamCommitCB stream_commit;
+ ReorderBufferStreamChangeCB stream_change;
+ ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamTruncateCB stream_truncate;
+
+ /*
* Pointer that will be passed untouched to the callbacks.
*/
void *private_data;