aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c57
-rw-r--r--src/backend/replication/logical/reorderbuffer.c114
-rw-r--r--src/include/replication/reorderbuffer.h19
3 files changed, 127 insertions, 63 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 33e4343cc22..36e6d9a5c90 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
@@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
data = XLogRecGetBlockData(r, 0, &datalen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
@@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
data = XLogRecGetData(r) + SizeOfHeapUpdate;
datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
@@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
+ Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete;
+
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- XLogRecGetDataLen(r) - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
+
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
data += datalen;
- tuple->t_data.header.t_infomask = xlhdr->t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
@@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);
- tuple->t_data.header.t_infomask = xlhdr.t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e20c5114e2a..570400ffb73 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -444,27 +444,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
/*
- * Get an unused, possibly preallocated, ReorderBufferTupleBuf
+ * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
+ * least a tuple of size tuple_len (excluding header overhead).
*/
ReorderBufferTupleBuf *
-ReorderBufferGetTupleBuf(ReorderBuffer *rb)
+ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
ReorderBufferTupleBuf *tuple;
+ Size alloc_len;
- /* check the slab cache */
- if (rb->nr_cached_tuplebufs)
+ alloc_len = tuple_len + SizeofHeapTupleHeader;
+
+ /*
+ * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
+ * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
+ * tuples generated for oldtuples can be bigger, as they don't have
+ * out-of-line toast columns.
+ */
+ if (alloc_len < MaxHeapTupleSize)
+ alloc_len = MaxHeapTupleSize;
+
+
+ /* if small enough, check the slab cache */
+ if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
{
rb->nr_cached_tuplebufs--;
tuple = slist_container(ReorderBufferTupleBuf, node,
slist_pop_head_node(&rb->cached_tuplebufs));
#ifdef USE_ASSERT_CHECKING
- memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
+ memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
+#endif
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
+#ifdef USE_ASSERT_CHECKING
+ memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
#endif
}
else
{
tuple = (ReorderBufferTupleBuf *)
- MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
+ MemoryContextAlloc(rb->context,
+ sizeof(ReorderBufferTupleBuf) + alloc_len);
+ tuple->alloc_tuple_size = alloc_len;
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
}
return tuple;
@@ -479,13 +500,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb)
void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{
- /* check whether to put into the slab cache */
- if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
+ /* check whether to put into the slab cache, oversized tuples never are */
+ if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
+ rb->nr_cached_tuplebufs < max_cached_tuplebufs)
{
rb->nr_cached_tuplebufs++;
slist_push_head(&rb->cached_tuplebufs, &tuple->node);
+ VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
+ VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
}
else
{
@@ -2092,15 +2116,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple;
if (oldtup)
- oldlen = offsetof(ReorderBufferTupleBuf, t_data) +
- oldtup->tuple.t_len;
+ {
+ sz += sizeof(HeapTupleData);
+ oldlen = oldtup->tuple.t_len;
+ sz += oldlen;
+ }
if (newtup)
- newlen = offsetof(ReorderBufferTupleBuf, t_data) +
- newtup->tuple.t_len;
-
- sz += oldlen;
- sz += newlen;
+ {
+ sz += sizeof(HeapTupleData);
+ newlen = newtup->tuple.t_len;
+ sz += newlen;
+ }
/* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz);
@@ -2111,14 +2138,20 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen)
{
- memcpy(data, oldtup, oldlen);
+ memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen;
}
if (newlen)
{
- memcpy(data, newtup, newlen);
- data += newlen;
+ memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, newtup->tuple.t_data, newlen);
+ data += oldlen;
}
break;
}
@@ -2337,27 +2370,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.oldtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.oldtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.oldtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data =
- &change->data.tp.oldtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.oldtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
if (change->data.tp.newtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.newtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.newtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data =
- &change->data.tp.newtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.newtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
+
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
@@ -2734,7 +2786,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
- Assert(&newtup->t_data.header == newtup->tuple.t_data);
+ Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b25eae832ae..b52d06af928 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -23,16 +23,19 @@ typedef struct ReorderBufferTupleBuf
/* position in preallocated list */
slist_node node;
- /* tuple, stored sequentially */
+ /* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple;
- union
- {
- HeapTupleHeaderData header;
- char data[MaxHeapTupleSize];
- double align_it; /* ensure t_data is MAXALIGN'd */
- } t_data;
+
+ /* pre-allocated size of tuple buffer, different from tuple size */
+ Size alloc_tuple_size;
+
+ /* actual tuple data follows */
} ReorderBufferTupleBuf;
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferTupleBufData(p) \
+ ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
+
/*
* Types of the change passed to a 'change' callback.
*
@@ -341,7 +344,7 @@ struct ReorderBuffer
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
-ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
+ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);