aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2016-04-06 10:05:41 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2016-04-06 10:05:41 +0100
commit3fe3511d05127cc024b221040db2eeb352e7d716 (patch)
treeb17a084bec318a70a1c0fcd755596b771871bce7 /src/backend/replication/logical/logical.c
parent989be0810dffd08b54e1caecec0677608211c339 (diff)
downloadpostgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz
postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are intended to be read by logical decoding plugins. This commit adds an optional new callback to the logical decoding API. Messages are either text or bytea. Messages can be transactional, or not, and are identified by a prefix to allow multiple concurrent decoding plugins. (Not to be confused with Generic WAL records, which are intended to allow crash recovery of extensible objects.) Author: Petr Jelinek and Andres Freund Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c38
1 files changed, 38 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9203c..c06b2fa2859 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
+ ctx->reorder->message = message_cb_wrapper;
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
return ret;
}
+static void
+message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ if (ctx->callbacks.message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.