diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 57 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 114 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 19 |
3 files changed, 127 insertions, 63 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 33e4343cc22..36e6d9a5c90 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } @@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { data = XLogRecGetBlockData(r, 0, &datalen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } @@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) data = XLogRecGetData(r) + SizeOfHeapUpdate; datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } @@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { + Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - XLogRecGetDataLen(r) - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + HeapTupleHeader header; + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); + data = ((char *) xlhdr) + SizeOfMultiInsertTuple; + datalen = xlhdr->datalen; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); - data = ((char *) xlhdr) + SizeOfMultiInsertTuple; - datalen = xlhdr->datalen; - /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, (char *) data, datalen); data += datalen; - tuple->t_data.header.t_infomask = xlhdr->t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2; - tuple->t_data.header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, data + SizeOfHeapHeader, datalen); - tuple->t_data.header.t_infomask = xlhdr.t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2; - tuple->t_data.header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e20c5114e2a..570400ffb73 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -444,27 +444,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) /* - * Get an unused, possibly preallocated, ReorderBufferTupleBuf + * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at + * least a tuple of size tuple_len (excluding header overhead). */ ReorderBufferTupleBuf * -ReorderBufferGetTupleBuf(ReorderBuffer *rb) +ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) { ReorderBufferTupleBuf *tuple; + Size alloc_len; - /* check the slab cache */ - if (rb->nr_cached_tuplebufs) + alloc_len = tuple_len + SizeofHeapTupleHeader; + + /* + * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for + * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples + * tuples generated for oldtuples can be bigger, as they don't have + * out-of-line toast columns. + */ + if (alloc_len < MaxHeapTupleSize) + alloc_len = MaxHeapTupleSize; + + + /* if small enough, check the slab cache */ + if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs) { rb->nr_cached_tuplebufs--; tuple = slist_container(ReorderBufferTupleBuf, node, slist_pop_head_node(&rb->cached_tuplebufs)); #ifdef USE_ASSERT_CHECKING - memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf)); + memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData)); +#endif + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); +#ifdef USE_ASSERT_CHECKING + memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size); #endif } else { tuple = (ReorderBufferTupleBuf *) - MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf)); + MemoryContextAlloc(rb->context, + sizeof(ReorderBufferTupleBuf) + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); } return tuple; @@ -479,13 +500,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb) void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) { - /* check whether to put into the slab cache */ - if (rb->nr_cached_tuplebufs < max_cached_tuplebufs) + /* check whether to put into the slab cache, oversized tuples never are */ + if (tuple->alloc_tuple_size == MaxHeapTupleSize && + rb->nr_cached_tuplebufs < max_cached_tuplebufs) { rb->nr_cached_tuplebufs++; slist_push_head(&rb->cached_tuplebufs, &tuple->node); + VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size); VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf)); VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node)); + VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size)); } else { @@ -2092,15 +2116,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, newtup = change->data.tp.newtuple; if (oldtup) - oldlen = offsetof(ReorderBufferTupleBuf, t_data) + - oldtup->tuple.t_len; + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } if (newtup) - newlen = offsetof(ReorderBufferTupleBuf, t_data) + - newtup->tuple.t_len; - - sz += oldlen; - sz += newlen; + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); @@ -2111,14 +2138,20 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, oldtup, oldlen); + memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, oldtup->tuple.t_data, oldlen); data += oldlen; } if (newlen) { - memcpy(data, newtup, newlen); - data += newlen; + memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, newtup->tuple.t_data, newlen); + data += oldlen; } break; } @@ -2337,27 +2370,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.oldtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.oldtuple, data, len); + /* restore ->tuple */ + memcpy(&change->data.tp.oldtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.oldtuple->tuple.t_data = - &change->data.tp.oldtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.oldtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } if (change->data.tp.newtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.newtuple, data, len); + /* restore ->tuple */ + memcpy(&change->data.tp.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.newtuple->tuple.t_data = - &change->data.tp.newtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { @@ -2734,7 +2786,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, */ tmphtup = heap_form_tuple(desc, attrs, isnull); Assert(newtup->tuple.t_len <= MaxHeapTupleSize); - Assert(&newtup->t_data.header == newtup->tuple.t_data); + Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); newtup->tuple.t_len = tmphtup->t_len; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b25eae832ae..b52d06af928 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -23,16 +23,19 @@ typedef struct ReorderBufferTupleBuf /* position in preallocated list */ slist_node node; - /* tuple, stored sequentially */ + /* tuple header, the interesting bit for users of logical decoding */ HeapTupleData tuple; - union - { - HeapTupleHeaderData header; - char data[MaxHeapTupleSize]; - double align_it; /* ensure t_data is MAXALIGN'd */ - } t_data; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ } ReorderBufferTupleBuf; +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferTupleBufData(p) \ + ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) + /* * Types of the change passed to a 'change' callback. * @@ -341,7 +344,7 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); -ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); |