aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c176
1 files changed, 176 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 93c948856e7..dbef52a3af4 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_start(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pg_decode_stream_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+static void pg_decode_stream_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
+static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change);
void
_PG_init(void)
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->stream_start_cb = pg_decode_stream_start;
+ cb->stream_stop_cb = pg_decode_stream_stop;
+ cb->stream_abort_cb = pg_decode_stream_abort;
+ cb->stream_commit_cb = pg_decode_stream_commit;
+ cb->stream_change_cb = pg_decode_stream_change;
+ cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_start(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "opening a streamed block for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_stop(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "closing a streamed block for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_abort(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * We never try to stream any empty xact so we don't need any special handling
+ * for skip_empty_xacts in streaming mode APIs.
+ */
+static void
+pg_decode_stream_commit(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "committing streamed transaction");
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the changes as the transaction can abort
+ * at a later point in time. We don't want users to see the changes until the
+ * transaction is committed.
+ */
+static void
+pg_decode_stream_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "streaming change for transaction");
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the contents for transactional messages
+ * as the transaction can abort at a later point in time. We don't want users to
+ * see the message contents until the transaction is committed.
+ */
+static void
+pg_decode_stream_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (transactional)
+ {
+ appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
+ transactional, prefix, sz);
+ }
+ else
+ {
+ appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
+ transactional, prefix, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ }
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the detailed information of Truncate.
+ * See pg_decode_stream_change.
+ */
+static void
+pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
+ else
+ appendStringInfo(ctx->out, "streaming truncate for transaction");
+ OutputPluginWrite(ctx, true);
+}