diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 57 |
1 files changed, 33 insertions, 24 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 33e4343cc22..36e6d9a5c90 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } @@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { data = XLogRecGetBlockData(r, 0, &datalen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } @@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) data = XLogRecGetData(r) + SizeOfHeapUpdate; datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } @@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { + Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - XLogRecGetDataLen(r) - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + HeapTupleHeader header; + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); + data = ((char *) xlhdr) + SizeOfMultiInsertTuple; + datalen = xlhdr->datalen; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); - data = ((char *) xlhdr) + SizeOfMultiInsertTuple; - datalen = xlhdr->datalen; - /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, (char *) data, datalen); data += datalen; - tuple->t_data.header.t_infomask = xlhdr->t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2; - tuple->t_data.header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, data + SizeOfHeapHeader, datalen); - tuple->t_data.header.t_infomask = xlhdr.t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2; - tuple->t_data.header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } |