diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6146c5acdb3..f68348dcf45 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, @@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_abort_cb = pgoutput_stream_abort; cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; + cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; } @@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool messages_option_given = false; bool streaming_given = false; data->binary = false; data->streaming = false; + data->messages = false; foreach(lc, options) { @@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + data->messages = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + + if (!data->messages) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + xid, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ |