diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e6d3f9203c..c06b2fa2859 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); @@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->message = message_cb_wrapper; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +static void +message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + if (ctx->callbacks.message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "message"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. |