aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc/sinvaladt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r--src/backend/storage/ipc/sinvaladt.c198
1 files changed, 155 insertions, 43 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index a13918e9dc0..7c4956ae600 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.66 2008/01/01 19:45:51 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.67 2008/03/16 19:47:33 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
@@ -24,7 +24,82 @@
#include "storage/sinvaladt.h"
-SISeg *shmInvalBuffer;
+/*
+ * Conceptually, the shared cache invalidation messages are stored in an
+ * infinite array, where maxMsgNum is the next array subscript to store a
+ * submitted message in, minMsgNum is the smallest array subscript containing a
+ * message not yet read by all backends, and we always have maxMsgNum >=
+ * minMsgNum. (They are equal when there are no messages pending.) For each
+ * active backend, there is a nextMsgNum pointer indicating the next message it
+ * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
+ * backend.
+ *
+ * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
+ * entries. We translate MsgNum values into circular-buffer indexes by
+ * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
+ * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
+ * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
+ * in the buffer. If the buffer does overflow, we reset it to empty and
+ * force each backend to "reset", ie, discard all its invalidatable state.
+ *
+ * We would have problems if the MsgNum values overflow an integer, so
+ * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
+ * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
+ * large so that we don't need to do this often. It must be a multiple of
+ * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
+ * to be moved when we do it.
+ */
+
+
+/*
+ * Configurable parameters.
+ *
+ * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
+ * Must be a power of 2 for speed.
+ *
+ * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
+ * Must be a multiple of MAXNUMMESSAGES. Should be large.
+ */
+
+#define MAXNUMMESSAGES 4096
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
+
+
+/* Shared cache invalidation memory segment */
+typedef struct SISeg
+{
+ /*
+ * General state information
+ */
+ int minMsgNum; /* oldest message still needed */
+ int maxMsgNum; /* next message number to be assigned */
+ int lastBackend; /* index of last active procState entry, +1 */
+ int maxBackends; /* size of procState array */
+ int freeBackends; /* number of empty procState slots */
+
+ /*
+ * Next LocalTransactionId to use for each idle backend slot. We keep
+ * this here because it is indexed by BackendId and it is convenient to
+ * copy the value to and from local memory when MyBackendId is set.
+ */
+ LocalTransactionId *nextLXID; /* array of maxBackends entries */
+
+ /*
+ * Circular buffer holding shared-inval messages
+ */
+ SharedInvalidationMessage buffer[MAXNUMMESSAGES];
+
+ /*
+ * Per-backend state info.
+ *
+ * We declare procState as 1 entry because C wants a fixed-size array, but
+ * actually it is maxBackends entries long.
+ */
+ ProcState procState[1]; /* reflects the invalidation state */
+} SISeg;
+
+static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
+
static LocalTransactionId nextLocalTransactionId;
@@ -49,13 +124,12 @@ SInvalShmemSize(void)
}
/*
- * SIBufferInit
- * Create and initialize a new SI message buffer
+ * SharedInvalBufferInit
+ * Create and initialize the SI message buffer
*/
void
-SIBufferInit(void)
+CreateSharedInvalidationState(void)
{
- SISeg *segP;
Size size;
int i;
bool found;
@@ -64,49 +138,43 @@ SIBufferInit(void)
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
- shmInvalBuffer = segP = (SISeg *)
+ shmInvalBuffer = (SISeg *)
ShmemInitStruct("shmInvalBuffer", size, &found);
if (found)
return;
- segP->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
+ shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
/* Clear message counters, save size of procState array */
- segP->minMsgNum = 0;
- segP->maxMsgNum = 0;
- segP->lastBackend = 0;
- segP->maxBackends = MaxBackends;
- segP->freeBackends = MaxBackends;
+ shmInvalBuffer->minMsgNum = 0;
+ shmInvalBuffer->maxMsgNum = 0;
+ shmInvalBuffer->lastBackend = 0;
+ shmInvalBuffer->maxBackends = MaxBackends;
+ shmInvalBuffer->freeBackends = MaxBackends;
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
- for (i = 0; i < segP->maxBackends; i++)
+ for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
- segP->procState[i].nextMsgNum = -1; /* inactive */
- segP->procState[i].resetState = false;
- segP->nextLXID[i] = InvalidLocalTransactionId;
+ shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
+ shmInvalBuffer->procState[i].resetState = false;
+ shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
}
}
/*
- * SIBackendInit
+ * SharedInvalBackendInit
* Initialize a new backend to operate on the sinval buffer
- *
- * Returns:
- * >0 A-OK
- * 0 Failed to find a free procState slot (ie, MaxBackends exceeded)
- * <0 Some other failure (not currently used)
- *
- * NB: this routine, and all following ones, must be executed with the
- * SInvalLock lock held, since there may be multiple backends trying
- * to access the buffer.
*/
-int
-SIBackendInit(SISeg *segP)
+void
+SharedInvalBackendInit(void)
{
int index;
ProcState *stateP = NULL;
+ SISeg *segP = shmInvalBuffer;
+
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
@@ -128,9 +196,14 @@ SIBackendInit(SISeg *segP)
}
else
{
- /* out of procState slots */
+ /*
+ * out of procState slots: MaxBackends exceeded -- report normally
+ */
MyBackendId = InvalidBackendId;
- return 0;
+ LWLockRelease(SInvalLock);
+ ereport(FATAL,
+ (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
+ errmsg("sorry, too many clients already")));
}
}
@@ -153,10 +226,10 @@ SIBackendInit(SISeg *segP)
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
+ LWLockRelease(SInvalLock);
+
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
-
- return 1;
}
/*
@@ -210,9 +283,16 @@ CleanupInvalidationState(int status, Datum arg)
* Returns true for normal successful insertion, false if had to reset.
*/
bool
-SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
+SIInsertDataEntry(SharedInvalidationMessage *data)
{
- int numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ int numMsgs;
+ bool signal_postmaster = false;
+ SISeg *segP;
+
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+
+ segP = shmInvalBuffer;
+ numMsgs = segP->maxMsgNum - segP->minMsgNum;
/* Is the buffer full? */
if (numMsgs >= MAXNUMMESSAGES)
@@ -222,12 +302,13 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
* messages but not yet have done SIDelExpiredDataEntries() to advance
* minMsgNum. So, make sure minMsgNum is up-to-date.
*/
- SIDelExpiredDataEntries(segP);
+ SIDelExpiredDataEntries(true);
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs >= MAXNUMMESSAGES)
{
/* Yup, it's definitely full, no choice but to reset */
SISetProcStateInvalid(segP);
+ LWLockRelease(SInvalLock);
return false;
}
}
@@ -246,7 +327,7 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
IsUnderPostmaster)
{
elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
- SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
+ signal_postmaster = true;
}
/*
@@ -255,6 +336,11 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
segP->maxMsgNum++;
+ LWLockRelease(SInvalLock);
+
+ if (signal_postmaster)
+ SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
+
return true;
}
@@ -293,14 +379,23 @@ SISetProcStateInvalid(SISeg *segP)
* -1: SI reset message extracted
*
* NB: this can run in parallel with other instances of SIGetDataEntry
- * executing on behalf of other backends. See comments in sinval.c in
- * ReceiveSharedInvalidMessages().
+ * executing on behalf of other backends, since each instance will modify only
+ * fields of its own backend's ProcState, and no instance will look at fields
+ * of other backends' ProcStates. We express this by grabbing SInvalLock in
+ * shared mode. Note that this is not exactly the normal (read-only)
+ * interpretation of a shared lock! Look closely at the interactions before
+ * allowing SInvalLock to be grabbed in shared mode for any other reason!
*/
int
-SIGetDataEntry(SISeg *segP, int backendId,
- SharedInvalidationMessage *data)
+SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
{
- ProcState *stateP = &segP->procState[backendId - 1];
+ ProcState *stateP;
+ SISeg *segP;
+
+ LWLockAcquire(SInvalLock, LW_SHARED);
+
+ segP = shmInvalBuffer;
+ stateP = &segP->procState[backendId - 1];
if (stateP->resetState)
{
@@ -310,11 +405,15 @@ SIGetDataEntry(SISeg *segP, int backendId,
*/
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
+ LWLockRelease(SInvalLock);
return -1;
}
if (stateP->nextMsgNum >= segP->maxMsgNum)
+ {
+ LWLockRelease(SInvalLock);
return 0; /* nothing to read */
+ }
/*
* Retrieve message and advance my counter.
@@ -327,6 +426,8 @@ SIGetDataEntry(SISeg *segP, int backendId,
* delete it here. SIDelExpiredDataEntries() should be called to remove
* dead messages.
*/
+
+ LWLockRelease(SInvalLock);
return 1; /* got a message */
}
@@ -335,15 +436,23 @@ SIGetDataEntry(SISeg *segP, int backendId,
* Remove messages that have been consumed by all active backends
*/
void
-SIDelExpiredDataEntries(SISeg *segP)
+SIDelExpiredDataEntries(bool locked)
{
+ SISeg *segP = shmInvalBuffer;
int min,
i,
h;
+ if (!locked)
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+
min = segP->maxMsgNum;
if (min == segP->minMsgNum)
+ {
+ if (!locked)
+ LWLockRelease(SInvalLock);
return; /* fast path if no messages exist */
+ }
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
@@ -372,6 +481,9 @@ SIDelExpiredDataEntries(SISeg *segP)
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
+
+ if (!locked)
+ LWLockRelease(SInvalLock);
}