diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 1999-09-06 19:37:38 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 1999-09-06 19:37:38 +0000 |
commit | 22fe3d4b0425fee1af0147a57e05aad70c3badd7 (patch) | |
tree | b55fbc7554a35c18eb465b7c64675e836cfc74ee /src/backend/storage/ipc/sinvaladt.c | |
parent | cc8b67a5c32a0cfc10be243e56d5bb10fa40f92d (diff) | |
download | postgresql-22fe3d4b0425fee1af0147a57e05aad70c3badd7.tar.gz postgresql-22fe3d4b0425fee1af0147a57e05aad70c3badd7.zip |
I finally understood what sinvaladt.c is doing --- and it
offended my aesthestic sensibility that there was so much unreadable code
doing so little. Rewritten code is about half the size, faster, and
(I hope) much more intelligible.
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r-- | src/backend/storage/ipc/sinvaladt.c | 921 |
1 files changed, 230 insertions, 691 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 2e64d027f31..99426693cd1 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.25 1999/09/06 19:37:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,648 +16,313 @@ #include "postgres.h" +#include "miscadmin.h" #include "storage/backendid.h" #include "storage/lmgr.h" +#include "storage/sinvaladt.h" #include "utils/trace.h" -/* ---------------- - * global variable notes - * - * SharedInvalidationSemaphore - * - * shmInvalBuffer - * the shared buffer segment, set by SISegmentAttach() - * - * MyBackendId - * might be removed later, used only for - * debugging in debug routines (end of file) - * - * SIDbId - * identification of buffer (disappears) - * - * SIRelId \ - * SIDummyOid \ identification of buffer - * SIXidData / - * SIXid / - * - * XXX This file really needs to be cleaned up. We switched to using - * spinlocks to protect critical sections (as opposed to using fake - * relations and going through the lock manager) and some of the old - * cruft was 'ifdef'ed out, while other parts (now unused) are still - * compiled into the system. -mer 5/24/92 - * ---------------- - */ -#ifdef HAS_TEST_AND_SET -int SharedInvalidationLockId; - -#else -IpcSemaphoreId SharedInvalidationSemaphore; - -#endif - SISeg *shmInvalBuffer; -extern BackendId MyBackendId; -static void CleanupInvalidationState(int status, SISeg *segInOutP); -static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag); -static int SIGetNumEntries(SISeg *segP); +static void SISegmentAttach(IpcMemoryId shmid); +static void SISegInit(SISeg *segP, int maxBackends); +static void CleanupInvalidationState(int status, SISeg *segP); +static void SISetProcStateInvalid(SISeg *segP); -/************************************************************************/ -/* SISetActiveProcess(segP, backendId) set the backend status active */ -/* should be called only by the postmaster when creating a backend */ -/************************************************************************/ -/* XXX I suspect that the segP parameter is extraneous. -hirohama */ -static void -SISetActiveProcess(SISeg *segInOutP, BackendId backendId) -{ - /* mark all messages as read */ - - /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */ - - segInOutP->procState[backendId - 1].resetState = false; - segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP); -} - -/****************************************************************************/ -/* SIBackendInit() initializes a backend to operate on the buffer */ -/****************************************************************************/ +/* + * SISegmentInit + * Create a new SI memory segment, or attach to an existing one + * + * This is called with createNewSegment = true by the postmaster (or by + * a standalone backend), and subsequently with createNewSegment = false + * by backends started by the postmaster. + * + * Note: maxBackends param is only valid when createNewSegment is true + */ int -SIBackendInit(SISeg *segInOutP) +SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends) { - LockRelId LtCreateRelId(); - TransactionId LMITransactionIdCopy(); - - Assert(MyBackendTag > 0); - - MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag); - if (MyBackendId == InvalidBackendTag) - return 0; - -#ifdef INVALIDDEBUG - elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.", - MyBackendTag, MyBackendId); -#endif /* INVALIDDEBUG */ + int segSize; + IpcMemoryId shmId; - SISetActiveProcess(segInOutP, MyBackendId); - on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP); - return 1; -} + if (createNewSegment) + { + /* Kill existing segment, if any */ + IpcMemoryKill(key); -/* ---------------- - * SIAssignBackendId - * ---------------- - */ -static BackendId -SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag) -{ - Index index; - ProcState *stateP = NULL; + /* Figure space needed. + * Note sizeof(SISeg) includes the first ProcState entry. + */ + segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); - for (index = 0; index < segInOutP->maxBackends; index++) - { - if (segInOutP->procState[index].tag == InvalidBackendTag || - segInOutP->procState[index].tag == backendTag) + /* Get a shared segment */ + shmId = IpcMemoryCreate(key, segSize, IPCProtection); + if (shmId < 0) { - stateP = &segInOutP->procState[index]; - break; + perror("SISegmentInit: segment create failed"); + return -1; /* an error */ } - if (!PointerIsValid(stateP) || - (segInOutP->procState[index].resetState && - (!stateP->resetState || - stateP->tag < backendTag)) || - (!stateP->resetState && - (segInOutP->procState[index].limit < - stateP->limit || - stateP->tag < backendTag))) - stateP = &segInOutP->procState[index]; - } - - /* verify that all "procState" entries checked for matching tags */ + /* Attach to the shared cache invalidation segment */ + /* sets the global variable shmInvalBuffer */ + SISegmentAttach(shmId); - for (index++; index < segInOutP->maxBackends; index++) - { - if (segInOutP->procState[index].tag == backendTag) - elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag); + /* Init shared memory contents */ + SISegInit(shmInvalBuffer, maxBackends); } - - Assert(stateP); - - if (stateP->tag != InvalidBackendTag) + else { - if (stateP->tag == backendTag) - elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag); - else + /* find existing segment */ + shmId = IpcMemoryIdGet(key, 0); + if (shmId < 0) { - elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag); - return InvalidBackendTag; + perror("SISegmentInit: segment get failed"); + return -1; /* an error */ } - } - - stateP->tag = backendTag; - - return 1 + stateP - &segInOutP->procState[0]; -} - - -/************************************************************************/ -/* The following function should be called only by the postmaster !! */ -/************************************************************************/ -/************************************************************************/ -/* SISetDeadProcess(segP, backendId) set the backend status DEAD */ -/* should be called only by the postmaster when a backend died */ -/************************************************************************/ -static void -SISetDeadProcess(SISeg *segP, int backendId) -{ - /* XXX call me.... */ - - segP->procState[backendId - 1].resetState = false; - segP->procState[backendId - 1].limit = -1; - segP->procState[backendId - 1].tag = InvalidBackendTag; + /* Attach to the shared cache invalidation segment */ + /* sets the global variable shmInvalBuffer */ + SISegmentAttach(shmId); + } + return 1; } /* - * CleanupInvalidationState - * Note: - * This is a temporary hack. ExitBackend should call this instead - * of exit (via on_shmem_exit). + * SISegmentAttach + * Attach to specified shared memory segment */ static void -CleanupInvalidationState(int status, /* XXX */ - SISeg *segInOutP) /* XXX style */ -{ - Assert(PointerIsValid(segInOutP)); - - SISetDeadProcess(segInOutP, MyBackendId); -} - - -/************************************************************************/ -/* SIComputeSize() - compute size and offsets for SI segment */ -/************************************************************************/ -static void -SIComputeSize(SISegOffsets *oP, int maxBackends) -{ - int A, - B, - a, - b, - totalSize; - - A = 0; - /* sizeof(SISeg) includes the first ProcState entry */ - a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); - a = MAXALIGN(a); /* offset to first data entry */ - b = sizeof(SISegEntry) * MAXNUMMESSAGES; - B = A + a + b; - B = MAXALIGN(B); - totalSize = B - A; - - oP->startSegment = A; - oP->offsetToFirstEntry = a; /* relative to A */ - oP->offsetToEndOfSegment = totalSize; /* relative to A */ -} - - -/************************************************************************/ -/* SISetStartEntrySection(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartEntrySection(SISeg *segP, Offset offset) -{ - segP->startEntrySection = offset; -} - -/************************************************************************/ -/* SIGetStartEntrySection(segP) - returnss the offset */ -/************************************************************************/ -static Offset -SIGetStartEntrySection(SISeg *segP) +SISegmentAttach(IpcMemoryId shmid) { - return segP->startEntrySection; -} + shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid); - -/************************************************************************/ -/* SISetEndEntrySection(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetEndEntrySection(SISeg *segP, Offset offset) -{ - segP->endEntrySection = offset; + if (shmInvalBuffer == IpcMemAttachFailed) + { + /* XXX use validity function */ + elog(FATAL, "SISegmentAttach: Could not attach segment: %m"); + } } -/************************************************************************/ -/* SISetEndEntryChain(segP, offset) - sets the offset */ -/************************************************************************/ +/* + * SISegInit + * Initialize contents of a new shared memory sinval segment + */ static void -SISetEndEntryChain(SISeg *segP, Offset offset) +SISegInit(SISeg *segP, int maxBackends) { - segP->endEntryChain = offset; -} - -/************************************************************************/ -/* SIGetEndEntryChain(segP) - returnss the offset */ -/************************************************************************/ -static Offset -SIGetEndEntryChain(SISeg *segP) -{ - return segP->endEntryChain; -} + int i; -/************************************************************************/ -/* SISetStartEntryChain(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartEntryChain(SISeg *segP, Offset offset) -{ - segP->startEntryChain = offset; -} + /* Clear message counters, save size of procState array */ + segP->minMsgNum = 0; + segP->maxMsgNum = 0; + segP->maxBackends = maxBackends; -/************************************************************************/ -/* SIGetStartEntryChain(segP) - returns the offset */ -/************************************************************************/ -static Offset -SIGetStartEntryChain(SISeg *segP) -{ - return segP->startEntryChain; -} + /* The buffer[] array is initially all unused, so we need not fill it */ -/************************************************************************/ -/* SISetNumEntries(segP, num) sets the current nuber of entries */ -/************************************************************************/ -static bool -SISetNumEntries(SISeg *segP, int num) -{ - if (num <= MAXNUMMESSAGES) - { - segP->numEntries = num; - return true; - } - else + /* Mark all backends inactive */ + for (i = 0; i < maxBackends; i++) { - return false; /* table full */ + segP->procState[i].nextMsgNum = -1; /* inactive */ + segP->procState[i].resetState = false; + segP->procState[i].tag = InvalidBackendTag; } } -/************************************************************************/ -/* SIGetNumEntries(segP) - returns the current nuber of entries */ -/************************************************************************/ -static int -SIGetNumEntries(SISeg *segP) -{ - return segP->numEntries; -} - - -/************************************************************************/ -/* SISetMaxNumEntries(segP, num) sets the maximal number of entries */ -/************************************************************************/ -static bool -SISetMaxNumEntries(SISeg *segP, int num) +/* + * SIBackendInit + * Initialize a new backend to operate on the sinval buffer + * + * NB: this routine, and all following ones, must be executed with the + * SInvalLock spinlock held, since there may be multiple backends trying + * to access the buffer. + */ +int +SIBackendInit(SISeg *segP) { - if (num <= MAXNUMMESSAGES) - { - segP->maxNumEntries = num; - return true; - } - else - { - return false; /* wrong number */ - } -} - - -/************************************************************************/ -/* SIGetProcStateLimit(segP, i) returns the limit of read messages */ -/************************************************************************/ - -#define SIGetProcStateLimit(segP,i) \ - ((segP)->procState[i].limit) + Index index; + ProcState *stateP = NULL; -/************************************************************************/ -/* SIIncNumEntries(segP, num) increments the current nuber of entries */ -/************************************************************************/ -static bool -SIIncNumEntries(SISeg *segP, int num) -{ + Assert(MyBackendTag > 0); - /* - * Try to prevent table overflow. When the table is 70% full send a - * SIGUSR2 to the postmaster which will send it back to all the - * backends. This will be handled by Async_NotifyHandler() with a - * StartTransactionCommand() which will flush unread SI entries for - * each backend. dz - 27 Jan 1998 - */ - if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100)) + /* Check for duplicate backend tags (should never happen) */ + for (index = 0; index < segP->maxBackends; index++) { - TPRINTF(TRACE_VERBOSE, - "SIIncNumEntries: table is 70%% full, signaling postmaster"); - kill(getppid(), SIGUSR2); + if (segP->procState[index].tag == MyBackendTag) + elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag); } - if ((segP->numEntries + num) <= MAXNUMMESSAGES) - { - segP->numEntries = segP->numEntries + num; - return true; - } - else + /* Look for a free entry in the procState array */ + for (index = 0; index < segP->maxBackends; index++) { - return false; /* table full */ + if (segP->procState[index].tag == InvalidBackendTag) + { + stateP = &segP->procState[index]; + break; + } } -} -/************************************************************************/ -/* SIDecNumEntries(segP, num) decrements the current nuber of entries */ -/************************************************************************/ -static bool -SIDecNumEntries(SISeg *segP, int num) -{ - if ((segP->numEntries - num) >= 0) - { - segP->numEntries = segP->numEntries - num; - return true; - } - else + /* elog() with spinlock held is probably not too cool, but these + * conditions should never happen anyway. + */ + if (stateP == NULL) { - return false; /* not enough entries in table */ + elog(NOTICE, "SIBackendInit: no free procState slot available"); + MyBackendId = InvalidBackendTag; + return 0; } -} -/************************************************************************/ -/* SISetStartFreeSpace(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartFreeSpace(SISeg *segP, Offset offset) -{ - segP->startFreeSpace = offset; -} - -/************************************************************************/ -/* SIGetStartFreeSpace(segP) - returns the offset */ -/************************************************************************/ -static Offset -SIGetStartFreeSpace(SISeg *segP) -{ - return segP->startFreeSpace; -} + MyBackendId = (stateP - &segP->procState[0]) + 1; +#ifdef INVALIDDEBUG + elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.", + MyBackendTag, MyBackendId); +#endif /* INVALIDDEBUG */ + /* mark myself active, with all extant messages already read */ + stateP->tag = MyBackendTag; + stateP->resetState = false; + stateP->nextMsgNum = segP->maxMsgNum; -/************************************************************************/ -/* SIGetFirstDataEntry(segP) returns first data entry */ -/************************************************************************/ -static SISegEntry * -SIGetFirstDataEntry(SISeg *segP) -{ - SISegEntry *eP; - Offset startChain; - - startChain = SIGetStartEntryChain(segP); - - if (startChain == InvalidOffset) - return NULL; - - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - startChain); - return eP; -} - - -/************************************************************************/ -/* SIGetLastDataEntry(segP) returns last data entry in the chain */ -/************************************************************************/ -static SISegEntry * -SIGetLastDataEntry(SISeg *segP) -{ - SISegEntry *eP; - Offset endChain; - - endChain = SIGetEndEntryChain(segP); - - if (endChain == InvalidOffset) - return NULL; - - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - endChain); - return eP; -} - -/************************************************************************/ -/* SIGetNextDataEntry(segP, offset) returns next data entry */ -/************************************************************************/ -#define SIGetNextDataEntry(segP,offset) \ - (((offset) == InvalidOffset) ? (SISegEntry *) NULL : \ - (SISegEntry *) ((Pointer) (segP) + \ - (segP)->startEntrySection + \ - (Offset) (offset))) - -/************************************************************************/ -/* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */ -/************************************************************************/ -static SISegEntry * -SIGetNthDataEntry(SISeg *segP, - int n) /* must range from 1 to MaxMessages */ -{ - SISegEntry *eP; - int i; - - if (n <= 0) - return NULL; - - eP = SIGetFirstDataEntry(segP); - for (i = 1; i < n; i++) - { - /* skip one and get the next */ - eP = SIGetNextDataEntry(segP, eP->next); - } - - return eP; -} - -/************************************************************************/ -/* SIEntryOffset(segP, entryP) returns the offset for an pointer */ -/************************************************************************/ -static Offset -SIEntryOffset(SISeg *segP, SISegEntry *entryP) -{ - /* relative to B !! */ - return ((Offset) ((Pointer) entryP - - (Pointer) segP - - SIGetStartEntrySection(segP))); -} - + /* register exit routine to mark my entry inactive at exit */ + on_shmem_exit(CleanupInvalidationState, (caddr_t) segP); -/************************************************************************/ -/* SISetDataEntry(segP, data) - sets a message in the segemnt */ -/************************************************************************/ -bool -SISetDataEntry(SISeg *segP, SharedInvalidData *data) -{ - Offset offsetToNewData; - SISegEntry *eP, - *lastP; - - if (!SIIncNumEntries(segP, 1)) - return false; /* no space */ - - /* get a free entry */ - offsetToNewData = SIGetStartFreeSpace(segP); - eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */ - SISetStartFreeSpace(segP, eP->next); - /* fill it up */ - eP->entryData = *data; - eP->isfree = false; - eP->next = InvalidOffset; - - /* handle insertion point at the end of the chain !! */ - lastP = SIGetLastDataEntry(segP); - if (lastP == NULL) - { - /* there is no chain, insert the first entry */ - SISetStartEntryChain(segP, SIEntryOffset(segP, eP)); - } - else - { - /* there is a last entry in the chain */ - lastP->next = SIEntryOffset(segP, eP); - } - SISetEndEntryChain(segP, SIEntryOffset(segP, eP)); - return true; + return 1; } - -/************************************************************************/ -/* SIDecProcLimit(segP, num) decrements all process limits */ -/************************************************************************/ +/* + * CleanupInvalidationState + * Mark the current backend as no longer active. + * + * This function is called via on_shmem_exit() during backend shutdown. + */ static void -SIDecProcLimit(SISeg *segP, int num) +CleanupInvalidationState(int status, + SISeg *segP) { - int i; + Assert(PointerIsValid(segP)); - for (i = 0; i < segP->maxBackends; i++) - { - /* decrement only, if there is a limit > 0 */ - if (segP->procState[i].limit > 0) - { - segP->procState[i].limit = segP->procState[i].limit - num; - if (segP->procState[i].limit < 0) - { - /* limit was not high enough, reset to zero */ - /* negative means it's a dead backend */ - segP->procState[i].limit = 0; - } - } - } -} + /* XXX we probably oughta grab the SInval spinlock for this... + * but I think it is safe not to. + */ + segP->procState[MyBackendId - 1].nextMsgNum = -1; + segP->procState[MyBackendId - 1].resetState = false; + segP->procState[MyBackendId - 1].tag = InvalidBackendTag; +} -/************************************************************************/ -/* SIDelDataEntries(segP, n) - free the FIRST n entries */ -/************************************************************************/ +/* + * SIInsertDataEntry + * Add a new invalidation message to the buffer. + * + * If we are unable to insert the message because the buffer is full, + * then clear the buffer and assert the "reset" flag to each backend. + * This will cause all the backends to discard *all* invalidatable state. + * + * Returns true for normal successful insertion, false if had to reset. + */ bool -SIDelDataEntries(SISeg *segP, int n) +SIInsertDataEntry(SISeg *segP, SharedInvalidData *data) { - int i; - - if (n <= 0) - return false; + int numMsgs = segP->maxMsgNum - segP->minMsgNum; - if (!SIDecNumEntries(segP, n)) + /* Is the buffer full? */ + if (numMsgs >= MAXNUMMESSAGES) { - /* not that many entries in buffer */ + /* Yes, so force reset */ + SISetProcStateInvalid(segP); return false; } - for (i = 1; i <= n; i++) + /* + * Try to prevent table overflow. When the table is 70% full send a + * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will + * send it back to all the backends. This will force idle backends to + * execute a transaction to look through pg_listener for NOTIFY messages, + * and as a byproduct of the transaction start they will read SI entries. + * + * This should never happen if all the backends are actively executing + * queries, but if a backend is sitting idle then it won't be starting + * transactions and so won't be reading SI entries. + * + * dz - 27 Jan 1998 + */ + if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && + IsUnderPostmaster) { - SISegEntry *e1P = SIGetFirstDataEntry(segP); - SISetStartEntryChain(segP, e1P->next); - if (SIGetStartEntryChain(segP) == InvalidOffset) - { - /* it was the last entry */ - SISetEndEntryChain(segP, InvalidOffset); - } - /* free the entry */ - e1P->isfree = true; - e1P->next = SIGetStartFreeSpace(segP); - SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P)); + TPRINTF(TRACE_VERBOSE, + "SIInsertDataEntry: table is 70%% full, signaling postmaster"); + kill(getppid(), SIGUSR2); } - SIDecProcLimit(segP, n); + /* + * Insert new message into proper slot of circular buffer + */ + segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; + segP->maxMsgNum++; + return true; } - - -/************************************************************************/ -/* SISetProcStateInvalid(segP) checks and marks a backends state as */ -/* invalid */ -/************************************************************************/ -void +/* + * SISetProcStateInvalid + * Flush pending messages from buffer, assert reset flag for each backend + * + * This is used only to recover from SI buffer overflow. + */ +static void SISetProcStateInvalid(SISeg *segP) { int i; + segP->minMsgNum = 0; + segP->maxMsgNum = 0; + for (i = 0; i < segP->maxBackends; i++) { - if (segP->procState[i].limit == 0) + if (segP->procState[i].nextMsgNum >= 0) /* active backend? */ { - /* backend i didn't read any message */ segP->procState[i].resetState = true; - - /* - * XXX signal backend that it has to reset its internal cache - * ? - */ + segP->procState[i].nextMsgNum = 0; } } } -/************************************************************************/ -/* SIGetDataEntry(segP, backendId, data) */ -/* get next SI message for specified backend, if there is one */ -/* */ -/* Possible return values: */ -/* 0: no SI message available */ -/* 1: next SI message has been extracted into *data */ -/* (there may be more messages available after this one!) */ -/* -1: SI reset message extracted */ -/************************************************************************/ +/* + * SIGetDataEntry + * get next SI message for specified backend, if there is one + * + * Possible return values: + * 0: no SI message available + * 1: next SI message has been extracted into *data + * (there may be more messages available after this one!) + * -1: SI reset message extracted + */ int SIGetDataEntry(SISeg *segP, int backendId, SharedInvalidData *data) { - SISegEntry *msg; + ProcState *stateP = & segP->procState[backendId - 1]; - Assert(segP->procState[backendId - 1].tag == MyBackendTag); + Assert(stateP->tag == MyBackendTag); - if (segP->procState[backendId - 1].resetState) + if (stateP->resetState) { - /* new valid state--mark all messages "read" */ - segP->procState[backendId - 1].resetState = false; - segP->procState[backendId - 1].limit = SIGetNumEntries(segP); + /* Force reset. We can say we have dealt with any messages added + * since the reset, as well... + */ + stateP->resetState = false; + stateP->nextMsgNum = segP->maxMsgNum; return -1; } - /* Get next message for this backend, if any */ - - /* This is fairly inefficient if there are many messages, - * but normally there should not be... - */ - msg = SIGetNthDataEntry(segP, - SIGetProcStateLimit(segP, backendId - 1) + 1); - - if (msg == NULL) + if (stateP->nextMsgNum >= segP->maxMsgNum) return 0; /* nothing to read */ - *data = msg->entryData; /* return contents of message */ - - segP->procState[backendId - 1].limit++; /* one more message read */ + /* + * Retrieve message and advance my counter. + */ + *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; + stateP->nextMsgNum++; /* There may be other backends that haven't read the message, * so we cannot delete it here. @@ -666,9 +331,10 @@ SIGetDataEntry(SISeg *segP, int backendId, return 1; /* got a message */ } -/************************************************************************/ -/* SIDelExpiredDataEntries (segP) - removes irrelevant messages */ -/************************************************************************/ +/* + * SIDelExpiredDataEntries + * Remove messages that have been consumed by all active backends + */ void SIDelExpiredDataEntries(SISeg *segP) { @@ -676,161 +342,34 @@ SIDelExpiredDataEntries(SISeg *segP) i, h; - min = 9999999; + min = segP->maxMsgNum; + if (min == segP->minMsgNum) + return; /* fast path if no messages exist */ + + /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ + for (i = 0; i < segP->maxBackends; i++) { - h = SIGetProcStateLimit(segP, i); + h = segP->procState[i].nextMsgNum; if (h >= 0) { /* backend active */ if (h < min) min = h; } } - if (min < 9999999 && min > 0) - { - /* we can remove min messages */ - /* this adjusts also the state limits! */ - if (!SIDelDataEntries(segP, min)) - elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state"); - } -} - - - -/************************************************************************/ -/* SISegInit(segP) - initializes the segment */ -/************************************************************************/ -static void -SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends) -{ - int i; - SISegEntry *eP; - - /* set semaphore ids in the segment */ - /* XXX */ - SISetStartEntrySection(segP, oP->offsetToFirstEntry); - SISetEndEntrySection(segP, oP->offsetToEndOfSegment); - SISetStartFreeSpace(segP, 0); - SISetStartEntryChain(segP, InvalidOffset); - SISetEndEntryChain(segP, InvalidOffset); - SISetNumEntries(segP, 0); - SISetMaxNumEntries(segP, MAXNUMMESSAGES); - segP->maxBackends = maxBackends; - for (i = 0; i < segP->maxBackends; i++) - { - segP->procState[i].limit = -1; /* no backend active !! */ - segP->procState[i].resetState = false; - segP->procState[i].tag = InvalidBackendTag; - } - /* construct a chain of free entries */ - for (i = 1; i < MAXNUMMESSAGES; i++) - { - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - (i - 1) * sizeof(SISegEntry)); - eP->isfree = true; - eP->next = i * sizeof(SISegEntry); /* relative to B */ - } - /* handle the last free entry separate */ - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - (MAXNUMMESSAGES - 1) * sizeof(SISegEntry)); - eP->isfree = true; - eP->next = InvalidOffset; /* it's the end of the chain !! */ -} - - - -/************************************************************************/ -/* SISegmentKill(key) - kill any segment */ -/************************************************************************/ -static void -SISegmentKill(int key) /* the corresponding key for the segment */ -{ - IpcMemoryKill(key); -} - - -/************************************************************************/ -/* SISegmentGet(key, size) - get a shared segment of size <size> */ -/* returns a segment id */ -/************************************************************************/ -static IpcMemoryId -SISegmentGet(int key, /* the corresponding key for the segment */ - int size, /* size of segment in bytes */ - bool create) -{ - IpcMemoryId shmid; - - if (create) - shmid = IpcMemoryCreate(key, size, IPCProtection); - else - shmid = IpcMemoryIdGet(key, size); - return shmid; -} - -/************************************************************************/ -/* SISegmentAttach(shmid) - attach a shared segment with id shmid */ -/************************************************************************/ -static void -SISegmentAttach(IpcMemoryId shmid) -{ - shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid); - if (shmInvalBuffer == IpcMemAttachFailed) - { - /* XXX use validity function */ - elog(FATAL, "SISegmentAttach: Could not attach segment: %m"); - } -} - - -/************************************************************************/ -/* SISegmentInit() initialize SI segment */ -/* */ -/* NB: maxBackends param is only valid when killExistingSegment is true */ -/************************************************************************/ -int -SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends) -{ - SISegOffsets offsets; - IpcMemoryId shmId; - bool create; - - if (killExistingSegment) - { - /* Kill existing segment */ - /* set semaphore */ - SISegmentKill(key); - - /* Get a shared segment */ - SIComputeSize(&offsets, maxBackends); - create = true; - shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create); - if (shmId < 0) - { - perror("SISegmentGet: failed"); - return -1; /* an error */ - } + segP->minMsgNum = min; - /* Attach the shared cache invalidation segment */ - /* sets the global variable shmInvalBuffer */ - SISegmentAttach(shmId); - - /* Init shared memory table */ - SISegInit(shmInvalBuffer, &offsets, maxBackends); - } - else + /* When minMsgNum gets really large, decrement all message counters + * so as to forestall overflow of the counters. + */ + if (min >= MSGNUMWRAPAROUND) { - /* use an existing segment */ - create = false; - shmId = SISegmentGet(key, 0, create); - if (shmId < 0) + segP->minMsgNum -= MSGNUMWRAPAROUND; + segP->maxMsgNum -= MSGNUMWRAPAROUND; + for (i = 0; i < segP->maxBackends; i++) { - perror("SISegmentGet: getting an existent segment failed"); - return -1; /* an error */ + if (segP->procState[i].nextMsgNum >= 0) + segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } - /* Attach the shared cache invalidation segment */ - SISegmentAttach(shmId); } - return 1; } |