diff options
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 645 |
1 files changed, 524 insertions, 121 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index b7c06da2557..e437c429920 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -45,6 +45,7 @@ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen); static void XLogReaderInvalReadState(XLogReaderState *state); +static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, @@ -57,6 +58,12 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, #define MAX_ERRORMSG_LEN 1000 /* + * Default size; large enough that typical users of XLogReader won't often need + * to use the 'oversized' memory allocation code path. + */ +#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024) + +/* * Construct a string in state->errormsg_buf explaining what's wrong with * the current record being read. */ @@ -70,6 +77,24 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) va_start(args, fmt); vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); va_end(args); + + state->errormsg_deferred = true; +} + +/* + * Set the size of the decoding buffer. A pointer to a caller supplied memory + * region may also be passed in, in which case non-oversized records will be + * decoded there. + */ +void +XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) +{ + Assert(state->decode_buffer == NULL); + + state->decode_buffer = buffer; + state->decode_buffer_size = size; + state->decode_buffer_tail = buffer; + state->decode_buffer_head = buffer; } /* @@ -92,8 +117,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* initialize caller-provided support functions */ state->routine = *routine; - state->max_block_id = -1; - /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the @@ -144,18 +167,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, void XLogReaderFree(XLogReaderState *state) { - int block_id; - if (state->seg.ws_file != -1) state->routine.segment_close(state); - for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) - { - if (state->blocks[block_id].data) - pfree(state->blocks[block_id].data); - } - if (state->main_data) - pfree(state->main_data); + if (state->decode_buffer && state->free_decode_buffer) + pfree(state->decode_buffer); pfree(state->errormsg_buf); if (state->readRecordBuf) @@ -251,7 +267,133 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; + state->NextRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->DecodeRecPtr = InvalidXLogRecPtr; +} + +/* + * See if we can release the last record that was returned by + * XLogNextRecord(), if any, to free up space. + */ +void +XLogReleasePreviousRecord(XLogReaderState *state) +{ + DecodedXLogRecord *record; + + if (!state->record) + return; + + /* + * Remove it from the decoded record queue. It must be the oldest item + * decoded, decode_queue_head. + */ + record = state->record; + Assert(record == state->decode_queue_head); + state->record = NULL; + state->decode_queue_head = record->next; + + /* It might also be the newest item decoded, decode_queue_tail. */ + if (state->decode_queue_tail == record) + state->decode_queue_tail = NULL; + + /* Release the space. */ + if (unlikely(record->oversized)) + { + /* It's not in the the decode buffer, so free it to release space. */ + pfree(record); + } + else + { + /* It must be the head (oldest) record in the decode buffer. */ + Assert(state->decode_buffer_head == (char *) record); + + /* + * We need to update head to point to the next record that is in the + * decode buffer, if any, being careful to skip oversized ones + * (they're not in the decode buffer). + */ + record = record->next; + while (unlikely(record && record->oversized)) + record = record->next; + + if (record) + { + /* Adjust head to release space up to the next record. */ + state->decode_buffer_head = (char *) record; + } + else + { + /* + * Otherwise we might as well just reset head and tail to the + * start of the buffer space, because we're empty. This means + * we'll keep overwriting the same piece of memory if we're not + * doing any prefetching. + */ + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + } + } +} + +/* + * Attempt to read an XLOG record. + * + * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be + * called before the first call to XLogNextRecord(). This functions returns + * records and errors that were put into an internal queue by XLogReadAhead(). + * + * On success, a record is returned. + * + * The returned record (or *errormsg) points to an internal buffer that's + * valid until the next call to XLogNextRecord. + */ +DecodedXLogRecord * +XLogNextRecord(XLogReaderState *state, char **errormsg) +{ + /* Release the last record returned by XLogNextRecord(). */ + XLogReleasePreviousRecord(state); + + if (state->decode_queue_head == NULL) + { + *errormsg = NULL; + if (state->errormsg_deferred) + { + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + state->errormsg_deferred = false; + } + + /* + * state->EndRecPtr is expected to have been set by the last call to + * XLogBeginRead() or XLogNextRecord(), and is the location of the + * error. + */ + Assert(!XLogRecPtrIsInvalid(state->EndRecPtr)); + + return NULL; + } + + /* + * Record this as the most recent record returned, so that we'll release + * it next time. This also exposes it to the traditional + * XLogRecXXX(xlogreader) macros, which work with the decoder rather than + * the record for historical reasons. + */ + state->record = state->decode_queue_head; + + /* + * Update the pointers to the beginning and one-past-the-end of this + * record, again for the benefit of historical code that expected the + * decoder to track this rather than accessing these fields of the record + * itself. + */ + state->ReadRecPtr = state->record->lsn; + state->EndRecPtr = state->record->next_lsn; + + *errormsg = NULL; + + return state->record; } /* @@ -273,6 +415,119 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg) { + DecodedXLogRecord *decoded; + + /* + * Release last returned record, if there is one. We need to do this so + * that we can check for empty decode queue accurately. + */ + XLogReleasePreviousRecord(state); + + /* + * Call XLogReadAhead() in blocking mode to make sure there is something + * in the queue, though we don't use the result. + */ + if (!XLogReaderHasQueuedRecordOrError(state)) + XLogReadAhead(state, false /* nonblocking */ ); + + /* Consume the head record or error. */ + decoded = XLogNextRecord(state, errormsg); + if (decoded) + { + /* + * This function returns a pointer to the record's header, not the + * actual decoded record. The caller will access the decoded record + * through the XLogRecGetXXX() macros, which reach the decoded + * recorded as xlogreader->record. + */ + Assert(state->record == decoded); + return &decoded->header; + } + + return NULL; +} + +/* + * Allocate space for a decoded record. The only member of the returned + * object that is initialized is the 'oversized' flag, indicating that the + * decoded record wouldn't fit in the decode buffer and must eventually be + * freed explicitly. + * + * The caller is responsible for adjusting decode_buffer_tail with the real + * size after successfully decoding a record into this space. This way, if + * decoding fails, then there is nothing to undo unless the 'oversized' flag + * was set and pfree() must be called. + * + * Return NULL if there is no space in the decode buffer and allow_oversized + * is false, or if memory allocation fails for an oversized buffer. + */ +static DecodedXLogRecord * +XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized) +{ + size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len); + DecodedXLogRecord *decoded = NULL; + + /* Allocate a circular decode buffer if we don't have one already. */ + if (unlikely(state->decode_buffer == NULL)) + { + if (state->decode_buffer_size == 0) + state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE; + state->decode_buffer = palloc(state->decode_buffer_size); + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + state->free_decode_buffer = true; + } + + /* Try to allocate space in the circular decode buffer. */ + if (state->decode_buffer_tail >= state->decode_buffer_head) + { + /* Empty, or tail is to the right of head. */ + if (state->decode_buffer_tail + required_space <= + state->decode_buffer + state->decode_buffer_size) + { + /* There is space between tail and end. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; + decoded->oversized = false; + return decoded; + } + else if (state->decode_buffer + required_space < + state->decode_buffer_head) + { + /* There is space between start and head. */ + decoded = (DecodedXLogRecord *) state->decode_buffer; + decoded->oversized = false; + return decoded; + } + } + else + { + /* Tail is to the left of head. */ + if (state->decode_buffer_tail + required_space < + state->decode_buffer_head) + { + /* There is space between tail and head. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; + decoded->oversized = false; + return decoded; + } + } + + /* Not enough space in the decode buffer. Are we allowed to allocate? */ + if (allow_oversized) + { + decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM); + if (decoded == NULL) + return NULL; + decoded->oversized = true; + return decoded; + } + + return NULL; +} + +static XLogPageReadResult +XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) +{ XLogRecPtr RecPtr; XLogRecord *record; XLogRecPtr targetPagePtr; @@ -284,6 +539,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) bool assembled; bool gotheader; int readOff; + DecodedXLogRecord *decoded; + char *errormsg; /* not used */ /* * randAccess indicates whether to verify the previous-record pointer of @@ -293,21 +550,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) randAccess = false; /* reset error state */ - *errormsg = NULL; state->errormsg_buf[0] = '\0'; + decoded = NULL; - ResetDecoder(state); state->abortedRecPtr = InvalidXLogRecPtr; state->missingContrecPtr = InvalidXLogRecPtr; - RecPtr = state->EndRecPtr; + RecPtr = state->NextRecPtr; - if (state->ReadRecPtr != InvalidXLogRecPtr) + if (state->DecodeRecPtr != InvalidXLogRecPtr) { /* read the record after the one we just read */ /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If + * NextRecPtr is pointing to end+1 of the previous WAL record. If * we're at a page boundary, no more records can fit on the current * page. We must skip over the page header, but we can't do that until * we've read in the page, since the header size is variable. @@ -318,7 +574,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) /* * Caller supplied a position to start at. * - * In this case, EndRecPtr should already be pointing to a valid + * In this case, NextRecPtr should already be pointing to a valid * record starting position. */ Assert(XRecOffIsValid(RecPtr)); @@ -326,6 +582,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) } restart: + state->nonblocking = nonblocking; state->currRecPtr = RecPtr; assembled = false; @@ -339,7 +596,9 @@ restart: */ readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; /* @@ -395,7 +654,7 @@ restart: */ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) goto err; gotheader = true; @@ -414,6 +673,31 @@ restart: gotheader = false; } + /* + * Find space to decode this record. Don't allow oversized allocation if + * the caller requested nonblocking. Otherwise, we *have* to try to + * decode the record now because the caller has nothing else to do, so + * allow an oversized record to be palloc'd if that turns out to be + * necessary. + */ + decoded = XLogReadRecordAlloc(state, + total_len, + !nonblocking /* allow_oversized */ ); + if (decoded == NULL) + { + /* + * There is no space in the decode buffer. The caller should help + * with that problem by consuming some records. + */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + + /* We failed to allocate memory for an oversized record. */ + report_invalid_record(state, + "out of memory while trying to decode a record of length %u", total_len); + goto err; + } + len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; if (total_len > len) { @@ -453,7 +737,9 @@ restart: Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; Assert(SizeOfXLogShortPHD <= readOff); @@ -471,7 +757,6 @@ restart: if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) { state->overwrittenRecPtr = RecPtr; - ResetDecoder(state); RecPtr = targetPagePtr; goto restart; } @@ -526,7 +811,7 @@ restart: if (!gotheader) { record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) goto err; gotheader = true; @@ -540,8 +825,8 @@ restart: goto err; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize + state->DecodeRecPtr = RecPtr; + state->NextRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len); } else @@ -549,16 +834,18 @@ restart: /* Wait for the record data to become available */ readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; /* Record does not cross a page boundary */ if (!ValidXLogRecord(state, record, RecPtr)) goto err; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + state->NextRecPtr = RecPtr + MAXALIGN(total_len); - state->ReadRecPtr = RecPtr; + state->DecodeRecPtr = RecPtr; } /* @@ -568,14 +855,40 @@ restart: (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->segcxt.ws_segsize - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); + state->NextRecPtr += state->segcxt.ws_segsize - 1; + state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; + if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg)) + { + /* Record the location of the next record. */ + decoded->next_lsn = state->NextRecPtr; + + /* + * If it's in the decode buffer, mark the decode buffer space as + * occupied. + */ + if (!decoded->oversized) + { + /* The new decode buffer head must be MAXALIGNed. */ + Assert(decoded->size == MAXALIGN(decoded->size)); + if ((char *) decoded == state->decode_buffer) + state->decode_buffer_tail = state->decode_buffer + decoded->size; + else + state->decode_buffer_tail += decoded->size; + } + + /* Insert it into the queue of decoded records. */ + Assert(state->decode_queue_tail != decoded); + if (state->decode_queue_tail) + state->decode_queue_tail->next = decoded; + state->decode_queue_tail = decoded; + if (!state->decode_queue_head) + state->decode_queue_head = decoded; + return XLREAD_SUCCESS; + } else - return NULL; + return XLREAD_FAIL; err: if (assembled) @@ -593,14 +906,46 @@ err: state->missingContrecPtr = targetPagePtr; } + if (decoded && decoded->oversized) + pfree(decoded); + /* * Invalidate the read state. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; + /* + * If an error was written to errmsg_buf, it'll be returned to the caller + * of XLogReadRecord() after all successfully decoded records from the + * read queue. + */ + + return XLREAD_FAIL; +} + +/* + * Try to decode the next available record, and return it. The record will + * also be returned to XLogNextRecord(), which must be called to 'consume' + * each record. + * + * If nonblocking is true, may return NULL due to lack of data or WAL decoding + * space. + */ +DecodedXLogRecord * +XLogReadAhead(XLogReaderState *state, bool nonblocking) +{ + XLogPageReadResult result; + + if (state->errormsg_deferred) + return NULL; + + result = XLogDecodeNextRecord(state, nonblocking); + if (result == XLREAD_SUCCESS) + { + Assert(state->decode_queue_tail != NULL); + return state->decode_queue_tail; + } return NULL; } @@ -609,8 +954,14 @@ err: * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the page_read() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns XLREAD_FAIL if the required page cannot be read for some + * reason; errormsg_buf is set in that case (unless the error occurs in the + * page_read callback). + * + * Returns XLREAD_WOULDBLOCK if the requested data can't be read without + * waiting. This can be returned only if the installed page_read callback + * respects the state->nonblocking flag, and cannot read the requested data + * immediately. * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -652,7 +1003,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; /* we can be sure to have enough WAL available, we scrolled back */ @@ -670,7 +1023,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; Assert(readLen <= XLOG_BLCKSZ); @@ -689,7 +1044,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; } @@ -707,8 +1064,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: - XLogReaderInvalReadState(state); - return -1; + if (state->errormsg_buf[0] != '\0') + { + state->errormsg_deferred = true; + XLogReaderInvalReadState(state); + } + return XLREAD_FAIL; } /* @@ -987,6 +1348,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) Assert(!XLogRecPtrIsInvalid(RecPtr)); + /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */ + state->nonblocking = false; + /* * skip over potential continuation data, keeping in mind that it may span * multiple pages @@ -1187,34 +1551,83 @@ WALRead(XLogReaderState *state, * ---------------------------------------- */ -/* private function to reset the state between records */ +/* + * Private function to reset the state, forgetting all decoded records, if we + * are asked to move to a new read position. + */ static void ResetDecoder(XLogReaderState *state) { - int block_id; + DecodedXLogRecord *r; - state->decoded_record = NULL; - - state->main_data_len = 0; - - for (block_id = 0; block_id <= state->max_block_id; block_id++) + /* Reset the decoded record queue, freeing any oversized records. */ + while ((r = state->decode_queue_head) != NULL) { - state->blocks[block_id].in_use = false; - state->blocks[block_id].has_image = false; - state->blocks[block_id].has_data = false; - state->blocks[block_id].apply_image = false; + state->decode_queue_head = r->next; + if (r->oversized) + pfree(r); } - state->max_block_id = -1; + state->decode_queue_tail = NULL; + state->decode_queue_head = NULL; + state->record = NULL; + + /* Reset the decode buffer to empty. */ + state->decode_buffer_tail = state->decode_buffer; + state->decode_buffer_head = state->decode_buffer; + + /* Clear error state. */ + state->errormsg_buf[0] = '\0'; + state->errormsg_deferred = false; } /* - * Decode the previously read record. + * Compute the maximum possible amount of padding that could be required to + * decode a record, given xl_tot_len from the record's header. This is the + * amount of output buffer space that we need to decode a record, though we + * might not finish up using it all. + * + * This computation is pessimistic and assumes the maximum possible number of + * blocks, due to lack of better information. + */ +size_t +DecodeXLogRecordRequiredSpace(size_t xl_tot_len) +{ + size_t size = 0; + + /* Account for the fixed size part of the decoded record struct. */ + size += offsetof(DecodedXLogRecord, blocks[0]); + /* Account for the flexible blocks array of maximum possible size. */ + size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1); + /* Account for all the raw main and block data. */ + size += xl_tot_len; + /* We might insert padding before main_data. */ + size += (MAXIMUM_ALIGNOF - 1); + /* We might insert padding before each block's data. */ + size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1); + /* We might insert padding at the end. */ + size += (MAXIMUM_ALIGNOF - 1); + + return size; +} + +/* + * Decode a record. "decoded" must point to a MAXALIGNed memory area that has + * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On + * success, decoded->size contains the actual space occupied by the decoded + * record, which may turn out to be less. + * + * Only decoded->oversized member must be initialized already, and will not be + * modified. Other members will be initialized as required. * * On error, a human-readable error message is returned in *errormsg, and * the return value is false. */ bool -DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, + char **errormsg) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1229,17 +1642,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) } while(0) char *ptr; + char *out; uint32 remaining; uint32 datatotal; RelFileNode *rnode = NULL; uint8 block_id; - ResetDecoder(state); - - state->decoded_record = record; - state->record_origin = InvalidRepOriginId; - state->toplevel_xid = InvalidTransactionId; - + decoded->header = *record; + decoded->lsn = lsn; + decoded->next = NULL; + decoded->record_origin = InvalidRepOriginId; + decoded->toplevel_xid = InvalidTransactionId; + decoded->main_data = NULL; + decoded->main_data_len = 0; + decoded->max_block_id = -1; ptr = (char *) record; ptr += SizeOfXLogRecord; remaining = record->xl_tot_len - SizeOfXLogRecord; @@ -1257,7 +1673,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ @@ -1268,18 +1684,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) uint32 main_data_len; COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ } else if (block_id == XLR_BLOCK_ID_ORIGIN) { - COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); + COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); } else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) { - COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId)); } else if (block_id <= XLR_MAX_BLOCK_ID) { @@ -1287,7 +1703,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) DecodedBkpBlock *blk; uint8 fork_flags; - if (block_id <= state->max_block_id) + /* mark any intervening block IDs as not in use */ + for (int i = decoded->max_block_id + 1; i < block_id; ++i) + decoded->blocks[i].in_use = false; + + if (block_id <= decoded->max_block_id) { report_invalid_record(state, "out-of-order block_id %u at %X/%X", @@ -1295,9 +1715,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; } - state->max_block_id = block_id; + decoded->max_block_id = block_id; - blk = &state->blocks[block_id]; + blk = &decoded->blocks[block_id]; blk->in_use = true; blk->apply_image = false; @@ -1440,17 +1860,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) /* * Ok, we've parsed the fragment headers, and verified that the total * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. + * left. Copy the data of each fragment to contiguous space after the + * blocks array, inserting alignment padding before the data fragments so + * they can be cast to struct pointers by REDO routines. */ + out = ((char *) decoded) + + offsetof(DecodedXLogRecord, blocks) + + sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1); /* block data first */ - for (block_id = 0; block_id <= state->max_block_id; block_id++) + for (block_id = 0; block_id <= decoded->max_block_id; block_id++) { - DecodedBkpBlock *blk = &state->blocks[block_id]; + DecodedBkpBlock *blk = &decoded->blocks[block_id]; if (!blk->in_use) continue; @@ -1459,58 +1880,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) if (blk->has_image) { - blk->bkp_image = ptr; + /* no need to align image */ + blk->bkp_image = out; + memcpy(out, ptr, blk->bimg_len); ptr += blk->bimg_len; + out += blk->bimg_len; } if (blk->has_data) { - if (!blk->data || blk->data_len > blk->data_bufsz) - { - if (blk->data) - pfree(blk->data); - - /* - * Force the initial request to be BLCKSZ so that we don't - * waste time with lots of trips through this stanza as a - * result of WAL compression. - */ - blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ)); - blk->data = palloc(blk->data_bufsz); - } + out = (char *) MAXALIGN(out); + blk->data = out; memcpy(blk->data, ptr, blk->data_len); ptr += blk->data_len; + out += blk->data_len; } } /* and finally, the main data */ - if (state->main_data_len > 0) + if (decoded->main_data_len > 0) { - if (!state->main_data || state->main_data_len > state->main_data_bufsz) - { - if (state->main_data) - pfree(state->main_data); - - /* - * main_data_bufsz must be MAXALIGN'ed. In many xlog record - * types, we omit trailing struct padding on-disk to save a few - * bytes; but compilers may generate accesses to the xlog struct - * that assume that padding bytes are present. If the palloc - * request is not large enough to include such padding bytes then - * we'll get valgrind complaints due to otherwise-harmless fetches - * of the padding bytes. - * - * In addition, force the initial request to be reasonably large - * so that we don't waste time with lots of trips through this - * stanza. BLCKSZ / 2 seems like a good compromise choice. - */ - state->main_data_bufsz = MAXALIGN(Max(state->main_data_len, - BLCKSZ / 2)); - state->main_data = palloc(state->main_data_bufsz); - } - memcpy(state->main_data, ptr, state->main_data_len); - ptr += state->main_data_len; + out = (char *) MAXALIGN(out); + decoded->main_data = out; + memcpy(decoded->main_data, ptr, decoded->main_data_len); + ptr += decoded->main_data_len; + out += decoded->main_data_len; } + /* Report the actual size we used. */ + decoded->size = MAXALIGN(out - (char *) decoded); + Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + decoded->size); + return true; shortdata_err: @@ -1536,10 +1936,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (rnode) *rnode = bkpb->rnode; if (forknum) @@ -1559,10 +1960,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return NULL; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (!bkpb->has_data) { @@ -1590,12 +1992,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) char *ptr; PGAlignedBlock tmp; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - if (!record->blocks[block_id].has_image) + if (!record->record->blocks[block_id].has_image) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; ptr = bkpb->bkp_image; if (BKPIMAGE_COMPRESSED(bkpb->bimg_info)) |