aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2016-04-13 17:38:54 -0700
committerAndres Freund <andres@anarazel.de>2016-04-13 17:38:54 -0700
commitbe65eddd80093a923b091dc60776aa6f966d1f07 (patch)
treee8e58ef17ea0fddbf173b032039c99dd6ac92f09 /src/backend/replication/logical/decode.c
parent80abbeba23d466b6541cf95082a9e1f36704424e (diff)
downloadpostgresql-be65eddd80093a923b091dc60776aa6f966d1f07.tar.gz
postgresql-be65eddd80093a923b091dc60776aa6f966d1f07.zip
Add required database and origin filtering for logical messages.
Logical messages, added in 3fe3511d05, during decoding failed to filter messages emitted in other databases and messages emitted "under" a replication origin the output plugin isn't interested in. Add tests to verify that both types of filtering actually work. While touching message.sql remove hunk obsoleted by d25379e. Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because 3fe3511d05 had omitted doing so. 3fe3511d05 additionally didn't bump catversion, but 7a542700d has done so since. Author: Petr Jelinek Reported-By: Andres Freund Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c23
1 files changed, 14 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3e80c4a0d86..0cdb0b8a92b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r);
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
if (message->transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr))
return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size);
}
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
- if (ctx->callbacks.filter_by_origin_cb == NULL)
- return false;
-
- return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
/*
* Consolidated commit record handling between the different form of commit
* records.