aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c48
1 files changed, 39 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 24b712aa667..1237118e84f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ /* If we don't have snapshot, there is no point in decoding messages */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
message = (xl_logical_message *) XLogRecGetData(r);
@@ -623,6 +619,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return;
/*
+ * We also skip decoding in fast_forward mode. This check must be last
+ * because we don't want to set the processing_required flag unless we
+ * have a decodable message.
+ */
+ if (ctx->fast_forward)
+ {
+ /*
+ * We need to set processing_required flag to notify the message's
+ * existence to the caller. Usually, the flag is set when either the
+ * COMMIT or ABORT records are decoded, but this must be turned on
+ * here because the non-transactional logical message is decoded
+ * without waiting for these records.
+ */
+ if (!message->transactional)
+ ctx->processing_required = true;
+
+ return;
+ }
+
+ /*
* If this is a non-transactional change, get the snapshot we're expected
* to use. We only get here when the snapshot is consistent, and the
* change is not meant to be skipped.
@@ -1286,7 +1302,21 @@ static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
- return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
+ return true;
+
+ /*
+ * We also skip decoding in fast_forward mode. In passing set the
+ * processing_required flag to indicate that if it were not for
+ * fast_forward mode, processing would have been required.
+ */
+ if (ctx->fast_forward)
+ {
+ ctx->processing_required = true;
+ return true;
+ }
+
+ return false;
}