aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c193
1 files changed, 188 insertions, 5 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e8e0825eb0c..5b9192ed450 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -83,6 +83,14 @@ bool track_io_timing = false;
int effective_io_concurrency = 0;
/*
+ * GUC variables about triggering kernel writeback for buffers written; OS
+ * dependant defaults are set via the GUC mechanism.
+ */
+int checkpoint_flush_after = 0;
+int bgwriter_flush_after = 0;
+int backend_flush_after = 0;
+
+/*
* How many buffers PrefetchBuffer callers should try to stay ahead of their
* ReadBuffer calls by. This is maintained by the assign hook for
* effective_io_concurrency. Zero means "never prefetch". This value is
@@ -399,7 +407,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
static void BufferSync(int flags);
-static int SyncOneBuffer(int buf_id, bool skip_recently_used);
+static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
@@ -416,6 +424,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void);
static int rnode_comparator(const void *p1, const void *p2);
+static int buffertag_comparator(const void *p1, const void *p2);
/*
@@ -818,6 +827,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
MemSet((char *) bufBlock, 0, BLCKSZ);
/* don't set checksum for all-zero page */
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
+
+ /*
+ * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
+ * although we're essentially performing a write. At least on linux
+ * doing so defeats the 'delayed allocation' mechanism, leading to
+ * increased file fragmentation.
+ */
}
else
{
@@ -1084,6 +1100,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
FlushBuffer(buf, NULL);
LWLockRelease(BufferDescriptorGetContentLock(buf));
+ ScheduleBufferTagForWriteback(&BackendWritebackContext,
+ &buf->tag);
+
TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
smgr->smgr_rnode.node.spcNode,
smgr->smgr_rnode.node.dbNode,
@@ -1642,6 +1661,7 @@ BufferSync(int flags)
int num_to_write;
int num_written;
int mask = BM_DIRTY;
+ WritebackContext wb_context;
/* Make sure we can handle the pin inside SyncOneBuffer */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
@@ -1694,6 +1714,8 @@ BufferSync(int flags)
if (num_to_write == 0)
return; /* nothing to do */
+ WritebackContextInit(&wb_context, &checkpoint_flush_after);
+
TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
/*
@@ -1725,7 +1747,7 @@ BufferSync(int flags)
*/
if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
{
- if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
+ if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
{
TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
BgWriterStats.m_buf_written_checkpoints++;
@@ -1756,6 +1778,9 @@ BufferSync(int flags)
buf_id = 0;
}
+ /* issue all pending flushes */
+ IssuePendingWritebacks(&wb_context);
+
/*
* Update checkpoint statistics. As noted above, this doesn't include
* buffers written by other backends or bgwriter scan.
@@ -1777,7 +1802,7 @@ BufferSync(int flags)
* bgwriter_lru_maxpages to 0.)
*/
bool
-BgBufferSync(void)
+BgBufferSync(WritebackContext *wb_context)
{
/* info obtained from freelist.c */
int strategy_buf_id;
@@ -2002,7 +2027,8 @@ BgBufferSync(void)
/* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{
- int buffer_state = SyncOneBuffer(next_to_clean, true);
+ int buffer_state = SyncOneBuffer(next_to_clean, true,
+ wb_context);
if (++next_to_clean >= NBuffers)
{
@@ -2079,10 +2105,11 @@ BgBufferSync(void)
* Note: caller must have done ResourceOwnerEnlargeBuffers.
*/
static int
-SyncOneBuffer(int buf_id, bool skip_recently_used)
+SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0;
+ BufferTag tag;
ReservePrivateRefCountEntry();
@@ -2123,8 +2150,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
FlushBuffer(bufHdr, NULL);
LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+
+ tag = bufHdr->tag;
+
UnpinBuffer(bufHdr, true);
+ ScheduleBufferTagForWriteback(wb_context, &tag);
+
return result | BUF_WRITTEN;
}
@@ -3729,3 +3761,154 @@ rnode_comparator(const void *p1, const void *p2)
else
return 0;
}
+
+/*
+ * BufferTag comparator.
+ */
+static int
+buffertag_comparator(const void *a, const void *b)
+{
+ const BufferTag *ba = (const BufferTag *) a;
+ const BufferTag *bb = (const BufferTag *) b;
+ int ret;
+
+ ret = rnode_comparator(&ba->rnode, &bb->rnode);
+
+ if (ret != 0)
+ return ret;
+
+ if (ba->forkNum < bb->forkNum)
+ return -1;
+ if (ba->forkNum > bb->forkNum)
+ return 1;
+
+ if (ba->blockNum < bb->blockNum)
+ return -1;
+ if (ba->blockNum > bb->blockNum)
+ return 1;
+
+ return 0;
+}
+
+/*
+ * Initialize a writeback context, discarding potential previous state.
+ *
+ * *max_coalesce is a pointer to a variable containing the current maximum
+ * number of writeback requests that will be coalesced into a bigger one. A
+ * value <= 0 means that no writeback control will be performed. max_pending
+ * is a pointer instead of an immediate value, so the coalesce limits can
+ * easily changed by the GUC mechanism, and so calling code does not have to
+ * check the current configuration.
+ */
+void
+WritebackContextInit(WritebackContext *context, int *max_pending)
+{
+ Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
+
+ context->max_pending = max_pending;
+ context->nr_pending = 0;
+}
+
+/*
+ * Add buffer to list of pending writeback requests.
+ */
+void
+ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag)
+{
+ PendingWriteback *pending;
+
+ /*
+ * Add buffer to the pending writeback array, unless writeback control is
+ * disabled.
+ */
+ if (*context->max_pending > 0)
+ {
+ Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
+
+ pending = &context->pending_writebacks[context->nr_pending++];
+
+ pending->tag = *tag;
+ }
+
+ /*
+ * Perform pending flushes if the writeback limit is exceeded. This
+ * includes the case where previously an item has been added, but control
+ * is now disabled.
+ */
+ if (context->nr_pending >= *context->max_pending)
+ IssuePendingWritebacks(context);
+}
+
+/*
+ * Issue all pending writeback requests, previously scheduled with
+ * ScheduleBufferTagForWriteback, to the OS.
+ *
+ * Because this is only used to improve the OSs IO scheduling we try to never
+ * error out - it's just a hint.
+ */
+void
+IssuePendingWritebacks(WritebackContext *context)
+{
+ int i;
+
+ if (context->nr_pending == 0)
+ return;
+
+ /*
+ * Executing the writes in-order can make them a lot faster, and allows to
+ * merge writeback requests to consecutive blocks into larger writebacks.
+ */
+ qsort(&context->pending_writebacks, context->nr_pending,
+ sizeof(PendingWriteback), buffertag_comparator);
+
+ /*
+ * Coalesce neighbouring writes, but nothing else. For that we iterate
+ * through the, now sorted, array of pending flushes, and look forward to
+ * find all neighbouring (or identical) writes.
+ */
+ for (i = 0; i < context->nr_pending; i++)
+ {
+ PendingWriteback *cur;
+ PendingWriteback *next;
+ SMgrRelation reln;
+ int ahead;
+ BufferTag tag;
+ Size nblocks = 1;
+
+ cur = &context->pending_writebacks[i];
+ tag = cur->tag;
+
+ /*
+ * Peek ahead, into following writeback requests, to see if they can
+ * be combined with the current one.
+ */
+ for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++)
+ {
+ next = &context->pending_writebacks[i + ahead + 1];
+
+ /* different file, stop */
+ if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) ||
+ cur->tag.forkNum != next->tag.forkNum)
+ break;
+
+ /* ok, block queued twice, skip */
+ if (cur->tag.blockNum == next->tag.blockNum)
+ continue;
+
+ /* only merge consecutive writes */
+ if (cur->tag.blockNum + 1 != next->tag.blockNum)
+ break;
+
+ nblocks++;
+ cur = next;
+ }
+
+ i += ahead;
+
+ /* and finally tell the kernel to write the data to storage */
+ reln = smgropen(tag.rnode, InvalidBackendId);
+ smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks);
+ }
+
+ context->nr_pending = 0;
+}