diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 718 |
1 files changed, 395 insertions, 323 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2a7d60d3aac..09605cf9476 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -14,12 +14,14 @@ void UpdateControlFile(void); int XLOGShmemSize(void); +void XLOGShmemInit(void); void BootStrapXLOG(void); void StartupXLOG(void); +void ShutdownXLOG(void); void CreateCheckPoint(bool shutdown); -char *XLogDir = NULL; -char *ControlFilePath = NULL; +char XLogDir[MAXPGPATH+1]; +char ControlFilePath[MAXPGPATH+1]; uint32 XLOGbuffers = 0; XLogRecPtr MyLastRecPtr = {0, 0}; bool StopIfError = false; @@ -81,7 +83,8 @@ static XLogCtlData *XLogCtl = NULL; typedef enum DBState { - DB_SHUTDOWNED = 1, + DB_STARTUP = 0, + DB_SHUTDOWNED, DB_SHUTDOWNING, DB_IN_RECOVERY, DB_IN_PRODUCTION @@ -114,9 +117,9 @@ typedef struct CheckPoint } CheckPoint; /* - * We break each log file in 64Mb segments + * We break each log file in 16Mb segments */ -#define XLogSegSize (64*1024*1024) +#define XLogSegSize (16*1024*1024) #define XLogLastSeg (0xffffffff / XLogSegSize) #define XLogFileSize (XLogLastSeg * XLogSegSize) @@ -166,6 +169,7 @@ static void XLogWrite(char *buffer); static int XLogFileInit(uint32 log, uint32 seg); static int XLogFileOpen(uint32 log, uint32 seg, bool econt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer); +static char *str_time(time_t tnow); static XLgwrResult LgwrResult = {{0, 0}, {0, 0}}; static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; @@ -173,14 +177,14 @@ static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; static int logFile = -1; static uint32 logId = 0; static uint32 logSeg = 0; -static off_t logOff = 0; +static uint32 logOff = 0; static XLogRecPtr ReadRecPtr; static XLogRecPtr EndRecPtr; static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; -static off_t readOff = (off_t) -1; +static uint32 readOff = 0; static char readBuf[BLCKSZ]; static XLogRecord *nextRecord = NULL; @@ -262,7 +266,13 @@ XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen) freespace -= SizeOfXLogRecord; record = (XLogRecord*) Insert->currpos; record->xl_prev = Insert->PrevRecord; - record->xl_xact_prev = MyLastRecPtr; + if (rmid != RM_XLOG_ID) + record->xl_xact_prev = MyLastRecPtr; + else + { + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + } record->xl_xid = GetCurrentTransactionId(); record->xl_len = (len > freespace) ? freespace : len; record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0; @@ -271,7 +281,7 @@ XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen) RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + Insert->currpos - ((char*) Insert->currpage); - if (MyLastRecPtr.xrecoff == 0) + if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID) { SpinAcquire(SInvalLock); MyProc->logRec = RecPtr; @@ -489,7 +499,7 @@ XLogFlush(XLogRecPtr record) { logId = LgwrResult.Write.xlogid; logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = (off_t) 0; + logOff = 0; logFile = XLogFileOpen(logId, logSeg, false); } @@ -612,7 +622,7 @@ XLogWrite(char *buffer) } logId = LgwrResult.Write.xlogid; logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = (off_t) 0; + logOff = 0; logFile = XLogFileInit(logId, logSeg); SpinAcquire(ControlFileLockId); ControlFile->logId = logId; @@ -626,14 +636,14 @@ XLogWrite(char *buffer) { logId = LgwrResult.Write.xlogid; logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = (off_t) 0; + logOff = 0; logFile = XLogFileOpen(logId, logSeg, false); } if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize) { logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize; - if (lseek(logFile, logOff, SEEK_SET) < 0) + if (lseek(logFile, (off_t)logOff, SEEK_SET) < 0) elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", logId, logSeg, logOff, errno); } @@ -717,6 +727,10 @@ tryAgain: elog(STOP, "Fsync(logfile %u seg %u) failed: %d", logId, logSeg, errno); + if (lseek(fd, 0, SEEK_SET) < 0) + elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", + log, seg, 0, errno); + return(fd); } @@ -753,6 +767,292 @@ tryAgain: return(fd); } +static XLogRecord* +ReadRecord(XLogRecPtr *RecPtr, char *buffer) +{ + XLogRecord *record; + XLogRecPtr tmpRecPtr = EndRecPtr; + bool nextmode = (RecPtr == NULL); + int emode = (nextmode) ? LOG : STOP; + bool noBlck = false; + + if (nextmode) + { + RecPtr = &tmpRecPtr; + if (nextRecord != NULL) + { + record = nextRecord; + goto got_record; + } + if (tmpRecPtr.xrecoff % BLCKSZ != 0) + tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ); + if (tmpRecPtr.xrecoff >= XLogFileSize) + { + (tmpRecPtr.xlogid)++; + tmpRecPtr.xrecoff = 0; + } + tmpRecPtr.xrecoff += SizeOfXLogPHD; + } + else if (!XRecOffIsValid(RecPtr->xrecoff)) + elog(STOP, "ReadRecord: invalid record offset in (%u, %u)", + RecPtr->xlogid, RecPtr->xrecoff); + + if (readFile >= 0 && (RecPtr->xlogid != readId || + RecPtr->xrecoff / XLogSegSize != readSeg)) + { + close(readFile); + readFile = -1; + } + readId = RecPtr->xlogid; + readSeg = RecPtr->xrecoff / XLogSegSize; + if (readFile < 0) + { + noBlck = true; + readFile = XLogFileOpen(readId, readSeg, nextmode); + if (readFile < 0) + goto next_record_is_invalid; + } + + if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ) + { + readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ; + if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) + { + elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", + ((XLogPageHeader)readBuf)->xlp_magic, + readId, readSeg, readOff); + goto next_record_is_invalid; + } + } + if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && + RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD) + { + elog(emode, "ReadRecord: subrecord is requested by (%u, %u)", + RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ); + +got_record:; + if (record->xl_len == 0 || record->xl_len > + (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord)) + { + elog(emode, "ReadRecord: invalid record len %u in (%u, %u)", + record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + if (record->xl_rmid > RM_MAX_ID) + { + elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)", + record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + nextRecord = NULL; + if (record->xl_info & XLR_TO_BE_CONTINUED) + { + XLogSubRecord *subrecord; + uint32 len = record->xl_len; + + if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ) + { + elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", + record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + memcpy(buffer, record, record->xl_len + SizeOfXLogRecord); + record = (XLogRecord*) buffer; + buffer += record->xl_len + SizeOfXLogRecord; + for ( ; ; ) + { + readOff++; + if (readOff == XLogSegSize / BLCKSZ) + { + readSeg++; + if (readSeg == XLogLastSeg) + { + readSeg = 0; + readId++; + } + close(readFile); + readOff = 0; + readFile = XLogFileOpen(readId, readSeg, nextmode); + if (readFile < 0) + goto next_record_is_invalid; + } + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) + { + elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", + ((XLogPageHeader)readBuf)->xlp_magic, + readId, readSeg, readOff); + goto next_record_is_invalid; + } + if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD)) + { + elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u", + readId, readSeg, readOff); + goto next_record_is_invalid; + } + subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD); + if (subrecord->xl_len == 0 || subrecord->xl_len > + (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord)) + { + elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u", + subrecord->xl_len, readId, readSeg, readOff); + goto next_record_is_invalid; + } + len += subrecord->xl_len; + if (len > MAXLOGRECSZ) + { + elog(emode, "ReadRecord: too long record len %u in (%u, %u)", + len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len); + buffer += subrecord->xl_len; + if (subrecord->xl_info & XLR_TO_BE_CONTINUED) + { + if (subrecord->xl_len + + SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) + { + elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", + subrecord->xl_len, readId, readSeg, readOff); + goto next_record_is_invalid; + } + continue; + } + break; + } + if (BLCKSZ - SizeOfXLogRecord >= + subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord) + { + nextRecord = (XLogRecord*) + ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord); + } + EndRecPtr.xlogid = readId; + EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + + SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len; + ReadRecPtr = *RecPtr; + return(record); + } + if (BLCKSZ - SizeOfXLogRecord >= + record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) + { + nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord); + } + EndRecPtr.xlogid = RecPtr->xlogid; + EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord; + ReadRecPtr = *RecPtr; + + return(record); + +next_record_is_invalid:; + close(readFile); + readFile = -1; + nextRecord = NULL; + memset(buffer, 0, SizeOfXLogRecord); + record = (XLogRecord*) buffer; + /* + * If we assumed that next record began on the same page where + * previous one ended - zero end of page. + */ + if (XLByteEQ(tmpRecPtr, EndRecPtr)) + { + Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord); + readId = EndRecPtr.xlogid; + readSeg = EndRecPtr.xrecoff / XLogSegSize; + readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ; + elog(LOG, "Formating logfile %u seg %u block %u at offset %u", + readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ); + readFile = XLogFileOpen(readId, readSeg, false); + if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ); + if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + readOff++; + } + else + { + Assert (EndRecPtr.xrecoff % BLCKSZ == 0 || + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord); + readId = tmpRecPtr.xlogid; + readSeg = tmpRecPtr.xrecoff / XLogSegSize; + readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ; + Assert(readOff > 0); + } + if (readOff > 0) + { + if (!XLByteEQ(tmpRecPtr, EndRecPtr)) + elog(LOG, "Formating logfile %u seg %u block %u at offset 0", + readId, readSeg, readOff); + readOff *= BLCKSZ; + memset(readBuf, 0, BLCKSZ); + readFile = XLogFileOpen(readId, readSeg, false); + if (lseek(readFile, (off_t)readOff, SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + while (readOff < XLogSegSize) + { + if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + readOff += BLCKSZ; + } + } + if (readFile >= 0) + { + if (fsync(readFile) < 0) + elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d", + readId, readSeg, errno); + close(readFile); + readFile = -1; + } + + readId = EndRecPtr.xlogid; + readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1; + elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1); + if (ControlFile->logId != readId || ControlFile->logSeg != readSeg) + { + elog(LOG, "Set logId/logSeg in control file"); + ControlFile->logId = readId; + ControlFile->logSeg = readSeg; + ControlFile->time = time(NULL); + UpdateControlFile(); + } + if (readSeg == XLogLastSeg) + { + readSeg = 0; + readId++; + } + { + char path[MAXPGPATH+1]; + + XLogFileName(path, readId, readSeg); + unlink(path); + } + + return(record); +} + void UpdateControlFile() { @@ -792,6 +1092,23 @@ XLOGShmemSize() sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ); } +void +XLOGShmemInit(void) +{ + bool found; + + if (XLOGbuffers < MinXLOGbuffers) + XLOGbuffers = MinXLOGbuffers; + + ControlFile = (ControlFileData*) + ShmemInitStruct("Control File", BLCKSZ, &found); + Assert(!found); + XLogCtl = (XLogCtlData*) + ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + + sizeof(XLogRecPtr) * XLOGbuffers, &found); + Assert(!found); +} + /* * This func must be called ONCE on system install */ @@ -806,7 +1123,8 @@ BootStrapXLOG() fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR); if (fd < 0) - elog(STOP, "BootStrapXLOG failed to create control file: %d", errno); + elog(STOP, "BootStrapXLOG failed to create control file (%s): %d", + ControlFilePath, errno); logFile = XLogFileInit(0, 0); @@ -857,35 +1175,35 @@ BootStrapXLOG() } +static char* +str_time(time_t tnow) +{ + char *result = ctime(&tnow); + char *p = strchr(result, '\n'); + + if (p != NULL) + *p = 0; + + return(result); +} + /* * This func must be called ONCE on system startup */ void StartupXLOG() { - XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlInsert *Insert; CheckPoint checkPoint; XLogRecPtr RecPtr, LastRec; XLogRecord *record; char buffer[MAXLOGRECSZ+SizeOfXLogRecord]; int fd; - bool found; - bool recovery = false; + int recovery = 0; bool sie_saved = false; - elog(LOG, "Starting up XLOG manager..."); - - if (XLOGbuffers < MinXLOGbuffers) - XLOGbuffers = MinXLOGbuffers; - - ControlFile = (ControlFileData*) - ShmemInitStruct("Control File", BLCKSZ, &found); - Assert(!found); - XLogCtl = (XLogCtlData*) - ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + - sizeof(XLogRecPtr) * XLOGbuffers, &found); - Assert(!found); + elog(LOG, "Data Base System is starting up at %s", str_time(time(NULL))); XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData)); XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers); @@ -899,6 +1217,9 @@ StartupXLOG() XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); XLogCtl->Write.LgwrResult = LgwrResult; XLogCtl->Write.curridx = 0; + S_INIT_LOCK(&(XLogCtl->insert_lck)); + S_INIT_LOCK(&(XLogCtl->info_lck)); + S_INIT_LOCK(&(XLogCtl->lgwr_lck)); /* * Open/read Control file @@ -925,26 +1246,25 @@ tryAgain: ControlFile->time <= 0 || ControlFile->state < DB_SHUTDOWNED || ControlFile->state > DB_IN_PRODUCTION || - ControlFile->checkPoint.xlogid == 0 || - ControlFile->checkPoint.xrecoff == 0) + !XRecOffIsValid(ControlFile->checkPoint.xrecoff)) elog(STOP, "Control file context is broken"); if (ControlFile->state == DB_SHUTDOWNED) - elog(LOG, "Data Base System was properly shutdowned at %s", - ctime(&(ControlFile->time))); + elog(LOG, "Data Base System was shutdowned at %s", + str_time(ControlFile->time)); else if (ControlFile->state == DB_SHUTDOWNING) - elog(LOG, "Data Base System was interrupted while shutting down at %s", - ctime(&(ControlFile->time))); + elog(LOG, "Data Base System was interrupted when shutting down at %s", + str_time(ControlFile->time)); else if (ControlFile->state == DB_IN_RECOVERY) { elog(LOG, "Data Base System was interrupted being in recovery at %s\n" - "This propably means that some data blocks are corrupted\n" - "And you will have to use last backup for recovery", - ctime(&(ControlFile->time))); + "\tThis propably means that some data blocks are corrupted\n" + "\tAnd you will have to use last backup for recovery", + str_time(ControlFile->time)); } else if (ControlFile->state == DB_IN_PRODUCTION) elog(LOG, "Data Base System was interrupted being in production at %s", - ctime(&(ControlFile->time))); + str_time(ControlFile->time)); LastRec = RecPtr = ControlFile->checkPoint; if (!XRecOffIsValid(RecPtr.xrecoff)) @@ -961,14 +1281,20 @@ tryAgain: elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)", checkPoint.redo.xlogid, checkPoint.redo.xrecoff, checkPoint.undo.xlogid, checkPoint.undo.xrecoff); - elog(LOG, "NextTransactionId: %u; NextOid: %u)", + elog(LOG, "NextTransactionId: %u; NextOid: %u", checkPoint.nextXid, checkPoint.nextOid); if (checkPoint.nextXid < FirstTransactionId || checkPoint.nextOid < BootstrapObjectIdData) +#ifdef XLOG + elog(STOP, "Invalid NextTransactionId/NextOid"); +#else elog(LOG, "Invalid NextTransactionId/NextOid"); +#endif +#ifdef XLOG ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; +#endif if (XLByteLT(RecPtr, checkPoint.redo)) elog(STOP, "Invalid redo in checkPoint record"); @@ -981,15 +1307,15 @@ tryAgain: { if (ControlFile->state == DB_SHUTDOWNED) elog(STOP, "Invalid Redo/Undo record in Shutdowned state"); - recovery = true; + recovery = 2; } else if (ControlFile->state != DB_SHUTDOWNED) - recovery = true; + recovery = 2; - if (recovery) + if (recovery > 0) { elog(LOG, "The DataBase system was not properly shutdowned\n" - "Automatic recovery is in progress..."); + "\tAutomatic recovery is in progress..."); ControlFile->state = DB_IN_RECOVERY; ControlFile->time = time(NULL); UpdateControlFile(); @@ -1010,8 +1336,10 @@ tryAgain: ReadRecPtr.xlogid, ReadRecPtr.xrecoff); do { +#ifdef XLOG if (record->xl_xid >= ShmemVariableCache->nextXid) ShmemVariableCache->nextXid = record->xl_xid + 1; +#endif RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); record = ReadRecord(NULL, buffer); } while (record->xl_len != 0); @@ -1020,7 +1348,11 @@ tryAgain: LastRec = ReadRecPtr; } else + { elog(LOG, "Redo is not required"); + recovery--; + } + /* UNDO */ RecPtr = ReadRecPtr; if (XLByteLT(checkPoint.undo, RecPtr)) @@ -1039,7 +1371,10 @@ tryAgain: ReadRecPtr.xlogid, ReadRecPtr.xrecoff); } else + { elog(LOG, "Undo is not required"); + recovery--; + } } /* Init xlog buffer cache */ @@ -1051,10 +1386,13 @@ tryAgain: XLogCtl->xlblocks[0].xlogid = logId; XLogCtl->xlblocks[0].xrecoff = ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; + Insert = &XLogCtl->Insert; + memcpy((char*)(Insert->currpage), readBuf, BLCKSZ); Insert->currpos = ((char*) Insert->currpage) + (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); + Insert->PrevRecord = ControlFile->checkPoint; - if (recovery) + if (recovery > 0) { int i; @@ -1071,290 +1409,23 @@ tryAgain: ControlFile->time = time(NULL); UpdateControlFile(); + elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL))); + return; } -static XLogRecord* -ReadRecord(XLogRecPtr *RecPtr, char *buffer) +/* + * This func must be called ONCE on system shutdown + */ +void +ShutdownXLOG() { - XLogRecord *record; - XLogRecPtr tmpRecPtr = EndRecPtr; - bool nextmode = (RecPtr == NULL); - int emode = (nextmode) ? LOG : STOP; - - if (nextmode) - { - RecPtr = &tmpRecPtr; - if (nextRecord != NULL) - { - record = nextRecord; - goto got_record; - } - if (tmpRecPtr.xrecoff % BLCKSZ != 0) - tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ); - if (tmpRecPtr.xrecoff >= XLogFileSize) - { - (tmpRecPtr.xlogid)++; - tmpRecPtr.xrecoff = 0; - } - tmpRecPtr.xrecoff += SizeOfXLogPHD; - } - else if (!XRecOffIsValid(RecPtr->xrecoff)) - elog(STOP, "ReadRecord: invalid record offset in (%u, %u)", - RecPtr->xlogid, RecPtr->xrecoff); - - if (readFile >= 0 && (RecPtr->xlogid != readId || - RecPtr->xrecoff / XLogSegSize != readSeg)) - { - close(readFile); - readFile = -1; - } - readId = RecPtr->xlogid; - readSeg = RecPtr->xrecoff / XLogSegSize; - if (readFile < 0) - { - readOff = (off_t) -1; - readFile = XLogFileOpen(readId, readSeg, nextmode); - if (readFile < 0) - goto next_record_is_invalid; - } - - if (readOff < 0 || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ) - { - readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ; - if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) - { - elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", - ((XLogPageHeader)readBuf)->xlp_magic, - readId, readSeg, readOff); - goto next_record_is_invalid; - } - } - if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && - RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD) - { - elog(emode, "ReadRecord: subrecord is requested by (%u, %u)", - RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ); - -got_record:; - if (record->xl_len == 0 || record->xl_len > - (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord)) - { - elog(emode, "ReadRecord: invalid record len %u in (%u, %u)", - record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - if (record->xl_rmid > RM_MAX_ID) - { - elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)", - record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - nextRecord = NULL; - if (record->xl_info & XLR_TO_BE_CONTINUED) - { - XLogSubRecord *subrecord; - uint32 len = record->xl_len; - - if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ) - { - elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", - record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - memcpy(buffer, record, record->xl_len + SizeOfXLogRecord); - record = (XLogRecord*) buffer; - buffer += record->xl_len + SizeOfXLogRecord; - for ( ; ; ) - { - readOff++; - if (readOff == XLogSegSize / BLCKSZ) - { - readSeg++; - if (readSeg == XLogLastSeg) - { - readSeg = 0; - readId++; - } - close(readFile); - readOff = (off_t) 0; - readFile = XLogFileOpen(readId, readSeg, nextmode); - if (readFile < 0) - goto next_record_is_invalid; - } - if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) - { - elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", - ((XLogPageHeader)readBuf)->xlp_magic, - readId, readSeg, readOff); - goto next_record_is_invalid; - } - if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD)) - { - elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u", - readId, readSeg, readOff); - goto next_record_is_invalid; - } - subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD); - if (subrecord->xl_len == 0 || subrecord->xl_len > - (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord)) - { - elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u", - subrecord->xl_len, readId, readSeg, readOff); - goto next_record_is_invalid; - } - len += subrecord->xl_len; - if (len > MAXLOGRECSZ) - { - elog(emode, "ReadRecord: too long record len %u in (%u, %u)", - len, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len); - buffer += subrecord->xl_len; - if (subrecord->xl_info & XLR_TO_BE_CONTINUED) - { - if (subrecord->xl_len + - SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) - { - elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", - subrecord->xl_len, readId, readSeg, readOff); - goto next_record_is_invalid; - } - continue; - } - break; - } - if (BLCKSZ - SizeOfXLogRecord >= - subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord) - { - nextRecord = (XLogRecord*) - ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord); - } - EndRecPtr.xlogid = readId; - EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + - SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len; - ReadRecPtr = *RecPtr; - return(record); - } - if (BLCKSZ - SizeOfXLogRecord >= - record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) - { - nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord); - } - EndRecPtr.xlogid = RecPtr->xlogid; - EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord; - ReadRecPtr = *RecPtr; - - return(record); - -next_record_is_invalid:; - close(readFile); - readFile = -1; - nextRecord = NULL; - memset(buffer, 0, SizeOfXLogRecord); - record = (XLogRecord*) buffer; - /* - * If we assumed that next record began on the same page where - * previous one ended - zero end of page. - */ - if (XLByteEQ(tmpRecPtr, EndRecPtr)) - { - Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord); - readId = EndRecPtr.xlogid; - readSeg = EndRecPtr.xrecoff / XLogSegSize; - readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ; - elog(LOG, "Formating logfile %u seg %u block %u at offset %u", - readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ); - readFile = XLogFileOpen(readId, readSeg, false); - if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ); - if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - readOff++; - } - else - { - Assert (EndRecPtr.xrecoff % BLCKSZ == 0 || - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord); - readId = tmpRecPtr.xlogid; - readSeg = tmpRecPtr.xrecoff / XLogSegSize; - readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ; - } - if (readOff > 0) - { - elog(LOG, "Formating logfile %u seg %u block %u at offset 0", - readId, readSeg, readOff); - readOff *= BLCKSZ; - memset(readBuf, 0, BLCKSZ); - readFile = XLogFileOpen(readId, readSeg, false); - if (lseek(readFile, readOff, SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - while (readOff < XLogSegSize) - { - if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", - readId, readSeg, readOff, errno); - readOff += BLCKSZ; - } - } - if (readFile >= 0) - { - if (fsync(readFile) < 0) - elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d", - readId, readSeg, errno); - close(readFile); - readFile = -1; - } - readId = EndRecPtr.xlogid; - readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1; - elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1); - if (ControlFile->logId != readId || ControlFile->logSeg != readSeg) - { - elog(LOG, "Set logId/logSeg in control file"); - ControlFile->logId = readId; - ControlFile->logSeg = readSeg; - ControlFile->time = time(NULL); - UpdateControlFile(); - } - if (readSeg == XLogLastSeg) - { - readSeg = 0; - readId++; - } - { - char path[MAXPGPATH+1]; + elog(LOG, "Data Base System is shutting down at %s", str_time(time(NULL))); - XLogFileName(path, readId, readSeg); - unlink(path); - } + CreateCheckPoint(true); - return(record); + elog(LOG, "Data Base System is shutdowned at %s", str_time(time(NULL))); } void @@ -1375,7 +1446,7 @@ CreateCheckPoint(bool shutdown) } /* Get REDO record ptr */ - while (!TAS(&(XLogCtl->insert_lck))) + while (TAS(&(XLogCtl->insert_lck))) { struct timeval delay = {0, 5000}; @@ -1410,6 +1481,7 @@ CreateCheckPoint(bool shutdown) FlushBufferPool(); /* Get UNDO record ptr */ + checkPoint.undo.xrecoff = 0; if (shutdown && checkPoint.undo.xrecoff != 0) elog(STOP, "Active transaction while data base is shutting down"); |