aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c57
1 files changed, 33 insertions, 24 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 33e4343cc22..36e6d9a5c90 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
@@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
data = XLogRecGetBlockData(r, 0, &datalen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
@@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
data = XLogRecGetData(r) + SizeOfHeapUpdate;
datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
@@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
+ Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete;
+
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- XLogRecGetDataLen(r) - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
+
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
data += datalen;
- tuple->t_data.header.t_infomask = xlhdr->t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
@@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);
- tuple->t_data.header.t_infomask = xlhdr.t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}