diff options
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 744 |
1 files changed, 596 insertions, 148 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 02257768ec8..f66592482a4 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -38,6 +38,9 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, bool header_inclusive); +size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +static XLogReadRecordResult XLogDecodeOneRecord(XLogReaderState *state, + bool allow_oversized); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record); @@ -50,6 +53,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 +#define DEFAULT_DECODE_BUFFER_SIZE 0x10000 + /* * Construct a string in state->errormsg_buf explaining what's wrong with * the current record being read. @@ -64,6 +69,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) va_start(args, fmt); vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); va_end(args); + + state->errormsg_deferred = true; } /* @@ -86,8 +93,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* initialize caller-provided support functions */ state->cleanup_cb = cleanup_cb; - state->max_block_id = -1; - /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the @@ -136,18 +141,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, void XLogReaderFree(XLogReaderState *state) { - int block_id; - if (state->seg.ws_file >= 0) state->cleanup_cb(state); - for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) - { - if (state->blocks[block_id].data) - pfree(state->blocks[block_id].data); - } - if (state->main_data) - pfree(state->main_data); + if (state->decode_buffer && state->free_decode_buffer) + pfree(state->decode_buffer); pfree(state->errormsg_buf); if (state->readRecordBuf) @@ -157,6 +155,22 @@ XLogReaderFree(XLogReaderState *state) } /* + * Set the size of the decoding buffer. A pointer to a caller supplied memory + * region may also be passed in, in which case non-oversized records will be + * decoded there. + */ +void +XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) +{ + Assert(state->decode_buffer == NULL); + + state->decode_buffer = buffer; + state->decode_buffer_size = size; + state->decode_buffer_head = buffer; + state->decode_buffer_tail = buffer; +} + +/* * Allocate readRecordBuf to fit a record of at least the given length. * Returns true if successful, false if out of memory. * @@ -243,22 +257,123 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; + state->NextRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->DecodeRecPtr = InvalidXLogRecPtr; state->readRecordState = XLREAD_NEXT_RECORD; } /* - * Attempt to read an XLOG record. - * - * XLogBeginRead() or XLogFindNextRecord() must be called before the first call - * to XLogReadRecord(). + * See if we can release the last record that was returned by + * XLogReadRecord(), to free up space. + */ +static void +XLogReleasePreviousRecord(XLogReaderState *state) +{ + DecodedXLogRecord *record; + + /* + * Remove it from the decoded record queue. It must be the oldest + * item decoded, decode_queue_tail. + */ + record = state->record; + Assert(record == state->decode_queue_tail); + state->record = NULL; + state->decode_queue_tail = record->next; + + /* It might also be the newest item decoded, decode_queue_head. */ + if (state->decode_queue_head == record) + state->decode_queue_head = NULL; + + /* Release the space. */ + if (unlikely(record->oversized)) + { + /* It's not in the the decode buffer, so free it to release space. */ + pfree(record); + } + else + { + /* It must be the tail record in the decode buffer. */ + Assert(state->decode_buffer_tail == (char *) record); + + /* + * We need to update tail to point to the next record that is in the + * decode buffer, if any, being careful to skip oversized ones + * (they're not in the decode buffer). + */ + record = record->next; + while (unlikely(record && record->oversized)) + record = record->next; + + if (record) + { + /* Adjust tail to release space up to the next record. */ + state->decode_buffer_tail = (char *) record; + } + else if (state->decoding && !state->decoding->oversized) + { + /* + * We're releasing the last fully decoded record in + * XLogReadRecord(), but some time earlier we partially decoded a + * record in XLogReadAhead() and were unable to complete the job. + * We'll set the buffer head and tail to point to the record we + * started working on, so that we can continue (perhaps from a + * different source). + */ + state->decode_buffer_tail = (char *) state->decoding; + state->decode_buffer_head = (char *) state->decoding; + } + else + { + /* + * Otherwise we might as well just reset head and tail to the + * start of the buffer space, because we're empty. This means + * we'll keep overwriting the same piece of memory if we're not + * doing any prefetching. + */ + state->decode_buffer_tail = state->decode_buffer; + state->decode_buffer_head = state->decode_buffer; + } + } +} + +/* + * Similar to XLogNextRecord(), but this traditional interface is for code + * that just wants the header, not the decoded record. Callers can access the + * decoded record through the XLogRecGetXXX() macros. + */ +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) +{ + XLogReadRecordResult result; + DecodedXLogRecord *decoded; + + /* Consume the next decoded record. */ + result = XLogNextRecord(state, &decoded, errormsg); + if (result == XLREAD_SUCCESS) + { + /* + * The traditional interface just returns the header, not the decoded + * record. The caller will access the decoded record through the + * XLogRecGetXXX() macros. + */ + *record = &decoded->header; + } + else + *record = NULL; + return result; +} + +/* + * Consume the next record. XLogBeginRead() or XLogFindNextRecord() must be + * called before the first call to XLogNextRecord(). * * This function may return XLREAD_NEED_DATA several times before returning a * result record. The caller shall read in some new data then call this * function again with the same parameters. * * When a record is successfully read, returns XLREAD_SUCCESS with result - * record being stored in *record. Otherwise *record is NULL. + * record being stored in *record. Otherwise *record is set to NULL. * * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the * current record. In that case, state->readPagePtr and state->reqLen inform @@ -269,11 +384,249 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * length of data that is now available (which must be >= given reqLen), * respectively. * - * If invalid data is encountered, returns XLREAD_FAIL and sets *record to - * NULL. *errormsg is set to a string with details of the failure. The + * Returns XLREAD_FULL if allow_oversized is true, and no space is available. + * This is intended for readahead. + * + * If invalid data is encountered, returns XLREAD_FAIL with *record being set + * to NULL. *errormsg is set to a string with details of the failure. The * returned pointer (or *errormsg) points to an internal buffer that's valid * until the next call to XLogReadRecord. * + */ +XLogReadRecordResult +XLogNextRecord(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg) +{ + /* Release the space occupied by the last record we returned. */ + if (state->record) + XLogReleasePreviousRecord(state); + + for (;;) + { + XLogReadRecordResult result; + + /* We can now return the oldest item in the queue, if there is one. */ + if (state->decode_queue_tail) + { + /* + * Record this as the most recent record returned, so that we'll + * release it next time. This also exposes it to the + * XLogRecXXX(decoder) macros, which pass in the decoder rather + * than the record for historical reasons. + */ + state->record = state->decode_queue_tail; + + /* + * It should be immediately after the last the record returned by + * XLogReadRecord(), or at the position set by XLogBeginRead() if + * XLogReadRecord() hasn't been called yet. It may be after a + * page header, though. + */ + Assert(state->record->lsn == state->EndRecPtr || + (state->EndRecPtr % XLOG_BLCKSZ == 0 && + (state->record->lsn == state->EndRecPtr + SizeOfXLogShortPHD || + state->record->lsn == state->EndRecPtr + SizeOfXLogLongPHD))); + + /* + * Set ReadRecPtr and EndRecPtr to correspond to that + * record. + * + * Calling code could access these through the returned decoded + * record, but for now we'll update them directly here, for the + * benefit of all the existing code that accesses these variables + * directly. + */ + state->ReadRecPtr = state->record->lsn; + state->EndRecPtr = state->record->next_lsn; + + *errormsg = NULL; + *record = state->record; + + return XLREAD_SUCCESS; + } + else if (state->errormsg_deferred) + { + /* + * If we've run out of records, but we have a deferred error, now + * is the time to report it. + */ + state->errormsg_deferred = false; + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + else + *errormsg = NULL; + *record = NULL; + state->EndRecPtr = state->DecodeRecPtr; + + return XLREAD_FAIL; + } + + /* We need to get a decoded record into our queue first. */ + result = XLogDecodeOneRecord(state, true /* allow_oversized */ ); + switch(result) + { + case XLREAD_NEED_DATA: + *errormsg = NULL; + *record = NULL; + return result; + case XLREAD_SUCCESS: + Assert(state->decode_queue_tail != NULL); + break; + case XLREAD_FULL: + /* Not expected because we passed allow_oversized = true */ + Assert(false); + break; + case XLREAD_FAIL: + /* + * If that produced neither a queued record nor a queued error, + * then we're at the end (for example, archive recovery with no + * more files available). + */ + Assert(state->decode_queue_tail == NULL); + if (!state->errormsg_deferred) + { + state->EndRecPtr = state->DecodeRecPtr; + *errormsg = NULL; + *record = NULL; + return result; + } + break; + } + } + + /* unreachable */ + return XLREAD_FAIL; +} + +/* + * Try to decode the next available record. The next record will also be + * returned to XLogRecordRead(). + * + * In addition to the values that XLogReadRecord() can return, XLogReadAhead() + * can also return XLREAD_FULL to indicate that further readahead is not + * possible yet due to lack of space. + */ +XLogReadRecordResult +XLogReadAhead(XLogReaderState *state, DecodedXLogRecord **record, char **errormsg) +{ + XLogReadRecordResult result; + + /* We stop trying after encountering an error. */ + if (unlikely(state->errormsg_deferred)) + { + /* We only report the error message the first time, see below. */ + *errormsg = NULL; + return XLREAD_FAIL; + } + + /* + * Try to decode one more record, if we have space. Pass allow_oversized + * = false, so that this call returns fast if the decode buffer is full. + */ + result = XLogDecodeOneRecord(state, false); + switch (result) + { + case XLREAD_SUCCESS: + /* New record at head of decode record queue. */ + Assert(state->decode_queue_head != NULL); + *record = state->decode_queue_head; + return result; + case XLREAD_FULL: + /* No space in circular decode buffer. */ + return result; + case XLREAD_NEED_DATA: + /* The caller needs to insert more data. */ + return result; + case XLREAD_FAIL: + /* Report the error. XLogReadRecord() will also report it. */ + Assert(state->errormsg_deferred); + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + return result; + } + + /* Unreachable. */ + return XLREAD_FAIL; +} + +/* + * Allocate space for a decoded record. The only member of the returned + * object that is initialized is the 'oversized' flag, indicating that the + * decoded record wouldn't fit in the decode buffer and must eventually be + * freed explicitly. + * + * Return NULL if there is no space in the decode buffer and allow_oversized + * is false, or if memory allocation fails for an oversized buffer. + */ +static DecodedXLogRecord * +XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized) +{ + size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len); + DecodedXLogRecord *decoded = NULL; + + /* Allocate a circular decode buffer if we don't have one already. */ + if (unlikely(state->decode_buffer == NULL)) + { + if (state->decode_buffer_size == 0) + state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE; + state->decode_buffer = palloc(state->decode_buffer_size); + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + state->free_decode_buffer = true; + } + if (state->decode_buffer_head >= state->decode_buffer_tail) + { + /* Empty, or head is to the right of tail. */ + if (state->decode_buffer_head + required_space <= + state->decode_buffer + state->decode_buffer_size) + { + /* There is space between head and end. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_head; + decoded->oversized = false; + return decoded; + } + else if (state->decode_buffer + required_space < + state->decode_buffer_tail) + { + /* There is space between start and tail. */ + decoded = (DecodedXLogRecord *) state->decode_buffer; + decoded->oversized = false; + return decoded; + } + } + else + { + /* Head is to the left of tail. */ + if (state->decode_buffer_head + required_space < + state->decode_buffer_tail) + { + /* There is space between head and tail. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_head; + decoded->oversized = false; + return decoded; + } + } + + /* Not enough space in the decode buffer. Are we allowed to allocate? */ + if (allow_oversized) + { + decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM); + if (decoded == NULL) + return NULL; + decoded->oversized = true; + return decoded; + } + + return decoded; +} + +/* + * Try to read and decode the next record and add it to the head of the + * decoded record queue. If 'allow_oversized' is false, then XLREAD_FULL can + * be returned to indicate the decoding buffer is full. XLogBeginRead() or + * XLogFindNextRecord() must be called before the first call to + * XLogReadRecord(). * * This function runs a state machine consisting of the following states. * @@ -300,35 +653,35 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * current state. This behavior allows us to continue reading a record * after switching to a different source, during streaming replication. */ -XLogReadRecordResult -XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) +static XLogReadRecordResult +XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized) { + XLogRecord *record; + char *errormsg; /* not used */ XLogRecord *prec; - *record = NULL; - /* reset error state */ - *errormsg = NULL; state->errormsg_buf[0] = '\0'; + record = NULL; switch (state->readRecordState) { case XLREAD_NEXT_RECORD: - ResetDecoder(state); + Assert(!state->decoding); - if (state->ReadRecPtr != InvalidXLogRecPtr) + if (state->DecodeRecPtr != InvalidXLogRecPtr) { /* read the record after the one we just read */ /* - * EndRecPtr is pointing to end+1 of the previous WAL record. + * NextRecPtr is pointing to end+1 of the previous WAL record. * If we're at a page boundary, no more records can fit on the * current page. We must skip over the page header, but we * can't do that until we've read in the page, since the * header size is variable. */ - state->PrevRecPtr = state->ReadRecPtr; - state->ReadRecPtr = state->EndRecPtr; + state->PrevRecPtr = state->DecodeRecPtr; + state->DecodeRecPtr = state->NextRecPtr; } else { @@ -338,8 +691,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * In this case, EndRecPtr should already be pointing to a * valid record starting position. */ - Assert(XRecOffIsValid(state->EndRecPtr)); - state->ReadRecPtr = state->EndRecPtr; + Assert(XRecOffIsValid(state->NextRecPtr)); + state->DecodeRecPtr = state->NextRecPtr; /* * We cannot verify the previous-record pointer when we're @@ -347,7 +700,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * won't try doing that. */ state->PrevRecPtr = InvalidXLogRecPtr; - state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ } state->record_verified = false; @@ -362,9 +714,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) uint32 targetRecOff; XLogPageHeader pageHeader; + Assert(!state->decoding); + targetPagePtr = - state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); - targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ); + targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ; /* * Check if we have enough data. For the first record in the @@ -385,13 +739,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) if (targetRecOff == 0) { /* At page start, so skip over page header. */ - state->ReadRecPtr += pageHeaderSize; + state->DecodeRecPtr += pageHeaderSize; targetRecOff = pageHeaderSize; } else if (targetRecOff < pageHeaderSize) { report_invalid_record(state, "invalid record offset at %X/%X", - LSN_FORMAT_ARGS(state->ReadRecPtr)); + LSN_FORMAT_ARGS(state->DecodeRecPtr)); goto err; } @@ -400,8 +754,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) targetRecOff == pageHeaderSize) { report_invalid_record(state, "contrecord is requested by %X/%X", - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr); + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr); goto err; } @@ -419,9 +773,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * header. */ prec = (XLogRecord *) (state->readBuf + - state->ReadRecPtr % XLOG_BLCKSZ); + state->DecodeRecPtr % XLOG_BLCKSZ); total_len = prec->xl_tot_len; + /* Find space to decode this record. */ + Assert(state->decoding == NULL); + state->decoding = XLogReadRecordAlloc(state, total_len, + allow_oversized); + if (state->decoding == NULL) + { + /* + * We couldn't get space. If allow_oversized was true, + * then palloc() must have failed. Otherwise, report that + * our decoding buffer is full. This means that weare + * trying to read too far ahead. + */ + if (allow_oversized) + goto err; + return XLREAD_FULL; + } + /* * If the whole record header is on this page, validate it * immediately. Otherwise do just a basic sanity check on @@ -433,7 +804,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) */ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, prec)) goto err; @@ -446,7 +817,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) { report_invalid_record(state, "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(state->ReadRecPtr), + LSN_FORMAT_ARGS(state->DecodeRecPtr), (uint32) SizeOfXLogRecord, total_len); goto err; } @@ -471,13 +842,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) XLogRecPtr targetPagePtr; uint32 targetRecOff; + Assert(state->decoding); + /* * Wait for the rest of the record on the first page to become * available */ targetPagePtr = - state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); - targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ); + targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ; request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); record_len = request_len - targetRecOff; @@ -496,7 +869,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* validate record header if not yet */ if (!state->record_verified && record_len >= SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, prec)) goto err; @@ -509,15 +882,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* Record does not cross a page boundary */ Assert(state->record_verified); - if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + if (!ValidXLogRecord(state, prec, state->DecodeRecPtr)) goto err; state->record_verified = true; /* to be tidy */ /* We already checked the header earlier */ - state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + state->NextRecPtr = state->DecodeRecPtr + MAXALIGN(record_len); - *record = prec; + record = prec; state->readRecordState = XLREAD_NEXT_RECORD; break; } @@ -536,7 +909,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) report_invalid_record(state, "record length %u at %X/%X too long", total_len, - LSN_FORMAT_ARGS(state->ReadRecPtr)); + LSN_FORMAT_ARGS(state->DecodeRecPtr)); goto err; } @@ -547,7 +920,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) state->recordRemainLen -= record_len; /* Calculate pointer to beginning of next page */ - state->recordContRecPtr = state->ReadRecPtr + record_len; + state->recordContRecPtr = state->DecodeRecPtr + record_len; Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); state->readRecordState = XLREAD_CONTINUATION; @@ -564,6 +937,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * we enter this state only if we haven't read the whole * record. */ + Assert(state->decoding); Assert(state->recordRemainLen > 0); while (state->recordRemainLen > 0) @@ -583,7 +957,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) return XLREAD_NEED_DATA; if (!state->page_verified) - goto err; + goto err_continue; Assert(SizeOfXLogShortPHD <= state->readLen); @@ -596,8 +970,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) "there is no contrecord flag at %X/%X reading %X/%X", (uint32) (state->recordContRecPtr >> 32), (uint32) state->recordContRecPtr, - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr); + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr); goto err; } @@ -614,8 +988,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) pageHeader->xlp_rem_len, (uint32) (state->recordContRecPtr >> 32), (uint32) state->recordContRecPtr, - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr, + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr, state->recordRemainLen); goto err; } @@ -651,7 +1025,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) if (!state->record_verified) { Assert(state->recordGotLen >= SizeOfXLogRecord); - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, (XLogRecord *) state->readRecordBuf)) goto err; @@ -668,16 +1042,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* targetPagePtr is pointing the last-read page here */ prec = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + if (!ValidXLogRecord(state, prec, state->DecodeRecPtr)) goto err; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->EndRecPtr = targetPagePtr + pageHeaderSize + state->NextRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len); - *record = prec; + record = prec; state->readRecordState = XLREAD_NEXT_RECORD; + break; } } @@ -685,32 +1060,65 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* * Special processing if it's an XLOG SWITCH record */ - if ((*record)->xl_rmid == RM_XLOG_ID && - ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if (record->xl_rmid == RM_XLOG_ID && + (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->segcxt.ws_segsize - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); + state->NextRecPtr += state->segcxt.ws_segsize - 1; + state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, *record, errormsg)) - return XLREAD_SUCCESS; + Assert(!record || state->readLen >= 0); + if (DecodeXLogRecord(state, state->decoding, record, state->DecodeRecPtr, &errormsg)) + { + /* Record the location of the next record. */ + state->decoding->next_lsn = state->NextRecPtr; - *record = NULL; - return XLREAD_FAIL; + /* + * If it's in the decode buffer (not an "oversized" record allocated + * with palloc()), mark the decode buffer space as occupied. + */ + if (!state->decoding->oversized) + { + /* The new decode buffer head must be MAXALIGNed. */ + Assert(state->decoding->size == MAXALIGN(state->decoding->size)); + if ((char *) state->decoding == state->decode_buffer) + state->decode_buffer_head = state->decode_buffer + + state->decoding->size; + else + state->decode_buffer_head += state->decoding->size; + } + + /* Insert it into the queue of decoded records. */ + Assert(state->decode_queue_head != state->decoding); + if (state->decode_queue_head) + state->decode_queue_head->next = state->decoding; + state->decode_queue_head = state->decoding; + if (!state->decode_queue_tail) + state->decode_queue_tail = state->decoding; + state->decoding = NULL; + + return XLREAD_SUCCESS; + } err: + if (state->decoding && state->decoding->oversized) + pfree(state->decoding); + state->decoding = NULL; +err_continue: /* * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; + /* + * If an error was written to errmsg_buf, it'll be returned to the caller + * of XLogReadRecord() after all successfully decoded records from the + * read queue. + */ - *record = NULL; return XLREAD_FAIL; } @@ -1342,34 +1750,84 @@ WALRead(XLogReaderState *state, * ---------------------------------------- */ -/* private function to reset the state between records */ +/* + * Private function to reset the state, forgetting all decoded records, if we + * are asked to move to a new read position. + */ static void ResetDecoder(XLogReaderState *state) { - int block_id; - - state->decoded_record = NULL; + DecodedXLogRecord *r; - state->main_data_len = 0; - - for (block_id = 0; block_id <= state->max_block_id; block_id++) + /* Reset the decoded record queue, freeing any oversized records. */ + while ((r = state->decode_queue_tail)) { - state->blocks[block_id].in_use = false; - state->blocks[block_id].has_image = false; - state->blocks[block_id].has_data = false; - state->blocks[block_id].apply_image = false; + state->decode_queue_tail = r->next; + if (r->oversized) + pfree(r); } - state->max_block_id = -1; + state->decode_queue_head = NULL; + state->decode_queue_tail = NULL; + state->record = NULL; + state->decoding = NULL; + + /* Reset the decode buffer to empty. */ + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + + /* Clear error state. */ + state->errormsg_buf[0] = '\0'; + state->errormsg_deferred = false; +} + +/* + * Compute the maximum possible amount of padding that could be required to + * decode a record, given xl_tot_len from the record's header. This is the + * amount of output buffer space that we need to decode a record, though we + * might not finish up using it all. + * + * This computation is pessimistic and assumes the maximum possible number of + * blocks, due to lack of better information. + */ +size_t +DecodeXLogRecordRequiredSpace(size_t xl_tot_len) +{ + size_t size = 0; + + /* Account for the fixed size part of the decoded record struct. */ + size += offsetof(DecodedXLogRecord, blocks[0]); + /* Account for the flexible blocks array of maximum possible size. */ + size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1); + /* Account for all the raw main and block data. */ + size += xl_tot_len; + /* We might insert padding before main_data. */ + size += (MAXIMUM_ALIGNOF - 1); + /* We might insert padding before each block's data. */ + size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1); + /* We might insert padding at the end. */ + size += (MAXIMUM_ALIGNOF - 1); + + return size; } /* - * Decode the previously read record. + * Decode a record. "decoded" must point to a MAXALIGNed memory area that has + * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On + * success, decoded->size contains the actual space occupied by the decoded + * record, which may turn out to be less. + * + * Only decoded->oversized member must be initialized already, and will not be + * modified. Other members will be initialized as required. * * On error, a human-readable error message is returned in *errormsg, and * the return value is false. */ bool -DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, + char **errormsg) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1384,17 +1842,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) } while(0) char *ptr; + char *out; uint32 remaining; uint32 datatotal; RelFileNode *rnode = NULL; uint8 block_id; - ResetDecoder(state); - - state->decoded_record = record; - state->record_origin = InvalidRepOriginId; - state->toplevel_xid = InvalidTransactionId; - + decoded->header = *record; + decoded->lsn = lsn; + decoded->next = NULL; + decoded->record_origin = InvalidRepOriginId; + decoded->toplevel_xid = InvalidTransactionId; + decoded->main_data = NULL; + decoded->main_data_len = 0; + decoded->max_block_id = -1; ptr = (char *) record; ptr += SizeOfXLogRecord; remaining = record->xl_tot_len - SizeOfXLogRecord; @@ -1412,7 +1873,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ @@ -1423,18 +1884,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) uint32 main_data_len; COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ } else if (block_id == XLR_BLOCK_ID_ORIGIN) { - COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); + COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); } else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) { - COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId)); } else if (block_id <= XLR_MAX_BLOCK_ID) { @@ -1442,7 +1903,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) DecodedBkpBlock *blk; uint8 fork_flags; - if (block_id <= state->max_block_id) + /* mark any intervening block IDs as not in use */ + for (int i = decoded->max_block_id + 1; i < block_id; ++i) + decoded->blocks[i].in_use = false; + + if (block_id <= decoded->max_block_id) { report_invalid_record(state, "out-of-order block_id %u at %X/%X", @@ -1450,9 +1915,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; } - state->max_block_id = block_id; + decoded->max_block_id = block_id; - blk = &state->blocks[block_id]; + blk = &decoded->blocks[block_id]; blk->in_use = true; blk->apply_image = false; @@ -1596,17 +2061,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) /* * Ok, we've parsed the fragment headers, and verified that the total * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. + * left. Copy the data of each fragment to contiguous space after the + * blocks array, inserting alignment padding before the data fragments so + * they can be cast to struct pointers by REDO routines. */ + out = ((char *) decoded) + + offsetof(DecodedXLogRecord, blocks) + + sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1); /* block data first */ - for (block_id = 0; block_id <= state->max_block_id; block_id++) + for (block_id = 0; block_id <= decoded->max_block_id; block_id++) { - DecodedBkpBlock *blk = &state->blocks[block_id]; + DecodedBkpBlock *blk = &decoded->blocks[block_id]; if (!blk->in_use) continue; @@ -1615,58 +2081,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) if (blk->has_image) { - blk->bkp_image = ptr; + /* no need to align image */ + blk->bkp_image = out; + memcpy(out, ptr, blk->bimg_len); ptr += blk->bimg_len; + out += blk->bimg_len; } if (blk->has_data) { - if (!blk->data || blk->data_len > blk->data_bufsz) - { - if (blk->data) - pfree(blk->data); - - /* - * Force the initial request to be BLCKSZ so that we don't - * waste time with lots of trips through this stanza as a - * result of WAL compression. - */ - blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ)); - blk->data = palloc(blk->data_bufsz); - } + out = (char *) MAXALIGN(out); + blk->data = out; memcpy(blk->data, ptr, blk->data_len); ptr += blk->data_len; + out += blk->data_len; } } /* and finally, the main data */ - if (state->main_data_len > 0) + if (decoded->main_data_len > 0) { - if (!state->main_data || state->main_data_len > state->main_data_bufsz) - { - if (state->main_data) - pfree(state->main_data); - - /* - * main_data_bufsz must be MAXALIGN'ed. In many xlog record - * types, we omit trailing struct padding on-disk to save a few - * bytes; but compilers may generate accesses to the xlog struct - * that assume that padding bytes are present. If the palloc - * request is not large enough to include such padding bytes then - * we'll get valgrind complaints due to otherwise-harmless fetches - * of the padding bytes. - * - * In addition, force the initial request to be reasonably large - * so that we don't waste time with lots of trips through this - * stanza. BLCKSZ / 2 seems like a good compromise choice. - */ - state->main_data_bufsz = MAXALIGN(Max(state->main_data_len, - BLCKSZ / 2)); - state->main_data = palloc(state->main_data_bufsz); - } - memcpy(state->main_data, ptr, state->main_data_len); - ptr += state->main_data_len; + out = (char *) MAXALIGN(out); + decoded->main_data = out; + memcpy(decoded->main_data, ptr, decoded->main_data_len); + ptr += decoded->main_data_len; + out += decoded->main_data_len; } + /* Report the actual size we used. */ + decoded->size = MAXALIGN(out - (char *) decoded); + Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + decoded->size); + return true; shortdata_err: @@ -1692,10 +2137,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (rnode) *rnode = bkpb->rnode; if (forknum) @@ -1715,10 +2161,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return NULL; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (!bkpb->has_data) { @@ -1746,12 +2193,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) char *ptr; PGAlignedBlock tmp; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - if (!record->blocks[block_id].has_image) + if (!record->record->blocks[block_id].has_image) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; ptr = bkpb->bkp_image; if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED) |