diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 28 |
1 files changed, 28 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 963d5df9dae..bca03ee21b4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/origin.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -43,6 +44,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool xact_wrote_changes; + bool only_local; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static bool pg_decode_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); void _PG_init(void) @@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; } @@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_xids = true; data->include_timestamp = false; data->skip_empty_xacts = false; + data->only_local = false; ctx->output_plugin_private = data; @@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "only-local") == 0) + { + + if (elem->arg == NULL) + data->only_local = true; + else if (!parse_bool(strVal(elem->arg), &data->only_local)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static bool +pg_decode_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. |