aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/storage/buffer/bufmgr.c147
-rw-r--r--src/include/storage/bufmgr.h1
2 files changed, 101 insertions, 47 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 14fc1bd1248..323382dcfa8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1252,12 +1252,13 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
Buffer *buffers,
BlockNumber blockNum,
int *nblocks,
- int flags)
+ int flags,
+ bool allow_forwarding)
{
int actual_nblocks = *nblocks;
- int io_buffers_len = 0;
int maxcombine = 0;
+ Assert(*nblocks == 1 || allow_forwarding);
Assert(*nblocks > 0);
Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
@@ -1265,30 +1266,81 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
{
bool found;
- buffers[i] = PinBufferForBlock(operation->rel,
- operation->smgr,
- operation->persistence,
- operation->forknum,
- blockNum + i,
- operation->strategy,
- &found);
+ if (allow_forwarding && buffers[i] != InvalidBuffer)
+ {
+ BufferDesc *bufHdr;
+
+ /*
+ * This is a buffer that was pinned by an earlier call to
+ * StartReadBuffers(), but couldn't be handled in one operation at
+ * that time. The operation was split, and the caller has passed
+ * an already pinned buffer back to us to handle the rest of the
+ * operation. It must continue at the expected block number.
+ */
+ Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+ /*
+ * It might be an already valid buffer (a hit) that followed the
+ * final contiguous block of an earlier I/O (a miss) marking the
+ * end of it, or a buffer that some other backend has since made
+ * valid by performing the I/O for us, in which case we can handle
+ * it as a hit now. It is safe to check for a BM_VALID flag with
+ * a relaxed load, because we got a fresh view of it while pinning
+ * it in the previous call.
+ *
+ * On the other hand if we don't see BM_VALID yet, it must be an
+ * I/O that was split by the previous call and we need to try to
+ * start a new I/O from this block. We're also racing against any
+ * other backend that might start the I/O or even manage to mark
+ * it BM_VALID after this check, but StartBufferIO() will handle
+ * those cases.
+ */
+ if (BufferIsLocal(buffers[i]))
+ bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+ else
+ bufHdr = GetBufferDescriptor(buffers[i] - 1);
+ Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
+ found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+ }
+ else
+ {
+ buffers[i] = PinBufferForBlock(operation->rel,
+ operation->smgr,
+ operation->persistence,
+ operation->forknum,
+ blockNum + i,
+ operation->strategy,
+ &found);
+ }
if (found)
{
/*
- * Terminate the read as soon as we get a hit. It could be a
- * single buffer hit, or it could be a hit that follows a readable
- * range. We don't want to create more than one readable range,
- * so we stop here.
+ * We have a hit. If it's the first block in the requested range,
+ * we can return it immediately and report that WaitReadBuffers()
+ * does not need to be called. If the initial value of *nblocks
+ * was larger, the caller will have to call again for the rest.
*/
- actual_nblocks = i + 1;
+ if (i == 0)
+ {
+ *nblocks = 1;
+ return false;
+ }
+
+ /*
+ * Otherwise we already have an I/O to perform, but this block
+ * can't be included as it is already valid. Split the I/O here.
+ * There may or may not be more blocks requiring I/O after this
+ * one, we haven't checked, but they can't be contiguous with this
+ * one in the way. We'll leave this buffer pinned, forwarding it
+ * to the next call, avoiding the need to unpin it here and re-pin
+ * it in the next call.
+ */
+ actual_nblocks = i;
break;
}
else
{
- /* Extend the readable range to cover this block. */
- io_buffers_len++;
-
/*
* Check how many blocks we can cover with the same IO. The smgr
* implementation might e.g. be limited due to a segment boundary.
@@ -1309,15 +1361,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
}
*nblocks = actual_nblocks;
- if (likely(io_buffers_len == 0))
- return false;
-
/* Populate information needed for I/O. */
operation->buffers = buffers;
operation->blocknum = blockNum;
operation->flags = flags;
operation->nblocks = actual_nblocks;
- operation->io_buffers_len = io_buffers_len;
if (flags & READ_BUFFERS_ISSUE_ADVICE)
{
@@ -1332,7 +1380,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
smgrprefetch(operation->smgr,
operation->forknum,
blockNum,
- operation->io_buffers_len);
+ actual_nblocks);
}
/* Indicate that WaitReadBuffers() should be called. */
@@ -1341,16 +1389,26 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
/*
* Begin reading a range of blocks beginning at blockNum and extending for
- * *nblocks. On return, up to *nblocks pinned buffers holding those blocks
- * are written into the buffers array, and *nblocks is updated to contain the
- * actual number, which may be fewer than requested. Caller sets some of the
- * members of operation; see struct definition.
- *
- * If false is returned, no I/O is necessary. If true is returned, one I/O
- * has been started, and WaitReadBuffers() must be called with the same
- * operation object before the buffers are accessed. Along with the operation
- * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * *nblocks. *nblocks and the buffers array are in/out parameters. On entry,
+ * the buffers elements covered by *nblocks must hold either InvalidBuffer or
+ * buffers forwarded by an earlier call to StartReadBuffers() that was split
+ * and is now being continued. On return, *nblocks holds the number of blocks
+ * accepted by this operation. If it is less than the original number then
+ * this operation has been split, but buffer elements up to the original
+ * requested size may hold forwarded buffers to be used for a continuing
+ * operation. The caller must either start a new I/O beginning at the block
+ * immediately following the blocks accepted by this call and pass those
+ * buffers back in, or release them if it chooses not to. It shouldn't make
+ * any other use of or assumptions about forwarded buffers.
+ *
+ * If false is returned, no I/O is necessary and the buffers covered by
+ * *nblocks on exit are valid and ready to be accessed. If true is returned,
+ * an I/O has been started, and WaitReadBuffers() must be called with the same
+ * operation object before the buffers covered by *nblocks on exit can be
+ * accessed. Along with the operation object, the caller-supplied array of
+ * buffers must remain valid until WaitReadBuffers() is called, and any
+ * forwarded buffers must also be preserved for a continuing call unless
+ * they are explicitly released.
*
* Currently the I/O is only started with optional operating system advice if
* requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1364,13 +1422,17 @@ StartReadBuffers(ReadBuffersOperation *operation,
int *nblocks,
int flags)
{
- return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+ return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+ true /* expect forwarded buffers */ );
}
/*
* Single block version of the StartReadBuffers(). This might save a few
* instructions when called from another translation unit, because it is
* specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block and *buffer is ignored on entry.
*/
bool
StartReadBuffer(ReadBuffersOperation *operation,
@@ -1381,7 +1443,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
int nblocks = 1;
bool result;
- result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+ result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+ false /* single block, no forwarding */ );
Assert(nblocks == 1); /* single block can't be short */
return result;
@@ -1407,24 +1470,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
IOObject io_object;
char persistence;
- /*
- * Currently operations are only allowed to include a read of some range,
- * with an optional extra buffer that is already pinned at the end. So
- * nblocks can be at most one more than io_buffers_len.
- */
- Assert((operation->nblocks == operation->io_buffers_len) ||
- (operation->nblocks == operation->io_buffers_len + 1));
-
/* Find the range of the physical read we need to perform. */
- nblocks = operation->io_buffers_len;
- if (nblocks == 0)
- return; /* nothing to do */
-
+ nblocks = operation->nblocks;
buffers = &operation->buffers[0];
blocknum = operation->blocknum;
forknum = operation->forknum;
persistence = operation->persistence;
+ Assert(nblocks > 0);
+ Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
if (persistence == RELPERSISTENCE_TEMP)
{
io_context = IOCONTEXT_NORMAL;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ecc1c3a909b..538b890a51d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
BlockNumber blocknum;
int flags;
int16 nblocks;
- int16 io_buffers_len;
};
typedef struct ReadBuffersOperation ReadBuffersOperation;