aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/replorigin.out56
-rw-r--r--contrib/test_decoding/sql/replorigin.sql22
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c19
-rw-r--r--src/include/replication/pgoutput.h2
4 files changed, 90 insertions, 9 deletions
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index 49ffaeea2da..c85e1a01b23 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
(1 row)
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+ ?column?
+----------
+ t
+(1 row)
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+ ?column?
+----------
+ t
+(1 row)
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+DROP PUBLICATION pub;
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index db06541f565..e71ee02d050 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
SELECT pg_replication_origin_session_reset();
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
+
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+DROP PUBLICATION pub;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3d2becb45cf..251ba46da5e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
-static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
}
else if (strcmp(defel->defname, "origin") == 0)
{
+ char *origin;
+
if (origin_option_given)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"));
origin_option_given = true;
- data->origin = defGetString(defel);
- if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
- publish_no_origin = true;
- else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
- publish_no_origin = false;
+ origin = defGetString(defel);
+ if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
+ data->publish_no_origin = true;
+ else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
+ data->publish_no_origin = false;
else
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("unrecognized origin value: \"%s\"", data->origin));
+ errmsg("unrecognized origin value: \"%s\"", origin));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -1673,7 +1674,9 @@ static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
- if (publish_no_origin && origin_id != InvalidRepOriginId)
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->publish_no_origin && origin_id != InvalidRepOriginId)
return true;
return false;
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403b..b3f9a016293 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,7 +29,7 @@ typedef struct PGOutputData
char streaming;
bool messages;
bool two_phase;
- char *origin;
+ bool publish_no_origin;
} PGOutputData;
#endif /* PGOUTPUT_H */