aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c351
1 files changed, 351 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0e..05d24b93da0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+/* streaming callbacks */
+static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change);
+static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
+static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
/*
@@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ /*
+ * To support streaming, we require start/stop/abort/commit/change
+ * callbacks. The message and truncate callbacks are optional, similar to
+ * regular output plugins. We however enable streaming when at least one
+ * of the methods is enabled so that we can easily identify missing
+ * methods.
+ *
+ * We decide it here, but only check it later in the wrappers.
+ */
+ ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
+ (ctx->callbacks.stream_stop_cb != NULL) ||
+ (ctx->callbacks.stream_abort_cb != NULL) ||
+ (ctx->callbacks.stream_commit_cb != NULL) ||
+ (ctx->callbacks.stream_change_cb != NULL) ||
+ (ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_truncate_cb != NULL);
+
+ /*
+ * streaming callbacks
+ *
+ * stream_message and stream_truncate callbacks are optional, so we do not
+ * fail with ERROR when missing, but the wrappers simply do nothing. We
+ * must set the ReorderBuffer callbacks to something, otherwise the calls
+ * from there will crash (we don't want to move the checks there).
+ */
+ ctx->reorder->stream_start = stream_start_cb_wrapper;
+ ctx->reorder->stream_stop = stream_stop_cb_wrapper;
+ ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+ ctx->reorder->stream_commit = stream_commit_cb_wrapper;
+ ctx->reorder->stream_change = stream_change_cb_wrapper;
+ ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_start";
+ state.report_location = first_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = first_lsn;
+
+ /* in streaming mode, stream_start_cb is required */
+ if (ctx->callbacks.stream_start_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_start_cb callback")));
+
+ ctx->callbacks.stream_start_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_stop";
+ state.report_location = last_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this message's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = last_lsn;
+
+ /* in streaming mode, stream_stop_cb is required */
+ if (ctx->callbacks.stream_stop_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_stop_cb callback")));
+
+ ctx->callbacks.stream_stop_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_abort";
+ state.report_location = abort_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = abort_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_abort_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_abort_cb callback")));
+
+ ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_commit";
+ state.report_location = txn->final_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn;
+
+ /* in streaming mode, stream_abort_cb is required */
+ if (ctx->callbacks.stream_commit_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_commit_cb callback")));
+
+ ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_change";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ /* in streaming mode, stream_change_cb is required */
+ if (ctx->callbacks.stream_change_cb == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical streaming requires a stream_change_cb callback")));
+
+ ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (!ctx->callbacks.stream_truncate_cb)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_truncate";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.