aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c645
1 files changed, 524 insertions, 121 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index b7c06da2557..e437c429920 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -45,6 +45,7 @@ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
int reqLen);
static void XLogReaderInvalReadState(XLogReaderState *state);
+static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
@@ -57,6 +58,12 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
#define MAX_ERRORMSG_LEN 1000
/*
+ * Default size; large enough that typical users of XLogReader won't often need
+ * to use the 'oversized' memory allocation code path.
+ */
+#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
+
+/*
* Construct a string in state->errormsg_buf explaining what's wrong with
* the current record being read.
*/
@@ -70,6 +77,24 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args);
+
+ state->errormsg_deferred = true;
+}
+
+/*
+ * Set the size of the decoding buffer. A pointer to a caller supplied memory
+ * region may also be passed in, in which case non-oversized records will be
+ * decoded there.
+ */
+void
+XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
+{
+ Assert(state->decode_buffer == NULL);
+
+ state->decode_buffer = buffer;
+ state->decode_buffer_size = size;
+ state->decode_buffer_tail = buffer;
+ state->decode_buffer_head = buffer;
}
/*
@@ -92,8 +117,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
/* initialize caller-provided support functions */
state->routine = *routine;
- state->max_block_id = -1;
-
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
@@ -144,18 +167,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
void
XLogReaderFree(XLogReaderState *state)
{
- int block_id;
-
if (state->seg.ws_file != -1)
state->routine.segment_close(state);
- for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
- {
- if (state->blocks[block_id].data)
- pfree(state->blocks[block_id].data);
- }
- if (state->main_data)
- pfree(state->main_data);
+ if (state->decode_buffer && state->free_decode_buffer)
+ pfree(state->decode_buffer);
pfree(state->errormsg_buf);
if (state->readRecordBuf)
@@ -251,7 +267,133 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
+ state->NextRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
+ state->DecodeRecPtr = InvalidXLogRecPtr;
+}
+
+/*
+ * See if we can release the last record that was returned by
+ * XLogNextRecord(), if any, to free up space.
+ */
+void
+XLogReleasePreviousRecord(XLogReaderState *state)
+{
+ DecodedXLogRecord *record;
+
+ if (!state->record)
+ return;
+
+ /*
+ * Remove it from the decoded record queue. It must be the oldest item
+ * decoded, decode_queue_head.
+ */
+ record = state->record;
+ Assert(record == state->decode_queue_head);
+ state->record = NULL;
+ state->decode_queue_head = record->next;
+
+ /* It might also be the newest item decoded, decode_queue_tail. */
+ if (state->decode_queue_tail == record)
+ state->decode_queue_tail = NULL;
+
+ /* Release the space. */
+ if (unlikely(record->oversized))
+ {
+ /* It's not in the the decode buffer, so free it to release space. */
+ pfree(record);
+ }
+ else
+ {
+ /* It must be the head (oldest) record in the decode buffer. */
+ Assert(state->decode_buffer_head == (char *) record);
+
+ /*
+ * We need to update head to point to the next record that is in the
+ * decode buffer, if any, being careful to skip oversized ones
+ * (they're not in the decode buffer).
+ */
+ record = record->next;
+ while (unlikely(record && record->oversized))
+ record = record->next;
+
+ if (record)
+ {
+ /* Adjust head to release space up to the next record. */
+ state->decode_buffer_head = (char *) record;
+ }
+ else
+ {
+ /*
+ * Otherwise we might as well just reset head and tail to the
+ * start of the buffer space, because we're empty. This means
+ * we'll keep overwriting the same piece of memory if we're not
+ * doing any prefetching.
+ */
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+ }
+ }
+}
+
+/*
+ * Attempt to read an XLOG record.
+ *
+ * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be
+ * called before the first call to XLogNextRecord(). This functions returns
+ * records and errors that were put into an internal queue by XLogReadAhead().
+ *
+ * On success, a record is returned.
+ *
+ * The returned record (or *errormsg) points to an internal buffer that's
+ * valid until the next call to XLogNextRecord.
+ */
+DecodedXLogRecord *
+XLogNextRecord(XLogReaderState *state, char **errormsg)
+{
+ /* Release the last record returned by XLogNextRecord(). */
+ XLogReleasePreviousRecord(state);
+
+ if (state->decode_queue_head == NULL)
+ {
+ *errormsg = NULL;
+ if (state->errormsg_deferred)
+ {
+ if (state->errormsg_buf[0] != '\0')
+ *errormsg = state->errormsg_buf;
+ state->errormsg_deferred = false;
+ }
+
+ /*
+ * state->EndRecPtr is expected to have been set by the last call to
+ * XLogBeginRead() or XLogNextRecord(), and is the location of the
+ * error.
+ */
+ Assert(!XLogRecPtrIsInvalid(state->EndRecPtr));
+
+ return NULL;
+ }
+
+ /*
+ * Record this as the most recent record returned, so that we'll release
+ * it next time. This also exposes it to the traditional
+ * XLogRecXXX(xlogreader) macros, which work with the decoder rather than
+ * the record for historical reasons.
+ */
+ state->record = state->decode_queue_head;
+
+ /*
+ * Update the pointers to the beginning and one-past-the-end of this
+ * record, again for the benefit of historical code that expected the
+ * decoder to track this rather than accessing these fields of the record
+ * itself.
+ */
+ state->ReadRecPtr = state->record->lsn;
+ state->EndRecPtr = state->record->next_lsn;
+
+ *errormsg = NULL;
+
+ return state->record;
}
/*
@@ -273,6 +415,119 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecord *
XLogReadRecord(XLogReaderState *state, char **errormsg)
{
+ DecodedXLogRecord *decoded;
+
+ /*
+ * Release last returned record, if there is one. We need to do this so
+ * that we can check for empty decode queue accurately.
+ */
+ XLogReleasePreviousRecord(state);
+
+ /*
+ * Call XLogReadAhead() in blocking mode to make sure there is something
+ * in the queue, though we don't use the result.
+ */
+ if (!XLogReaderHasQueuedRecordOrError(state))
+ XLogReadAhead(state, false /* nonblocking */ );
+
+ /* Consume the head record or error. */
+ decoded = XLogNextRecord(state, errormsg);
+ if (decoded)
+ {
+ /*
+ * This function returns a pointer to the record's header, not the
+ * actual decoded record. The caller will access the decoded record
+ * through the XLogRecGetXXX() macros, which reach the decoded
+ * recorded as xlogreader->record.
+ */
+ Assert(state->record == decoded);
+ return &decoded->header;
+ }
+
+ return NULL;
+}
+
+/*
+ * Allocate space for a decoded record. The only member of the returned
+ * object that is initialized is the 'oversized' flag, indicating that the
+ * decoded record wouldn't fit in the decode buffer and must eventually be
+ * freed explicitly.
+ *
+ * The caller is responsible for adjusting decode_buffer_tail with the real
+ * size after successfully decoding a record into this space. This way, if
+ * decoding fails, then there is nothing to undo unless the 'oversized' flag
+ * was set and pfree() must be called.
+ *
+ * Return NULL if there is no space in the decode buffer and allow_oversized
+ * is false, or if memory allocation fails for an oversized buffer.
+ */
+static DecodedXLogRecord *
+XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
+{
+ size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
+ DecodedXLogRecord *decoded = NULL;
+
+ /* Allocate a circular decode buffer if we don't have one already. */
+ if (unlikely(state->decode_buffer == NULL))
+ {
+ if (state->decode_buffer_size == 0)
+ state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
+ state->decode_buffer = palloc(state->decode_buffer_size);
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+ state->free_decode_buffer = true;
+ }
+
+ /* Try to allocate space in the circular decode buffer. */
+ if (state->decode_buffer_tail >= state->decode_buffer_head)
+ {
+ /* Empty, or tail is to the right of head. */
+ if (state->decode_buffer_tail + required_space <=
+ state->decode_buffer + state->decode_buffer_size)
+ {
+ /* There is space between tail and end. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
+ decoded->oversized = false;
+ return decoded;
+ }
+ else if (state->decode_buffer + required_space <
+ state->decode_buffer_head)
+ {
+ /* There is space between start and head. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+ else
+ {
+ /* Tail is to the left of head. */
+ if (state->decode_buffer_tail + required_space <
+ state->decode_buffer_head)
+ {
+ /* There is space between tail and head. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+
+ /* Not enough space in the decode buffer. Are we allowed to allocate? */
+ if (allow_oversized)
+ {
+ decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
+ if (decoded == NULL)
+ return NULL;
+ decoded->oversized = true;
+ return decoded;
+ }
+
+ return NULL;
+}
+
+static XLogPageReadResult
+XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
+{
XLogRecPtr RecPtr;
XLogRecord *record;
XLogRecPtr targetPagePtr;
@@ -284,6 +539,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
bool assembled;
bool gotheader;
int readOff;
+ DecodedXLogRecord *decoded;
+ char *errormsg; /* not used */
/*
* randAccess indicates whether to verify the previous-record pointer of
@@ -293,21 +550,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
randAccess = false;
/* reset error state */
- *errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ decoded = NULL;
- ResetDecoder(state);
state->abortedRecPtr = InvalidXLogRecPtr;
state->missingContrecPtr = InvalidXLogRecPtr;
- RecPtr = state->EndRecPtr;
+ RecPtr = state->NextRecPtr;
- if (state->ReadRecPtr != InvalidXLogRecPtr)
+ if (state->DecodeRecPtr != InvalidXLogRecPtr)
{
/* read the record after the one we just read */
/*
- * EndRecPtr is pointing to end+1 of the previous WAL record. If
+ * NextRecPtr is pointing to end+1 of the previous WAL record. If
* we're at a page boundary, no more records can fit on the current
* page. We must skip over the page header, but we can't do that until
* we've read in the page, since the header size is variable.
@@ -318,7 +574,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
/*
* Caller supplied a position to start at.
*
- * In this case, EndRecPtr should already be pointing to a valid
+ * In this case, NextRecPtr should already be pointing to a valid
* record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
@@ -326,6 +582,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
}
restart:
+ state->nonblocking = nonblocking;
state->currRecPtr = RecPtr;
assembled = false;
@@ -339,7 +596,9 @@ restart:
*/
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
/*
@@ -395,7 +654,7 @@ restart:
*/
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+ if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
randAccess))
goto err;
gotheader = true;
@@ -414,6 +673,31 @@ restart:
gotheader = false;
}
+ /*
+ * Find space to decode this record. Don't allow oversized allocation if
+ * the caller requested nonblocking. Otherwise, we *have* to try to
+ * decode the record now because the caller has nothing else to do, so
+ * allow an oversized record to be palloc'd if that turns out to be
+ * necessary.
+ */
+ decoded = XLogReadRecordAlloc(state,
+ total_len,
+ !nonblocking /* allow_oversized */ );
+ if (decoded == NULL)
+ {
+ /*
+ * There is no space in the decode buffer. The caller should help
+ * with that problem by consuming some records.
+ */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
+ /* We failed to allocate memory for an oversized record. */
+ report_invalid_record(state,
+ "out of memory while trying to decode a record of length %u", total_len);
+ goto err;
+ }
+
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
if (total_len > len)
{
@@ -453,7 +737,9 @@ restart:
Min(total_len - gotlen + SizeOfXLogShortPHD,
XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
Assert(SizeOfXLogShortPHD <= readOff);
@@ -471,7 +757,6 @@ restart:
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
{
state->overwrittenRecPtr = RecPtr;
- ResetDecoder(state);
RecPtr = targetPagePtr;
goto restart;
}
@@ -526,7 +811,7 @@ restart:
if (!gotheader)
{
record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
+ if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
record, randAccess))
goto err;
gotheader = true;
@@ -540,8 +825,8 @@ restart:
goto err;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->ReadRecPtr = RecPtr;
- state->EndRecPtr = targetPagePtr + pageHeaderSize
+ state->DecodeRecPtr = RecPtr;
+ state->NextRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len);
}
else
@@ -549,16 +834,18 @@ restart:
/* Wait for the record data to become available */
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + total_len, XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
/* Record does not cross a page boundary */
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
- state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+ state->NextRecPtr = RecPtr + MAXALIGN(total_len);
- state->ReadRecPtr = RecPtr;
+ state->DecodeRecPtr = RecPtr;
}
/*
@@ -568,14 +855,40 @@ restart:
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->EndRecPtr += state->segcxt.ws_segsize - 1;
- state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
+ state->NextRecPtr += state->segcxt.ws_segsize - 1;
+ state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
}
- if (DecodeXLogRecord(state, record, errormsg))
- return record;
+ if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
+ {
+ /* Record the location of the next record. */
+ decoded->next_lsn = state->NextRecPtr;
+
+ /*
+ * If it's in the decode buffer, mark the decode buffer space as
+ * occupied.
+ */
+ if (!decoded->oversized)
+ {
+ /* The new decode buffer head must be MAXALIGNed. */
+ Assert(decoded->size == MAXALIGN(decoded->size));
+ if ((char *) decoded == state->decode_buffer)
+ state->decode_buffer_tail = state->decode_buffer + decoded->size;
+ else
+ state->decode_buffer_tail += decoded->size;
+ }
+
+ /* Insert it into the queue of decoded records. */
+ Assert(state->decode_queue_tail != decoded);
+ if (state->decode_queue_tail)
+ state->decode_queue_tail->next = decoded;
+ state->decode_queue_tail = decoded;
+ if (!state->decode_queue_head)
+ state->decode_queue_head = decoded;
+ return XLREAD_SUCCESS;
+ }
else
- return NULL;
+ return XLREAD_FAIL;
err:
if (assembled)
@@ -593,14 +906,46 @@ err:
state->missingContrecPtr = targetPagePtr;
}
+ if (decoded && decoded->oversized)
+ pfree(decoded);
+
/*
* Invalidate the read state. We might read from a different source after
* failure.
*/
XLogReaderInvalReadState(state);
- if (state->errormsg_buf[0] != '\0')
- *errormsg = state->errormsg_buf;
+ /*
+ * If an error was written to errmsg_buf, it'll be returned to the caller
+ * of XLogReadRecord() after all successfully decoded records from the
+ * read queue.
+ */
+
+ return XLREAD_FAIL;
+}
+
+/*
+ * Try to decode the next available record, and return it. The record will
+ * also be returned to XLogNextRecord(), which must be called to 'consume'
+ * each record.
+ *
+ * If nonblocking is true, may return NULL due to lack of data or WAL decoding
+ * space.
+ */
+DecodedXLogRecord *
+XLogReadAhead(XLogReaderState *state, bool nonblocking)
+{
+ XLogPageReadResult result;
+
+ if (state->errormsg_deferred)
+ return NULL;
+
+ result = XLogDecodeNextRecord(state, nonblocking);
+ if (result == XLREAD_SUCCESS)
+ {
+ Assert(state->decode_queue_tail != NULL);
+ return state->decode_queue_tail;
+ }
return NULL;
}
@@ -609,8 +954,14 @@ err:
* Read a single xlog page including at least [pageptr, reqLen] of valid data
* via the page_read() callback.
*
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns XLREAD_FAIL if the required page cannot be read for some
+ * reason; errormsg_buf is set in that case (unless the error occurs in the
+ * page_read callback).
+ *
+ * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
+ * waiting. This can be returned only if the installed page_read callback
+ * respects the state->nonblocking flag, and cannot read the requested data
+ * immediately.
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -652,7 +1003,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
/* we can be sure to have enough WAL available, we scrolled back */
@@ -670,7 +1023,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
Assert(readLen <= XLOG_BLCKSZ);
@@ -689,7 +1044,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
}
@@ -707,8 +1064,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
return readLen;
err:
- XLogReaderInvalReadState(state);
- return -1;
+ if (state->errormsg_buf[0] != '\0')
+ {
+ state->errormsg_deferred = true;
+ XLogReaderInvalReadState(state);
+ }
+ return XLREAD_FAIL;
}
/*
@@ -987,6 +1348,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
Assert(!XLogRecPtrIsInvalid(RecPtr));
+ /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
+ state->nonblocking = false;
+
/*
* skip over potential continuation data, keeping in mind that it may span
* multiple pages
@@ -1187,34 +1551,83 @@ WALRead(XLogReaderState *state,
* ----------------------------------------
*/
-/* private function to reset the state between records */
+/*
+ * Private function to reset the state, forgetting all decoded records, if we
+ * are asked to move to a new read position.
+ */
static void
ResetDecoder(XLogReaderState *state)
{
- int block_id;
+ DecodedXLogRecord *r;
- state->decoded_record = NULL;
-
- state->main_data_len = 0;
-
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ /* Reset the decoded record queue, freeing any oversized records. */
+ while ((r = state->decode_queue_head) != NULL)
{
- state->blocks[block_id].in_use = false;
- state->blocks[block_id].has_image = false;
- state->blocks[block_id].has_data = false;
- state->blocks[block_id].apply_image = false;
+ state->decode_queue_head = r->next;
+ if (r->oversized)
+ pfree(r);
}
- state->max_block_id = -1;
+ state->decode_queue_tail = NULL;
+ state->decode_queue_head = NULL;
+ state->record = NULL;
+
+ /* Reset the decode buffer to empty. */
+ state->decode_buffer_tail = state->decode_buffer;
+ state->decode_buffer_head = state->decode_buffer;
+
+ /* Clear error state. */
+ state->errormsg_buf[0] = '\0';
+ state->errormsg_deferred = false;
}
/*
- * Decode the previously read record.
+ * Compute the maximum possible amount of padding that could be required to
+ * decode a record, given xl_tot_len from the record's header. This is the
+ * amount of output buffer space that we need to decode a record, though we
+ * might not finish up using it all.
+ *
+ * This computation is pessimistic and assumes the maximum possible number of
+ * blocks, due to lack of better information.
+ */
+size_t
+DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
+{
+ size_t size = 0;
+
+ /* Account for the fixed size part of the decoded record struct. */
+ size += offsetof(DecodedXLogRecord, blocks[0]);
+ /* Account for the flexible blocks array of maximum possible size. */
+ size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
+ /* Account for all the raw main and block data. */
+ size += xl_tot_len;
+ /* We might insert padding before main_data. */
+ size += (MAXIMUM_ALIGNOF - 1);
+ /* We might insert padding before each block's data. */
+ size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
+ /* We might insert padding at the end. */
+ size += (MAXIMUM_ALIGNOF - 1);
+
+ return size;
+}
+
+/*
+ * Decode a record. "decoded" must point to a MAXALIGNed memory area that has
+ * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
+ * success, decoded->size contains the actual space occupied by the decoded
+ * record, which may turn out to be less.
+ *
+ * Only decoded->oversized member must be initialized already, and will not be
+ * modified. Other members will be initialized as required.
*
* On error, a human-readable error message is returned in *errormsg, and
* the return value is false.
*/
bool
-DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
+ char **errormsg)
{
/*
* read next _size bytes from record buffer, but check for overrun first.
@@ -1229,17 +1642,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
} while(0)
char *ptr;
+ char *out;
uint32 remaining;
uint32 datatotal;
RelFileNode *rnode = NULL;
uint8 block_id;
- ResetDecoder(state);
-
- state->decoded_record = record;
- state->record_origin = InvalidRepOriginId;
- state->toplevel_xid = InvalidTransactionId;
-
+ decoded->header = *record;
+ decoded->lsn = lsn;
+ decoded->next = NULL;
+ decoded->record_origin = InvalidRepOriginId;
+ decoded->toplevel_xid = InvalidTransactionId;
+ decoded->main_data = NULL;
+ decoded->main_data_len = 0;
+ decoded->max_block_id = -1;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
remaining = record->xl_tot_len - SizeOfXLogRecord;
@@ -1257,7 +1673,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
@@ -1268,18 +1684,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
uint32 main_data_len;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
}
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
- COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
+ COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
}
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
- COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+ COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
}
else if (block_id <= XLR_MAX_BLOCK_ID)
{
@@ -1287,7 +1703,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
DecodedBkpBlock *blk;
uint8 fork_flags;
- if (block_id <= state->max_block_id)
+ /* mark any intervening block IDs as not in use */
+ for (int i = decoded->max_block_id + 1; i < block_id; ++i)
+ decoded->blocks[i].in_use = false;
+
+ if (block_id <= decoded->max_block_id)
{
report_invalid_record(state,
"out-of-order block_id %u at %X/%X",
@@ -1295,9 +1715,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err;
}
- state->max_block_id = block_id;
+ decoded->max_block_id = block_id;
- blk = &state->blocks[block_id];
+ blk = &decoded->blocks[block_id];
blk->in_use = true;
blk->apply_image = false;
@@ -1440,17 +1860,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
/*
* Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data
- * left. Copy the data of each fragment to a separate buffer.
- *
- * We could just set up pointers into readRecordBuf, but we want to align
- * the data for the convenience of the callers. Backup images are not
- * copied, however; they don't need alignment.
+ * left. Copy the data of each fragment to contiguous space after the
+ * blocks array, inserting alignment padding before the data fragments so
+ * they can be cast to struct pointers by REDO routines.
*/
+ out = ((char *) decoded) +
+ offsetof(DecodedXLogRecord, blocks) +
+ sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
/* block data first */
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
{
- DecodedBkpBlock *blk = &state->blocks[block_id];
+ DecodedBkpBlock *blk = &decoded->blocks[block_id];
if (!blk->in_use)
continue;
@@ -1459,58 +1880,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (blk->has_image)
{
- blk->bkp_image = ptr;
+ /* no need to align image */
+ blk->bkp_image = out;
+ memcpy(out, ptr, blk->bimg_len);
ptr += blk->bimg_len;
+ out += blk->bimg_len;
}
if (blk->has_data)
{
- if (!blk->data || blk->data_len > blk->data_bufsz)
- {
- if (blk->data)
- pfree(blk->data);
-
- /*
- * Force the initial request to be BLCKSZ so that we don't
- * waste time with lots of trips through this stanza as a
- * result of WAL compression.
- */
- blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
- blk->data = palloc(blk->data_bufsz);
- }
+ out = (char *) MAXALIGN(out);
+ blk->data = out;
memcpy(blk->data, ptr, blk->data_len);
ptr += blk->data_len;
+ out += blk->data_len;
}
}
/* and finally, the main data */
- if (state->main_data_len > 0)
+ if (decoded->main_data_len > 0)
{
- if (!state->main_data || state->main_data_len > state->main_data_bufsz)
- {
- if (state->main_data)
- pfree(state->main_data);
-
- /*
- * main_data_bufsz must be MAXALIGN'ed. In many xlog record
- * types, we omit trailing struct padding on-disk to save a few
- * bytes; but compilers may generate accesses to the xlog struct
- * that assume that padding bytes are present. If the palloc
- * request is not large enough to include such padding bytes then
- * we'll get valgrind complaints due to otherwise-harmless fetches
- * of the padding bytes.
- *
- * In addition, force the initial request to be reasonably large
- * so that we don't waste time with lots of trips through this
- * stanza. BLCKSZ / 2 seems like a good compromise choice.
- */
- state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
- BLCKSZ / 2));
- state->main_data = palloc(state->main_data_bufsz);
- }
- memcpy(state->main_data, ptr, state->main_data_len);
- ptr += state->main_data_len;
+ out = (char *) MAXALIGN(out);
+ decoded->main_data = out;
+ memcpy(decoded->main_data, ptr, decoded->main_data_len);
+ ptr += decoded->main_data_len;
+ out += decoded->main_data_len;
}
+ /* Report the actual size we used. */
+ decoded->size = MAXALIGN(out - (char *) decoded);
+ Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
+ decoded->size);
+
return true;
shortdata_err:
@@ -1536,10 +1936,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (rnode)
*rnode = bkpb->rnode;
if (forknum)
@@ -1559,10 +1960,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return NULL;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (!bkpb->has_data)
{
@@ -1590,12 +1992,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
char *ptr;
PGAlignedBlock tmp;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- if (!record->blocks[block_id].has_image)
+ if (!record->record->blocks[block_id].has_image)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
ptr = bkpb->bkp_image;
if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))