diff options
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 931 |
1 files changed, 565 insertions, 366 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 42738eb940c..02257768ec8 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,11 +36,11 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); @@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogReaderRoutine *routine, void *private_data) + WALSegmentCleanupCB cleanup_cb) { XLogReaderState *state; @@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, return NULL; /* initialize caller-provided support functions */ - state->routine = *routine; + state->cleanup_cb = cleanup_cb; state->max_block_id = -1; @@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - /* system_identifier initialized to zeroes above */ - state->private_data = private_data; - /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ + /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state) { int block_id; - if (state->seg.ws_file != -1) - state->routine.segment_close(state); + if (state->seg.ws_file >= 0) + state->cleanup_cb(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->readRecordState = XLREAD_NEXT_RECORD; } /* @@ -254,303 +253,456 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the page_read callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * This function may return XLREAD_NEED_DATA several times before returning a + * result record. The caller shall read in some new data then call this + * function again with the same parameters. * - * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * When a record is successfully read, returns XLREAD_SUCCESS with result + * record being stored in *record. Otherwise *record is NULL. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the + * current record. In that case, state->readPagePtr and state->reqLen inform + * the desired position and minimum length of data needed. The caller shall + * read in the requested data and set state->readBuf to point to a buffer + * containing it. The caller must also set state->seg->ws_tli and + * state->readLen to indicate the timeline that it was read from, and the + * length of data that is now available (which must be >= given reqLen), + * respectively. + * + * If invalid data is encountered, returns XLREAD_FAIL and sets *record to + * NULL. *errormsg is set to a string with details of the failure. The + * returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. + * + * + * This function runs a state machine consisting of the following states. + * + * XLREAD_NEXT_RECORD: + * The initial state. If called with a valid XLogRecPtr, try to read a + * record at that position. If invalid RecPtr is given try to read a record + * just after the last one read. The next state is XLREAD_TOT_LEN. + * + * XLREAD_TOT_LEN: + * Examining record header. Ends after reading record length. + * recordRemainLen and recordGotLen are initialized. The next state is + * XLREAD_FIRST_FRAGMENT. + * + * XLREAD_FIRST_FRAGMENT: + * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_CONTINUATION if we need more data. + + * XLREAD_CONTINUATION: + * Reading continuation of record. If the whole record is now decoded, goes + * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how + * much is left. + * + * If invalid data is found in any state, the state machine stays at the + * current state. This behavior allows us to continue reading a record + * after switching to a different source, during streaming replication. */ -XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) { - XLogRecPtr RecPtr; - XLogRecord *record; - XLogRecPtr targetPagePtr; - bool randAccess; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; - int readOff; + XLogRecord *prec; - /* - * randAccess indicates whether to verify the previous-record pointer of - * the record we're reading. We only do this if we're reading - * sequentially, which is what we initially assume. - */ - randAccess = false; + *record = NULL; /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; - ResetDecoder(state); + switch (state->readRecordState) + { + case XLREAD_NEXT_RECORD: + ResetDecoder(state); - RecPtr = state->EndRecPtr; + if (state->ReadRecPtr != InvalidXLogRecPtr) + { + /* read the record after the one we just read */ - if (state->ReadRecPtr != InvalidXLogRecPtr) - { - /* read the record after the one we just read */ + /* + * EndRecPtr is pointing to end+1 of the previous WAL record. + * If we're at a page boundary, no more records can fit on the + * current page. We must skip over the page header, but we + * can't do that until we've read in the page, since the + * header size is variable. + */ + state->PrevRecPtr = state->ReadRecPtr; + state->ReadRecPtr = state->EndRecPtr; + } + else + { + /* + * Caller supplied a position to start at. + * + * In this case, EndRecPtr should already be pointing to a + * valid record starting position. + */ + Assert(XRecOffIsValid(state->EndRecPtr)); + state->ReadRecPtr = state->EndRecPtr; - /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that until - * we've read in the page, since the header size is variable. - */ - } - else - { - /* - * Caller supplied a position to start at. - * - * In this case, EndRecPtr should already be pointing to a valid - * record starting position. - */ - Assert(XRecOffIsValid(RecPtr)); - randAccess = true; - } + /* + * We cannot verify the previous-record pointer when we're + * seeking to a particular record. Reset PrevRecPtr so that we + * won't try doing that. + */ + state->PrevRecPtr = InvalidXLogRecPtr; + state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ + } - state->currRecPtr = RecPtr; + state->record_verified = false; + state->readRecordState = XLREAD_TOT_LEN; + /* fall through */ - targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); - targetRecOff = RecPtr % XLOG_BLCKSZ; + case XLREAD_TOT_LEN: + { + uint32 total_len; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; + XLogPageHeader pageHeader; - /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. - */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) - { - /* - * At page start, so skip over page header. - */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* + * Check if we have enough data. For the first record in the + * page, the requesting length doesn't contain page header. + */ + if (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + return XLREAD_NEED_DATA; - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + /* examine page header now. */ + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* At page start, so skip over page header. */ + state->ReadRecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + LSN_FORMAT_ARGS(state->ReadRecPtr)); + goto err; + } - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + pageHeader = (XLogPageHeader) state->readBuf; + if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) - { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); - goto err; - } - gotheader = false; - } + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; - char *buffer; - uint32 gotlen; + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the + * whole record header might not fit on this page. xl_tot_len + * is the first field of the struct, so it must be on this + * page (the records are MAXALIGNed), but we cannot access any + * other fields until we've verified that we got the whole + * header. + */ + prec = (XLogRecord *) (state->readBuf + + state->ReadRecPtr % XLOG_BLCKSZ); + total_len = prec->xl_tot_len; - /* - * Enlarge readRecordBuf as needed. - */ - if (total_len > state->readRecordBufSize && - !allocate_recordbuf(state, total_len)) - { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", - total_len, LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* + * If the whole record header is on this page, validate it + * immediately. Otherwise do just a basic sanity check on + * xl_tot_len, and validate the rest of the header after + * reading it from the next page. The xl_tot_len check is + * necessary here to ensure that we enter the + * XLREAD_CONTINUATION state below; otherwise we might fail to + * apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; + state->record_verified = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, + "invalid record length at %X/%X: wanted %u, got %u", + LSN_FORMAT_ARGS(state->ReadRecPtr), + (uint32) SizeOfXLogRecord, total_len); + goto err; + } + } - do - { - /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; + /* + * Wait for the rest of the record, or the part of the record + * that fit on the first page if crossed a page boundary, to + * become available. + */ + state->recordGotLen = 0; + state->recordRemainLen = total_len; + state->readRecordState = XLREAD_FIRST_FRAGMENT; + } + /* fall through */ - /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + case XLREAD_FIRST_FRAGMENT: + { + uint32 total_len = state->recordRemainLen; + uint32 request_len; + uint32 record_len; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; - if (readOff < 0) - goto err; + /* + * Wait for the rest of the record on the first page to become + * available + */ + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - Assert(SizeOfXLogShortPHD <= readOff); + request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); + record_len = request_len - targetRecOff; - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) state->readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) - { - report_invalid_record(state, - "there is no contrecord flag at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* ReadRecPtr contains page header */ + Assert(targetRecOff != 0); + if (XLogNeedData(state, targetPagePtr, request_len, true)) + return XLREAD_NEED_DATA; - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) - { - report_invalid_record(state, - "invalid contrecord length %u (expected %lld) at %X/%X", - pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); + prec = (XLogRecord *) (state->readBuf + targetRecOff); - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); + /* validate record header if not yet */ + if (!state->record_verified && record_len >= SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; - Assert(pageHeaderSize <= readOff); + state->record_verified = true; + } - contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); + if (total_len == record_len) + { + /* Record does not cross a page boundary */ + Assert(state->record_verified); - memcpy(buffer, (char *) contdata, len); - buffer += len; - gotlen += len; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, - record, randAccess)) + state->record_verified = true; /* to be tidy */ + + /* We already checked the header earlier */ + state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } + + /* + * The record continues on the next page. Need to reassemble + * record + */ + Assert(total_len > record_len); + + /* Enlarge readRecordBuf as needed. */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) + { + /* We treat this as a "bogus data" condition */ + report_invalid_record(state, + "record length %u at %X/%X too long", + total_len, + LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; - gotheader = true; + } + + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, state->readBuf + targetRecOff, + record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* Calculate pointer to beginning of next page */ + state->recordContRecPtr = state->ReadRecPtr + record_len; + Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); + + state->readRecordState = XLREAD_CONTINUATION; } - } while (gotlen < total_len); + /* fall through */ - Assert(gotheader); + case XLREAD_CONTINUATION: + { + XLogPageHeader pageHeader = NULL; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr = InvalidXLogRecPtr; - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* + * we enter this state only if we haven't read the whole + * record. + */ + Assert(state->recordRemainLen > 0); - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(pageHeader->xlp_rem_len); - } - else - { - /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; + while (state->recordRemainLen > 0) + { + char *contdata; + uint32 request_len PG_USED_FOR_ASSERTS_ONLY; + uint32 record_len; + + /* Wait for the next page to become available */ + targetPagePtr = state->recordContRecPtr; + + /* this request contains page header */ + Assert(targetPagePtr != 0); + if (XLogNeedData(state, targetPagePtr, + Min(state->recordRemainLen, XLOG_BLCKSZ), + false)) + return XLREAD_NEED_DATA; + + if (!state->page_verified) + goto err; + + Assert(SizeOfXLogShortPHD <= state->readLen); + + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) state->readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + { + report_invalid_record( + state, + "there is no contrecord flag at %X/%X reading %X/%X", + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* + * Cross-check that xlp_rem_len agrees with how much of + * the record we expect there to be left. + */ + if (pageHeader->xlp_rem_len == 0 || + pageHeader->xlp_rem_len != state->recordRemainLen) + { + report_invalid_record( + state, + "invalid contrecord length %u at %X/%X reading %X/%X, expected %u", + pageHeader->xlp_rem_len, + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + state->recordRemainLen); + goto err; + } + + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); + + /* + * XLogNeedData should have ensured that the whole page + * header was read + */ + Assert(pageHeaderSize <= state->readLen); + + contdata = (char *) state->readBuf + pageHeaderSize; + record_len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < record_len) + record_len = pageHeader->xlp_rem_len; + + request_len = record_len + pageHeaderSize; + + /* + * XLogNeedData should have ensured all needed data was + * read + */ + Assert(request_len <= state->readLen); + + memcpy(state->readRecordBuf + state->recordGotLen, + (char *) contdata, record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* If we just reassembled the record header, validate it. */ + if (!state->record_verified) + { + Assert(state->recordGotLen >= SizeOfXLogRecord); + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, + (XLogRecord *) state->readRecordBuf)) + goto err; + + state->record_verified = true; + } + + /* + * Calculate pointer to beginning of next page, and + * continue + */ + state->recordContRecPtr += XLOG_BLCKSZ; + } - /* Record does not cross a page boundary */ - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* targetPagePtr is pointing the last-read page here */ + prec = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); - state->ReadRecPtr = RecPtr; + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } } /* * Special processing if it's an XLOG SWITCH record */ - if (record->xl_rmid == RM_XLOG_ID && - (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if ((*record)->xl_rmid == RM_XLOG_ID && + ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->EndRecPtr += state->segcxt.ws_segsize - 1; state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; - else - return NULL; + if (DecodeXLogRecord(state, *record, errormsg)) + return XLREAD_SUCCESS; + + *record = NULL; + return XLREAD_FAIL; err: /* - * Invalidate the read state. We might read from a different source after + * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); @@ -558,113 +710,141 @@ err: if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; - return NULL; + *record = NULL; + return XLREAD_FAIL; } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the page_read() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. header_inclusive indicates that + * reqLen is calculated including page header length. + * + * Returns false if the buffer already contains the requested data, or found + * error. state->page_verified is set to true for the former and false for the + * latter. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall call this function + * again after filling the buffer at least with that portion of data and set + * state->readLen to the length of actually loaded data. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * If header_inclusive is false, corrects reqLen internally by adding the + * actual page header length and may request caller for new data. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool header_inclusive) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; + uint32 addLen = 0; - Assert((pageptr % XLOG_BLCKSZ) == 0); + /* Some data is loaded, but page header is not verified yet. */ + if (!state->page_verified && + !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0) + { + uint32 pageHeaderSize; - XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); - targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + /* just loaded new data so needs to verify page header */ - /* check whether we have all the requested data already */ - if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->segoff && reqLen <= state->readLen) - return state->readLen; + /* The caller must have loaded at least page header */ + Assert(state->readLen >= SizeOfXLogShortPHD); - /* - * Data is not in our buffer. - * - * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the page_read callback might - * now be rereading data from a different source. - * - * Whenever switching to a new WAL segment, we read the first page of the - * file and validate its header, even if that's not where the target - * record is. This is so that we can check the additional identification - * info that is present in the first page's "long" header. - */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) - { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + /* + * We have enough data to check the header length. Recheck the loaded + * length against the actual header length. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* Request more data if we don't have the full header. */ + if (state->readLen < pageHeaderSize) + { + state->reqLen = pageHeaderSize; + return true; + } - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); + /* Now that we know we have the full header, validate it. */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* That's bad. Force reading the page again. */ + XLogReaderInvalReadState(state); - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + return false; + } + + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->seg.ws_segno, + state->segcxt.ws_segsize); } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * The loaded page may not be the one caller is supposing to read when we + * are verifying the first page of new segment. In that case, skip further + * verification and immediately load the target page. */ - readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + if (state->page_verified && pageptr == state->readPagePtr) + { + /* + * calculate additional length for page header keeping the total + * length within the block size. + */ + if (!header_inclusive) + { + uint32 pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); - Assert(readLen <= XLOG_BLCKSZ); + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; + } - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; + /* Return if we already have it. */ + if (reqLen + addLen <= state->readLen) + return false; + } - Assert(readLen >= reqLen); + /* Data is not in our buffer, request the caller for it. */ + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + Assert((pageptr % XLOG_BLCKSZ) == 0); - hdr = (XLogPageHeader) state->readBuf; + /* + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the + * next invocation as the caller might now be rereading data from a + * different source. + */ + state->page_verified = false; - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) + /* + * Whenever switching to a new WAL segment, we read the first page of the + * file and validate its header, even if that's not where the target + * record is. This is so that we can check the additional identification + * info that is present in the first page's "long" header. Don't do this + * if the caller requested the first page in the segment. + */ + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { - readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* + * Then we'll see that the targetSegNo now matches the ws_segno, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->reqLen = XLOG_BLCKSZ; + return true; } /* - * Now that we know we have the full header, validate it. + * Request the caller to load the page. We need at least a short page + * header so that we can validate it. */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->seg.ws_segno = targetSegNo; - state->segoff = targetPageOff; - state->readLen = readLen; - - return readLen; - -err: - XLogReaderInvalReadState(state); - return -1; + state->readPagePtr = pageptr; + state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* @@ -673,9 +853,7 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->seg.ws_segno = 0; - state->segoff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state) * * This is just a convenience subroutine to avoid duplicated code in * XLogReadRecord. It's not intended for use from anywhere else. + * + * If PrevRecPtr is valid, the xl_prev is is cross-checked with it. */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, - bool randAccess) + XLogRecPtr PrevRecPtr, XLogRecord *record) { if (record->xl_tot_len < SizeOfXLogRecord) { @@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; } - if (randAccess) + if (PrevRecPtr == InvalidXLogRecPtr) { /* * We can't exactly verify the prev-link, but surely it should be less @@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * here. */ +XLogFindNextRecordState * +InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr) +{ + XLogFindNextRecordState *state = (XLogFindNextRecordState *) + palloc_extended(sizeof(XLogFindNextRecordState), + MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); + if (!state) + return NULL; + + state->reader_state = reader_state; + state->targetRecPtr = start_ptr; + state->currRecPtr = start_ptr; + + return state; +} + /* * Find the first record with an lsn >= RecPtr. * @@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * This positions the reader, like XLogBeginRead(), so that the next call to * XLogReadRecord() will read the next valid record. */ -XLogRecPtr -XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +bool +XLogFindNextRecord(XLogFindNextRecordState *state) { - XLogRecPtr tmpRecPtr; - XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; + XLogRecord *record; + XLogReadRecordResult result; char *errormsg; - Assert(!XLogRecPtrIsInvalid(RecPtr)); + Assert(!XLogRecPtrIsInvalid(state->currRecPtr)); /* * skip over potential continuation data, keeping in mind that it may span * multiple pages */ - tmpRecPtr = RecPtr; while (true) { XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at - * least short page-header worth of data + * XLogNeedData() is prepared to handle that and will read at least + * short page-header worth of data */ - targetRecOff = tmpRecPtr % XLOG_BLCKSZ; + targetRecOff = state->currRecPtr % XLOG_BLCKSZ; /* scroll back to page boundary */ - targetPagePtr = tmpRecPtr - targetRecOff; + targetPagePtr = state->currRecPtr - targetRecOff; - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff, + targetRecOff != 0)) + return true; + + if (!state->reader_state->page_verified) goto err; - header = (XLogPageHeader) state->readBuf; + header = (XLogPageHeader) state->reader_state->readBuf; pageHeaderSize = XLogPageHeaderSize(header); - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; + /* we should have read the page header */ + Assert(state->reader_state->readLen >= pageHeaderSize); /* skip over potential continuation data */ if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) @@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * Note that record headers are MAXALIGN'ed */ if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) - tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; + state->currRecPtr = targetPagePtr + XLOG_BLCKSZ; else { /* * The previous continuation record ends in this page. Set - * tmpRecPtr to point to the first valid record + * state->currRecPtr to point to the first valid record */ - tmpRecPtr = targetPagePtr + pageHeaderSize + state->currRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(header->xlp_rem_len); break; } } else { - tmpRecPtr = targetPagePtr + pageHeaderSize; + state->currRecPtr = targetPagePtr + pageHeaderSize; break; } } @@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + XLogBeginRead(state->reader_state, state->currRecPtr); + while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) != + XLREAD_FAIL) { + if (result == XLREAD_NEED_DATA) + return true; + /* past the record we've found, break out */ - if (RecPtr <= state->ReadRecPtr) + if (state->targetRecPtr <= state->reader_state->ReadRecPtr) { /* Rewind the reader to the beginning of the last record. */ - found = state->ReadRecPtr; - XLogBeginRead(state, found); - return found; + state->currRecPtr = state->reader_state->ReadRecPtr; + XLogBeginRead(state->reader_state, state->currRecPtr); + return false; } } err: - XLogReaderInvalReadState(state); + XLogReaderInvalReadState(state->reader_state); - return InvalidXLogRecPtr; + state->currRecPtr = InvalidXLogRecPtr;; + return false; } #endif /* FRONTEND */ /* - * Helper function to ease writing of XLogRoutine->page_read callbacks. - * If this function is used, caller must supply a segment_open callback in - * 'state', as that is used here. + * Helper function to ease writing of routines that read raw WAL data. + * If this function is used, caller must supply a segment_open callback and + * segment_close callback as that is used here. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. @@ -1057,6 +1255,7 @@ err: */ bool WALRead(XLogReaderState *state, + WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo) { @@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state, XLogSegNo nextSegNo; if (state->seg.ws_file >= 0) - state->routine.segment_close(state); + segclosefn(state); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); - state->routine.segment_open(state, nextSegNo, &tli); + segopenfn(state, nextSegNo, &tli); /* This shouldn't happen -- indicates a bug in segment_open */ Assert(state->seg.ws_file >= 0); |