diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 344 |
1 files changed, 199 insertions, 145 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 03feb145514..a144fbb1a2c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -653,7 +653,9 @@ static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static void CheckRecoveryConsistency(void); -static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); +static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode); +static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, + int emode, bool randAccess); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); static List *readTimeLineHistory(TimeLineID targetTLI); static bool existsTimeLineHistory(TimeLineID probeTLI); @@ -695,7 +697,6 @@ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecord *record; XLogRecPtr RecPtr; XLogRecPtr WriteRqst; uint32 freespace; @@ -709,6 +710,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; + XLogRecData hdr_rdt; pg_crc32 rdata_crc; uint32 len, write_len; @@ -717,6 +719,15 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) bool doPageWrites; bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); uint8 info_orig = info; + static XLogRecord *rechdr; + + if (rechdr == NULL) + { + rechdr = malloc(SizeOfXLogRecord); + if (rechdr == NULL) + elog(ERROR, "out of memory"); + MemSet(rechdr, 0, SizeOfXLogRecord); + } /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) @@ -903,6 +914,22 @@ begin:; for (rdt = rdata; rdt != NULL; rdt = rdt->next) COMP_CRC32(rdata_crc, rdt->data, rdt->len); + /* + * Construct record header (prev-link and CRC are filled in later), and + * make that the first chunk in the chain. + */ + rechdr->xl_xid = GetCurrentTransactionIdIfAny(); + rechdr->xl_tot_len = SizeOfXLogRecord + write_len; + rechdr->xl_len = len; /* doesn't include backup blocks */ + rechdr->xl_info = info; + rechdr->xl_rmid = rmid; + + hdr_rdt.next = rdata; + hdr_rdt.data = (char *) rechdr; + hdr_rdt.len = SizeOfXLogRecord; + + write_len += SizeOfXLogRecord; + START_CRIT_SECTION(); /* Now wait to get insert lock */ @@ -962,12 +989,12 @@ begin:; } /* - * If there isn't enough space on the current XLOG page for a record - * header, advance to the next page (leaving the unused space as zeroes). + * If the current page is completely full, the record goes to the next + * page, right after the page header. */ updrqst = false; freespace = INSERT_FREESPACE(Insert); - if (freespace < SizeOfXLogRecord) + if (freespace == 0) { updrqst = AdvanceXLInsertBuffer(false); freespace = INSERT_FREESPACE(Insert); @@ -1009,21 +1036,13 @@ begin:; return RecPtr; } - /* Insert record header */ - - record = (XLogRecord *) Insert->currpos; - record->xl_prev = Insert->PrevRecord; - record->xl_xid = GetCurrentTransactionIdIfAny(); - record->xl_tot_len = SizeOfXLogRecord + write_len; - record->xl_len = len; /* doesn't include backup blocks */ - record->xl_info = info; - record->xl_rmid = rmid; + /* Finish the record header */ + rechdr->xl_prev = Insert->PrevRecord; /* Now we can finish computing the record's CRC */ - COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32), - SizeOfXLogRecord - sizeof(pg_crc32)); + COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc)); FIN_CRC32(rdata_crc); - record->xl_crc = rdata_crc; + rechdr->xl_crc = rdata_crc; #ifdef WAL_DEBUG if (XLOG_DEBUG) @@ -1033,11 +1052,11 @@ begin:; initStringInfo(&buf); appendStringInfo(&buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff); - xlog_outrec(&buf, record); + xlog_outrec(&buf, rechdr); if (rdata->data != NULL) { appendStringInfo(&buf, " - "); - RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data); + RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data); } elog(LOG, "%s", buf.data); pfree(buf.data); @@ -1048,12 +1067,10 @@ begin:; ProcLastRecPtr = RecPtr; Insert->PrevRecord = RecPtr; - Insert->currpos += SizeOfXLogRecord; - freespace -= SizeOfXLogRecord; - /* * Append the data, including backup blocks if any */ + rdata = &hdr_rdt; while (write_len) { while (rdata->data == NULL) @@ -1171,7 +1188,7 @@ begin:; /* normal case, ie not xlog switch */ /* Need to update shared LogwrtRqst if some block was filled up */ - if (freespace < SizeOfXLogRecord) + if (freespace == 0) { /* curridx is filled and available for writing out */ updrqst = true; @@ -2090,7 +2107,7 @@ XLogFlush(XLogRecPtr record) XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace = INSERT_FREESPACE(Insert); - if (freespace < SizeOfXLogRecord) /* buffer is full */ + if (freespace == 0) /* buffer is full */ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; else { @@ -3705,8 +3722,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) } /* Finally include the record header */ - COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), - SizeOfXLogRecord - sizeof(pg_crc32)); + COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32(crc); if (!EQ_CRC32(record->xl_crc, crc)) @@ -3736,13 +3752,13 @@ static XLogRecord * ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) { XLogRecord *record; - char *buffer; XLogRecPtr tmpRecPtr = EndRecPtr; bool randAccess = false; uint32 len, total_len; uint32 targetRecOff; uint32 pageHeaderSize; + bool gotheader; if (readBuf == NULL) { @@ -3762,17 +3778,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) RecPtr = &tmpRecPtr; /* - * RecPtr is pointing to end+1 of the previous WAL record. We must - * advance it if necessary to where the next record starts. First, - * align to next page if no more records can fit on the current page. - */ - if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord) - NextLogPage(*RecPtr); - - /* - * If at page start, we must skip over the page header. But we can't - * do that until we've read in the page, since the header size is - * variable. + * RecPtr is pointing to end+1 of the previous WAL record. If + * we're at a page boundary, no more records can fit on the current + * page. We must skip over the page header, but we can't do that + * until we've read in the page, since the header size is variable. */ } else @@ -3793,7 +3802,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) * to go backwards (but we can't reset that variable right here, since * we might not change files at all). */ - lastPageTLI = 0; /* see comment in ValidXLOGHeader */ + lastPageTLI = 0; /* see comment in ValidXLogPageHeader */ randAccess = true; /* allow curFileTLI to go backwards too */ } @@ -3833,76 +3842,15 @@ retry: RecPtr->xlogid, RecPtr->xrecoff))); goto next_record_is_invalid; } - record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ); /* - * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is - * required. + * NB: Even though we use an XLogRecord pointer here, the whole record + * header might not fit on this page. xl_tot_len is the first field in + * struct, so it must be on this page, but we cannot safely access any + * other fields yet. */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - if (record->xl_len != 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid xlog switch record at %X/%X", - RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - } - else if (record->xl_len == 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with zero length at %X/%X", - RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || - record->xl_tot_len > SizeOfXLogRecord + record->xl_len + - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record length at %X/%X", - RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - if (record->xl_rmid > RM_MAX_ID) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid resource manager ID %u at %X/%X", - record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - if (randAccess) - { - /* - * We can't exactly verify the prev-link, but surely it should be less - * than the record's own address. - */ - if (!XLByteLT(record->xl_prev, *RecPtr)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - record->xl_prev.xlogid, record->xl_prev.xrecoff, - RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - } - else - { - /* - * Record's prev-link should exactly match our previous location. This - * check guards against torn WAL pages where a stale but valid-looking - * WAL record starts on a sector boundary. - */ - if (!XLByteEQ(record->xl_prev, ReadRecPtr)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - record->xl_prev.xlogid, record->xl_prev.xrecoff, - RecPtr->xlogid, RecPtr->xrecoff))); - goto next_record_is_invalid; - } - } + record = (XLogRecord *) (readBuf + RecPtr->xrecoff % XLOG_BLCKSZ); + total_len = record->xl_tot_len; /* * Allocate or enlarge readRecordBuf as needed. To avoid useless small @@ -3911,7 +3859,6 @@ retry: * enough for all "normal" records, but very large commit or abort records * might need more space.) */ - total_len = record->xl_tot_len; if (total_len > readRecordBufSize) { uint32 newSize = total_len; @@ -3933,7 +3880,19 @@ retry: readRecordBufSize = newSize; } - buffer = readRecordBuf; + /* + * If we got the whole header already, validate it immediately. Otherwise + * we validate it after reading the rest of the header from the next page. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) + goto next_record_is_invalid; + gotheader = true; + } + else + gotheader = false; + len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ; if (total_len > len) { @@ -3941,16 +3900,19 @@ retry: char *contrecord; XLogPageHeader pageHeader; XLogRecPtr pagelsn; - uint32 gotlen = len; + char *buffer; + uint32 gotlen; /* Initialize pagelsn to the beginning of the page this record is on */ pagelsn = *RecPtr; pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ; - memcpy(buffer, record, len); - record = (XLogRecord *) buffer; - buffer += len; - for (;;) + /* Copy the first fragment of the record from the first page. */ + memcpy(readRecordBuf, readBuf + RecPtr->xrecoff % XLOG_BLCKSZ, len); + buffer = readRecordBuf + len; + gotlen = len; + + do { /* Calculate pointer to beginning of next page */ XLByteAdvance(pagelsn, XLOG_BLCKSZ); @@ -3958,8 +3920,9 @@ retry: if (!XLogPageRead(&pagelsn, emode, false, false)) return NULL; - /* Check that the continuation record looks valid */ - if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { ereport(emode_for_corrupt_record(emode, *RecPtr), (errmsg("there is no contrecord flag in log segment %s, offset %u", @@ -3967,14 +3930,13 @@ retry: readOff))); goto next_record_is_invalid; } - pageHeader = (XLogPageHeader) readBuf; - pageHeaderSize = XLogPageHeaderSize(pageHeader); - contrecord = (char *) readBuf + pageHeaderSize; + /* + * Cross-check that xlp_rem_len agrees with how much of the record + * we expect there to be left. + */ if (pageHeader->xlp_rem_len == 0 || total_len != (pageHeader->xlp_rem_len + gotlen)) { - char fname[MAXFNAMELEN]; - XLogFileName(fname, curFileTLI, readSegNo); ereport(emode_for_corrupt_record(emode, *RecPtr), (errmsg("invalid contrecord length %u in log segment %s, offset %u", pageHeader->xlp_rem_len, @@ -3982,17 +3944,28 @@ retry: readOff))); goto next_record_is_invalid; } + + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); + contrecord = (char *) readBuf + pageHeaderSize; len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len > len) + if (pageHeader->xlp_rem_len < len) + len = pageHeader->xlp_rem_len; + memcpy(buffer, (char *) contrecord, len); + buffer += len; + gotlen += len; + + /* If we just reassembled the record header, validate it. */ + if (!gotheader) { - memcpy(buffer, (char *) contrecord, len); - gotlen += len; - buffer += len; - continue; + record = (XLogRecord *) readRecordBuf; + if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) + goto next_record_is_invalid; + gotheader = true; } - memcpy(buffer, (char *) contrecord, pageHeader->xlp_rem_len); - break; - } + } while (pageHeader->xlp_rem_len > len); + + record = (XLogRecord *) readRecordBuf; if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); @@ -4001,18 +3974,18 @@ retry: readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len), EndRecPtr); ReadRecPtr = *RecPtr; - /* needn't worry about XLOG SWITCH, it can't cross page boundaries */ - return record; } + else + { + /* Record does not cross a page boundary */ + if (!RecordIsValid(record, *RecPtr, emode)) + goto next_record_is_invalid; + EndRecPtr.xlogid = RecPtr->xlogid; + EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); - /* Record does not cross a page boundary */ - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - EndRecPtr.xlogid = RecPtr->xlogid; - EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); - - ReadRecPtr = *RecPtr; - memcpy(buffer, record, total_len); + ReadRecPtr = *RecPtr; + memcpy(readRecordBuf, record, total_len); + } /* * Special processing if it's an XLOG SWITCH record @@ -4030,7 +4003,7 @@ retry: */ readOff = XLogSegSize - XLOG_BLCKSZ; } - return (XLogRecord *) buffer; + return record; next_record_is_invalid: failedSources |= readSource; @@ -4055,7 +4028,7 @@ next_record_is_invalid: * ReadRecord. It's not intended for use from anywhere else. */ static bool -ValidXLOGHeader(XLogPageHeader hdr, int emode) +ValidXLogPageHeader(XLogPageHeader hdr, int emode) { XLogRecPtr recaddr; @@ -4174,6 +4147,88 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode) } /* + * Validate an XLOG record header. + * + * This is just a convenience subroutine to avoid duplicated code in + * ReadRecord. It's not intended for use from anywhere else. + */ +static bool +ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode, + bool randAccess) +{ + /* + * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is + * required. + */ + if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) + { + if (record->xl_len != 0) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("invalid xlog switch record at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + } + else if (record->xl_len == 0) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("record with zero length at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || + record->xl_tot_len > SizeOfXLogRecord + record->xl_len + + XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("invalid record length at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + if (record->xl_rmid > RM_MAX_ID) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("invalid resource manager ID %u at %X/%X", + record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + if (randAccess) + { + /* + * We can't exactly verify the prev-link, but surely it should be less + * than the record's own address. + */ + if (!XLByteLT(record->xl_prev, *RecPtr)) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("record with incorrect prev-link %X/%X at %X/%X", + record->xl_prev.xlogid, record->xl_prev.xrecoff, + RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + } + else + { + /* + * Record's prev-link should exactly match our previous location. This + * check guards against torn WAL pages where a stale but valid-looking + * WAL record starts on a sector boundary. + */ + if (!XLByteEQ(record->xl_prev, ReadRecPtr)) + { + ereport(emode_for_corrupt_record(emode, *RecPtr), + (errmsg("record with incorrect prev-link %X/%X at %X/%X", + record->xl_prev.xlogid, record->xl_prev.xrecoff, + RecPtr->xlogid, RecPtr->xrecoff))); + return false; + } + } + + return true; +} + +/* * Try to read a timeline's history file. * * If successful, return the list of component TLIs (the given TLI followed by @@ -5182,8 +5237,7 @@ BootStrapXLOG(void) INIT_CRC32(crc); COMP_CRC32(crc, &checkPoint, sizeof(checkPoint)); - COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), - SizeOfXLogRecord - sizeof(pg_crc32)); + COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32(crc); record->xl_crc = crc; @@ -7720,7 +7774,7 @@ CreateCheckPoint(int flags) * checkpoint, even though physically before it. Got that? */ freespace = INSERT_FREESPACE(Insert); - if (freespace < SizeOfXLogRecord) + if (freespace == 0) { (void) AdvanceXLInsertBuffer(false); /* OK to ignore update return flag, since we will do flush anyway */ @@ -10285,7 +10339,7 @@ retry: fname, readOff))); goto next_record_is_invalid; } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode)) goto next_record_is_invalid; } @@ -10311,7 +10365,7 @@ retry: fname, readOff))); goto next_record_is_invalid; } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode)) goto next_record_is_invalid; Assert(targetSegNo == readSegNo); |