aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/Makefile4
-rw-r--r--src/backend/access/transam/rmgr.c4
-rw-r--r--src/backend/access/transam/xlog.c1434
-rw-r--r--src/backend/postmaster/postmaster.c22
-rw-r--r--src/backend/storage/file/fd.c16
-rw-r--r--src/backend/tcop/utility.c6
-rw-r--r--src/include/access/rmgr.h34
-rw-r--r--src/include/access/xlog.h70
-rw-r--r--src/include/storage/proc.h5
-rw-r--r--src/include/utils/elog.h4
10 files changed, 1583 insertions, 16 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 69d507b3d96..3c941a323dc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -4,7 +4,7 @@
# Makefile for access/transam
#
# IDENTIFICATION
-# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.6 1998/04/06 00:21:52 momjian Exp $
+# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.7 1999/09/27 15:47:37 vadim Exp $
#
#-------------------------------------------------------------------------
@@ -13,7 +13,7 @@ include ../../../Makefile.global
CFLAGS += -I../..
-OBJS = transam.o transsup.o varsup.o xact.o xid.o
+OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o rmgr.o
all: SUBSYS.o
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
new file mode 100644
index 00000000000..36d50f08c0c
--- /dev/null
+++ b/src/backend/access/transam/rmgr.c
@@ -0,0 +1,4 @@
+#include "postgres.h"
+#include "access/rmgr.h"
+
+RmgrData *RmgrTable = NULL;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
new file mode 100644
index 00000000000..f1b80d40270
--- /dev/null
+++ b/src/backend/access/transam/xlog.c
@@ -0,0 +1,1434 @@
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/stat.h>
+
+#include "postgres.h"
+#include "access/xlog.h"
+#include "access/xact.h"
+#include "storage/sinval.h"
+#include "storage/proc.h"
+#include "storage/spin.h"
+#include "storage/s_lock.h"
+
+void UpdateControlFile(void);
+int XLOGShmemSize(void);
+void BootStrapXLOG(void);
+void StartupXLOG(void);
+void CreateCheckPoint(bool shutdown);
+
+char *XLogDir = NULL;
+char *ControlFilePath = NULL;
+uint32 XLOGbuffers = 0;
+XLogRecPtr MyLastRecPtr = {0, 0};
+bool StopIfError = false;
+
+SPINLOCK ControlFileLockId;
+SPINLOCK XidGenLockId;
+
+extern bool ReleaseDataFile(void);
+
+extern VariableCache ShmemVariableCache;
+
+#define MinXLOGbuffers 4
+
+typedef struct XLgwrRqst
+{
+ XLogRecPtr Write; /* byte (1-based) to write out */
+ XLogRecPtr Flush; /* byte (1-based) to flush */
+} XLgwrRqst;
+
+typedef struct XLgwrResult
+{
+ XLogRecPtr Write; /* bytes written out */
+ XLogRecPtr Flush; /* bytes flushed */
+} XLgwrResult;
+
+typedef struct XLogCtlInsert
+{
+ XLgwrResult LgwrResult;
+ XLogRecPtr PrevRecord;
+ uint16 curridx; /* current block index in cache */
+ XLogPageHeader currpage;
+ char *currpos;
+} XLogCtlInsert;
+
+typedef struct XLogCtlWrite
+{
+ XLgwrResult LgwrResult;
+ uint16 curridx; /* index of next block to write */
+} XLogCtlWrite;
+
+typedef struct XLogCtlData
+{
+ XLogCtlInsert Insert;
+ XLgwrRqst LgwrRqst;
+ XLgwrResult LgwrResult;
+ XLogCtlWrite Write;
+ char *pages;
+ XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
+ uint32 XLogCacheByte;
+ uint32 XLogCacheBlck;
+#ifdef HAS_TEST_AND_SET
+ slock_t insert_lck;
+ slock_t info_lck;
+ slock_t lgwr_lck;
+#endif
+} XLogCtlData;
+
+static XLogCtlData *XLogCtl = NULL;
+
+typedef enum DBState
+{
+ DB_SHUTDOWNED = 1,
+ DB_SHUTDOWNING,
+ DB_IN_RECOVERY,
+ DB_IN_PRODUCTION
+} DBState;
+
+typedef struct ControlFileData
+{
+ uint32 logId; /* current log file id */
+ uint32 logSeg; /* current log file segment (1-based) */
+ XLogRecPtr checkPoint; /* last check point record ptr */
+ time_t time; /* time stamp of last modification */
+ DBState state; /* */
+ /* MORE DATA FOLLOWS AT THE END OF THIS STRUCTURE
+ * - locations of data dirs
+ */
+} ControlFileData;
+
+static ControlFileData *ControlFile = NULL;
+
+typedef struct CheckPoint
+{
+ XLogRecPtr redo; /* next RecPtr available when we */
+ /* began to create CheckPoint */
+ /* (i.e. REDO start point) */
+ XLogRecPtr undo; /* first record of oldest in-progress */
+ /* transaction when we started */
+ /* (i.e. UNDO end point) */
+ TransactionId nextXid;
+ Oid nextOid;
+} CheckPoint;
+
+/*
+ * We break each log file in 64Mb segments
+ */
+#define XLogSegSize (64*1024*1024)
+#define XLogLastSeg (0xffffffff / XLogSegSize)
+#define XLogFileSize (XLogLastSeg * XLogSegSize)
+
+#define XLogFileName(path, log, seg) \
+ sprintf(path, "%.*s%c%08X%08X", \
+ MAXPGPATH, XLogDir, SEP_CHAR, log, seg)
+
+#define PrevBufIdx(curridx) \
+ ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1))
+
+#define NextBufIdx(curridx) \
+ ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
+
+#define XLByteLT(left, right) \
+ (right.xlogid > left.xlogid || \
+ (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
+
+#define XLByteLE(left, right) \
+ (right.xlogid > left.xlogid || \
+ (right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff))
+
+#define XLByteEQ(left, right) \
+ (right.xlogid == left.xlogid && right.xrecoff == left.xrecoff)
+
+#define InitXLBuffer(curridx) (\
+ XLogCtl->xlblocks[curridx].xrecoff = \
+ (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
+ BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \
+ XLogCtl->xlblocks[curridx].xlogid = \
+ (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
+ (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \
+ XLogCtl->xlblocks[Insert->curridx].xlogid, \
+ Insert->curridx = curridx, \
+ Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \
+ Insert->currpos = \
+ ((char*) Insert->currpage) + SizeOfXLogPHD, \
+ Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \
+ Insert->currpage->xlp_info = 0 \
+ )
+
+#define XRecOffIsValid(xrecoff) \
+ (xrecoff % BLCKSZ >= SizeOfXLogPHD && \
+ (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
+
+static void GetFreeXLBuffer(void);
+static void XLogWrite(char *buffer);
+static int XLogFileInit(uint32 log, uint32 seg);
+static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
+
+static XLgwrResult LgwrResult = {{0, 0}, {0, 0}};
+static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
+
+static int logFile = -1;
+static uint32 logId = 0;
+static uint32 logSeg = 0;
+static off_t logOff = 0;
+
+static XLogRecPtr ReadRecPtr;
+static XLogRecPtr EndRecPtr;
+static int readFile = -1;
+static uint32 readId = 0;
+static uint32 readSeg = 0;
+static off_t readOff = (off_t) -1;
+static char readBuf[BLCKSZ];
+static XLogRecord *nextRecord = NULL;
+
+XLogRecPtr
+XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogRecord *record;
+ XLogSubRecord *subrecord;
+ XLogRecPtr RecPtr;
+ uint32 len = hdrlen + buflen,
+ freespace,
+ wlen;
+ uint16 curridx;
+ bool updrqst = false;
+
+ if (len == 0 || len > MAXLOGRECSZ)
+ elog(STOP, "XLogInsert: invalid record len %u", len);
+
+ /* obtain xlog insert lock */
+ if (TAS(&(XLogCtl->insert_lck))) /* busy */
+ {
+ bool do_lgwr = true;
+ unsigned i = 0;
+
+ for ( ; ; )
+ {
+ /* try to read LgwrResult while waiting for insert lock */
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ LgwrRqst = XLogCtl->LgwrRqst;
+ LgwrResult = XLogCtl->LgwrResult;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ /*
+ * If cache is half filled then try to acquire lgwr lock
+ * and do LGWR work, but only once.
+ */
+ if (do_lgwr &&
+ (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid ||
+ (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >=
+ XLogCtl->XLogCacheByte / 2)))
+ {
+ if (!TAS(&(XLogCtl->lgwr_lck)))
+ {
+ LgwrResult = XLogCtl->Write.LgwrResult;
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ LgwrRqst = XLogCtl->LgwrRqst;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ }
+ if (XLByteLT(LgwrResult.Write, LgwrRqst.Write))
+ {
+ XLogWrite(NULL);
+ do_lgwr = false;
+ }
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ }
+ }
+ }
+ s_lock_sleep(i++);
+ if (!TAS(&(XLogCtl->insert_lck)))
+ break;
+ }
+ }
+
+ freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
+ if (freespace < SizeOfXLogRecord)
+ {
+ curridx = NextBufIdx(Insert->curridx);
+ if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
+ InitXLBuffer(curridx);
+ else
+ GetFreeXLBuffer();
+ freespace = BLCKSZ - SizeOfXLogPHD;
+ }
+ else
+ curridx = Insert->curridx;
+
+ freespace -= SizeOfXLogRecord;
+ record = (XLogRecord*) Insert->currpos;
+ record->xl_prev = Insert->PrevRecord;
+ record->xl_xact_prev = MyLastRecPtr;
+ record->xl_xid = GetCurrentTransactionId();
+ record->xl_len = (len > freespace) ? freespace : len;
+ record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0;
+ record->xl_rmid = rmid;
+ RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
+ RecPtr.xrecoff =
+ XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
+ Insert->currpos - ((char*) Insert->currpage);
+ if (MyLastRecPtr.xrecoff == 0)
+ {
+ SpinAcquire(SInvalLock);
+ MyProc->logRec = RecPtr;
+ SpinRelease(SInvalLock);
+ }
+ MyLastRecPtr = RecPtr;
+ RecPtr.xrecoff += record->xl_len;
+ Insert->currpos += SizeOfXLogRecord;
+ if (freespace > 0)
+ {
+ wlen = (hdrlen > freespace) ? freespace : hdrlen;
+ memcpy(Insert->currpos, hdr, wlen);
+ freespace -= wlen;
+ hdrlen -= wlen;
+ hdr += wlen;
+ Insert->currpos += wlen;
+ if (buflen > 0 && freespace > 0)
+ {
+ wlen = (buflen > freespace) ? freespace : buflen;
+ memcpy(Insert->currpos, buf, wlen);
+ freespace -= wlen;
+ buflen -= wlen;
+ buf += wlen;
+ Insert->currpos += wlen;
+ }
+ Insert->currpos = ((char*)Insert->currpage) +
+ DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
+ len = hdrlen + buflen;
+ }
+
+ if (len != 0)
+ {
+nbuf:
+ curridx = NextBufIdx(curridx);
+ if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
+ {
+ InitXLBuffer(curridx);
+ updrqst = true;
+ }
+ else
+ {
+ GetFreeXLBuffer();
+ updrqst = false;
+ }
+ freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
+ Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
+ subrecord = (XLogSubRecord*) Insert->currpos;
+ Insert->currpos += SizeOfXLogSubRecord;
+ if (hdrlen > freespace)
+ {
+ subrecord->xl_len = freespace;
+ subrecord->xl_info = XLR_TO_BE_CONTINUED;
+ memcpy(Insert->currpos, hdr, freespace);
+ hdrlen -= freespace;
+ hdr += freespace;
+ goto nbuf;
+ }
+ else if (hdrlen > 0)
+ {
+ subrecord->xl_len = hdrlen;
+ memcpy(Insert->currpos, hdr, hdrlen);
+ Insert->currpos += hdrlen;
+ freespace -= hdrlen;
+ hdrlen = 0;
+ }
+ else
+ subrecord->xl_len = 0;
+ if (buflen > freespace)
+ {
+ subrecord->xl_len += freespace;
+ subrecord->xl_info = XLR_TO_BE_CONTINUED;
+ memcpy(Insert->currpos, buf, freespace);
+ buflen -= freespace;
+ buf += freespace;
+ goto nbuf;
+ }
+ else if (buflen > 0)
+ {
+ subrecord->xl_len += buflen;
+ memcpy(Insert->currpos, buf, buflen);
+ Insert->currpos += buflen;
+ }
+ subrecord->xl_info = 0;
+ RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
+ RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff -
+ BLCKSZ + SizeOfXLogPHD + subrecord->xl_len;
+ Insert->currpos = ((char*)Insert->currpage) +
+ DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
+ }
+ freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
+ /*
+ * All done! Update global LgwrRqst if some block was filled up.
+ */
+ if (freespace < SizeOfXLogRecord)
+ updrqst = true; /* curridx is filled and available for writing out */
+ else
+ curridx = PrevBufIdx(curridx);
+ LgwrRqst.Write = XLogCtl->xlblocks[curridx];
+
+ S_UNLOCK(&(XLogCtl->insert_lck));
+
+ if (updrqst)
+ {
+ unsigned i = 0;
+
+ for ( ; ; )
+ {
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
+ XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ break;
+ }
+ s_lock_sleep(i++);
+ }
+ }
+
+ return (RecPtr);
+}
+
+void
+XLogFlush(XLogRecPtr record)
+{
+ XLogRecPtr WriteRqst;
+ char buffer[BLCKSZ];
+ char *usebuf = NULL;
+ unsigned i = 0;
+ bool force_lgwr = false;
+
+ if (XLByteLE(record, LgwrResult.Flush))
+ return;
+ WriteRqst = LgwrRqst.Write;
+ for ( ; ; )
+ {
+ /* try to read LgwrResult */
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ LgwrResult = XLogCtl->LgwrResult;
+ if (XLByteLE(record, LgwrResult.Flush))
+ {
+ S_UNLOCK(&(XLogCtl->info_lck));
+ return;
+ }
+ if (XLByteLT(XLogCtl->LgwrRqst.Flush, record))
+ XLogCtl->LgwrRqst.Flush = record;
+ if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write))
+ {
+ WriteRqst = XLogCtl->LgwrRqst.Write;
+ usebuf = NULL;
+ }
+ S_UNLOCK(&(XLogCtl->info_lck));
+ }
+ /* if something was added to log cache then try to flush this too */
+ if (!TAS(&(XLogCtl->insert_lck)))
+ {
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint32 freespace =
+ ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
+
+ if (freespace < SizeOfXLogRecord) /* buffer is full */
+ {
+ usebuf = NULL;
+ LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx];
+ }
+ else
+ {
+ usebuf = buffer;
+ memcpy(usebuf, Insert->currpage, BLCKSZ - freespace);
+ memset(usebuf + BLCKSZ - freespace, 0, freespace);
+ WriteRqst = XLogCtl->xlblocks[Insert->curridx];
+ WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ +
+ Insert->currpos - ((char*) Insert->currpage);
+ }
+ S_UNLOCK(&(XLogCtl->insert_lck));
+ force_lgwr = true;
+ }
+ if (force_lgwr || WriteRqst.xlogid > record.xlogid ||
+ (WriteRqst.xlogid == record.xlogid &&
+ WriteRqst.xrecoff >= record.xrecoff + BLCKSZ))
+ {
+ if (!TAS(&(XLogCtl->lgwr_lck)))
+ {
+ LgwrResult = XLogCtl->Write.LgwrResult;
+ if (XLByteLE(record, LgwrResult.Flush))
+ {
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ return;
+ }
+ if (XLByteLT(LgwrResult.Write, WriteRqst))
+ {
+ LgwrRqst.Flush = LgwrRqst.Write = WriteRqst;
+ XLogWrite(usebuf);
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ if (XLByteLT(LgwrResult.Flush, record))
+ elog(STOP, "XLogFlush: request is not satisfyed");
+ return;
+ }
+ break;
+ }
+ }
+ s_lock_sleep(i++);
+ }
+
+ if (logFile >= 0 && (LgwrResult.Write.xlogid != logId ||
+ (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg))
+ {
+ if (close(logFile) != 0)
+ elog(STOP, "Close(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ logFile = -1;
+ }
+
+ if (logFile < 0)
+ {
+ logId = LgwrResult.Write.xlogid;
+ logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
+ logOff = (off_t) 0;
+ logFile = XLogFileOpen(logId, logSeg, false);
+ }
+
+ if (fsync(logFile) != 0)
+ elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ LgwrResult.Flush = LgwrResult.Write;
+
+ for (i = 0; ; )
+ {
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ XLogCtl->LgwrResult = LgwrResult;
+ if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
+ XLogCtl->LgwrRqst.Write = LgwrResult.Write;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ break;
+ }
+ s_lock_sleep(i++);
+ }
+ XLogCtl->Write.LgwrResult = LgwrResult;
+
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ return;
+
+}
+
+static void
+GetFreeXLBuffer()
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlWrite *Write = &XLogCtl->Write;
+ uint16 curridx = NextBufIdx(Insert->curridx);
+
+ LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
+ for ( ; ; )
+ {
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ LgwrResult = XLogCtl->LgwrResult;
+ XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
+ {
+ Insert->LgwrResult = LgwrResult;
+ InitXLBuffer(curridx);
+ return;
+ }
+ }
+ /*
+ * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock
+ * and write full blocks.
+ */
+ if (!TAS(&(XLogCtl->lgwr_lck)))
+ {
+ LgwrResult = Write->LgwrResult;
+ if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
+ {
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ Insert->LgwrResult = LgwrResult;
+ InitXLBuffer(curridx);
+ return;
+ }
+ /*
+ * Have to write buffers while holding insert lock -
+ * not good...
+ */
+ XLogWrite(NULL);
+ S_UNLOCK(&(XLogCtl->lgwr_lck));
+ Insert->LgwrResult = LgwrResult;
+ InitXLBuffer(curridx);
+ return;
+ }
+ }
+
+ return;
+}
+
+static void
+XLogWrite(char *buffer)
+{
+ XLogCtlWrite *Write = &XLogCtl->Write;
+ char *from;
+ uint32 wcnt = 0;
+ int i = 0;
+
+ for ( ; XLByteLT(LgwrResult.Write, LgwrRqst.Write); )
+ {
+ LgwrResult.Write = XLogCtl->xlblocks[Write->curridx];
+ if (LgwrResult.Write.xlogid != logId ||
+ (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)
+ {
+ if (wcnt > 0)
+ {
+ if (fsync(logFile) != 0)
+ elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ if (LgwrResult.Write.xlogid != logId)
+ LgwrResult.Flush.xrecoff = XLogFileSize;
+ else
+ LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ;
+ LgwrResult.Flush.xlogid = logId;
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ XLogCtl->LgwrResult.Flush = LgwrResult.Flush;
+ XLogCtl->LgwrResult.Write = LgwrResult.Flush;
+ if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush))
+ XLogCtl->LgwrRqst.Write = LgwrResult.Flush;
+ if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush))
+ XLogCtl->LgwrRqst.Flush = LgwrResult.Flush;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ }
+ }
+ if (logFile >= 0)
+ {
+ if (close(logFile) != 0)
+ elog(STOP, "Close(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ logFile = -1;
+ }
+ logId = LgwrResult.Write.xlogid;
+ logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
+ logOff = (off_t) 0;
+ logFile = XLogFileInit(logId, logSeg);
+ SpinAcquire(ControlFileLockId);
+ ControlFile->logId = logId;
+ ControlFile->logSeg = logSeg + 1;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+ SpinRelease(ControlFileLockId);
+ }
+
+ if (logFile < 0)
+ {
+ logId = LgwrResult.Write.xlogid;
+ logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
+ logOff = (off_t) 0;
+ logFile = XLogFileOpen(logId, logSeg, false);
+ }
+
+ if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
+ {
+ logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+ if (lseek(logFile, logOff, SEEK_SET) < 0)
+ elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d",
+ logId, logSeg, logOff, errno);
+ }
+
+ if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write))
+ from = buffer;
+ else
+ from = XLogCtl->pages + Write->curridx * BLCKSZ;
+
+ if (write(logFile, from, BLCKSZ) != BLCKSZ)
+ elog(STOP, "Write(logfile %u seg %u off %u) failed: %d",
+ logId, logSeg, logOff, errno);
+
+ wcnt++;
+ logOff += BLCKSZ;
+
+ if (from != buffer)
+ Write->curridx = NextBufIdx(Write->curridx);
+ else
+ LgwrResult.Write = LgwrRqst.Write;
+ }
+ if (wcnt == 0)
+ elog(STOP, "XLogWrite: nothing written");
+
+ if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) &&
+ XLByteLE(LgwrRqst.Flush, LgwrResult.Write))
+ {
+ if (fsync(logFile) != 0)
+ elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ LgwrResult.Flush = LgwrResult.Write;
+ }
+
+ for ( ; ; )
+ {
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ XLogCtl->LgwrResult = LgwrResult;
+ if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
+ XLogCtl->LgwrRqst.Write = LgwrResult.Write;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ break;
+ }
+ s_lock_sleep(i++);
+ }
+ Write->LgwrResult = LgwrResult;
+}
+
+static int
+XLogFileInit(uint32 log, uint32 seg)
+{
+ char path[MAXPGPATH+1];
+ int fd;
+
+ XLogFileName(path, log, seg);
+ unlink(path);
+
+tryAgain:
+ fd = open(path, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
+ if (fd < 0 && (errno == EMFILE || errno == ENFILE))
+ {
+ fd = errno;
+ if (!ReleaseDataFile())
+ elog(STOP, "Create(logfile %u seg %u) failed: %d (and no one data file can be closed)",
+ logId, logSeg, fd);
+ goto tryAgain;
+ }
+ if (fd < 0)
+ elog(STOP, "Init(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+
+ if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1))
+ elog(STOP, "Lseek(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+
+ if (write(fd, "", 1) != 1)
+ elog(STOP, "Init(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+
+ if (fsync(fd) != 0)
+ elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+
+ return(fd);
+}
+
+static int
+XLogFileOpen(uint32 log, uint32 seg, bool econt)
+{
+ char path[MAXPGPATH+1];
+ int fd;
+
+ XLogFileName(path, log, seg);
+
+tryAgain:
+ fd = open(path, O_RDWR);
+ if (fd < 0 && (errno == EMFILE || errno == ENFILE))
+ {
+ fd = errno;
+ if (!ReleaseDataFile())
+ elog(STOP, "Open(logfile %u seg %u) failed: %d (and no one data file can be closed)",
+ logId, logSeg, fd);
+ goto tryAgain;
+ }
+ if (fd < 0)
+ {
+ if (econt && errno == ENOENT)
+ {
+ elog(LOG, "Open(logfile %u seg %u) failed: file doesn't exist",
+ logId, logSeg);
+ return (fd);
+ }
+ elog(STOP, "Open(logfile %u seg %u) failed: %d",
+ logId, logSeg, errno);
+ }
+
+ return(fd);
+}
+
+void
+UpdateControlFile()
+{
+ int fd;
+
+tryAgain:
+ fd = open(ControlFilePath, O_RDWR);
+ if (fd < 0 && (errno == EMFILE || errno == ENFILE))
+ {
+ fd = errno;
+ if (!ReleaseDataFile())
+ elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
+ fd);
+ goto tryAgain;
+ }
+ if (fd < 0)
+ elog(STOP, "Open(cntlfile) failed: %d", errno);
+
+ if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
+ elog(STOP, "Write(cntlfile) failed: %d", errno);
+
+ if (fsync(fd) != 0)
+ elog(STOP, "Fsync(cntlfile) failed: %d", errno);
+
+ close(fd);
+
+ return;
+}
+
+int
+XLOGShmemSize()
+{
+ if (XLOGbuffers < MinXLOGbuffers)
+ XLOGbuffers = MinXLOGbuffers;
+
+ return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
+ sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
+}
+
+/*
+ * This func must be called ONCE on system install
+ */
+void
+BootStrapXLOG()
+{
+ int fd;
+ char buffer[BLCKSZ];
+ XLogPageHeader page = (XLogPageHeader)buffer;
+ CheckPoint checkPoint;
+ XLogRecord *record;
+
+ fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
+ if (fd < 0)
+ elog(STOP, "BootStrapXLOG failed to create control file: %d", errno);
+
+ logFile = XLogFileInit(0, 0);
+
+ checkPoint.redo.xlogid = 0;
+ checkPoint.redo.xrecoff = SizeOfXLogPHD;
+ checkPoint.undo = checkPoint.redo;
+ checkPoint.nextXid = FirstTransactionId;
+ checkPoint.nextOid = BootstrapObjectIdData;
+
+ memset(buffer, 0, BLCKSZ);
+ page->xlp_magic = XLOG_PAGE_MAGIC;
+ page->xlp_info = 0;
+ record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
+ record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
+ record->xl_xact_prev = record->xl_prev;
+ record->xl_xid = InvalidTransactionId;
+ record->xl_len = sizeof(checkPoint);
+ record->xl_info = 0;
+ record->xl_rmid = RM_XLOG_ID;
+ memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
+
+ if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
+ elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
+
+ if (fsync(logFile) != 0)
+ elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
+
+ close(logFile);
+ logFile = -1;
+
+ memset(buffer, 0, BLCKSZ);
+ ControlFile = (ControlFileData*) buffer;
+ ControlFile->logId = 0;
+ ControlFile->logSeg = 1;
+ ControlFile->checkPoint = checkPoint.redo;
+ ControlFile->time = time(NULL);
+ ControlFile->state = DB_SHUTDOWNED;
+
+ if (write(fd, buffer, BLCKSZ) != BLCKSZ)
+ elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
+
+ if (fsync(fd) != 0)
+ elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
+
+ close(fd);
+
+ return;
+
+}
+
+/*
+ * This func must be called ONCE on system startup
+ */
+void
+StartupXLOG()
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ CheckPoint checkPoint;
+ XLogRecPtr RecPtr,
+ LastRec;
+ XLogRecord *record;
+ char buffer[MAXLOGRECSZ+SizeOfXLogRecord];
+ int fd;
+ bool found;
+ bool recovery = false;
+ bool sie_saved = false;
+
+ elog(LOG, "Starting up XLOG manager...");
+
+ if (XLOGbuffers < MinXLOGbuffers)
+ XLOGbuffers = MinXLOGbuffers;
+
+ ControlFile = (ControlFileData*)
+ ShmemInitStruct("Control File", BLCKSZ, &found);
+ Assert(!found);
+ XLogCtl = (XLogCtlData*)
+ ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
+ sizeof(XLogRecPtr) * XLOGbuffers, &found);
+ Assert(!found);
+
+ XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
+ XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
+ XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
+ XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
+ memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
+ XLogCtl->LgwrRqst = LgwrRqst;
+ XLogCtl->LgwrResult = LgwrResult;
+ XLogCtl->Insert.LgwrResult = LgwrResult;
+ XLogCtl->Insert.curridx = 0;
+ XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+ XLogCtl->Write.LgwrResult = LgwrResult;
+ XLogCtl->Write.curridx = 0;
+
+ /*
+ * Open/read Control file
+ */
+tryAgain:
+ fd = open(ControlFilePath, O_RDWR);
+ if (fd < 0 && (errno == EMFILE || errno == ENFILE))
+ {
+ fd = errno;
+ if (!ReleaseDataFile())
+ elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
+ fd);
+ goto tryAgain;
+ }
+ if (fd < 0)
+ elog(STOP, "Open(cntlfile) failed: %d", errno);
+
+ if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
+ elog(STOP, "Read(cntlfile) failed: %d", errno);
+
+ close(fd);
+
+ if (ControlFile->logSeg == 0 ||
+ ControlFile->time <= 0 ||
+ ControlFile->state < DB_SHUTDOWNED ||
+ ControlFile->state > DB_IN_PRODUCTION ||
+ ControlFile->checkPoint.xlogid == 0 ||
+ ControlFile->checkPoint.xrecoff == 0)
+ elog(STOP, "Control file context is broken");
+
+ if (ControlFile->state == DB_SHUTDOWNED)
+ elog(LOG, "Data Base System was properly shutdowned at %s",
+ ctime(&(ControlFile->time)));
+ else if (ControlFile->state == DB_SHUTDOWNING)
+ elog(LOG, "Data Base System was interrupted while shutting down at %s",
+ ctime(&(ControlFile->time)));
+ else if (ControlFile->state == DB_IN_RECOVERY)
+ {
+ elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
+ "This propably means that some data blocks are corrupted\n"
+ "And you will have to use last backup for recovery",
+ ctime(&(ControlFile->time)));
+ }
+ else if (ControlFile->state == DB_IN_PRODUCTION)
+ elog(LOG, "Data Base System was interrupted being in production at %s",
+ ctime(&(ControlFile->time)));
+
+ LastRec = RecPtr = ControlFile->checkPoint;
+ if (!XRecOffIsValid(RecPtr.xrecoff))
+ elog(STOP, "Invalid checkPoint in control file");
+ elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
+
+ record = ReadRecord(&RecPtr, buffer);
+ if (record->xl_rmid != RM_XLOG_ID)
+ elog(STOP, "Invalid RMID in checkPoint record");
+ if (record->xl_len != sizeof(checkPoint))
+ elog(STOP, "Invalid length of checkPoint record");
+ checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
+
+ elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
+ checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
+ checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
+ elog(LOG, "NextTransactionId: %u; NextOid: %u)",
+ checkPoint.nextXid, checkPoint.nextOid);
+ if (checkPoint.nextXid < FirstTransactionId ||
+ checkPoint.nextOid < BootstrapObjectIdData)
+ elog(LOG, "Invalid NextTransactionId/NextOid");
+
+ ShmemVariableCache->nextXid = checkPoint.nextXid;
+ ShmemVariableCache->nextOid = checkPoint.nextOid;
+
+ if (XLByteLT(RecPtr, checkPoint.redo))
+ elog(STOP, "Invalid redo in checkPoint record");
+ if (checkPoint.undo.xrecoff == 0)
+ checkPoint.undo = RecPtr;
+ if (XLByteLT(RecPtr, checkPoint.undo))
+ elog(STOP, "Invalid undo in checkPoint record");
+
+ if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
+ {
+ if (ControlFile->state == DB_SHUTDOWNED)
+ elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
+ recovery = true;
+ }
+ else if (ControlFile->state != DB_SHUTDOWNED)
+ recovery = true;
+
+ if (recovery)
+ {
+ elog(LOG, "The DataBase system was not properly shutdowned\n"
+ "Automatic recovery is in progress...");
+ ControlFile->state = DB_IN_RECOVERY;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+
+ sie_saved = StopIfError;
+ StopIfError = true;
+
+ /* Is REDO required ? */
+ if (XLByteLT(checkPoint.redo, RecPtr))
+ record = ReadRecord(&(checkPoint.redo), buffer);
+ else /* read past CheckPoint record */
+ record = ReadRecord(NULL, buffer);
+
+ /* REDO */
+ if (record->xl_len != 0)
+ {
+ elog(LOG, "Redo starts at (%u, %u)",
+ ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
+ do
+ {
+ if (record->xl_xid >= ShmemVariableCache->nextXid)
+ ShmemVariableCache->nextXid = record->xl_xid + 1;
+ RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
+ record = ReadRecord(NULL, buffer);
+ } while (record->xl_len != 0);
+ elog(LOG, "Redo done at (%u, %u)",
+ ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
+ LastRec = ReadRecPtr;
+ }
+ else
+ elog(LOG, "Redo is not required");
+ /* UNDO */
+ RecPtr = ReadRecPtr;
+ if (XLByteLT(checkPoint.undo, RecPtr))
+ {
+ elog(LOG, "Undo starts at (%u, %u)",
+ RecPtr.xlogid, RecPtr.xrecoff);
+ do
+ {
+ record = ReadRecord(&RecPtr, buffer);
+ if (TransactionIdIsValid(record->xl_xid) &&
+ !TransactionIdDidCommit(record->xl_xid))
+ RmgrTable[record->xl_rmid].rm_undo(record);
+ RecPtr = record->xl_prev;
+ } while (XLByteLE(checkPoint.undo, RecPtr));
+ elog(LOG, "Undo done at (%u, %u)",
+ ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
+ }
+ else
+ elog(LOG, "Undo is not required");
+ }
+
+ /* Init xlog buffer cache */
+ record = ReadRecord(&LastRec, buffer);
+ logId = EndRecPtr.xlogid;
+ logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
+ logOff = 0;
+ logFile = XLogFileOpen(logId, logSeg, false);
+ XLogCtl->xlblocks[0].xlogid = logId;
+ XLogCtl->xlblocks[0].xrecoff =
+ ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
+ Insert->currpos = ((char*) Insert->currpage) +
+ (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+
+ if (recovery)
+ {
+ int i;
+
+ /*
+ * Let resource managers know that recovery is done
+ */
+ for (i = 0; i <= RM_MAX_ID; i++)
+ RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
+ CreateCheckPoint(true);
+ StopIfError = sie_saved;
+ }
+
+ ControlFile->state = DB_IN_PRODUCTION;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+
+ return;
+}
+
+static XLogRecord*
+ReadRecord(XLogRecPtr *RecPtr, char *buffer)
+{
+ XLogRecord *record;
+ XLogRecPtr tmpRecPtr = EndRecPtr;
+ bool nextmode = (RecPtr == NULL);
+ int emode = (nextmode) ? LOG : STOP;
+
+ if (nextmode)
+ {
+ RecPtr = &tmpRecPtr;
+ if (nextRecord != NULL)
+ {
+ record = nextRecord;
+ goto got_record;
+ }
+ if (tmpRecPtr.xrecoff % BLCKSZ != 0)
+ tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
+ if (tmpRecPtr.xrecoff >= XLogFileSize)
+ {
+ (tmpRecPtr.xlogid)++;
+ tmpRecPtr.xrecoff = 0;
+ }
+ tmpRecPtr.xrecoff += SizeOfXLogPHD;
+ }
+ else if (!XRecOffIsValid(RecPtr->xrecoff))
+ elog(STOP, "ReadRecord: invalid record offset in (%u, %u)",
+ RecPtr->xlogid, RecPtr->xrecoff);
+
+ if (readFile >= 0 && (RecPtr->xlogid != readId ||
+ RecPtr->xrecoff / XLogSegSize != readSeg))
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ readId = RecPtr->xlogid;
+ readSeg = RecPtr->xrecoff / XLogSegSize;
+ if (readFile < 0)
+ {
+ readOff = (off_t) -1;
+ readFile = XLogFileOpen(readId, readSeg, nextmode);
+ if (readFile < 0)
+ goto next_record_is_invalid;
+ }
+
+ if (readOff < 0 || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
+ {
+ readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
+ if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
+ elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
+ elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
+ {
+ elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
+ ((XLogPageHeader)readBuf)->xlp_magic,
+ readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
+ }
+ if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) &&
+ RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
+ {
+ elog(emode, "ReadRecord: subrecord is requested by (%u, %u)",
+ RecPtr->xlogid, RecPtr->xrecoff);
+ goto next_record_is_invalid;
+ }
+ record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ);
+
+got_record:;
+ if (record->xl_len == 0 || record->xl_len >
+ (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
+ {
+ elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
+ record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
+ goto next_record_is_invalid;
+ }
+ if (record->xl_rmid > RM_MAX_ID)
+ {
+ elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)",
+ record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
+ goto next_record_is_invalid;
+ }
+ nextRecord = NULL;
+ if (record->xl_info & XLR_TO_BE_CONTINUED)
+ {
+ XLogSubRecord *subrecord;
+ uint32 len = record->xl_len;
+
+ if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ)
+ {
+ elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
+ record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
+ goto next_record_is_invalid;
+ }
+ memcpy(buffer, record, record->xl_len + SizeOfXLogRecord);
+ record = (XLogRecord*) buffer;
+ buffer += record->xl_len + SizeOfXLogRecord;
+ for ( ; ; )
+ {
+ readOff++;
+ if (readOff == XLogSegSize / BLCKSZ)
+ {
+ readSeg++;
+ if (readSeg == XLogLastSeg)
+ {
+ readSeg = 0;
+ readId++;
+ }
+ close(readFile);
+ readOff = (off_t) 0;
+ readFile = XLogFileOpen(readId, readSeg, nextmode);
+ if (readFile < 0)
+ goto next_record_is_invalid;
+ }
+ if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
+ elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
+ {
+ elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
+ ((XLogPageHeader)readBuf)->xlp_magic,
+ readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
+ if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD))
+ {
+ elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u",
+ readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
+ subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD);
+ if (subrecord->xl_len == 0 || subrecord->xl_len >
+ (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord))
+ {
+ elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
+ subrecord->xl_len, readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
+ len += subrecord->xl_len;
+ if (len > MAXLOGRECSZ)
+ {
+ elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
+ len, RecPtr->xlogid, RecPtr->xrecoff);
+ goto next_record_is_invalid;
+ }
+ memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
+ buffer += subrecord->xl_len;
+ if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
+ {
+ if (subrecord->xl_len +
+ SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
+ {
+ elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
+ subrecord->xl_len, readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
+ continue;
+ }
+ break;
+ }
+ if (BLCKSZ - SizeOfXLogRecord >=
+ subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord)
+ {
+ nextRecord = (XLogRecord*)
+ ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord);
+ }
+ EndRecPtr.xlogid = readId;
+ EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ +
+ SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len;
+ ReadRecPtr = *RecPtr;
+ return(record);
+ }
+ if (BLCKSZ - SizeOfXLogRecord >=
+ record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
+ {
+ nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord);
+ }
+ EndRecPtr.xlogid = RecPtr->xlogid;
+ EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord;
+ ReadRecPtr = *RecPtr;
+
+ return(record);
+
+next_record_is_invalid:;
+ close(readFile);
+ readFile = -1;
+ nextRecord = NULL;
+ memset(buffer, 0, SizeOfXLogRecord);
+ record = (XLogRecord*) buffer;
+ /*
+ * If we assumed that next record began on the same page where
+ * previous one ended - zero end of page.
+ */
+ if (XLByteEQ(tmpRecPtr, EndRecPtr))
+ {
+ Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) &&
+ BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord);
+ readId = EndRecPtr.xlogid;
+ readSeg = EndRecPtr.xrecoff / XLogSegSize;
+ readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
+ elog(LOG, "Formating logfile %u seg %u block %u at offset %u",
+ readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
+ readFile = XLogFileOpen(readId, readSeg, false);
+ if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
+ elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
+ elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0,
+ BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
+ if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
+ elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
+ elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ readOff++;
+ }
+ else
+ {
+ Assert (EndRecPtr.xrecoff % BLCKSZ == 0 ||
+ BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord);
+ readId = tmpRecPtr.xlogid;
+ readSeg = tmpRecPtr.xrecoff / XLogSegSize;
+ readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
+ }
+ if (readOff > 0)
+ {
+ elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
+ readId, readSeg, readOff);
+ readOff *= BLCKSZ;
+ memset(readBuf, 0, BLCKSZ);
+ readFile = XLogFileOpen(readId, readSeg, false);
+ if (lseek(readFile, readOff, SEEK_SET) < 0)
+ elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ while (readOff < XLogSegSize)
+ {
+ if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
+ elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d",
+ readId, readSeg, readOff, errno);
+ readOff += BLCKSZ;
+ }
+ }
+ if (readFile >= 0)
+ {
+ if (fsync(readFile) < 0)
+ elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d",
+ readId, readSeg, errno);
+ close(readFile);
+ readFile = -1;
+ }
+
+ readId = EndRecPtr.xlogid;
+ readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1;
+ elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1);
+ if (ControlFile->logId != readId || ControlFile->logSeg != readSeg)
+ {
+ elog(LOG, "Set logId/logSeg in control file");
+ ControlFile->logId = readId;
+ ControlFile->logSeg = readSeg;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+ }
+ if (readSeg == XLogLastSeg)
+ {
+ readSeg = 0;
+ readId++;
+ }
+ {
+ char path[MAXPGPATH+1];
+
+ XLogFileName(path, readId, readSeg);
+ unlink(path);
+ }
+
+ return(record);
+}
+
+void
+CreateCheckPoint(bool shutdown)
+{
+ CheckPoint checkPoint;
+ XLogRecPtr recptr;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint32 freespace;
+ uint16 curridx;
+
+ memset(&checkPoint, 0, sizeof(checkPoint));
+ if (shutdown)
+ {
+ ControlFile->state = DB_SHUTDOWNING;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+ }
+
+ /* Get REDO record ptr */
+ while (!TAS(&(XLogCtl->insert_lck)))
+ {
+ struct timeval delay;
+
+ if (shutdown)
+ elog(STOP, "XLog insert lock is busy while data base is shutting down");
+ delay.tv_sec = 0;
+ delay.tv_usec = 0;
+ (void) select(0, NULL, NULL, NULL, &delay);
+ }
+ freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
+ if (freespace < SizeOfXLogRecord)
+ {
+ curridx = NextBufIdx(Insert->curridx);
+ if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
+ InitXLBuffer(curridx);
+ else
+ GetFreeXLBuffer();
+ freespace = BLCKSZ - SizeOfXLogPHD;
+ }
+ else
+ curridx = Insert->curridx;
+ checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
+ checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
+ Insert->currpos - ((char*) Insert->currpage);
+ S_UNLOCK(&(XLogCtl->insert_lck));
+
+ SpinAcquire(XidGenLockId);
+ checkPoint.nextXid = ShmemVariableCache->nextXid;
+ SpinRelease(XidGenLockId);
+ SpinAcquire(OidGenLockId);
+ checkPoint.nextOid = ShmemVariableCache->nextOid;
+ SpinRelease(OidGenLockId);
+
+ FlushBufferPool(false);
+
+ /* Get UNDO record ptr */
+
+ if (shutdown && checkPoint.undo.xrecoff != 0)
+ elog(STOP, "Active transaction while data base is shutting down");
+
+ recptr = XLogInsert(RM_XLOG_ID, (char*)&checkPoint, sizeof(checkPoint), NULL, 0);
+
+ if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
+ elog(STOP, "XLog concurrent activity while data base is shutting down");
+
+ XLogFlush(recptr);
+
+ SpinAcquire(ControlFileLockId);
+ if (shutdown)
+ ControlFile->state = DB_SHUTDOWNED;
+ ControlFile->checkPoint = MyLastRecPtr;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+ SpinRelease(ControlFileLockId);
+
+ return;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a44fe726bfd..32f5aa253cc 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.117 1999/09/27 03:13:05 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.118 1999/09/27 15:47:43 vadim Exp $
*
* NOTES
*
@@ -204,9 +204,10 @@ static int SendStop = false;
static bool NetServer = false; /* if not zero, postmaster listen for
* non-local connections */
+#ifdef USE_SSL
static bool SecureNetServer = false; /* if not zero, postmaster listens for only SSL
* non-local connections */
-
+#endif
/*
* GH: For !HAVE_SIGPROCMASK (NEXTSTEP), TRH implemented an
@@ -990,12 +991,17 @@ readStartupPacket(void *arg, PacketLen len, void *pkt)
/* Could add additional special packet types here */
- /* Any SSL negotiation must have taken place here, so drop the connection
- * ASAP if we require SSL */
- if (SecureNetServer && !port->ssl) {
- PacketSendError(&port->pktInfo, "Backend requires secure connection.");
- return STATUS_OK;
+#ifdef USE_SSL
+ /*
+ * Any SSL negotiation must have taken place here, so drop the connection
+ * ASAP if we require SSL
+ */
+ if (SecureNetServer && !port->ssl)
+ {
+ PacketSendError(&port->pktInfo, "Backend requires secure connection.");
+ return STATUS_OK;
}
+#endif
/* Check we can handle the protocol the frontend is using. */
@@ -1832,6 +1838,7 @@ CountChildren(void)
}
+#ifdef USE_SSL
/*
* Initialize SSL library and structures
*/
@@ -1860,3 +1867,4 @@ static void InitSSL(void) {
exit(1);
}
}
+#endif
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index c279528974c..4cdb638819e 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -6,7 +6,7 @@
* Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Id: fd.c,v 1.47 1999/07/17 20:17:42 momjian Exp $
+ * $Id: fd.c,v 1.48 1999/09/27 15:47:49 vadim Exp $
*
* NOTES:
*
@@ -49,6 +49,7 @@
#include "miscadmin.h"
#include "storage/fd.h"
+bool ReleaseDataFile(void);
/*
* Problem: Postgres does a system(ld...) to do dynamic loading.
* This will open several extra files in addition to those used by
@@ -410,6 +411,19 @@ ReleaseLruFile()
LruDelete(VfdCache[0].lruMoreRecently);
}
+bool
+ReleaseDataFile()
+{
+ DO_DB(elog(DEBUG, "ReleaseDataFile. Opened %d", nfile));
+
+ if (nfile <= 0)
+ return(false);
+ Assert(VfdCache[0].lruMoreRecently != 0);
+ LruDelete(VfdCache[0].lruMoreRecently);
+
+ return(true);
+}
+
static File
AllocateVfd()
{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 26735a5d806..9a5b7935db8 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.66 1999/09/23 17:02:52 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.67 1999/09/27 15:47:54 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -220,13 +220,13 @@ ProcessUtility(Node *parsetree,
relname);
}
- rel = heap_openr(relname);
+ rel = heap_openr(relname, AccessExclusiveLock);
if (RelationIsValid(rel)) {
if (rel->rd_rel->relkind == RELKIND_SEQUENCE) {
elog(ERROR, "TRUNCATE cannot be used on sequences. '%s' is a sequence",
relname);
}
- heap_close(rel);
+ heap_close(rel, NoLock);
}
#ifndef NO_SECURITY
if (!pg_ownercheck(userName, relname, RELNAME)) {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
new file mode 100644
index 00000000000..d8bde166c2c
--- /dev/null
+++ b/src/include/access/rmgr.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * rmgr.h
+ *
+ * Resource managers description table
+ *
+ */
+#ifndef RMGR_H
+#define RMGR_H
+
+typedef uint8 RmgrId;
+
+typedef struct RmgrData
+{
+ char *rm_name;
+ char *(*rm_redo) (); /* REDO(XLogRecPtr rptr) */
+ char *(*rm_undo) (); /* UNDO(XLogRecPtr rptr) */
+} RmgrData;
+
+extern RmgrData *RmgrTable;
+
+/*
+ * Built-in resource managers
+ */
+#define RM_XLOG_ID 0
+#define RM_XACT_ID 1
+#define RM_HEAP_ID 2
+#define RM_BTREE_ID 3
+#define RM_HASH_ID 4
+#define RM_RTREE_ID 5
+#define RM_GIST_ID 6
+#define RM_MAX_ID RM_GIST_ID
+
+#endif /* RMGR_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
new file mode 100644
index 00000000000..5c8c0754246
--- /dev/null
+++ b/src/include/access/xlog.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * xlog.h
+ *
+ * Postgres transaction log manager
+ *
+ */
+#ifndef XLOG_H
+#define XLOG_H
+
+#include "access/rmgr.h"
+#include "access/transam.h"
+
+typedef struct XLogRecPtr
+{
+ uint32 xlogid; /* log file #, 0 based */
+ uint32 xrecoff; /* offset of record in log file */
+} XLogRecPtr;
+
+typedef struct XLogRecord
+{
+ XLogRecPtr xl_prev; /* ptr to previous record in log */
+ XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */
+ TransactionId xl_xid; /* xact id */
+ uint16 xl_len; /* len of record on this page */
+ uint8 xl_info;
+ RmgrId xl_rmid; /* resource manager inserted this record */
+
+ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
+
+} XLogRecord;
+
+#define SizeOfXLogRecord DOUBLEALIGN(sizeof(XLogRecord))
+#define MAXLOGRECSZ (2 * BLCKSZ)
+/*
+ * When there is no space on current page we continue on the next
+ * page with subrecord.
+ */
+typedef struct XLogSubRecord
+{
+ uint16 xl_len;
+ uint8 xl_info;
+
+ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
+
+} XLogSubRecord;
+
+#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord))
+
+#define XLR_TO_BE_CONTINUED 0x01
+
+#define XLOG_PAGE_MAGIC 0x17345168
+
+typedef struct XLogPageHeaderData
+{
+ uint32 xlp_magic;
+ uint16 xlp_info;
+} XLogPageHeaderData;
+
+#define SizeOfXLogPHD DOUBLEALIGN(sizeof(XLogPageHeaderData))
+
+typedef XLogPageHeaderData *XLogPageHeader;
+
+#define XLP_FIRST_IS_SUBRECORD 0x0001
+
+extern XLogRecPtr XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen,
+ char *buf, uint32 buflen);
+extern void XLogFlush(XLogRecPtr RecPtr);
+
+#endif /* XLOG_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index d28e936b33f..44a5fbd313b 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -6,13 +6,14 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: proc.h,v 1.26 1999/09/24 00:25:27 tgl Exp $
+ * $Id: proc.h,v 1.27 1999/09/27 15:48:06 vadim Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef _PROC_H_
#define _PROC_H_
+#include "access/xlog.h"
#include "storage/lock.h"
typedef struct
@@ -47,7 +48,7 @@ typedef struct proc
TransactionId xmin; /* minimal running XID as it was when we
* were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */
-
+ XLogRecPtr logRec;
LOCK *waitLock; /* Lock we're sleeping on ... */
int token; /* type of lock we sleeping for */
int holdLock; /* while holding these locks */
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 5f64a6b1c77..e69ef11d7a3 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: elog.h,v 1.12 1999/09/11 19:06:25 tgl Exp $
+ * $Id: elog.h,v 1.13 1999/09/27 15:48:12 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,7 +17,9 @@
#define ERROR (-1) /* user error - return to known state */
#define FATAL 1 /* fatal error - abort process */
#define REALLYFATAL 2 /* take down the other backends with me */
+#define STOP REALLYFATAL
#define DEBUG (-2) /* debug message */
+#define LOG DEBUG
#define NOIND (-3) /* debug message, don't indent as far */
extern void elog(int lev, const char *fmt, ...);