diff options
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 193 |
1 files changed, 188 insertions, 5 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e8e0825eb0c..5b9192ed450 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -83,6 +83,14 @@ bool track_io_timing = false; int effective_io_concurrency = 0; /* + * GUC variables about triggering kernel writeback for buffers written; OS + * dependant defaults are set via the GUC mechanism. + */ +int checkpoint_flush_after = 0; +int bgwriter_flush_after = 0; +int backend_flush_after = 0; + +/* * How many buffers PrefetchBuffer callers should try to stay ahead of their * ReadBuffer calls by. This is maintained by the assign hook for * effective_io_concurrency. Zero means "never prefetch". This value is @@ -399,7 +407,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); -static int SyncOneBuffer(int buf_id, bool skip_recently_used); +static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, @@ -416,6 +424,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); static void CheckForBufferLeaks(void); static int rnode_comparator(const void *p1, const void *p2); +static int buffertag_comparator(const void *p1, const void *p2); /* @@ -818,6 +827,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, MemSet((char *) bufBlock, 0, BLCKSZ); /* don't set checksum for all-zero page */ smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); + + /* + * NB: we're *not* doing a ScheduleBufferTagForWriteback here; + * although we're essentially performing a write. At least on linux + * doing so defeats the 'delayed allocation' mechanism, leading to + * increased file fragmentation. + */ } else { @@ -1084,6 +1100,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, FlushBuffer(buf, NULL); LWLockRelease(BufferDescriptorGetContentLock(buf)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, + &buf->tag); + TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, @@ -1642,6 +1661,7 @@ BufferSync(int flags) int num_to_write; int num_written; int mask = BM_DIRTY; + WritebackContext wb_context; /* Make sure we can handle the pin inside SyncOneBuffer */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); @@ -1694,6 +1714,8 @@ BufferSync(int flags) if (num_to_write == 0) return; /* nothing to do */ + WritebackContextInit(&wb_context, &checkpoint_flush_after); + TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); /* @@ -1725,7 +1747,7 @@ BufferSync(int flags) */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { - if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) + if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; @@ -1756,6 +1778,9 @@ BufferSync(int flags) buf_id = 0; } + /* issue all pending flushes */ + IssuePendingWritebacks(&wb_context); + /* * Update checkpoint statistics. As noted above, this doesn't include * buffers written by other backends or bgwriter scan. @@ -1777,7 +1802,7 @@ BufferSync(int flags) * bgwriter_lru_maxpages to 0.) */ bool -BgBufferSync(void) +BgBufferSync(WritebackContext *wb_context) { /* info obtained from freelist.c */ int strategy_buf_id; @@ -2002,7 +2027,8 @@ BgBufferSync(void) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true); + int buffer_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2079,10 +2105,11 @@ BgBufferSync(void) * Note: caller must have done ResourceOwnerEnlargeBuffers. */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used) +SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + BufferTag tag; ReservePrivateRefCountEntry(); @@ -2123,8 +2150,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) FlushBuffer(bufHdr, NULL); LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + + tag = bufHdr->tag; + UnpinBuffer(bufHdr, true); + ScheduleBufferTagForWriteback(wb_context, &tag); + return result | BUF_WRITTEN; } @@ -3729,3 +3761,154 @@ rnode_comparator(const void *p1, const void *p2) else return 0; } + +/* + * BufferTag comparator. + */ +static int +buffertag_comparator(const void *a, const void *b) +{ + const BufferTag *ba = (const BufferTag *) a; + const BufferTag *bb = (const BufferTag *) b; + int ret; + + ret = rnode_comparator(&ba->rnode, &bb->rnode); + + if (ret != 0) + return ret; + + if (ba->forkNum < bb->forkNum) + return -1; + if (ba->forkNum > bb->forkNum) + return 1; + + if (ba->blockNum < bb->blockNum) + return -1; + if (ba->blockNum > bb->blockNum) + return 1; + + return 0; +} + +/* + * Initialize a writeback context, discarding potential previous state. + * + * *max_coalesce is a pointer to a variable containing the current maximum + * number of writeback requests that will be coalesced into a bigger one. A + * value <= 0 means that no writeback control will be performed. max_pending + * is a pointer instead of an immediate value, so the coalesce limits can + * easily changed by the GUC mechanism, and so calling code does not have to + * check the current configuration. + */ +void +WritebackContextInit(WritebackContext *context, int *max_pending) +{ + Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES); + + context->max_pending = max_pending; + context->nr_pending = 0; +} + +/* + * Add buffer to list of pending writeback requests. + */ +void +ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag) +{ + PendingWriteback *pending; + + /* + * Add buffer to the pending writeback array, unless writeback control is + * disabled. + */ + if (*context->max_pending > 0) + { + Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES); + + pending = &context->pending_writebacks[context->nr_pending++]; + + pending->tag = *tag; + } + + /* + * Perform pending flushes if the writeback limit is exceeded. This + * includes the case where previously an item has been added, but control + * is now disabled. + */ + if (context->nr_pending >= *context->max_pending) + IssuePendingWritebacks(context); +} + +/* + * Issue all pending writeback requests, previously scheduled with + * ScheduleBufferTagForWriteback, to the OS. + * + * Because this is only used to improve the OSs IO scheduling we try to never + * error out - it's just a hint. + */ +void +IssuePendingWritebacks(WritebackContext *context) +{ + int i; + + if (context->nr_pending == 0) + return; + + /* + * Executing the writes in-order can make them a lot faster, and allows to + * merge writeback requests to consecutive blocks into larger writebacks. + */ + qsort(&context->pending_writebacks, context->nr_pending, + sizeof(PendingWriteback), buffertag_comparator); + + /* + * Coalesce neighbouring writes, but nothing else. For that we iterate + * through the, now sorted, array of pending flushes, and look forward to + * find all neighbouring (or identical) writes. + */ + for (i = 0; i < context->nr_pending; i++) + { + PendingWriteback *cur; + PendingWriteback *next; + SMgrRelation reln; + int ahead; + BufferTag tag; + Size nblocks = 1; + + cur = &context->pending_writebacks[i]; + tag = cur->tag; + + /* + * Peek ahead, into following writeback requests, to see if they can + * be combined with the current one. + */ + for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++) + { + next = &context->pending_writebacks[i + ahead + 1]; + + /* different file, stop */ + if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) || + cur->tag.forkNum != next->tag.forkNum) + break; + + /* ok, block queued twice, skip */ + if (cur->tag.blockNum == next->tag.blockNum) + continue; + + /* only merge consecutive writes */ + if (cur->tag.blockNum + 1 != next->tag.blockNum) + break; + + nblocks++; + cur = next; + } + + i += ahead; + + /* and finally tell the kernel to write the data to storage */ + reln = smgropen(tag.rnode, InvalidBackendId); + smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks); + } + + context->nr_pending = 0; +} |