aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
authorThomas Munro <tmunro@postgresql.org>2021-04-08 23:03:23 +1200
committerThomas Munro <tmunro@postgresql.org>2021-04-08 23:20:42 +1200
commit323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b (patch)
tree5290af3834511b9bd1773841b1068e485ba52fe6 /src/backend/access/transam/xlogreader.c
parent5ac9c4307337313bedeafc21dbbab93ba809241c (diff)
downloadpostgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.tar.gz
postgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.zip
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a callback function. Redesign the interface so that it tells the caller to insert more data with a special return value instead. This API suits later patches for prefetching, encryption and maybe other future projects that would otherwise require continually extending the callback interface. As incidental cleanup work, move global variables readOff, readLen and readSegNo inside XlogReaderState. Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version) Reviewed-by: Antonin Houska <ah@cybertec.at> Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com> Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c931
1 files changed, 565 insertions, 366 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 42738eb940c..02257768ec8 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -36,11 +36,11 @@
static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
pg_attribute_printf(2, 3);
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
-static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
- int reqLen);
+static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
+ int reqLen, bool header_inclusive);
static void XLogReaderInvalReadState(XLogReaderState *state);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+ XLogRecPtr PrevRecPtr, XLogRecord *record);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr);
static void ResetDecoder(XLogReaderState *state);
@@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- XLogReaderRoutine *routine, void *private_data)
+ WALSegmentCleanupCB cleanup_cb)
{
XLogReaderState *state;
@@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
return NULL;
/* initialize caller-provided support functions */
- state->routine = *routine;
+ state->cleanup_cb = cleanup_cb;
state->max_block_id = -1;
@@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- /* system_identifier initialized to zeroes above */
- state->private_data = private_data;
- /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
+ /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */
state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
MCXT_ALLOC_NO_OOM);
if (!state->errormsg_buf)
@@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state)
{
int block_id;
- if (state->seg.ws_file != -1)
- state->routine.segment_close(state);
+ if (state->seg.ws_file >= 0)
+ state->cleanup_cb(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{
@@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
+ state->readRecordState = XLREAD_NEXT_RECORD;
}
/*
@@ -254,303 +253,456 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
- * If the page_read callback fails to read the requested data, NULL is
- * returned. The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * This function may return XLREAD_NEED_DATA several times before returning a
+ * result record. The caller shall read in some new data then call this
+ * function again with the same parameters.
*
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * When a record is successfully read, returns XLREAD_SUCCESS with result
+ * record being stored in *record. Otherwise *record is NULL.
*
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
+ * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the
+ * current record. In that case, state->readPagePtr and state->reqLen inform
+ * the desired position and minimum length of data needed. The caller shall
+ * read in the requested data and set state->readBuf to point to a buffer
+ * containing it. The caller must also set state->seg->ws_tli and
+ * state->readLen to indicate the timeline that it was read from, and the
+ * length of data that is now available (which must be >= given reqLen),
+ * respectively.
+ *
+ * If invalid data is encountered, returns XLREAD_FAIL and sets *record to
+ * NULL. *errormsg is set to a string with details of the failure. The
+ * returned pointer (or *errormsg) points to an internal buffer that's valid
+ * until the next call to XLogReadRecord.
+ *
+ *
+ * This function runs a state machine consisting of the following states.
+ *
+ * XLREAD_NEXT_RECORD:
+ * The initial state. If called with a valid XLogRecPtr, try to read a
+ * record at that position. If invalid RecPtr is given try to read a record
+ * just after the last one read. The next state is XLREAD_TOT_LEN.
+ *
+ * XLREAD_TOT_LEN:
+ * Examining record header. Ends after reading record length.
+ * recordRemainLen and recordGotLen are initialized. The next state is
+ * XLREAD_FIRST_FRAGMENT.
+ *
+ * XLREAD_FIRST_FRAGMENT:
+ * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or
+ * XLREAD_CONTINUATION if we need more data.
+
+ * XLREAD_CONTINUATION:
+ * Reading continuation of record. If the whole record is now decoded, goes
+ * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how
+ * much is left.
+ *
+ * If invalid data is found in any state, the state machine stays at the
+ * current state. This behavior allows us to continue reading a record
+ * after switching to a different source, during streaming replication.
*/
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, char **errormsg)
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
{
- XLogRecPtr RecPtr;
- XLogRecord *record;
- XLogRecPtr targetPagePtr;
- bool randAccess;
- uint32 len,
- total_len;
- uint32 targetRecOff;
- uint32 pageHeaderSize;
- bool gotheader;
- int readOff;
+ XLogRecord *prec;
- /*
- * randAccess indicates whether to verify the previous-record pointer of
- * the record we're reading. We only do this if we're reading
- * sequentially, which is what we initially assume.
- */
- randAccess = false;
+ *record = NULL;
/* reset error state */
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
- ResetDecoder(state);
+ switch (state->readRecordState)
+ {
+ case XLREAD_NEXT_RECORD:
+ ResetDecoder(state);
- RecPtr = state->EndRecPtr;
+ if (state->ReadRecPtr != InvalidXLogRecPtr)
+ {
+ /* read the record after the one we just read */
- if (state->ReadRecPtr != InvalidXLogRecPtr)
- {
- /* read the record after the one we just read */
+ /*
+ * EndRecPtr is pointing to end+1 of the previous WAL record.
+ * If we're at a page boundary, no more records can fit on the
+ * current page. We must skip over the page header, but we
+ * can't do that until we've read in the page, since the
+ * header size is variable.
+ */
+ state->PrevRecPtr = state->ReadRecPtr;
+ state->ReadRecPtr = state->EndRecPtr;
+ }
+ else
+ {
+ /*
+ * Caller supplied a position to start at.
+ *
+ * In this case, EndRecPtr should already be pointing to a
+ * valid record starting position.
+ */
+ Assert(XRecOffIsValid(state->EndRecPtr));
+ state->ReadRecPtr = state->EndRecPtr;
- /*
- * EndRecPtr is pointing to end+1 of the previous WAL record. If
- * we're at a page boundary, no more records can fit on the current
- * page. We must skip over the page header, but we can't do that until
- * we've read in the page, since the header size is variable.
- */
- }
- else
- {
- /*
- * Caller supplied a position to start at.
- *
- * In this case, EndRecPtr should already be pointing to a valid
- * record starting position.
- */
- Assert(XRecOffIsValid(RecPtr));
- randAccess = true;
- }
+ /*
+ * We cannot verify the previous-record pointer when we're
+ * seeking to a particular record. Reset PrevRecPtr so that we
+ * won't try doing that.
+ */
+ state->PrevRecPtr = InvalidXLogRecPtr;
+ state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */
+ }
- state->currRecPtr = RecPtr;
+ state->record_verified = false;
+ state->readRecordState = XLREAD_TOT_LEN;
+ /* fall through */
- targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
- targetRecOff = RecPtr % XLOG_BLCKSZ;
+ case XLREAD_TOT_LEN:
+ {
+ uint32 total_len;
+ uint32 pageHeaderSize;
+ XLogRecPtr targetPagePtr;
+ uint32 targetRecOff;
+ XLogPageHeader pageHeader;
- /*
- * Read the page containing the record into state->readBuf. Request enough
- * byte to cover the whole record header, or at least the part of it that
- * fits on the same page.
- */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
- if (readOff < 0)
- goto err;
+ targetPagePtr =
+ state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+ targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
- /*
- * ReadPageInternal always returns at least the page header, so we can
- * examine it now.
- */
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- if (targetRecOff == 0)
- {
- /*
- * At page start, so skip over page header.
- */
- RecPtr += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
- {
- report_invalid_record(state, "invalid record offset at %X/%X",
- LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /*
+ * Check if we have enough data. For the first record in the
+ * page, the requesting length doesn't contain page header.
+ */
+ if (XLogNeedData(state, targetPagePtr,
+ Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
+ targetRecOff != 0))
+ return XLREAD_NEED_DATA;
- if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- report_invalid_record(state, "contrecord is requested by %X/%X",
- LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /* error out if caller supplied bogus page */
+ if (!state->page_verified)
+ goto err;
- /* ReadPageInternal has verified the page header */
- Assert(pageHeaderSize <= readOff);
+ /* examine page header now. */
+ pageHeaderSize =
+ XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ if (targetRecOff == 0)
+ {
+ /* At page start, so skip over page header. */
+ state->ReadRecPtr += pageHeaderSize;
+ targetRecOff = pageHeaderSize;
+ }
+ else if (targetRecOff < pageHeaderSize)
+ {
+ report_invalid_record(state, "invalid record offset at %X/%X",
+ LSN_FORMAT_ARGS(state->ReadRecPtr));
+ goto err;
+ }
- /*
- * Read the record length.
- *
- * NB: Even though we use an XLogRecord pointer here, the whole record
- * header might not fit on this page. xl_tot_len is the first field of the
- * struct, so it must be on this page (the records are MAXALIGNed), but we
- * cannot access any other fields until we've verified that we got the
- * whole header.
- */
- record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
- total_len = record->xl_tot_len;
+ pageHeader = (XLogPageHeader) state->readBuf;
+ if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+ targetRecOff == pageHeaderSize)
+ {
+ report_invalid_record(state, "contrecord is requested by %X/%X",
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
- /*
- * If the whole record header is on this page, validate it immediately.
- * Otherwise do just a basic sanity check on xl_tot_len, and validate the
- * rest of the header after reading it from the next page. The xl_tot_len
- * check is necessary here to ensure that we enter the "Need to reassemble
- * record" code path below; otherwise we might fail to apply
- * ValidXLogRecordHeader at all.
- */
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
- randAccess))
- goto err;
- gotheader = true;
- }
- else
- {
- /* XXX: more validation should be done here */
- if (total_len < SizeOfXLogRecord)
- {
- report_invalid_record(state,
- "invalid record length at %X/%X: wanted %u, got %u",
- LSN_FORMAT_ARGS(RecPtr),
- (uint32) SizeOfXLogRecord, total_len);
- goto err;
- }
- gotheader = false;
- }
+ /* XLogNeedData has verified the page header */
+ Assert(pageHeaderSize <= state->readLen);
- len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
- if (total_len > len)
- {
- /* Need to reassemble record */
- char *contdata;
- XLogPageHeader pageHeader;
- char *buffer;
- uint32 gotlen;
+ /*
+ * Read the record length.
+ *
+ * NB: Even though we use an XLogRecord pointer here, the
+ * whole record header might not fit on this page. xl_tot_len
+ * is the first field of the struct, so it must be on this
+ * page (the records are MAXALIGNed), but we cannot access any
+ * other fields until we've verified that we got the whole
+ * header.
+ */
+ prec = (XLogRecord *) (state->readBuf +
+ state->ReadRecPtr % XLOG_BLCKSZ);
+ total_len = prec->xl_tot_len;
- /*
- * Enlarge readRecordBuf as needed.
- */
- if (total_len > state->readRecordBufSize &&
- !allocate_recordbuf(state, total_len))
- {
- /* We treat this as a "bogus data" condition */
- report_invalid_record(state, "record length %u at %X/%X too long",
- total_len, LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /*
+ * If the whole record header is on this page, validate it
+ * immediately. Otherwise do just a basic sanity check on
+ * xl_tot_len, and validate the rest of the header after
+ * reading it from the next page. The xl_tot_len check is
+ * necessary here to ensure that we enter the
+ * XLREAD_CONTINUATION state below; otherwise we might fail to
+ * apply ValidXLogRecordHeader at all.
+ */
+ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ {
+ if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ state->PrevRecPtr, prec))
+ goto err;
- /* Copy the first fragment of the record from the first page. */
- memcpy(state->readRecordBuf,
- state->readBuf + RecPtr % XLOG_BLCKSZ, len);
- buffer = state->readRecordBuf + len;
- gotlen = len;
+ state->record_verified = true;
+ }
+ else
+ {
+ /* XXX: more validation should be done here */
+ if (total_len < SizeOfXLogRecord)
+ {
+ report_invalid_record(state,
+ "invalid record length at %X/%X: wanted %u, got %u",
+ LSN_FORMAT_ARGS(state->ReadRecPtr),
+ (uint32) SizeOfXLogRecord, total_len);
+ goto err;
+ }
+ }
- do
- {
- /* Calculate pointer to beginning of next page */
- targetPagePtr += XLOG_BLCKSZ;
+ /*
+ * Wait for the rest of the record, or the part of the record
+ * that fit on the first page if crossed a page boundary, to
+ * become available.
+ */
+ state->recordGotLen = 0;
+ state->recordRemainLen = total_len;
+ state->readRecordState = XLREAD_FIRST_FRAGMENT;
+ }
+ /* fall through */
- /* Wait for the next page to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(total_len - gotlen + SizeOfXLogShortPHD,
- XLOG_BLCKSZ));
+ case XLREAD_FIRST_FRAGMENT:
+ {
+ uint32 total_len = state->recordRemainLen;
+ uint32 request_len;
+ uint32 record_len;
+ XLogRecPtr targetPagePtr;
+ uint32 targetRecOff;
- if (readOff < 0)
- goto err;
+ /*
+ * Wait for the rest of the record on the first page to become
+ * available
+ */
+ targetPagePtr =
+ state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+ targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
- Assert(SizeOfXLogShortPHD <= readOff);
+ request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
+ record_len = request_len - targetRecOff;
- /* Check that the continuation on next page looks valid */
- pageHeader = (XLogPageHeader) state->readBuf;
- if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
- {
- report_invalid_record(state,
- "there is no contrecord flag at %X/%X",
- LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /* ReadRecPtr contains page header */
+ Assert(targetRecOff != 0);
+ if (XLogNeedData(state, targetPagePtr, request_len, true))
+ return XLREAD_NEED_DATA;
- /*
- * Cross-check that xlp_rem_len agrees with how much of the record
- * we expect there to be left.
- */
- if (pageHeader->xlp_rem_len == 0 ||
- total_len != (pageHeader->xlp_rem_len + gotlen))
- {
- report_invalid_record(state,
- "invalid contrecord length %u (expected %lld) at %X/%X",
- pageHeader->xlp_rem_len,
- ((long long) total_len) - gotlen,
- LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /* error out if caller supplied bogus page */
+ if (!state->page_verified)
+ goto err;
- /* Append the continuation from this page to the buffer */
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
+ prec = (XLogRecord *) (state->readBuf + targetRecOff);
- if (readOff < pageHeaderSize)
- readOff = ReadPageInternal(state, targetPagePtr,
- pageHeaderSize);
+ /* validate record header if not yet */
+ if (!state->record_verified && record_len >= SizeOfXLogRecord)
+ {
+ if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ state->PrevRecPtr, prec))
+ goto err;
- Assert(pageHeaderSize <= readOff);
+ state->record_verified = true;
+ }
- contdata = (char *) state->readBuf + pageHeaderSize;
- len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len < len)
- len = pageHeader->xlp_rem_len;
- if (readOff < pageHeaderSize + len)
- readOff = ReadPageInternal(state, targetPagePtr,
- pageHeaderSize + len);
+ if (total_len == record_len)
+ {
+ /* Record does not cross a page boundary */
+ Assert(state->record_verified);
- memcpy(buffer, (char *) contdata, len);
- buffer += len;
- gotlen += len;
+ if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+ goto err;
- /* If we just reassembled the record header, validate it. */
- if (!gotheader)
- {
- record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
- record, randAccess))
+ state->record_verified = true; /* to be tidy */
+
+ /* We already checked the header earlier */
+ state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len);
+
+ *record = prec;
+ state->readRecordState = XLREAD_NEXT_RECORD;
+ break;
+ }
+
+ /*
+ * The record continues on the next page. Need to reassemble
+ * record
+ */
+ Assert(total_len > record_len);
+
+ /* Enlarge readRecordBuf as needed. */
+ if (total_len > state->readRecordBufSize &&
+ !allocate_recordbuf(state, total_len))
+ {
+ /* We treat this as a "bogus data" condition */
+ report_invalid_record(state,
+ "record length %u at %X/%X too long",
+ total_len,
+ LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err;
- gotheader = true;
+ }
+
+ /* Copy the first fragment of the record from the first page. */
+ memcpy(state->readRecordBuf, state->readBuf + targetRecOff,
+ record_len);
+ state->recordGotLen += record_len;
+ state->recordRemainLen -= record_len;
+
+ /* Calculate pointer to beginning of next page */
+ state->recordContRecPtr = state->ReadRecPtr + record_len;
+ Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
+
+ state->readRecordState = XLREAD_CONTINUATION;
}
- } while (gotlen < total_len);
+ /* fall through */
- Assert(gotheader);
+ case XLREAD_CONTINUATION:
+ {
+ XLogPageHeader pageHeader = NULL;
+ uint32 pageHeaderSize;
+ XLogRecPtr targetPagePtr = InvalidXLogRecPtr;
- record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecord(state, record, RecPtr))
- goto err;
+ /*
+ * we enter this state only if we haven't read the whole
+ * record.
+ */
+ Assert(state->recordRemainLen > 0);
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->ReadRecPtr = RecPtr;
- state->EndRecPtr = targetPagePtr + pageHeaderSize
- + MAXALIGN(pageHeader->xlp_rem_len);
- }
- else
- {
- /* Wait for the record data to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(targetRecOff + total_len, XLOG_BLCKSZ));
- if (readOff < 0)
- goto err;
+ while (state->recordRemainLen > 0)
+ {
+ char *contdata;
+ uint32 request_len PG_USED_FOR_ASSERTS_ONLY;
+ uint32 record_len;
+
+ /* Wait for the next page to become available */
+ targetPagePtr = state->recordContRecPtr;
+
+ /* this request contains page header */
+ Assert(targetPagePtr != 0);
+ if (XLogNeedData(state, targetPagePtr,
+ Min(state->recordRemainLen, XLOG_BLCKSZ),
+ false))
+ return XLREAD_NEED_DATA;
+
+ if (!state->page_verified)
+ goto err;
+
+ Assert(SizeOfXLogShortPHD <= state->readLen);
+
+ /* Check that the continuation on next page looks valid */
+ pageHeader = (XLogPageHeader) state->readBuf;
+ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+ {
+ report_invalid_record(
+ state,
+ "there is no contrecord flag at %X/%X reading %X/%X",
+ (uint32) (state->recordContRecPtr >> 32),
+ (uint32) state->recordContRecPtr,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+
+ /*
+ * Cross-check that xlp_rem_len agrees with how much of
+ * the record we expect there to be left.
+ */
+ if (pageHeader->xlp_rem_len == 0 ||
+ pageHeader->xlp_rem_len != state->recordRemainLen)
+ {
+ report_invalid_record(
+ state,
+ "invalid contrecord length %u at %X/%X reading %X/%X, expected %u",
+ pageHeader->xlp_rem_len,
+ (uint32) (state->recordContRecPtr >> 32),
+ (uint32) state->recordContRecPtr,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr,
+ state->recordRemainLen);
+ goto err;
+ }
+
+ /* Append the continuation from this page to the buffer */
+ pageHeaderSize = XLogPageHeaderSize(pageHeader);
+
+ /*
+ * XLogNeedData should have ensured that the whole page
+ * header was read
+ */
+ Assert(pageHeaderSize <= state->readLen);
+
+ contdata = (char *) state->readBuf + pageHeaderSize;
+ record_len = XLOG_BLCKSZ - pageHeaderSize;
+ if (pageHeader->xlp_rem_len < record_len)
+ record_len = pageHeader->xlp_rem_len;
+
+ request_len = record_len + pageHeaderSize;
+
+ /*
+ * XLogNeedData should have ensured all needed data was
+ * read
+ */
+ Assert(request_len <= state->readLen);
+
+ memcpy(state->readRecordBuf + state->recordGotLen,
+ (char *) contdata, record_len);
+ state->recordGotLen += record_len;
+ state->recordRemainLen -= record_len;
+
+ /* If we just reassembled the record header, validate it. */
+ if (!state->record_verified)
+ {
+ Assert(state->recordGotLen >= SizeOfXLogRecord);
+ if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ state->PrevRecPtr,
+ (XLogRecord *) state->readRecordBuf))
+ goto err;
+
+ state->record_verified = true;
+ }
+
+ /*
+ * Calculate pointer to beginning of next page, and
+ * continue
+ */
+ state->recordContRecPtr += XLOG_BLCKSZ;
+ }
- /* Record does not cross a page boundary */
- if (!ValidXLogRecord(state, record, RecPtr))
- goto err;
+ /* targetPagePtr is pointing the last-read page here */
+ prec = (XLogRecord *) state->readRecordBuf;
+ if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+ goto err;
- state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+ pageHeaderSize =
+ XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ state->EndRecPtr = targetPagePtr + pageHeaderSize
+ + MAXALIGN(pageHeader->xlp_rem_len);
- state->ReadRecPtr = RecPtr;
+ *record = prec;
+ state->readRecordState = XLREAD_NEXT_RECORD;
+ break;
+ }
}
/*
* Special processing if it's an XLOG SWITCH record
*/
- if (record->xl_rmid == RM_XLOG_ID &&
- (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+ if ((*record)->xl_rmid == RM_XLOG_ID &&
+ ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
state->EndRecPtr += state->segcxt.ws_segsize - 1;
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
}
- if (DecodeXLogRecord(state, record, errormsg))
- return record;
- else
- return NULL;
+ if (DecodeXLogRecord(state, *record, errormsg))
+ return XLREAD_SUCCESS;
+
+ *record = NULL;
+ return XLREAD_FAIL;
err:
/*
- * Invalidate the read state. We might read from a different source after
+ * Invalidate the read page. We might read from a different source after
* failure.
*/
XLogReaderInvalReadState(state);
@@ -558,113 +710,141 @@ err:
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
- return NULL;
+ *record = NULL;
+ return XLREAD_FAIL;
}
/*
- * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the page_read() callback.
+ * Checks that an xlog page loaded in state->readBuf is including at least
+ * [pageptr, reqLen] and the page is valid. header_inclusive indicates that
+ * reqLen is calculated including page header length.
+ *
+ * Returns false if the buffer already contains the requested data, or found
+ * error. state->page_verified is set to true for the former and false for the
+ * latter.
*
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Otherwise returns true and requests data loaded onto state->readBuf by
+ * state->readPagePtr and state->readLen. The caller shall call this function
+ * again after filling the buffer at least with that portion of data and set
+ * state->readLen to the length of actually loaded data.
*
- * We fetch the page from a reader-local cache if we know we have the required
- * data and if there hasn't been any error since caching the data.
+ * If header_inclusive is false, corrects reqLen internally by adding the
+ * actual page header length and may request caller for new data.
*/
-static int
-ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+static bool
+XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
+ bool header_inclusive)
{
- int readLen;
uint32 targetPageOff;
XLogSegNo targetSegNo;
- XLogPageHeader hdr;
+ uint32 addLen = 0;
- Assert((pageptr % XLOG_BLCKSZ) == 0);
+ /* Some data is loaded, but page header is not verified yet. */
+ if (!state->page_verified &&
+ !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0)
+ {
+ uint32 pageHeaderSize;
- XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
- targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
+ /* just loaded new data so needs to verify page header */
- /* check whether we have all the requested data already */
- if (targetSegNo == state->seg.ws_segno &&
- targetPageOff == state->segoff && reqLen <= state->readLen)
- return state->readLen;
+ /* The caller must have loaded at least page header */
+ Assert(state->readLen >= SizeOfXLogShortPHD);
- /*
- * Data is not in our buffer.
- *
- * Every time we actually read the segment, even if we looked at parts of
- * it before, we need to do verification as the page_read callback might
- * now be rereading data from a different source.
- *
- * Whenever switching to a new WAL segment, we read the first page of the
- * file and validate its header, even if that's not where the target
- * record is. This is so that we can check the additional identification
- * info that is present in the first page's "long" header.
- */
- if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
- {
- XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
+ /*
+ * We have enough data to check the header length. Recheck the loaded
+ * length against the actual header length.
+ */
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf);
- if (readLen < 0)
- goto err;
+ /* Request more data if we don't have the full header. */
+ if (state->readLen < pageHeaderSize)
+ {
+ state->reqLen = pageHeaderSize;
+ return true;
+ }
- /* we can be sure to have enough WAL available, we scrolled back */
- Assert(readLen == XLOG_BLCKSZ);
+ /* Now that we know we have the full header, validate it. */
+ if (!XLogReaderValidatePageHeader(state, state->readPagePtr,
+ (char *) state->readBuf))
+ {
+ /* That's bad. Force reading the page again. */
+ XLogReaderInvalReadState(state);
- if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
- state->readBuf))
- goto err;
+ return false;
+ }
+
+ state->page_verified = true;
+
+ XLByteToSeg(state->readPagePtr, state->seg.ws_segno,
+ state->segcxt.ws_segsize);
}
/*
- * First, read the requested data length, but at least a short page header
- * so that we can validate it.
+ * The loaded page may not be the one caller is supposing to read when we
+ * are verifying the first page of new segment. In that case, skip further
+ * verification and immediately load the target page.
*/
- readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf);
- if (readLen < 0)
- goto err;
+ if (state->page_verified && pageptr == state->readPagePtr)
+ {
+ /*
+ * calculate additional length for page header keeping the total
+ * length within the block size.
+ */
+ if (!header_inclusive)
+ {
+ uint32 pageHeaderSize =
+ XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- Assert(readLen <= XLOG_BLCKSZ);
+ addLen = pageHeaderSize;
+ if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
+ addLen = pageHeaderSize;
+ else
+ addLen = XLOG_BLCKSZ - reqLen;
+ }
- /* Do we have enough data to check the header length? */
- if (readLen <= SizeOfXLogShortPHD)
- goto err;
+ /* Return if we already have it. */
+ if (reqLen + addLen <= state->readLen)
+ return false;
+ }
- Assert(readLen >= reqLen);
+ /* Data is not in our buffer, request the caller for it. */
+ XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+ targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
+ Assert((pageptr % XLOG_BLCKSZ) == 0);
- hdr = (XLogPageHeader) state->readBuf;
+ /*
+ * Every time we request to load new data of a page to the caller, even if
+ * we looked at a part of it before, we need to do verification on the
+ * next invocation as the caller might now be rereading data from a
+ * different source.
+ */
+ state->page_verified = false;
- /* still not enough */
- if (readLen < XLogPageHeaderSize(hdr))
+ /*
+ * Whenever switching to a new WAL segment, we read the first page of the
+ * file and validate its header, even if that's not where the target
+ * record is. This is so that we can check the additional identification
+ * info that is present in the first page's "long" header. Don't do this
+ * if the caller requested the first page in the segment.
+ */
+ if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{
- readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf);
- if (readLen < 0)
- goto err;
+ /*
+ * Then we'll see that the targetSegNo now matches the ws_segno, and
+ * will not come back here, but will request the actual target page.
+ */
+ state->readPagePtr = pageptr - targetPageOff;
+ state->reqLen = XLOG_BLCKSZ;
+ return true;
}
/*
- * Now that we know we have the full header, validate it.
+ * Request the caller to load the page. We need at least a short page
+ * header so that we can validate it.
*/
- if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
- goto err;
-
- /* update read state information */
- state->seg.ws_segno = targetSegNo;
- state->segoff = targetPageOff;
- state->readLen = readLen;
-
- return readLen;
-
-err:
- XLogReaderInvalReadState(state);
- return -1;
+ state->readPagePtr = pageptr;
+ state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
+ return true;
}
/*
@@ -673,9 +853,7 @@ err:
static void
XLogReaderInvalReadState(XLogReaderState *state)
{
- state->seg.ws_segno = 0;
- state->segoff = 0;
- state->readLen = 0;
+ state->readPagePtr = InvalidXLogRecPtr;
}
/*
@@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state)
*
* This is just a convenience subroutine to avoid duplicated code in
* XLogReadRecord. It's not intended for use from anywhere else.
+ *
+ * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
*/
static bool
ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record,
- bool randAccess)
+ XLogRecPtr PrevRecPtr, XLogRecord *record)
{
if (record->xl_tot_len < SizeOfXLogRecord)
{
@@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
return false;
}
- if (randAccess)
+ if (PrevRecPtr == InvalidXLogRecPtr)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
@@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* here.
*/
+XLogFindNextRecordState *
+InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr)
+{
+ XLogFindNextRecordState *state = (XLogFindNextRecordState *)
+ palloc_extended(sizeof(XLogFindNextRecordState),
+ MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
+ if (!state)
+ return NULL;
+
+ state->reader_state = reader_state;
+ state->targetRecPtr = start_ptr;
+ state->currRecPtr = start_ptr;
+
+ return state;
+}
+
/*
* Find the first record with an lsn >= RecPtr.
*
@@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* This positions the reader, like XLogBeginRead(), so that the next call to
* XLogReadRecord() will read the next valid record.
*/
-XLogRecPtr
-XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
+bool
+XLogFindNextRecord(XLogFindNextRecordState *state)
{
- XLogRecPtr tmpRecPtr;
- XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header;
+ XLogRecord *record;
+ XLogReadRecordResult result;
char *errormsg;
- Assert(!XLogRecPtrIsInvalid(RecPtr));
+ Assert(!XLogRecPtrIsInvalid(state->currRecPtr));
/*
* skip over potential continuation data, keeping in mind that it may span
* multiple pages
*/
- tmpRecPtr = RecPtr;
while (true)
{
XLogRecPtr targetPagePtr;
int targetRecOff;
uint32 pageHeaderSize;
- int readLen;
/*
* Compute targetRecOff. It should typically be equal or greater than
@@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* that, except when caller has explicitly specified the offset that
* falls somewhere there or when we are skipping multi-page
* continuation record. It doesn't matter though because
- * ReadPageInternal() is prepared to handle that and will read at
- * least short page-header worth of data
+ * XLogNeedData() is prepared to handle that and will read at least
+ * short page-header worth of data
*/
- targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
+ targetRecOff = state->currRecPtr % XLOG_BLCKSZ;
/* scroll back to page boundary */
- targetPagePtr = tmpRecPtr - targetRecOff;
+ targetPagePtr = state->currRecPtr - targetRecOff;
- /* Read the page containing the record */
- readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
- if (readLen < 0)
+ if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff,
+ targetRecOff != 0))
+ return true;
+
+ if (!state->reader_state->page_verified)
goto err;
- header = (XLogPageHeader) state->readBuf;
+ header = (XLogPageHeader) state->reader_state->readBuf;
pageHeaderSize = XLogPageHeaderSize(header);
- /* make sure we have enough data for the page header */
- readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
- if (readLen < 0)
- goto err;
+ /* we should have read the page header */
+ Assert(state->reader_state->readLen >= pageHeaderSize);
/* skip over potential continuation data */
if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
@@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* Note that record headers are MAXALIGN'ed
*/
if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
- tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
+ state->currRecPtr = targetPagePtr + XLOG_BLCKSZ;
else
{
/*
* The previous continuation record ends in this page. Set
- * tmpRecPtr to point to the first valid record
+ * state->currRecPtr to point to the first valid record
*/
- tmpRecPtr = targetPagePtr + pageHeaderSize
+ state->currRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(header->xlp_rem_len);
break;
}
}
else
{
- tmpRecPtr = targetPagePtr + pageHeaderSize;
+ state->currRecPtr = targetPagePtr + pageHeaderSize;
break;
}
}
@@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation.
*/
- XLogBeginRead(state, tmpRecPtr);
- while (XLogReadRecord(state, &errormsg) != NULL)
+ XLogBeginRead(state->reader_state, state->currRecPtr);
+ while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) !=
+ XLREAD_FAIL)
{
+ if (result == XLREAD_NEED_DATA)
+ return true;
+
/* past the record we've found, break out */
- if (RecPtr <= state->ReadRecPtr)
+ if (state->targetRecPtr <= state->reader_state->ReadRecPtr)
{
/* Rewind the reader to the beginning of the last record. */
- found = state->ReadRecPtr;
- XLogBeginRead(state, found);
- return found;
+ state->currRecPtr = state->reader_state->ReadRecPtr;
+ XLogBeginRead(state->reader_state, state->currRecPtr);
+ return false;
}
}
err:
- XLogReaderInvalReadState(state);
+ XLogReaderInvalReadState(state->reader_state);
- return InvalidXLogRecPtr;
+ state->currRecPtr = InvalidXLogRecPtr;;
+ return false;
}
#endif /* FRONTEND */
/*
- * Helper function to ease writing of XLogRoutine->page_read callbacks.
- * If this function is used, caller must supply a segment_open callback in
- * 'state', as that is used here.
+ * Helper function to ease writing of routines that read raw WAL data.
+ * If this function is used, caller must supply a segment_open callback and
+ * segment_close callback as that is used here.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
@@ -1057,6 +1255,7 @@ err:
*/
bool
WALRead(XLogReaderState *state,
+ WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALReadError *errinfo)
{
@@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state,
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
- state->routine.segment_close(state);
+ segclosefn(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
- state->routine.segment_open(state, nextSegNo, &tli);
+ segopenfn(state, nextSegNo, &tli);
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);