diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 537eba7875c..6eb0d5527e0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * call ReorderBufferProcessXid for each record type by default, because * e.g. empty xacts can be handled more efficiently if there's no previous * state for them. + * + * We also support the ability to fast forward thru records, skipping some + * record types completely - see individual record types for details. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_invalidations *invalidations = (xl_invalidations *) XLogRecGetData(r); - ReorderBufferImmediateInvalidation( - ctx->reorder, invalidations->nmsgs, invalidations->msgs); + if (!ctx->fast_forward) + ReorderBufferImmediateInvalidation(ctx->reorder, + invalidations->nmsgs, + invalidations->msgs); } break; default: @@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (!ctx->fast_forward && + SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: @@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) @@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - /* No point in doing anything yet. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; message = (xl_logical_message *) XLogRecGetData(r); @@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (parsed->nmsgs > 0) { - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } @@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - FilterByOrigin(ctx, origin_id)) + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { |