diff options
author | Andres Freund <andres@anarazel.de> | 2016-03-05 18:02:20 -0800 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2016-03-05 18:02:20 -0800 |
commit | c8f621c43a599b35dc004ee09627bf4688cbbb84 (patch) | |
tree | 0d8ecce142f1bc0aad939f73926c9646a86d59d7 /src | |
parent | 0bda14d54cf24dedcd2011559a53cc62702e421b (diff) | |
download | postgresql-c8f621c43a599b35dc004ee09627bf4688cbbb84.tar.gz postgresql-c8f621c43a599b35dc004ee09627bf4688cbbb84.zip |
logical decoding: Fix handling of large old tuples with replica identity full.
When decoding the old version of an UPDATE or DELETE change, and if that
tuple was bigger than MaxHeapTupleSize, we either Assert'ed out, or
failed in more subtle ways in non-assert builds. Normally individual
tuples aren't bigger than MaxHeapTupleSize, with big datums toasted.
But that's not the case for the old version of a tuple for logical
decoding; the replica identity is logged as one piece. With the default
replica identity btree limits that to small tuples, but that's not the
case for FULL.
Change the tuple buffer infrastructure to separate allocate over-large
tuples, instead of always going through the slab cache.
This unfortunately requires changing the ReorderBufferTupleBuf
definition, we need to store the allocated size someplace. To avoid
requiring output plugins to recompile, don't store HeapTupleHeaderData
directly after HeapTupleData, but point to it via t_data; that leaves
rooms for the allocated size. As there's no reason for an output plugin
to look at ReorderBufferTupleBuf->t_data.header, remove the field. It
was just a minor convenience having it directly accessible.
Reported-By: Adam DratwiĆski
Discussion: CAKg6ypLd7773AOX4DiOGRwQk1TVOQKhNwjYiVjJnpq8Wo+i62Q@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 57 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 114 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 19 |
3 files changed, 127 insertions, 63 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 33e4343cc22..36e6d9a5c90 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } @@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { data = XLogRecGetBlockData(r, 0, &datalen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } @@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) data = XLogRecGetData(r) + SizeOfHeapUpdate; datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } @@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { + Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - XLogRecGetDataLen(r) - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + HeapTupleHeader header; + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); + data = ((char *) xlhdr) + SizeOfMultiInsertTuple; + datalen = xlhdr->datalen; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); - data = ((char *) xlhdr) + SizeOfMultiInsertTuple; - datalen = xlhdr->datalen; - /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, (char *) data, datalen); data += datalen; - tuple->t_data.header.t_infomask = xlhdr->t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2; - tuple->t_data.header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, data + SizeOfHeapHeader, datalen); - tuple->t_data.header.t_infomask = xlhdr.t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2; - tuple->t_data.header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e20c5114e2a..570400ffb73 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -444,27 +444,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) /* - * Get an unused, possibly preallocated, ReorderBufferTupleBuf + * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at + * least a tuple of size tuple_len (excluding header overhead). */ ReorderBufferTupleBuf * -ReorderBufferGetTupleBuf(ReorderBuffer *rb) +ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) { ReorderBufferTupleBuf *tuple; + Size alloc_len; - /* check the slab cache */ - if (rb->nr_cached_tuplebufs) + alloc_len = tuple_len + SizeofHeapTupleHeader; + + /* + * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for + * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples + * tuples generated for oldtuples can be bigger, as they don't have + * out-of-line toast columns. + */ + if (alloc_len < MaxHeapTupleSize) + alloc_len = MaxHeapTupleSize; + + + /* if small enough, check the slab cache */ + if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs) { rb->nr_cached_tuplebufs--; tuple = slist_container(ReorderBufferTupleBuf, node, slist_pop_head_node(&rb->cached_tuplebufs)); #ifdef USE_ASSERT_CHECKING - memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf)); + memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData)); +#endif + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); +#ifdef USE_ASSERT_CHECKING + memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size); #endif } else { tuple = (ReorderBufferTupleBuf *) - MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf)); + MemoryContextAlloc(rb->context, + sizeof(ReorderBufferTupleBuf) + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); } return tuple; @@ -479,13 +500,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb) void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) { - /* check whether to put into the slab cache */ - if (rb->nr_cached_tuplebufs < max_cached_tuplebufs) + /* check whether to put into the slab cache, oversized tuples never are */ + if (tuple->alloc_tuple_size == MaxHeapTupleSize && + rb->nr_cached_tuplebufs < max_cached_tuplebufs) { rb->nr_cached_tuplebufs++; slist_push_head(&rb->cached_tuplebufs, &tuple->node); + VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size); VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf)); VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node)); + VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size)); } else { @@ -2092,15 +2116,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, newtup = change->data.tp.newtuple; if (oldtup) - oldlen = offsetof(ReorderBufferTupleBuf, t_data) + - oldtup->tuple.t_len; + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } if (newtup) - newlen = offsetof(ReorderBufferTupleBuf, t_data) + - newtup->tuple.t_len; - - sz += oldlen; - sz += newlen; + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); @@ -2111,14 +2138,20 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, oldtup, oldlen); + memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, oldtup->tuple.t_data, oldlen); data += oldlen; } if (newlen) { - memcpy(data, newtup, newlen); - data += newlen; + memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, newtup->tuple.t_data, newlen); + data += oldlen; } break; } @@ -2337,27 +2370,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.oldtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.oldtuple, data, len); + /* restore ->tuple */ + memcpy(&change->data.tp.oldtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.oldtuple->tuple.t_data = - &change->data.tp.oldtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.oldtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } if (change->data.tp.newtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.newtuple, data, len); + /* restore ->tuple */ + memcpy(&change->data.tp.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.newtuple->tuple.t_data = - &change->data.tp.newtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { @@ -2734,7 +2786,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, */ tmphtup = heap_form_tuple(desc, attrs, isnull); Assert(newtup->tuple.t_len <= MaxHeapTupleSize); - Assert(&newtup->t_data.header == newtup->tuple.t_data); + Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); newtup->tuple.t_len = tmphtup->t_len; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b25eae832ae..b52d06af928 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -23,16 +23,19 @@ typedef struct ReorderBufferTupleBuf /* position in preallocated list */ slist_node node; - /* tuple, stored sequentially */ + /* tuple header, the interesting bit for users of logical decoding */ HeapTupleData tuple; - union - { - HeapTupleHeaderData header; - char data[MaxHeapTupleSize]; - double align_it; /* ensure t_data is MAXALIGN'd */ - } t_data; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ } ReorderBufferTupleBuf; +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferTupleBufData(p) \ + ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) + /* * Types of the change passed to a 'change' callback. * @@ -341,7 +344,7 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); -ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); |