diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-07-23 08:19:07 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-07-23 08:34:48 +0530 |
commit | c55040ccd017962b7b8d3fbcdc184aa90c722a21 (patch) | |
tree | 8d15aa2f834cf0d0ac59391ee6f202577eeaf058 /src/backend/replication/logical/decode.c | |
parent | 38f60f174e3279069b5547b5f4015eb4a8492037 (diff) | |
download | postgresql-c55040ccd017962b7b8d3fbcdc184aa90c722a21.tar.gz postgresql-c55040ccd017962b7b8d3fbcdc184aa90c722a21.zip |
WAL Log invalidations at command end with wal_level=logical.
When wal_level=logical, write invalidations at command end into WAL so
that decoding can use this information.
This patch is required to allow the streaming of in-progress transactions
in logical decoding. The actual work to allow streaming will be committed
as a separate patch.
We still add the invalidations to the cache and write them to WAL at
commit time in RecordTransactionCommit(). This uses the existing
XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource
manager (see LogStandbyInvalidations for details).
So existing code relying on those invalidations (e.g. redo) does not need
to be changed.
The invalidations written at command end uses a new xlog record type
XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See
LogLogicalInvalidations for details.
These new xlog records are ignored by existing redo procedures, which
still rely on the invalidations written to commit records.
The invalidations are decoded and accumulated in top-transaction, and then
executed during replay. This obviates the need to decode the
invalidations as part of a commit record.
Bump XLOG_PAGE_MAGIC, since this introduces XLOG_XACT_INVALIDATIONS.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 0c0c3717391..f3a1c31a292 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * We assign subxact to the toplevel xact while processing each - * record if required. So, we don't need to do anything here. - * See LogicalDecodingProcessRecord. + * record if required. So, we don't need to do anything here. See + * LogicalDecodingProcessRecord. */ break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invals *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invals *) XLogRecGetData(r); + + /* + * Execute the invalidations for xid-less transactions, + * otherwise, accumulate them so that they can be processed at + * the commit time. + */ + if (TransactionIdIsValid(xid)) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(reorder, xid, + buf->origptr, + invals->nmsgs, + invals->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, + buf->origptr); + } + else if ((!ctx->fast_forward)) + ReorderBufferImmediateInvalidation(ctx->reorder, + invals->nmsgs, + invals->msgs); + } + break; case XLOG_XACT_PREPARE: /* @@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_STANDBY_LOCK: break; case XLOG_INVALIDATIONS: - { - xl_invalidations *invalidations = - (xl_invalidations *) XLogRecGetData(r); - if (!ctx->fast_forward) - ReorderBufferImmediateInvalidation(ctx->reorder, - invalidations->nmsgs, - invalidations->msgs); - } + /* + * We are processing the invalidations at the command level via + * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here. + */ break; default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); @@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } - /* - * Process invalidation messages, even if we're not interested in the - * transaction's contents, since the various caches need to always be - * consistent. - */ - if (parsed->nmsgs > 0) - { - if (!ctx->fast_forward) - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); - } - SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); |