diff options
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r-- | src/backend/storage/ipc/sinvaladt.c | 198 |
1 files changed, 155 insertions, 43 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index a13918e9dc0..7c4956ae600 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.66 2008/01/01 19:45:51 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.67 2008/03/16 19:47:33 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -24,7 +24,82 @@ #include "storage/sinvaladt.h" -SISeg *shmInvalBuffer; +/* + * Conceptually, the shared cache invalidation messages are stored in an + * infinite array, where maxMsgNum is the next array subscript to store a + * submitted message in, minMsgNum is the smallest array subscript containing a + * message not yet read by all backends, and we always have maxMsgNum >= + * minMsgNum. (They are equal when there are no messages pending.) For each + * active backend, there is a nextMsgNum pointer indicating the next message it + * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every + * backend. + * + * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES + * entries. We translate MsgNum values into circular-buffer indexes by + * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as + * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum + * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space + * in the buffer. If the buffer does overflow, we reset it to empty and + * force each backend to "reset", ie, discard all its invalidatable state. + * + * We would have problems if the MsgNum values overflow an integer, so + * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND + * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be + * large so that we don't need to do this often. It must be a multiple of + * MAXNUMMESSAGES so that the existing circular-buffer entries don't need + * to be moved when we do it. + */ + + +/* + * Configurable parameters. + * + * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. + * Must be a power of 2 for speed. + * + * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. + * Must be a multiple of MAXNUMMESSAGES. Should be large. + */ + +#define MAXNUMMESSAGES 4096 +#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096) + + +/* Shared cache invalidation memory segment */ +typedef struct SISeg +{ + /* + * General state information + */ + int minMsgNum; /* oldest message still needed */ + int maxMsgNum; /* next message number to be assigned */ + int lastBackend; /* index of last active procState entry, +1 */ + int maxBackends; /* size of procState array */ + int freeBackends; /* number of empty procState slots */ + + /* + * Next LocalTransactionId to use for each idle backend slot. We keep + * this here because it is indexed by BackendId and it is convenient to + * copy the value to and from local memory when MyBackendId is set. + */ + LocalTransactionId *nextLXID; /* array of maxBackends entries */ + + /* + * Circular buffer holding shared-inval messages + */ + SharedInvalidationMessage buffer[MAXNUMMESSAGES]; + + /* + * Per-backend state info. + * + * We declare procState as 1 entry because C wants a fixed-size array, but + * actually it is maxBackends entries long. + */ + ProcState procState[1]; /* reflects the invalidation state */ +} SISeg; + +static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ + static LocalTransactionId nextLocalTransactionId; @@ -49,13 +124,12 @@ SInvalShmemSize(void) } /* - * SIBufferInit - * Create and initialize a new SI message buffer + * SharedInvalBufferInit + * Create and initialize the SI message buffer */ void -SIBufferInit(void) +CreateSharedInvalidationState(void) { - SISeg *segP; Size size; int i; bool found; @@ -64,49 +138,43 @@ SIBufferInit(void) size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); - shmInvalBuffer = segP = (SISeg *) + shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", size, &found); if (found) return; - segP->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends); + shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends); /* Clear message counters, save size of procState array */ - segP->minMsgNum = 0; - segP->maxMsgNum = 0; - segP->lastBackend = 0; - segP->maxBackends = MaxBackends; - segP->freeBackends = MaxBackends; + shmInvalBuffer->minMsgNum = 0; + shmInvalBuffer->maxMsgNum = 0; + shmInvalBuffer->lastBackend = 0; + shmInvalBuffer->maxBackends = MaxBackends; + shmInvalBuffer->freeBackends = MaxBackends; /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive, and initialize nextLXID */ - for (i = 0; i < segP->maxBackends; i++) + for (i = 0; i < shmInvalBuffer->maxBackends; i++) { - segP->procState[i].nextMsgNum = -1; /* inactive */ - segP->procState[i].resetState = false; - segP->nextLXID[i] = InvalidLocalTransactionId; + shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */ + shmInvalBuffer->procState[i].resetState = false; + shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId; } } /* - * SIBackendInit + * SharedInvalBackendInit * Initialize a new backend to operate on the sinval buffer - * - * Returns: - * >0 A-OK - * 0 Failed to find a free procState slot (ie, MaxBackends exceeded) - * <0 Some other failure (not currently used) - * - * NB: this routine, and all following ones, must be executed with the - * SInvalLock lock held, since there may be multiple backends trying - * to access the buffer. */ -int -SIBackendInit(SISeg *segP) +void +SharedInvalBackendInit(void) { int index; ProcState *stateP = NULL; + SISeg *segP = shmInvalBuffer; + + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); /* Look for a free entry in the procState array */ for (index = 0; index < segP->lastBackend; index++) @@ -128,9 +196,14 @@ SIBackendInit(SISeg *segP) } else { - /* out of procState slots */ + /* + * out of procState slots: MaxBackends exceeded -- report normally + */ MyBackendId = InvalidBackendId; - return 0; + LWLockRelease(SInvalLock); + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many clients already"))); } } @@ -153,10 +226,10 @@ SIBackendInit(SISeg *segP) stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; + LWLockRelease(SInvalLock); + /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); - - return 1; } /* @@ -210,9 +283,16 @@ CleanupInvalidationState(int status, Datum arg) * Returns true for normal successful insertion, false if had to reset. */ bool -SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data) +SIInsertDataEntry(SharedInvalidationMessage *data) { - int numMsgs = segP->maxMsgNum - segP->minMsgNum; + int numMsgs; + bool signal_postmaster = false; + SISeg *segP; + + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + + segP = shmInvalBuffer; + numMsgs = segP->maxMsgNum - segP->minMsgNum; /* Is the buffer full? */ if (numMsgs >= MAXNUMMESSAGES) @@ -222,12 +302,13 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data) * messages but not yet have done SIDelExpiredDataEntries() to advance * minMsgNum. So, make sure minMsgNum is up-to-date. */ - SIDelExpiredDataEntries(segP); + SIDelExpiredDataEntries(true); numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs >= MAXNUMMESSAGES) { /* Yup, it's definitely full, no choice but to reset */ SISetProcStateInvalid(segP); + LWLockRelease(SInvalLock); return false; } } @@ -246,7 +327,7 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data) IsUnderPostmaster) { elog(DEBUG4, "SI table is 70%% full, signaling postmaster"); - SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN); + signal_postmaster = true; } /* @@ -255,6 +336,11 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data) segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; segP->maxMsgNum++; + LWLockRelease(SInvalLock); + + if (signal_postmaster) + SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN); + return true; } @@ -293,14 +379,23 @@ SISetProcStateInvalid(SISeg *segP) * -1: SI reset message extracted * * NB: this can run in parallel with other instances of SIGetDataEntry - * executing on behalf of other backends. See comments in sinval.c in - * ReceiveSharedInvalidMessages(). + * executing on behalf of other backends, since each instance will modify only + * fields of its own backend's ProcState, and no instance will look at fields + * of other backends' ProcStates. We express this by grabbing SInvalLock in + * shared mode. Note that this is not exactly the normal (read-only) + * interpretation of a shared lock! Look closely at the interactions before + * allowing SInvalLock to be grabbed in shared mode for any other reason! */ int -SIGetDataEntry(SISeg *segP, int backendId, - SharedInvalidationMessage *data) +SIGetDataEntry(int backendId, SharedInvalidationMessage *data) { - ProcState *stateP = &segP->procState[backendId - 1]; + ProcState *stateP; + SISeg *segP; + + LWLockAcquire(SInvalLock, LW_SHARED); + + segP = shmInvalBuffer; + stateP = &segP->procState[backendId - 1]; if (stateP->resetState) { @@ -310,11 +405,15 @@ SIGetDataEntry(SISeg *segP, int backendId, */ stateP->resetState = false; stateP->nextMsgNum = segP->maxMsgNum; + LWLockRelease(SInvalLock); return -1; } if (stateP->nextMsgNum >= segP->maxMsgNum) + { + LWLockRelease(SInvalLock); return 0; /* nothing to read */ + } /* * Retrieve message and advance my counter. @@ -327,6 +426,8 @@ SIGetDataEntry(SISeg *segP, int backendId, * delete it here. SIDelExpiredDataEntries() should be called to remove * dead messages. */ + + LWLockRelease(SInvalLock); return 1; /* got a message */ } @@ -335,15 +436,23 @@ SIGetDataEntry(SISeg *segP, int backendId, * Remove messages that have been consumed by all active backends */ void -SIDelExpiredDataEntries(SISeg *segP) +SIDelExpiredDataEntries(bool locked) { + SISeg *segP = shmInvalBuffer; int min, i, h; + if (!locked) + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + min = segP->maxMsgNum; if (min == segP->minMsgNum) + { + if (!locked) + LWLockRelease(SInvalLock); return; /* fast path if no messages exist */ + } /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ @@ -372,6 +481,9 @@ SIDelExpiredDataEntries(SISeg *segP) segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } + + if (!locked) + LWLockRelease(SInvalLock); } |