diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 48 |
1 files changed, 39 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 24b712aa667..1237118e84f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. - */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + /* If we don't have snapshot, there is no point in decoding messages */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; message = (xl_logical_message *) XLogRecGetData(r); @@ -623,6 +619,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* + * We also skip decoding in fast_forward mode. This check must be last + * because we don't want to set the processing_required flag unless we + * have a decodable message. + */ + if (ctx->fast_forward) + { + /* + * We need to set processing_required flag to notify the message's + * existence to the caller. Usually, the flag is set when either the + * COMMIT or ABORT records are decoded, but this must be turned on + * here because the non-transactional logical message is decoded + * without waiting for these records. + */ + if (!message->transactional) + ctx->processing_required = true; + + return; + } + + /* * If this is a non-transactional change, get the snapshot we're expected * to use. We only get here when the snapshot is consistent, and the * change is not meant to be skipped. @@ -1286,7 +1302,21 @@ static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id) { - return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)); + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) + return true; + + /* + * We also skip decoding in fast_forward mode. In passing set the + * processing_required flag to indicate that if it were not for + * fast_forward mode, processing would have been required. + */ + if (ctx->fast_forward) + { + ctx->processing_required = true; + return true; + } + + return false; } |