diff options
author | Andres Freund <andres@anarazel.de> | 2016-03-05 18:02:20 -0800 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2016-03-05 18:02:20 -0800 |
commit | d9e903f3cbbd00c7ba7d4974e6852c3d2cbf4447 (patch) | |
tree | 3eaa1e855ad224b0f5386a1e7f5ec722bcd42623 /src/backend/replication/logical/decode.c | |
parent | dc7d70ea05deca9dfc6a25043d406b57cc8f6c30 (diff) | |
download | postgresql-d9e903f3cbbd00c7ba7d4974e6852c3d2cbf4447.tar.gz postgresql-d9e903f3cbbd00c7ba7d4974e6852c3d2cbf4447.zip |
logical decoding: Tell reorderbuffer about all xids.
Logical decoding's reorderbuffer keeps transactions in an LSN ordered
list for efficiency. To make that's efficiently possible upper-level
xids are forced to be logged before nested subtransaction xids. That
only works though if these records are all looked at: Unfortunately we
didn't do so for e.g. row level locks, which are otherwise uninteresting
for logical decoding.
This could lead to errors like:
"ERROR: subxact logged without previous toplevel record".
It's not sufficient to just look at row locking records, the xid could
appear first due to a lot of other types of records (which will trigger
the transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny).
So invent infrastructure to tell reorderbuffer about xids seen, when
they'd otherwise not pass through reorderbuffer.c.
Reported-By: Jarred Ward
Bug: #13844
Discussion: 20160105033249.1087.66040@wrigleys.postgresql.org
Backpatch: 9.4, where logical decoding was added
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 56be1ed554b..33e4343cc22 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding * context. + * + * NB: Note that every record's xid needs to be processed by reorderbuffer + * (xids contained in the content of records are not relevant for this rule). + * That means that for records which'd otherwise not go through the + * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to + * call ReorderBufferProcessXid for each record type by default, because + * e.g. empty xacts can be handled more efficiently if there's no previous + * state for them. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_BRIN_ID: case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), + buf->origptr); + switch (info) { /* this is also used in END_OF_RECOVERY checkpoints */ @@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; - /* no point in doing anything yet, data could not be decoded anyway */ + /* + * No point in doing anything yet, data could not be decoded anyway. It's + * ok not to call ReorderBufferProcessXid() in that case, except in the + * assignment case there'll not be any later records with the same xid; + * and in the assignment case we'll not decode those xacts. + */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * transactions in the changestream allowing for a kind of * distributed 2PC. */ + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + switch (info) { case XLOG_RUNNING_XACTS: @@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; |