aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c44
1 files changed, 32 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 537eba7875c..6eb0d5527e0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* call ReorderBufferProcessXid for each record type by default, because
* e.g. empty xacts can be handled more efficiently if there's no previous
* state for them.
+ *
+ * We also support the ability to fast forward thru records, skipping some
+ * record types completely - see individual record types for details.
*/
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
@@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_invalidations *invalidations =
(xl_invalidations *) XLogRecGetData(r);
- ReorderBufferImmediateInvalidation(
- ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+ if (!ctx->fast_forward)
+ ReorderBufferImmediateInvalidation(ctx->reorder,
+ invalidations->nmsgs,
+ invalidations->msgs);
}
break;
default:
@@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
- /* no point in doing anything yet */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (!ctx->fast_forward &&
+ SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
@@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
- /* no point in doing anything yet */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding data changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
switch (info)
@@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
- /* No point in doing anything yet. */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
message = (xl_logical_message *) XLogRecGetData(r);
@@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
*/
if (parsed->nmsgs > 0)
{
- ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- parsed->nmsgs, parsed->msgs);
+ if (!ctx->fast_forward)
+ ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
@@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
*
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
@@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
- FilterByOrigin(ctx, origin_id))
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{