aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c47
1 files changed, 47 insertions, 0 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6146c5acdb3..f68348dcf45 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
+ cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
+ cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
}
@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
+ bool messages_option_given = false;
bool streaming_given = false;
data->binary = false;
data->streaming = false;
+ data->messages = false;
foreach(lc, options)
{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "messages") == 0)
+ {
+ if (messages_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ messages_option_given = true;
+
+ data->messages = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+
+ if (!data->messages)
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_message(ctx->out,
+ xid,
+ message_lsn,
+ transactional,
+ prefix,
+ sz,
+ message);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/