aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c114
1 files changed, 83 insertions, 31 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e20c5114e2a..570400ffb73 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -444,27 +444,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
/*
- * Get an unused, possibly preallocated, ReorderBufferTupleBuf
+ * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
+ * least a tuple of size tuple_len (excluding header overhead).
*/
ReorderBufferTupleBuf *
-ReorderBufferGetTupleBuf(ReorderBuffer *rb)
+ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
ReorderBufferTupleBuf *tuple;
+ Size alloc_len;
- /* check the slab cache */
- if (rb->nr_cached_tuplebufs)
+ alloc_len = tuple_len + SizeofHeapTupleHeader;
+
+ /*
+ * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
+ * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
+ * tuples generated for oldtuples can be bigger, as they don't have
+ * out-of-line toast columns.
+ */
+ if (alloc_len < MaxHeapTupleSize)
+ alloc_len = MaxHeapTupleSize;
+
+
+ /* if small enough, check the slab cache */
+ if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
{
rb->nr_cached_tuplebufs--;
tuple = slist_container(ReorderBufferTupleBuf, node,
slist_pop_head_node(&rb->cached_tuplebufs));
#ifdef USE_ASSERT_CHECKING
- memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
+ memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
+#endif
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
+#ifdef USE_ASSERT_CHECKING
+ memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
#endif
}
else
{
tuple = (ReorderBufferTupleBuf *)
- MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
+ MemoryContextAlloc(rb->context,
+ sizeof(ReorderBufferTupleBuf) + alloc_len);
+ tuple->alloc_tuple_size = alloc_len;
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
}
return tuple;
@@ -479,13 +500,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb)
void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{
- /* check whether to put into the slab cache */
- if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
+ /* check whether to put into the slab cache, oversized tuples never are */
+ if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
+ rb->nr_cached_tuplebufs < max_cached_tuplebufs)
{
rb->nr_cached_tuplebufs++;
slist_push_head(&rb->cached_tuplebufs, &tuple->node);
+ VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
+ VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
}
else
{
@@ -2092,15 +2116,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple;
if (oldtup)
- oldlen = offsetof(ReorderBufferTupleBuf, t_data) +
- oldtup->tuple.t_len;
+ {
+ sz += sizeof(HeapTupleData);
+ oldlen = oldtup->tuple.t_len;
+ sz += oldlen;
+ }
if (newtup)
- newlen = offsetof(ReorderBufferTupleBuf, t_data) +
- newtup->tuple.t_len;
-
- sz += oldlen;
- sz += newlen;
+ {
+ sz += sizeof(HeapTupleData);
+ newlen = newtup->tuple.t_len;
+ sz += newlen;
+ }
/* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz);
@@ -2111,14 +2138,20 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen)
{
- memcpy(data, oldtup, oldlen);
+ memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen;
}
if (newlen)
{
- memcpy(data, newtup, newlen);
- data += newlen;
+ memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, newtup->tuple.t_data, newlen);
+ data += oldlen;
}
break;
}
@@ -2337,27 +2370,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.oldtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.oldtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.oldtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data =
- &change->data.tp.oldtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.oldtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
if (change->data.tp.newtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.newtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.newtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data =
- &change->data.tp.newtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.newtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
+
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
@@ -2734,7 +2786,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
- Assert(&newtup->t_data.header == newtup->tuple.t_data);
+ Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len;