aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/slru.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/slru.c')
-rw-r--r--src/backend/access/transam/slru.c112
1 files changed, 100 insertions, 12 deletions
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index e68ed7e331e..bf3990bc299 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -41,7 +41,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.40 2007/01/05 22:19:23 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.41 2007/08/01 22:45:07 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -140,6 +140,8 @@ static SlruErrorCause slru_errcause;
static int slru_errno;
+static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
+static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
SlruFlush fdata);
@@ -152,7 +154,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
*/
Size
-SimpleLruShmemSize(int nslots)
+SimpleLruShmemSize(int nslots, int nlsns)
{
Size sz;
@@ -165,18 +167,21 @@ SimpleLruShmemSize(int nslots)
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockId)); /* buffer_locks[] */
+ if (nlsns > 0)
+ sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
+
return BUFFERALIGN(sz) + BLCKSZ * nslots;
}
void
-SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
+SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLockId ctllock, const char *subdir)
{
SlruShared shared;
bool found;
shared = (SlruShared) ShmemInitStruct(name,
- SimpleLruShmemSize(nslots),
+ SimpleLruShmemSize(nslots, nlsns),
&found);
if (!IsUnderPostmaster)
@@ -193,6 +198,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
shared->ControlLock = ctllock;
shared->num_slots = nslots;
+ shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0;
@@ -212,8 +218,14 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
offset += MAXALIGN(nslots * sizeof(int));
shared->buffer_locks = (LWLockId *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLockId));
- ptr += BUFFERALIGN(offset);
+ if (nlsns > 0)
+ {
+ shared->group_lsn = (XLogRecPtr *) (ptr + offset);
+ offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
+ }
+
+ ptr += BUFFERALIGN(offset);
for (slotno = 0; slotno < nslots; slotno++)
{
shared->page_buffer[slotno] = ptr;
@@ -266,6 +278,9 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
/* Set the buffer to zeroes */
MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
+ /* Set the LSNs for this new page to zero */
+ SimpleLruZeroLSNs(ctl, slotno);
+
/* Assume this page is now the latest active page */
shared->latest_page_number = pageno;
@@ -273,8 +288,27 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
}
/*
+ * Zero all the LSNs we store for this slru page.
+ *
+ * This should be called each time we create a new page, and each time we read
+ * in a page from disk into an existing buffer. (Such an old page cannot
+ * have any interesting LSNs, since we'd have flushed them before writing
+ * the page in the first place.)
+ */
+static void
+SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
+{
+ SlruShared shared = ctl->shared;
+
+ if (shared->lsn_groups_per_page > 0)
+ MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
+ shared->lsn_groups_per_page * sizeof(XLogRecPtr));
+}
+
+/*
* Wait for any active I/O on a page slot to finish. (This does not
- * guarantee that new I/O hasn't been started before we return, though.)
+ * guarantee that new I/O hasn't been started before we return, though.
+ * In fact the slot might not even contain the same page anymore.)
*
* Control lock must be held at entry, and will be held at exit.
*/
@@ -305,8 +339,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
/* indeed, the I/O must have failed */
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
- else
- /* write_in_progress */
+ else /* write_in_progress */
{
shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true;
@@ -320,6 +353,11 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Find a page in a shared buffer, reading it in if necessary.
* The page number must correspond to an already-initialized page.
*
+ * If write_ok is true then it is OK to return a page that is in
+ * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
+ * that modification of the page is safe. If write_ok is false then we
+ * will not return the page until it is not undergoing active I/O.
+ *
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
*
@@ -329,7 +367,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Control lock must be held at entry, and will be held at exit.
*/
int
-SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
+SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
+ TransactionId xid)
{
SlruShared shared = ctl->shared;
@@ -346,8 +385,13 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY)
{
- /* If page is still being read in, we must wait for I/O */
- if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
+ /*
+ * If page is still being read in, we must wait for I/O. Likewise
+ * if the page is being written and the caller said that's not OK.
+ */
+ if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
+ (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
+ !write_ok))
{
SimpleLruWaitIO(ctl, slotno);
/* Now we must recheck state from the top */
@@ -383,6 +427,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
/* Do the read */
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
+ /* Set the LSNs for this newly read-in page to zero */
+ SimpleLruZeroLSNs(ctl, slotno);
+
/* Re-acquire control lock and update page state */
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
@@ -443,7 +490,7 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
- return SimpleLruReadPage(ctl, pageno, xid);
+ return SimpleLruReadPage(ctl, pageno, true, xid);
}
/*
@@ -622,6 +669,47 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
int fd = -1;
/*
+ * Honor the write-WAL-before-data rule, if appropriate, so that we do
+ * not write out data before associated WAL records. This is the same
+ * action performed during FlushBuffer() in the main buffer manager.
+ */
+ if (shared->group_lsn != NULL)
+ {
+ /*
+ * We must determine the largest async-commit LSN for the page.
+ * This is a bit tedious, but since this entire function is a slow
+ * path anyway, it seems better to do this here than to maintain
+ * a per-page LSN variable (which'd need an extra comparison in the
+ * transaction-commit path).
+ */
+ XLogRecPtr max_lsn;
+ int lsnindex, lsnoff;
+
+ lsnindex = slotno * shared->lsn_groups_per_page;
+ max_lsn = shared->group_lsn[lsnindex++];
+ for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
+ {
+ XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
+
+ if (XLByteLT(max_lsn, this_lsn))
+ max_lsn = this_lsn;
+ }
+
+ if (!XLogRecPtrIsInvalid(max_lsn))
+ {
+ /*
+ * As noted above, elog(ERROR) is not acceptable here, so if
+ * XLogFlush were to fail, we must PANIC. This isn't much of
+ * a restriction because XLogFlush is just about all critical
+ * section anyway, but let's make sure.
+ */
+ START_CRIT_SECTION();
+ XLogFlush(max_lsn);
+ END_CRIT_SECTION();
+ }
+ }
+
+ /*
* During a Flush, we may already have the desired file open.
*/
if (fdata)