aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/replication
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
downloadpostgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz
postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/decode.c158
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/logicalfuncs.c4
-rw-r--r--src/backend/replication/logical/reorderbuffer.c1
-rw-r--r--src/backend/replication/logical/snapbuild.c2
-rw-r--r--src/backend/replication/walsender.c2
6 files changed, 82 insertions, 92 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8e78aafda7c..1c7dac38fc9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -31,7 +31,9 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
#include "catalog/pg_control.h"
@@ -46,8 +48,7 @@ typedef struct XLogRecordBuffer
{
XLogRecPtr origptr;
XLogRecPtr endptr;
- XLogRecord record;
- char *record_data;
+ XLogReaderState *record;
} XLogRecordBuffer;
/* RMGR Handlers */
@@ -79,17 +80,16 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* context.
*/
void
-LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
+LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
XLogRecordBuffer buf;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
- buf.record = *record;
- buf.record_data = XLogRecGetData(record);
+ buf.record = record;
/* cast so we get a warning when new rmgrs are added */
- switch ((RmgrIds) buf.record.xl_rmid)
+ switch ((RmgrIds) XLogRecGetRmid(record))
{
/*
* Rmgrs we care about for logical decoding. Add new rmgrs in
@@ -135,7 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
case RM_BRIN_ID:
break;
case RM_NEXT_ID:
- elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
+ elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
}
}
@@ -146,7 +146,7 @@ static void
DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
- uint8 info = buf->record.xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
switch (info)
{
@@ -185,8 +185,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
- XLogRecord *r = &buf->record;
- uint8 info = r->xl_info & ~XLR_INFO_MASK;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
@@ -200,12 +200,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId *subxacts = NULL;
SharedInvalidationMessage *invals = NULL;
- xlrec = (xl_xact_commit *) buf->record_data;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
- DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId,
+ DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
xlrec->xact_time,
xlrec->nsubxacts, subxacts,
xlrec->nmsgs, invals);
@@ -220,7 +220,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SharedInvalidationMessage *invals = NULL;
/* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) buf->record_data;
+ prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
xlrec = &prec->crec;
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
@@ -237,9 +237,9 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_xact_commit_compact *xlrec;
- xlrec = (xl_xact_commit_compact *) buf->record_data;
+ xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
- DecodeCommit(ctx, buf, r->xl_xid, InvalidOid,
+ DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
xlrec->xact_time,
xlrec->nsubxacts, xlrec->subxacts,
0, NULL);
@@ -250,11 +250,11 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec;
TransactionId *sub_xids;
- xlrec = (xl_xact_abort *) buf->record_data;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- DecodeAbort(ctx, buf->origptr, r->xl_xid,
+ DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
sub_xids, xlrec->nsubxacts);
break;
}
@@ -265,7 +265,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId *sub_xids;
/* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) buf->record_data;
+ prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
xlrec = &prec->arec;
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
@@ -282,7 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
int i;
TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) buf->record_data;
+ xlrec = (xl_xact_assignment *) XLogRecGetData(r);
sub_xid = &xlrec->xsub[0];
@@ -316,14 +316,14 @@ static void
DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
- XLogRecord *r = &buf->record;
- uint8 info = r->xl_info & ~XLR_INFO_MASK;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_RUNNING_XACTS:
{
- xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
+ xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
@@ -352,8 +352,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
- TransactionId xid = buf->record.xl_xid;
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
/* no point in doing anything yet */
@@ -370,7 +370,7 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_heap_new_cid *xlrec;
- xlrec = (xl_heap_new_cid *) buf->record_data;
+ xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
break;
@@ -405,8 +405,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
- TransactionId xid = buf->record.xl_xid;
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
/* no point in doing anything yet */
@@ -576,34 +576,35 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
static void
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_insert *xlrec;
ReorderBufferChange *change;
+ RelFileNode target_node;
- xlrec = (xl_heap_insert *) buf->record_data;
+ xlrec = (xl_heap_insert *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
+ Size tuplelen;
+ char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
- r->xl_len - SizeOfHeapInsert,
- change->data.tp.newtuple);
+ DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -615,62 +616,47 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_update *xlrec;
- xl_heap_header_len xlhdr;
ReorderBufferChange *change;
char *data;
+ Size datalen;
+ RelFileNode target_node;
- xlrec = (xl_heap_update *) buf->record_data;
+ xlrec = (xl_heap_update *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
-
- /* caution, remaining data in record is not aligned */
- data = buf->record_data + SizeOfHeapUpdate;
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
-
- memcpy(&xlhdr, data, sizeof(xlhdr));
- data += offsetof(xl_heap_header_len, header);
+ data = XLogRecGetBlockData(r, 0, &datalen);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.newtuple);
- /* skip over the rest of the tuple header */
- data += SizeOfHeapHeader;
- /* skip over the tuple data */
- data += xlhdr.t_len;
+ DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
- memcpy(&xlhdr, data, sizeof(xlhdr));
- data += offsetof(xl_heap_header_len, header);
+ /* caution, remaining data in record is not aligned */
+ data = XLogRecGetData(r) + SizeOfHeapUpdate;
+ datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.oldtuple);
-#ifdef NOT_USED
- data += SizeOfHeapHeader;
- data += xlhdr.t_len;
-#endif
+ DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -681,36 +667,38 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_delete *xlrec;
ReorderBufferChange *change;
+ RelFileNode target_node;
- xlrec = (xl_heap_delete *) buf->record_data;
+ xlrec = (xl_heap_delete *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
/* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
- Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
+ Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- r->xl_len - SizeOfHeapDelete,
+ XLogRecGetDataLen(r) - SizeOfHeapDelete,
change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -721,27 +709,24 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_multi_insert *xlrec;
int i;
char *data;
- bool isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+ char *tupledata;
+ Size tuplelen;
+ RelFileNode rnode;
- xlrec = (xl_heap_multi_insert *) buf->record_data;
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
+ if (rnode.dbNode != ctx->slot->data.database)
return;
- data = buf->record_data + SizeOfHeapMultiInsert;
-
- /*
- * OffsetNumbers (which are not of interest to us) are stored when
- * XLOG_HEAP_INIT_PAGE is not set -- skip over them.
- */
- if (!isinit)
- data += sizeof(OffsetNumber) * xlrec->ntuples;
+ tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
+ data = tupledata;
for (i = 0; i < xlrec->ntuples; i++)
{
ReorderBufferChange *change;
@@ -751,7 +736,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
/*
* CONTAINS_NEW_TUPLE will always be set currently as multi_insert
@@ -806,9 +791,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
change->data.tp.clear_toast_afterwards = false;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid,
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change);
}
+ Assert(data == tupledata + tuplelen);
}
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 875b89a6288..8c318cd4b51 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -34,6 +34,7 @@
#include "miscadmin.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "replication/decode.h"
#include "replication/logical.h"
@@ -455,12 +456,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
record = XLogReadRecord(ctx->reader, startptr, &err);
if (err)
elog(ERROR, "%s", err);
-
- Assert(record);
+ if (!record)
+ elog(ERROR, "no record found"); /* shouldn't happen */
startptr = InvalidXLogRecPtr;
- LogicalDecodingProcessRecord(ctx, record);
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
/* only continue till we found a consistent spot */
if (DecodingContextReady(ctx))
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 3a5ec2f61d9..1977f098c79 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -21,6 +21,8 @@
#include "funcapi.h"
#include "miscadmin.h"
+#include "access/xlog_internal.h"
+
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
@@ -431,7 +433,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* store the description into our tuplestore.
*/
if (record != NULL)
- LogicalDecodingProcessRecord(ctx, record);
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
/* check limits */
if (upto_lsn != InvalidXLogRecPtr &&
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7d8f40738d4..6e75398eabe 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -54,6 +54,7 @@
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 200b54d7c2a..20f9b04adfa 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -699,7 +699,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
- xlrec->target.node, xlrec->target.tid,
+ xlrec->target_node, xlrec->target_tid,
xlrec->cmin, xlrec->cmax,
xlrec->combocid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 385d18ba1bb..addae8f6ce5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2444,7 +2444,7 @@ XLogSendLogical(void)
if (record != NULL)
{
- LogicalDecodingProcessRecord(logical_decoding_ctx, record);
+ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
}