aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/slru.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/slru.c')
-rw-r--r--src/backend/access/transam/slru.c278
1 files changed, 193 insertions, 85 deletions
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 57dcd2b3379..58798d0f07f 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.15 2004/05/29 22:48:18 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.16 2004/05/31 03:47:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,6 +17,7 @@
#include <unistd.h>
#include "access/slru.h"
+#include "access/clog.h" /* only for NUM_CLOG_BUFFERS */
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
@@ -100,6 +101,8 @@ typedef enum
*/
typedef struct SlruSharedData
{
+ LWLockId ControlLock;
+
/*
* Info for each buffer slot. Page number is undefined when status is
* EMPTY. lru_count is essentially the number of page switches since
@@ -110,6 +113,7 @@ typedef struct SlruSharedData
SlruPageStatus page_status[NUM_CLOG_BUFFERS];
int page_number[NUM_CLOG_BUFFERS];
unsigned int page_lru_count[NUM_CLOG_BUFFERS];
+ LWLockId BufferLocks[NUM_CLOG_BUFFERS]; /* Per-buffer I/O locks */
/*
* latest_page_number is the page number of the current end of the
@@ -118,13 +122,25 @@ typedef struct SlruSharedData
*/
int latest_page_number;
} SlruSharedData;
-typedef SlruSharedData *SlruShared;
-
#define SlruFileName(ctl, path, seg) \
snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
/*
+ * During SimpleLruFlush(), we will usually not need to write/fsync more
+ * than one or two physical files, but we may need to write several pages
+ * per file. We can consolidate the I/O requests by leaving files open
+ * until control returns to SimpleLruFlush(). This data structure remembers
+ * which files are open.
+ */
+typedef struct SlruFlushData
+{
+ int num_files; /* # files actually open */
+ int fd[NUM_CLOG_BUFFERS]; /* their FD's */
+ int segno[NUM_CLOG_BUFFERS]; /* their clog seg#s */
+} SlruFlushData;
+
+/*
* Macro to mark a buffer slot "most recently used".
*/
#define SlruRecentlyUsed(shared, slotno) \
@@ -145,14 +161,17 @@ typedef enum
SLRU_SEEK_FAILED,
SLRU_READ_FAILED,
SLRU_WRITE_FAILED,
+ SLRU_FSYNC_FAILED,
SLRU_CLOSE_FAILED
} SlruErrorCause;
+
static SlruErrorCause slru_errcause;
static int slru_errno;
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
-static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno);
+static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
+ SlruFlush fdata);
static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
static bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
@@ -165,24 +184,16 @@ static bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
int
SimpleLruShmemSize(void)
{
- return MAXALIGN(sizeof(SlruSharedData))
- + BLCKSZ * NUM_CLOG_BUFFERS
- + MAXALIGN(sizeof(SlruLockData))
- ;
+ return MAXALIGN(sizeof(SlruSharedData)) + BLCKSZ * NUM_CLOG_BUFFERS;
}
void
SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
{
- bool found;
- char *ptr;
SlruShared shared;
- SlruLock locks;
+ bool found;
- ptr = ShmemInitStruct(name, SimpleLruShmemSize(), &found);
- shared = (SlruShared) ptr;
- locks = (SlruLock) (ptr + MAXALIGN(sizeof(SlruSharedData)) +
- BLCKSZ * NUM_CLOG_BUFFERS);
+ shared = (SlruShared) ShmemInitStruct(name, SimpleLruShmemSize(), &found);
if (!IsUnderPostmaster)
{
@@ -192,18 +203,18 @@ SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
Assert(!found);
- locks->ControlLock = LWLockAssign();
-
memset(shared, 0, sizeof(SlruSharedData));
+ shared->ControlLock = LWLockAssign();
+
bufptr = (char *) shared + MAXALIGN(sizeof(SlruSharedData));
for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
{
- locks->BufferLocks[slotno] = LWLockAssign();
shared->page_buffer[slotno] = bufptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
shared->page_lru_count[slotno] = 1;
+ shared->BufferLocks[slotno] = LWLockAssign();
bufptr += BLCKSZ;
}
@@ -213,10 +224,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
Assert(found);
/* Initialize the unshared control struct */
- ctl->locks = locks;
ctl->shared = shared;
+ ctl->ControlLock = shared->ControlLock;
- /* Init directory path */
+ /* Initialize unshared copy of directory path */
snprintf(ctl->Dir, MAXPGPATH, "%s/%s", DataDir, subdir);
}
@@ -232,7 +243,7 @@ int
SimpleLruZeroPage(SlruCtl ctl, int pageno)
{
int slotno;
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
/* Find a suitable buffer slot for the page */
slotno = SlruSelectLRUPage(ctl, pageno);
@@ -270,7 +281,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
char *
SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
{
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
/* Outer loop handles restart if we lose the buffer to someone else */
for (;;)
@@ -313,8 +324,8 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
SlruRecentlyUsed(shared, slotno);
/* Release shared lock, grab per-buffer lock instead */
- LWLockRelease(ctl->locks->ControlLock);
- LWLockAcquire(ctl->locks->BufferLocks[slotno], LW_EXCLUSIVE);
+ LWLockRelease(shared->ControlLock);
+ LWLockAcquire(shared->BufferLocks[slotno], LW_EXCLUSIVE);
/*
* Check to see if someone else already did the read, or took the
@@ -323,8 +334,8 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
if (shared->page_number[slotno] != pageno ||
shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
{
- LWLockRelease(ctl->locks->BufferLocks[slotno]);
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockRelease(shared->BufferLocks[slotno]);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
continue;
}
@@ -332,14 +343,14 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
/* Re-acquire shared control lock and update page state */
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS);
shared->page_status[slotno] = ok ? SLRU_PAGE_CLEAN : SLRU_PAGE_EMPTY;
- LWLockRelease(ctl->locks->BufferLocks[slotno]);
+ LWLockRelease(shared->BufferLocks[slotno]);
/* Now it's okay to ereport if we failed */
if (!ok)
@@ -364,11 +375,11 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
* Control lock must be held at entry, and will be held at exit.
*/
void
-SimpleLruWritePage(SlruCtl ctl, int slotno)
+SimpleLruWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
{
int pageno;
bool ok;
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
/* Do nothing if page does not need writing */
if (shared->page_status[slotno] != SLRU_PAGE_DIRTY &&
@@ -378,8 +389,8 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
pageno = shared->page_number[slotno];
/* Release shared lock, grab per-buffer lock instead */
- LWLockRelease(ctl->locks->ControlLock);
- LWLockAcquire(ctl->locks->BufferLocks[slotno], LW_EXCLUSIVE);
+ LWLockRelease(shared->ControlLock);
+ LWLockAcquire(shared->BufferLocks[slotno], LW_EXCLUSIVE);
/*
* Check to see if someone else already did the write, or took the
@@ -392,8 +403,8 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
(shared->page_status[slotno] != SLRU_PAGE_DIRTY &&
shared->page_status[slotno] != SLRU_PAGE_WRITE_IN_PROGRESS))
{
- LWLockRelease(ctl->locks->BufferLocks[slotno]);
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockRelease(shared->BufferLocks[slotno]);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
return;
}
@@ -412,10 +423,19 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
/* Okay, do the write */
- ok = SlruPhysicalWritePage(ctl, pageno, slotno);
+ ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
+
+ /* If we failed, and we're in a flush, better close the files */
+ if (!ok && fdata)
+ {
+ int i;
+
+ for (i = 0; i < fdata->num_files; i++)
+ close(fdata->fd[i]);
+ }
/* Re-acquire shared control lock and update page state */
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS ||
@@ -425,7 +445,7 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
if (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
shared->page_status[slotno] = ok ? SLRU_PAGE_CLEAN : SLRU_PAGE_DIRTY;
- LWLockRelease(ctl->locks->BufferLocks[slotno]);
+ LWLockRelease(shared->BufferLocks[slotno]);
/* Now it's okay to ereport if we failed */
if (!ok)
@@ -445,7 +465,7 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
static bool
SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
@@ -482,6 +502,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
+ close(fd);
return false;
}
@@ -490,6 +511,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_READ_FAILED;
slru_errno = errno;
+ close(fd);
return false;
}
@@ -511,50 +533,80 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
* info in static variables to let SlruReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across
- * read/write operations. We could cache one virtual file pointer ...
+ * independent read/write operations. We do batch operations during
+ * SimpleLruFlush, though.
+ *
+ * fdata is NULL for a standalone write, pointer to open-file info during
+ * SimpleLruFlush.
*/
static bool
-SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
+SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
{
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
- int fd;
-
- SlruFileName(ctl, path, segno);
+ int fd = -1;
/*
- * If the file doesn't already exist, we should create it. It is
- * possible for this to need to happen when writing a page that's not
- * first in its segment; we assume the OS can cope with that. (Note:
- * it might seem that it'd be okay to create files only when
- * SimpleLruZeroPage is called for the first page of a segment.
- * However, if after a crash and restart the REDO logic elects to
- * replay the log from a checkpoint before the latest one, then it's
- * possible that we will get commands to set transaction status of
- * transactions that have already been truncated from the commit log.
- * Easiest way to deal with that is to accept references to
- * nonexistent files here and in SlruPhysicalReadPage.)
+ * During a Flush, we may already have the desired file open.
*/
- fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
- if (fd < 0)
+ if (fdata)
{
- if (errno != ENOENT)
+ int i;
+
+ for (i = 0; i < fdata->num_files; i++)
{
- slru_errcause = SLRU_OPEN_FAILED;
- slru_errno = errno;
- return false;
+ if (fdata->segno[i] == segno)
+ {
+ fd = fdata->fd[i];
+ break;
+ }
}
+ }
- fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
- S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ /*
+ * If the file doesn't already exist, we should create it. It is
+ * possible for this to need to happen when writing a page that's not
+ * first in its segment; we assume the OS can cope with that.
+ * (Note: it might seem that it'd be okay to create files only when
+ * SimpleLruZeroPage is called for the first page of a segment.
+ * However, if after a crash and restart the REDO logic elects to
+ * replay the log from a checkpoint before the latest one, then it's
+ * possible that we will get commands to set transaction status of
+ * transactions that have already been truncated from the commit log.
+ * Easiest way to deal with that is to accept references to
+ * nonexistent files here and in SlruPhysicalReadPage.)
+ */
+ SlruFileName(ctl, path, segno);
+ fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
- slru_errcause = SLRU_CREATE_FAILED;
- slru_errno = errno;
- return false;
+ if (errno != ENOENT)
+ {
+ slru_errcause = SLRU_OPEN_FAILED;
+ slru_errno = errno;
+ return false;
+ }
+
+ fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ slru_errcause = SLRU_CREATE_FAILED;
+ slru_errno = errno;
+ return false;
+ }
+ }
+
+ if (fdata)
+ {
+ fdata->fd[fdata->num_files] = fd;
+ fdata->segno[fdata->num_files] = segno;
+ fdata->num_files++;
}
}
@@ -562,6 +614,8 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
+ if (!fdata)
+ close(fd);
return false;
}
@@ -573,14 +627,31 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
errno = ENOSPC;
slru_errcause = SLRU_WRITE_FAILED;
slru_errno = errno;
+ if (!fdata)
+ close(fd);
return false;
}
- if (close(fd))
+ /*
+ * If not part of Flush, need to fsync now. We assume this happens
+ * infrequently enough that it's not a performance issue.
+ */
+ if (!fdata)
{
- slru_errcause = SLRU_CLOSE_FAILED;
- slru_errno = errno;
- return false;
+ if (pg_fsync(fd))
+ {
+ slru_errcause = SLRU_FSYNC_FAILED;
+ slru_errno = errno;
+ close(fd);
+ return false;
+ }
+
+ if (close(fd))
+ {
+ slru_errcause = SLRU_CLOSE_FAILED;
+ slru_errno = errno;
+ return false;
+ }
}
return true;
@@ -637,6 +708,13 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
errdetail("could not write to file \"%s\" at offset %u: %m",
path, offset)));
break;
+ case SLRU_FSYNC_FAILED:
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not access status of transaction %u", xid),
+ errdetail("could not fsync file \"%s\": %m",
+ path)));
+ break;
case SLRU_CLOSE_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
@@ -668,7 +746,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
static int
SlruSelectLRUPage(SlruCtl ctl, int pageno)
{
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
/* Outer loop handles restart after I/O */
for (;;)
@@ -717,7 +795,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
(void) SimpleLruReadPage(ctl, shared->page_number[bestslot],
InvalidTransactionId, false);
else
- SimpleLruWritePage(ctl, bestslot);
+ SimpleLruWritePage(ctl, bestslot, NULL);
/*
* Now loop back and try again. This is the easiest way of
@@ -733,7 +811,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
void
SimpleLruSetLatestPage(SlruCtl ctl, int pageno)
{
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
shared->latest_page_number = pageno;
}
@@ -744,16 +822,20 @@ SimpleLruSetLatestPage(SlruCtl ctl, int pageno)
void
SimpleLruFlush(SlruCtl ctl, bool checkpoint)
{
-#ifdef USE_ASSERT_CHECKING /* only used in Assert() */
- SlruShared shared = (SlruShared) ctl->shared;
-#endif
+ SlruShared shared = ctl->shared;
+ SlruFlushData fdata;
int slotno;
+ int pageno = 0;
+ int i;
+ bool ok;
+
+ fdata.num_files = 0;
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
{
- SimpleLruWritePage(ctl, slotno);
+ SimpleLruWritePage(ctl, slotno, &fdata);
/*
* When called during a checkpoint, we cannot assert that the slot
@@ -765,7 +847,32 @@ SimpleLruFlush(SlruCtl ctl, bool checkpoint)
shared->page_status[slotno] == SLRU_PAGE_CLEAN);
}
- LWLockRelease(ctl->locks->ControlLock);
+ LWLockRelease(shared->ControlLock);
+
+ /*
+ * Now fsync and close any files that were open
+ */
+ ok = true;
+ for (i = 0; i < fdata.num_files; i++)
+ {
+ if (pg_fsync(fdata.fd[i]))
+ {
+ slru_errcause = SLRU_FSYNC_FAILED;
+ slru_errno = errno;
+ pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
+ ok = false;
+ }
+
+ if (close(fdata.fd[i]))
+ {
+ slru_errcause = SLRU_CLOSE_FAILED;
+ slru_errno = errno;
+ pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
+ ok = false;
+ }
+ }
+ if (!ok)
+ SlruReportIOError(ctl, pageno, InvalidTransactionId);
}
/*
@@ -786,7 +893,7 @@ void
SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
{
int slotno;
- SlruShared shared = (SlruShared) ctl->shared;
+ SlruShared shared = ctl->shared;
/*
* The cutoff point is the start of the segment containing cutoffPage.
@@ -805,7 +912,7 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
* have been flushed already during the checkpoint, we're just being
* extra careful here.)
*/
- LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
restart:;
@@ -817,7 +924,7 @@ restart:;
*/
if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
{
- LWLockRelease(ctl->locks->ControlLock);
+ LWLockRelease(shared->ControlLock);
ereport(LOG,
(errmsg("could not truncate directory \"%s\": apparent wraparound",
ctl->Dir)));
@@ -849,11 +956,11 @@ restart:;
(void) SimpleLruReadPage(ctl, shared->page_number[slotno],
InvalidTransactionId, false);
else
- SimpleLruWritePage(ctl, slotno);
+ SimpleLruWritePage(ctl, slotno, NULL);
goto restart;
}
- LWLockRelease(ctl->locks->ControlLock);
+ LWLockRelease(shared->ControlLock);
/* Now we can remove the old segment(s) */
(void) SlruScanDirectory(ctl, cutoffPage, true);
@@ -878,7 +985,8 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
if (cldir == NULL)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open directory \"%s\": %m", ctl->Dir)));
+ errmsg("could not open directory \"%s\": %m",
+ ctl->Dir)));
errno = 0;
while ((clde = readdir(cldir)) != NULL)