diff options
Diffstat (limited to 'src/backend/access/transam/slru.c')
-rw-r--r-- | src/backend/access/transam/slru.c | 112 |
1 files changed, 100 insertions, 12 deletions
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index e68ed7e331e..bf3990bc299 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -41,7 +41,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.40 2007/01/05 22:19:23 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.41 2007/08/01 22:45:07 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -140,6 +140,8 @@ static SlruErrorCause slru_errcause; static int slru_errno; +static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); +static void SimpleLruWaitIO(SlruCtl ctl, int slotno); static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata); @@ -152,7 +154,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int pageno); */ Size -SimpleLruShmemSize(int nslots) +SimpleLruShmemSize(int nslots, int nlsns) { Size sz; @@ -165,18 +167,21 @@ SimpleLruShmemSize(int nslots) sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(LWLockId)); /* buffer_locks[] */ + if (nlsns > 0) + sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ + return BUFFERALIGN(sz) + BLCKSZ * nslots; } void -SimpleLruInit(SlruCtl ctl, const char *name, int nslots, +SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLockId ctllock, const char *subdir) { SlruShared shared; bool found; shared = (SlruShared) ShmemInitStruct(name, - SimpleLruShmemSize(nslots), + SimpleLruShmemSize(nslots, nlsns), &found); if (!IsUnderPostmaster) @@ -193,6 +198,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, shared->ControlLock = ctllock; shared->num_slots = nslots; + shared->lsn_groups_per_page = nlsns; shared->cur_lru_count = 0; @@ -212,8 +218,14 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, offset += MAXALIGN(nslots * sizeof(int)); shared->buffer_locks = (LWLockId *) (ptr + offset); offset += MAXALIGN(nslots * sizeof(LWLockId)); - ptr += BUFFERALIGN(offset); + if (nlsns > 0) + { + shared->group_lsn = (XLogRecPtr *) (ptr + offset); + offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); + } + + ptr += BUFFERALIGN(offset); for (slotno = 0; slotno < nslots; slotno++) { shared->page_buffer[slotno] = ptr; @@ -266,6 +278,9 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) /* Set the buffer to zeroes */ MemSet(shared->page_buffer[slotno], 0, BLCKSZ); + /* Set the LSNs for this new page to zero */ + SimpleLruZeroLSNs(ctl, slotno); + /* Assume this page is now the latest active page */ shared->latest_page_number = pageno; @@ -273,8 +288,27 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) } /* + * Zero all the LSNs we store for this slru page. + * + * This should be called each time we create a new page, and each time we read + * in a page from disk into an existing buffer. (Such an old page cannot + * have any interesting LSNs, since we'd have flushed them before writing + * the page in the first place.) + */ +static void +SimpleLruZeroLSNs(SlruCtl ctl, int slotno) +{ + SlruShared shared = ctl->shared; + + if (shared->lsn_groups_per_page > 0) + MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0, + shared->lsn_groups_per_page * sizeof(XLogRecPtr)); +} + +/* * Wait for any active I/O on a page slot to finish. (This does not - * guarantee that new I/O hasn't been started before we return, though.) + * guarantee that new I/O hasn't been started before we return, though. + * In fact the slot might not even contain the same page anymore.) * * Control lock must be held at entry, and will be held at exit. */ @@ -305,8 +339,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) /* indeed, the I/O must have failed */ if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS) shared->page_status[slotno] = SLRU_PAGE_EMPTY; - else - /* write_in_progress */ + else /* write_in_progress */ { shared->page_status[slotno] = SLRU_PAGE_VALID; shared->page_dirty[slotno] = true; @@ -320,6 +353,11 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) * Find a page in a shared buffer, reading it in if necessary. * The page number must correspond to an already-initialized page. * + * If write_ok is true then it is OK to return a page that is in + * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure + * that modification of the page is safe. If write_ok is false then we + * will not return the page until it is not undergoing active I/O. + * * The passed-in xid is used only for error reporting, and may be * InvalidTransactionId if no specific xid is associated with the action. * @@ -329,7 +367,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) * Control lock must be held at entry, and will be held at exit. */ int -SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid) +SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, + TransactionId xid) { SlruShared shared = ctl->shared; @@ -346,8 +385,13 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid) if (shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_EMPTY) { - /* If page is still being read in, we must wait for I/O */ - if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS) + /* + * If page is still being read in, we must wait for I/O. Likewise + * if the page is being written and the caller said that's not OK. + */ + if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS || + (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && + !write_ok)) { SimpleLruWaitIO(ctl, slotno); /* Now we must recheck state from the top */ @@ -383,6 +427,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid) /* Do the read */ ok = SlruPhysicalReadPage(ctl, pageno, slotno); + /* Set the LSNs for this newly read-in page to zero */ + SimpleLruZeroLSNs(ctl, slotno); + /* Re-acquire control lock and update page state */ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); @@ -443,7 +490,7 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) LWLockRelease(shared->ControlLock); LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); - return SimpleLruReadPage(ctl, pageno, xid); + return SimpleLruReadPage(ctl, pageno, true, xid); } /* @@ -622,6 +669,47 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata) int fd = -1; /* + * Honor the write-WAL-before-data rule, if appropriate, so that we do + * not write out data before associated WAL records. This is the same + * action performed during FlushBuffer() in the main buffer manager. + */ + if (shared->group_lsn != NULL) + { + /* + * We must determine the largest async-commit LSN for the page. + * This is a bit tedious, but since this entire function is a slow + * path anyway, it seems better to do this here than to maintain + * a per-page LSN variable (which'd need an extra comparison in the + * transaction-commit path). + */ + XLogRecPtr max_lsn; + int lsnindex, lsnoff; + + lsnindex = slotno * shared->lsn_groups_per_page; + max_lsn = shared->group_lsn[lsnindex++]; + for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++) + { + XLogRecPtr this_lsn = shared->group_lsn[lsnindex++]; + + if (XLByteLT(max_lsn, this_lsn)) + max_lsn = this_lsn; + } + + if (!XLogRecPtrIsInvalid(max_lsn)) + { + /* + * As noted above, elog(ERROR) is not acceptable here, so if + * XLogFlush were to fail, we must PANIC. This isn't much of + * a restriction because XLogFlush is just about all critical + * section anyway, but let's make sure. + */ + START_CRIT_SECTION(); + XLogFlush(max_lsn); + END_CRIT_SECTION(); + } + } + + /* * During a Flush, we may already have the desired file open. */ if (fdata) |