/*------------------------------------------------------------------------- * * sinvaladt.c-- * POSTGRES shared cache invalidation segment definitions. * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.1.1.1 1996/07/09 06:21:54 scrappy Exp $ * *------------------------------------------------------------------------- */ #include "storage/ipc.h" #include "storage/sinvaladt.h" #include "storage/lmgr.h" #include "utils/elog.h" #include "utils/palloc.h" /* ---------------- * global variable notes * * SharedInvalidationSemaphore * * shmInvalBuffer * the shared buffer segment, set by SISegmentAttach() * * MyBackendId * might be removed later, used only for * debugging in debug routines (end of file) * * SIDbId * identification of buffer (disappears) * * SIRelId \ * SIDummyOid \ identification of buffer * SIXidData / * SIXid / * * XXX This file really needs to be cleaned up. We switched to using * spinlocks to protect critical sections (as opposed to using fake * relations and going through the lock manager) and some of the old * cruft was 'ifdef'ed out, while other parts (now unused) are still * compiled into the system. -mer 5/24/92 * ---------------- */ #ifdef HAS_TEST_AND_SET int SharedInvalidationLockId; #else IpcSemaphoreId SharedInvalidationSemaphore; #endif SISeg *shmInvalBuffer; extern BackendId MyBackendId; static void CleanupInvalidationState(int status, SISeg *segInOutP); static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag); static int SIGetNumEntries(SISeg *segP); /************************************************************************/ /* SISetActiveProcess(segP, backendId) set the backend status active */ /* should be called only by the postmaster when creating a backend */ /************************************************************************/ /* XXX I suspect that the segP parameter is extraneous. -hirohama */ static void SISetActiveProcess(SISeg *segInOutP, BackendId backendId) { /* mark all messages as read */ /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */ segInOutP->procState[backendId - 1].resetState = false; segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP); } /****************************************************************************/ /* SIBackendInit() initializes a backend to operate on the buffer */ /****************************************************************************/ int SIBackendInit(SISeg *segInOutP) { LRelId LtCreateRelId(); TransactionId LMITransactionIdCopy(); Assert(MyBackendTag > 0); MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag); if (MyBackendId == InvalidBackendTag) return 0; #ifdef INVALIDDEBUG elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.", MyBackendTag, MyBackendId); #endif /* INVALIDDEBUG */ SISetActiveProcess(segInOutP, MyBackendId); on_exitpg(CleanupInvalidationState, (caddr_t)segInOutP); return 1; } /* ---------------- * SIAssignBackendId * ---------------- */ static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag) { Index index; ProcState *stateP; stateP = NULL; for (index = 0; index < MaxBackendId; index += 1) { if (segInOutP->procState[index].tag == InvalidBackendTag || segInOutP->procState[index].tag == backendTag) { stateP = &segInOutP->procState[index]; break; } if (!PointerIsValid(stateP) || (segInOutP->procState[index].resetState && (!stateP->resetState || stateP->tag < backendTag)) || (!stateP->resetState && (segInOutP->procState[index].limit < stateP->limit || stateP->tag < backendTag))) { stateP = &segInOutP->procState[index]; } } /* verify that all "procState" entries checked for matching tags */ for (index += 1; index < MaxBackendId; index += 1) { if (segInOutP->procState[index].tag == backendTag) { elog (FATAL, "SIAssignBackendId: tag %d found twice", backendTag); } } if (stateP->tag != InvalidBackendTag) { if (stateP->tag == backendTag) { elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag); } else { elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag); return InvalidBackendTag; } } stateP->tag = backendTag; return (1 + stateP - &segInOutP->procState[0]); } /************************************************************************/ /* The following function should be called only by the postmaster !! */ /************************************************************************/ /************************************************************************/ /* SISetDeadProcess(segP, backendId) set the backend status DEAD */ /* should be called only by the postmaster when a backend died */ /************************************************************************/ static void SISetDeadProcess(SISeg *segP, int backendId) { /* XXX call me.... */ segP->procState[backendId - 1].resetState = false; segP->procState[backendId - 1].limit = -1; segP->procState[backendId - 1].tag = InvalidBackendTag; } /* * CleanupInvalidationState -- * Note: * This is a temporary hack. ExitBackend should call this instead * of exit (via on_exitpg). */ static void CleanupInvalidationState(int status, /* XXX */ SISeg *segInOutP) /* XXX style */ { Assert(PointerIsValid(segInOutP)); SISetDeadProcess(segInOutP, MyBackendId); } /************************************************************************/ /* SIComputeSize() - retuns the size of a buffer segment */ /************************************************************************/ static SISegOffsets * SIComputeSize(int *segSize) { int A, B, a, b, totalSize; SISegOffsets *oP; A = 0; a = SizeSISeg; /* offset to first data entry */ b = SizeOfOneSISegEntry * MAXNUMMESSAGES; B = A + a + b; totalSize = B - A; *segSize = totalSize; oP = (SISegOffsets *) palloc(sizeof(SISegOffsets)); oP->startSegment = A; oP->offsetToFirstEntry = a; /* relatiove to A */ oP->offsetToEndOfSegemnt = totalSize; /* relative to A */ return(oP); } /************************************************************************/ /* SISetStartEntrySection(segP, offset) - sets the offset */ /************************************************************************/ static void SISetStartEntrySection(SISeg *segP, Offset offset) { segP->startEntrySection = offset; } /************************************************************************/ /* SIGetStartEntrySection(segP) - returnss the offset */ /************************************************************************/ static Offset SIGetStartEntrySection(SISeg *segP) { return(segP->startEntrySection); } /************************************************************************/ /* SISetEndEntrySection(segP, offset) - sets the offset */ /************************************************************************/ static void SISetEndEntrySection(SISeg *segP, Offset offset) { segP->endEntrySection = offset; } /************************************************************************/ /* SISetEndEntryChain(segP, offset) - sets the offset */ /************************************************************************/ static void SISetEndEntryChain(SISeg *segP, Offset offset) { segP->endEntryChain = offset; } /************************************************************************/ /* SIGetEndEntryChain(segP) - returnss the offset */ /************************************************************************/ static Offset SIGetEndEntryChain(SISeg *segP) { return(segP->endEntryChain); } /************************************************************************/ /* SISetStartEntryChain(segP, offset) - sets the offset */ /************************************************************************/ static void SISetStartEntryChain(SISeg *segP, Offset offset) { segP->startEntryChain = offset; } /************************************************************************/ /* SIGetStartEntryChain(segP) - returns the offset */ /************************************************************************/ static Offset SIGetStartEntryChain(SISeg *segP) { return(segP->startEntryChain); } /************************************************************************/ /* SISetNumEntries(segP, num) sets the current nuber of entries */ /************************************************************************/ static bool SISetNumEntries(SISeg *segP, int num) { if ( num <= MAXNUMMESSAGES) { segP->numEntries = num; return(true); } else { return(false); /* table full */ } } /************************************************************************/ /* SIGetNumEntries(segP) - returns the current nuber of entries */ /************************************************************************/ static int SIGetNumEntries(SISeg *segP) { return(segP->numEntries); } /************************************************************************/ /* SISetMaxNumEntries(segP, num) sets the maximal number of entries */ /************************************************************************/ static bool SISetMaxNumEntries(SISeg *segP, int num) { if ( num <= MAXNUMMESSAGES) { segP->maxNumEntries = num; return(true); } else { return(false); /* wrong number */ } } /************************************************************************/ /* SIGetProcStateLimit(segP, i) returns the limit of read messages */ /************************************************************************/ static int SIGetProcStateLimit(SISeg *segP, int i) { return(segP->procState[i].limit); } /************************************************************************/ /* SIIncNumEntries(segP, num) increments the current nuber of entries */ /************************************************************************/ static bool SIIncNumEntries(SISeg *segP, int num) { if ((segP->numEntries + num) <= MAXNUMMESSAGES) { segP->numEntries = segP->numEntries + num; return(true); } else { return(false); /* table full */ } } /************************************************************************/ /* SIDecNumEntries(segP, num) decrements the current nuber of entries */ /************************************************************************/ static bool SIDecNumEntries(SISeg *segP, int num) { if ((segP->numEntries - num) >= 0) { segP->numEntries = segP->numEntries - num; return(true); } else { return(false); /* not enough entries in table */ } } /************************************************************************/ /* SISetStartFreeSpace(segP, offset) - sets the offset */ /************************************************************************/ static void SISetStartFreeSpace(SISeg *segP, Offset offset) { segP->startFreeSpace = offset; } /************************************************************************/ /* SIGetStartFreeSpace(segP) - returns the offset */ /************************************************************************/ static Offset SIGetStartFreeSpace(SISeg *segP) { return(segP->startFreeSpace); } /************************************************************************/ /* SIGetFirstDataEntry(segP) returns first data entry */ /************************************************************************/ static SISegEntry * SIGetFirstDataEntry(SISeg *segP) { SISegEntry *eP; Offset startChain; startChain = SIGetStartEntryChain(segP); if (startChain == InvalidOffset) return(NULL); eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + startChain ); return(eP); } /************************************************************************/ /* SIGetLastDataEntry(segP) returns last data entry in the chain */ /************************************************************************/ static SISegEntry * SIGetLastDataEntry(SISeg *segP) { SISegEntry *eP; Offset endChain; endChain = SIGetEndEntryChain(segP); if (endChain == InvalidOffset) return(NULL); eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + endChain ); return(eP); } /************************************************************************/ /* SIGetNextDataEntry(segP, offset) returns next data entry */ /************************************************************************/ static SISegEntry * SIGetNextDataEntry(SISeg *segP, Offset offset) { SISegEntry *eP; if (offset == InvalidOffset) return(NULL); eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + offset); return(eP); } /************************************************************************/ /* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */ /************************************************************************/ static SISegEntry * SIGetNthDataEntry(SISeg *segP, int n) /* must range from 1 to MaxMessages */ { SISegEntry *eP; int i; if (n <= 0) return(NULL); eP = SIGetFirstDataEntry(segP); for (i = 1; i < n; i++) { /* skip one and get the next */ eP = SIGetNextDataEntry(segP, eP->next); } return(eP); } /************************************************************************/ /* SIEntryOffset(segP, entryP) returns the offset for an pointer */ /************************************************************************/ static Offset SIEntryOffset(SISeg *segP, SISegEntry *entryP) { /* relative to B !! */ return ((Offset) ((Pointer) entryP - (Pointer) segP - SIGetStartEntrySection(segP) )); } /************************************************************************/ /* SISetDataEntry(segP, data) - sets a message in the segemnt */ /************************************************************************/ bool SISetDataEntry(SISeg *segP, SharedInvalidData *data) { Offset offsetToNewData; SISegEntry *eP, *lastP; bool SISegFull(); Offset SIEntryOffset(); Offset SIGetStartFreeSpace(); SISegEntry *SIGetFirstDataEntry(); SISegEntry *SIGetNextDataEntry(); SISegEntry *SIGetLastDataEntry(); if (!SIIncNumEntries(segP, 1)) return(false); /* no space */ /* get a free entry */ offsetToNewData = SIGetStartFreeSpace(segP); eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */ SISetStartFreeSpace(segP, eP->next); /* fill it up */ eP->entryData = *data; eP->isfree = false; eP->next = InvalidOffset; /* handle insertion point at the end of the chain !!*/ lastP = SIGetLastDataEntry(segP); if (lastP == NULL) { /* there is no chain, insert the first entry */ SISetStartEntryChain(segP, SIEntryOffset(segP, eP)); } else { /* there is a last entry in the chain */ lastP->next = SIEntryOffset(segP, eP); } SISetEndEntryChain(segP, SIEntryOffset(segP, eP)); return(true); } /************************************************************************/ /* SIDecProcLimit(segP, num) decrements all process limits */ /************************************************************************/ static void SIDecProcLimit(SISeg *segP, int num) { int i; for (i=0; i < MaxBackendId; i++) { /* decrement only, if there is a limit > 0 */ if (segP->procState[i].limit > 0) { segP->procState[i].limit = segP->procState[i].limit - num; if (segP->procState[i].limit < 0) { /* limit was not high enough, reset to zero */ /* negative means it's a dead backend */ segP->procState[i].limit = 0; } } } } /************************************************************************/ /* SIDelDataEntry(segP) - free the FIRST entry */ /************************************************************************/ bool SIDelDataEntry(SISeg *segP) { SISegEntry *e1P; SISegEntry *SIGetFirstDataEntry(); if (!SIDecNumEntries(segP, 1)) { /* no entries in buffer */ return(false); } e1P = SIGetFirstDataEntry(segP); SISetStartEntryChain(segP, e1P->next); if (SIGetStartEntryChain(segP) == InvalidOffset) { /* it was the last entry */ SISetEndEntryChain(segP, InvalidOffset); } /* free the entry */ e1P->isfree = true; e1P->next = SIGetStartFreeSpace(segP); SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P)); SIDecProcLimit(segP, 1); return(true); } /************************************************************************/ /* SISetProcStateInvalid(segP) checks and marks a backends state as */ /* invalid */ /************************************************************************/ void SISetProcStateInvalid(SISeg *segP) { int i; for (i=0; i < MaxBackendId; i++) { if (segP->procState[i].limit == 0) { /* backend i didn't read any message */ segP->procState[i].resetState = true; /*XXX signal backend that it has to reset its internal cache ? */ } } } /************************************************************************/ /* SIReadEntryData(segP, backendId, function) */ /* - marks messages to be read by id */ /* and executes function */ /************************************************************************/ void SIReadEntryData(SISeg *segP, int backendId, void (*invalFunction)(), void (*resetFunction)()) { int i = 0; SISegEntry *data; Assert(segP->procState[backendId - 1].tag == MyBackendTag); if (!segP->procState[backendId - 1].resetState) { /* invalidate data, but only those, you have not seen yet !!*/ /* therefore skip read messages */ data = SIGetNthDataEntry(segP, SIGetProcStateLimit(segP, backendId - 1) + 1); while (data != NULL) { i++; segP->procState[backendId - 1].limit++; /* one more message read */ invalFunction(data->entryData.cacheId, data->entryData.hashIndex, &data->entryData.pointerData); data = SIGetNextDataEntry(segP, data->next); } /* SIDelExpiredDataEntries(segP); */ } else { /*backend must not read messages, its own state has to be reset */ elog(NOTICE, "SIMarkEntryData: cache state reset"); resetFunction(); /* XXXX call it here, parameters? */ /* new valid state--mark all messages "read" */ segP->procState[backendId - 1].resetState = false; segP->procState[backendId - 1].limit = SIGetNumEntries(segP); } /* check whether we can remove dead messages */ if (i > MAXNUMMESSAGES) { elog(FATAL, "SIReadEntryData: Invalid segment state"); } } /************************************************************************/ /* SIDelExpiredDataEntries (segP) - removes irrelevant messages */ /************************************************************************/ void SIDelExpiredDataEntries(SISeg *segP) { int min, i, h; min = 9999999; for (i = 0; i < MaxBackendId; i++) { h = SIGetProcStateLimit(segP, i); if (h >= 0) { /* backend active */ if (h < min ) min = h; } } if (min != 9999999) { /* we can remove min messages */ for (i = 1; i <= min; i++) { /* this adjusts also the state limits!*/ if (!SIDelDataEntry(segP)) { elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state"); } } } } /************************************************************************/ /* SISegInit(segP) - initializes the segment */ /************************************************************************/ static void SISegInit(SISeg *segP) { SISegOffsets *oP; int segSize, i; SISegEntry *eP; oP = SIComputeSize(&segSize); /* set sempahore ids in the segment */ /* XXX */ SISetStartEntrySection(segP, oP->offsetToFirstEntry); SISetEndEntrySection(segP, oP->offsetToEndOfSegemnt); SISetStartFreeSpace(segP, 0); SISetStartEntryChain(segP, InvalidOffset); SISetEndEntryChain(segP, InvalidOffset); (void) SISetNumEntries(segP, 0); (void) SISetMaxNumEntries(segP, MAXNUMMESSAGES); for (i = 0; i < MaxBackendId; i++) { segP->procState[i].limit = -1; /* no backend active !!*/ segP->procState[i].resetState = false; segP->procState[i].tag = InvalidBackendTag; } /* construct a chain of free entries */ for (i = 1; i < MAXNUMMESSAGES; i++) { eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + (i - 1) * sizeof(SISegEntry)); eP->isfree = true; eP->next = i * sizeof(SISegEntry); /* relative to B */ } /* handle the last free entry separate */ eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + (MAXNUMMESSAGES - 1) * sizeof(SISegEntry)); eP->isfree = true; eP->next = InvalidOffset; /* it's the end of the chain !! */ /* * Be tidy */ pfree(oP); } /************************************************************************/ /* SISegmentKill(key) - kill any segment */ /************************************************************************/ static void SISegmentKill(int key) /* the corresponding key for the segment */ { IpcMemoryKill(key); } /************************************************************************/ /* SISegmentGet(key, size) - get a shared segment of size */ /* returns a segment id */ /************************************************************************/ static IpcMemoryId SISegmentGet(int key, /* the corresponding key for the segment */ int size, /* size of segment in bytes */ bool create) { IpcMemoryId shmid; if (create) { shmid = IpcMemoryCreate(key, size, IPCProtection); } else { shmid = IpcMemoryIdGet(key, size); } return(shmid); } /************************************************************************/ /* SISegmentAttach(shmid) - attach a shared segment with id shmid */ /************************************************************************/ static void SISegmentAttach(IpcMemoryId shmid) { shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid); if (shmInvalBuffer == IpcMemAttachFailed) { /* XXX use validity function */ elog(NOTICE, "SISegmentAttach: Could not attach segment"); elog(FATAL, "SISegmentAttach: %m"); } } /************************************************************************/ /* SISegmentInit(killExistingSegment, key) initialize segment */ /************************************************************************/ int SISegmentInit(bool killExistingSegment, IPCKey key) { SISegOffsets *oP; int segSize; IpcMemoryId shmId; bool create; if (killExistingSegment) { /* Kill existing segment */ /* set semaphore */ SISegmentKill(key); /* Get a shared segment */ oP = SIComputeSize(&segSize); /* * Be tidy */ pfree(oP); create = true; shmId = SISegmentGet(key,segSize, create); if (shmId < 0) { perror("SISegmentGet: failed"); return(-1); /* an error */ } /* Attach the shared cache invalidation segment */ /* sets the global variable shmInvalBuffer */ SISegmentAttach(shmId); /* Init shared memory table */ SISegInit(shmInvalBuffer); } else { /* use an existing segment */ create = false; shmId = SISegmentGet(key, 0, create); if (shmId < 0) { perror("SISegmentGet: getting an existent segment failed"); return(-1); /* an error */ } /* Attach the shared cache invalidation segment */ SISegmentAttach(shmId); } return(1); }