diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e7..dbef52a3af4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); +static void pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); +static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change); void _PG_init(void) @@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->stream_start_cb = pg_decode_stream_start; + cb->stream_stop_cb = pg_decode_stream_stop; + cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_commit_cb = pg_decode_stream_commit; + cb->stream_change_cb = pg_decode_stream_change; + cb->stream_message_cb = pg_decode_stream_message; + cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx, appendBinaryStringInfo(ctx->out, message, sz); OutputPluginWrite(ctx, true); } + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "opening a streamed block for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "closing a streamed block for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "aborting streamed (sub)transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "committing streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the changes as the transaction can abort + * at a later point in time. We don't want users to see the changes until the + * transaction is committed. + */ +static void +pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "streaming change for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the contents for transactional messages + * as the transaction can abort at a later point in time. We don't want users to + * see the message contents until the transaction is committed. + */ +static void +pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + + if (transactional) + { + appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu", + transactional, prefix, sz); + } + else + { + appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + } + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the detailed information of Truncate. + * See pg_decode_stream_change. + */ +static void +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "streaming truncate for transaction"); + OutputPluginWrite(ctx, true); +} |