diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 56be1ed554b..33e4343cc22 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding * context. + * + * NB: Note that every record's xid needs to be processed by reorderbuffer + * (xids contained in the content of records are not relevant for this rule). + * That means that for records which'd otherwise not go through the + * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to + * call ReorderBufferProcessXid for each record type by default, because + * e.g. empty xacts can be handled more efficiently if there's no previous + * state for them. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_BRIN_ID: case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), + buf->origptr); + switch (info) { /* this is also used in END_OF_RECOVERY checkpoints */ @@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; - /* no point in doing anything yet, data could not be decoded anyway */ + /* + * No point in doing anything yet, data could not be decoded anyway. It's + * ok not to call ReorderBufferProcessXid() in that case, except in the + * assignment case there'll not be any later records with the same xid; + * and in the assignment case we'll not decode those xacts. + */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * transactions in the changestream allowing for a kind of * distributed 2PC. */ + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + switch (info) { case XLOG_RUNNING_XACTS: @@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; |