aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/postmaster/postmaster.c12
-rw-r--r--src/backend/storage/ipc/sinval.c106
-rw-r--r--src/backend/storage/ipc/sinvaladt.c469
-rw-r--r--src/backend/utils/cache/inval.c43
-rw-r--r--src/include/storage/lwlock.h5
-rw-r--r--src/include/storage/pmsignal.h3
-rw-r--r--src/include/storage/sinval.h5
-rw-r--r--src/include/storage/sinvaladt.h14
8 files changed, 413 insertions, 244 deletions
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 751bb8244d3..73d6dae56d8 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -37,7 +37,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.558 2008/06/06 22:35:22 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.559 2008/06/19 21:32:56 tgl Exp $
*
* NOTES
*
@@ -3829,16 +3829,6 @@ sigusr1_handler(SIGNAL_ARGS)
load_role();
}
- if (CheckPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN))
- {
- /*
- * Send SIGUSR1 to all children (triggers CatchupInterruptHandler).
- * See storage/ipc/sinval[adt].c for the use of this.
- */
- if (Shutdown <= SmartShutdown)
- SignalChildren(SIGUSR1);
- }
-
if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
PgArchPID != 0)
{
diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c
index 4b8a8f1afbd..e2c6ca2aec9 100644
--- a/src/backend/storage/ipc/sinval.c
+++ b/src/backend/storage/ipc/sinval.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.85 2008/03/17 11:50:26 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,9 +17,7 @@
#include "access/xact.h"
#include "commands/async.h"
#include "miscadmin.h"
-#include "storage/backendid.h"
#include "storage/ipc.h"
-#include "storage/proc.h"
#include "storage/sinvaladt.h"
#include "utils/inval.h"
@@ -27,9 +25,9 @@
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
- * it catch up before the sinval queue overflows and forces everyone
- * through a cache reset exercise. This is done by broadcasting SIGUSR1
- * to all backends when the queue is threatening to become full.
+ * it catch up before the sinval queue overflows and forces it to go
+ * through a cache reset exercise. This is done by sending SIGUSR1
+ * to any backend that gets too far behind.
*
* State for catchup events consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessCatchupEvent
@@ -47,67 +45,101 @@ static void ProcessCatchupEvent(void);
/*
- * SendSharedInvalidMessage
- * Add a shared-cache-invalidation message to the global SI message queue.
+ * SendSharedInvalidMessages
+ * Add shared-cache-invalidation message(s) to the global SI message queue.
*/
void
-SendSharedInvalidMessage(SharedInvalidationMessage *msg)
+SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
{
- bool insertOK;
-
- insertOK = SIInsertDataEntry(msg);
- if (!insertOK)
- elog(DEBUG4, "SI buffer overflow");
+ SIInsertDataEntries(msgs, n);
}
/*
* ReceiveSharedInvalidMessages
* Process shared-cache-invalidation messages waiting for this backend
*
+ * We guarantee to process all messages that had been queued before the
+ * routine was entered. It is of course possible for more messages to get
+ * queued right after our last SIGetDataEntries call.
+ *
* NOTE: it is entirely possible for this routine to be invoked recursively
* as a consequence of processing inside the invalFunction or resetFunction.
- * Hence, we must be holding no SI resources when we call them. The only
- * bad side-effect is that SIDelExpiredDataEntries might be called extra
- * times on the way out of a nested call.
+ * Furthermore, such a recursive call must guarantee that all outstanding
+ * inval messages have been processed before it exits. This is the reason
+ * for the strange-looking choice to use a statically allocated buffer array
+ * and counters; it's so that a recursive call can process messages already
+ * sucked out of sinvaladt.c.
*/
void
ReceiveSharedInvalidMessages(
void (*invalFunction) (SharedInvalidationMessage *msg),
void (*resetFunction) (void))
{
- SharedInvalidationMessage data;
- int getResult;
- bool gotMessage = false;
+#define MAXINVALMSGS 32
+ static SharedInvalidationMessage messages[MAXINVALMSGS];
+ /*
+ * We use volatile here to prevent bugs if a compiler doesn't realize
+ * that recursion is a possibility ...
+ */
+ static volatile int nextmsg = 0;
+ static volatile int nummsgs = 0;
- for (;;)
+ /* Deal with any messages still pending from an outer recursion */
+ while (nextmsg < nummsgs)
{
- /*
- * We can discard any pending catchup event, since we will not exit
- * this loop until we're fully caught up.
- */
- catchupInterruptOccurred = 0;
+ SharedInvalidationMessage *msg = &messages[nextmsg++];
- getResult = SIGetDataEntry(MyBackendId, &data);
+ invalFunction(msg);
+ }
+
+ do
+ {
+ int getResult;
+
+ nextmsg = nummsgs = 0;
+
+ /* Try to get some more messages */
+ getResult = SIGetDataEntries(messages, MAXINVALMSGS);
- if (getResult == 0)
- break; /* nothing more to do */
if (getResult < 0)
{
/* got a reset message */
elog(DEBUG4, "cache state reset");
resetFunction();
+ break; /* nothing more to do */
}
- else
+
+ /* Process them, being wary that a recursive call might eat some */
+ nextmsg = 0;
+ nummsgs = getResult;
+
+ while (nextmsg < nummsgs)
{
- /* got a normal data message */
- invalFunction(&data);
+ SharedInvalidationMessage *msg = &messages[nextmsg++];
+
+ invalFunction(msg);
}
- gotMessage = true;
- }
- /* If we got any messages, try to release dead messages */
- if (gotMessage)
- SIDelExpiredDataEntries(false);
+ /*
+ * We only need to loop if the last SIGetDataEntries call (which
+ * might have been within a recursive call) returned a full buffer.
+ */
+ } while (nummsgs == MAXINVALMSGS);
+
+ /*
+ * We are now caught up. If we received a catchup signal, reset that
+ * flag, and call SICleanupQueue(). This is not so much because we
+ * need to flush dead messages right now, as that we want to pass on
+ * the catchup signal to the next slowest backend. "Daisy chaining" the
+ * catchup signal this way avoids creating spikes in system load for
+ * what should be just a background maintenance activity.
+ */
+ if (catchupInterruptOccurred)
+ {
+ catchupInterruptOccurred = 0;
+ elog(DEBUG4, "sinval catchup complete, cleaning queue");
+ SICleanupQueue(false, 0);
+ }
}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index ddbc08ef55f..0befc4a9341 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -1,24 +1,25 @@
/*-------------------------------------------------------------------------
*
* sinvaladt.c
- * POSTGRES shared cache invalidation segment definitions.
+ * POSTGRES shared cache invalidation data manager.
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.70 2008/06/17 20:07:08 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <signal.h>
+#include <unistd.h>
+
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
-#include "storage/lwlock.h"
-#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
@@ -27,20 +28,44 @@
/*
* Conceptually, the shared cache invalidation messages are stored in an
* infinite array, where maxMsgNum is the next array subscript to store a
- * submitted message in, minMsgNum is the smallest array subscript containing a
- * message not yet read by all backends, and we always have maxMsgNum >=
+ * submitted message in, minMsgNum is the smallest array subscript containing
+ * a message not yet read by all backends, and we always have maxMsgNum >=
* minMsgNum. (They are equal when there are no messages pending.) For each
* active backend, there is a nextMsgNum pointer indicating the next message it
* needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
* backend.
*
+ * (In the current implementation, minMsgNum is a lower bound for the
+ * per-process nextMsgNum values, but it isn't rigorously kept equal to the
+ * smallest nextMsgNum --- it may lag behind. We only update it when
+ * SICleanupQueue is called, and we try not to do that often.)
+ *
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
* entries. We translate MsgNum values into circular-buffer indexes by
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
- * in the buffer. If the buffer does overflow, we reset it to empty and
- * force each backend to "reset", ie, discard all its invalidatable state.
+ * in the buffer. If the buffer does overflow, we recover by setting the
+ * "reset" flag for each backend that has fallen too far behind. A backend
+ * that is in "reset" state is ignored while determining minMsgNum. When
+ * it does finally attempt to receive inval messages, it must discard all
+ * its invalidatable state, since it won't know what it missed.
+ *
+ * To reduce the probability of needing resets, we send a "catchup" interrupt
+ * to any backend that seems to be falling unreasonably far behind. The
+ * normal behavior is that at most one such interrupt is in flight at a time;
+ * when a backend completes processing a catchup interrupt, it executes
+ * SICleanupQueue, which will signal the next-furthest-behind backend if
+ * needed. This avoids undue contention from multiple backends all trying
+ * to catch up at once. However, the furthest-back backend might be stuck
+ * in a state where it can't catch up. Eventually it will get reset, so it
+ * won't cause any more problems for anyone but itself. But we don't want
+ * to find that a bunch of other backends are now too close to the reset
+ * threshold to be saved. So SICleanupQueue is designed to occasionally
+ * send extra catchup interrupts as the queue gets fuller, to backends that
+ * are far behind and haven't gotten one yet. As long as there aren't a lot
+ * of "stuck" backends, we won't need a lot of extra interrupts, since ones
+ * that aren't stuck will propagate their interrupts to the next guy.
*
* We would have problems if the MsgNum values overflow an integer, so
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
@@ -48,6 +73,21 @@
* large so that we don't need to do this often. It must be a multiple of
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
* to be moved when we do it.
+ *
+ * Access to the shared sinval array is protected by two locks, SInvalReadLock
+ * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
+ * authorizes them to modify their own ProcState but not to modify or even
+ * look at anyone else's. When we need to perform array-wide updates,
+ * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
+ * lock out all readers. Writers take SInvalWriteLock (always in exclusive
+ * mode) to serialize adding messages to the queue. Note that a writer
+ * can operate in parallel with one or more readers, because the writer
+ * has no need to touch anyone's ProcState, except in the infrequent cases
+ * when SICleanupQueue is needed. The only point of overlap is that
+ * the writer might change maxMsgNum while readers are looking at it.
+ * This should be okay: we are assuming that fetching or storing an int
+ * is atomic, an assumption also made elsewhere in Postgres. However
+ * readers mustn't assume that maxMsgNum isn't changing under them.
*/
@@ -59,17 +99,46 @@
*
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
* Must be a multiple of MAXNUMMESSAGES. Should be large.
+ *
+ * CLEANUP_MIN: the minimum number of messages that must be in the buffer
+ * before we bother to call SICleanupQueue.
+ *
+ * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
+ * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
+ *
+ * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
+ * behind before we'll send it SIGUSR1.
+ *
+ * WRITE_QUANTUM: the max number of messages to push into the buffer per
+ * iteration of SIInsertDataEntries. Noncritical but should be less than
+ * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
+ * per iteration.
*/
#define MAXNUMMESSAGES 4096
-#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
+#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
+#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
+#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
+#define WRITE_QUANTUM 64
/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
- /* nextMsgNum is -1 in an inactive ProcState array entry. */
- int nextMsgNum; /* next message number to read, or -1 */
- bool resetState; /* true, if backend has to reset its state */
+ /* procPid is zero in an inactive ProcState array entry. */
+ pid_t procPid; /* PID of backend, for signaling */
+ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
+ int nextMsgNum; /* next message number to read */
+ bool resetState; /* backend needs to reset its state */
+ bool signaled; /* backend has been sent catchup signal */
+
+ /*
+ * Next LocalTransactionId to use for each idle backend slot. We keep
+ * this here because it is indexed by BackendId and it is convenient to
+ * copy the value to and from local memory when MyBackendId is set.
+ * It's meaningless in an active ProcState entry.
+ */
+ LocalTransactionId nextLXID;
} ProcState;
/* Shared cache invalidation memory segment */
@@ -80,17 +149,11 @@ typedef struct SISeg
*/
int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */
+ int nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
/*
- * Next LocalTransactionId to use for each idle backend slot. We keep
- * this here because it is indexed by BackendId and it is convenient to
- * copy the value to and from local memory when MyBackendId is set.
- */
- LocalTransactionId *nextLXID; /* array of maxBackends entries */
-
- /*
* Circular buffer holding shared-inval messages
*/
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
@@ -110,7 +173,6 @@ static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
static LocalTransactionId nextLocalTransactionId;
static void CleanupInvalidationState(int status, Datum arg);
-static void SISetProcStateInvalid(SISeg *segP);
/*
@@ -124,8 +186,6 @@ SInvalShmemSize(void)
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
- size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
-
return size;
}
@@ -149,11 +209,10 @@ CreateSharedInvalidationState(void)
if (found)
return;
- shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
-
/* Clear message counters, save size of procState array */
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
+ shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
@@ -162,9 +221,11 @@ CreateSharedInvalidationState(void)
/* Mark all backends inactive, and initialize nextLXID */
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
- shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
+ shmInvalBuffer->procState[i].procPid = 0; /* inactive */
+ shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
- shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
+ shmInvalBuffer->procState[i].signaled = false;
+ shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
}
@@ -179,12 +240,19 @@ SharedInvalBackendInit(void)
ProcState *stateP = NULL;
SISeg *segP = shmInvalBuffer;
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ /*
+ * This can run in parallel with read operations, and for that matter
+ * with write operations; but not in parallel with additions and removals
+ * of backends, nor in parallel with SICleanupQueue. It doesn't seem
+ * worth having a third lock, so we choose to use SInvalWriteLock to
+ * serialize additions/removals.
+ */
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
{
- if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
+ if (segP->procState[index].procPid == 0) /* inactive slot? */
{
stateP = &segP->procState[index];
break;
@@ -196,7 +264,7 @@ SharedInvalBackendInit(void)
if (segP->lastBackend < segP->maxBackends)
{
stateP = &segP->procState[segP->lastBackend];
- Assert(stateP->nextMsgNum < 0);
+ Assert(stateP->procPid == 0);
segP->lastBackend++;
}
else
@@ -205,7 +273,7 @@ SharedInvalBackendInit(void)
* out of procState slots: MaxBackends exceeded -- report normally
*/
MyBackendId = InvalidBackendId;
- LWLockRelease(SInvalLock);
+ LWLockRelease(SInvalWriteLock);
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
@@ -214,21 +282,21 @@ SharedInvalBackendInit(void)
MyBackendId = (stateP - &segP->procState[0]) + 1;
-#ifdef INVALIDDEBUG
- elog(DEBUG2, "my backend id is %d", MyBackendId);
-#endif /* INVALIDDEBUG */
+ elog(DEBUG4, "my backend id is %d", MyBackendId);
/* Advertise assigned backend ID in MyProc */
MyProc->backendId = MyBackendId;
/* Fetch next local transaction ID into local memory */
- nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
+ nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */
+ stateP->procPid = MyProcPid;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
+ stateP->signaled = false;
- LWLockRelease(SInvalLock);
+ LWLockRelease(SInvalWriteLock);
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
@@ -238,8 +306,7 @@ SharedInvalBackendInit(void)
* CleanupInvalidationState
* Mark the current backend as no longer active.
*
- * This function is called via on_shmem_exit() during backend shutdown,
- * so the caller has NOT acquired the lock for us.
+ * This function is called via on_shmem_exit() during backend shutdown.
*
* arg is really of type "SISeg*".
*/
@@ -247,227 +314,247 @@ static void
CleanupInvalidationState(int status, Datum arg)
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
+ ProcState *stateP;
int i;
Assert(PointerIsValid(segP));
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+ stateP = &segP->procState[MyBackendId - 1];
/* Update next local transaction ID for next holder of this backendID */
- segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
+ stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */
- segP->procState[MyBackendId - 1].nextMsgNum = -1;
- segP->procState[MyBackendId - 1].resetState = false;
+ stateP->procPid = 0;
+ stateP->nextMsgNum = 0;
+ stateP->resetState = false;
+ stateP->signaled = false;
/* Recompute index of last active backend */
for (i = segP->lastBackend; i > 0; i--)
{
- if (segP->procState[i - 1].nextMsgNum >= 0)
+ if (segP->procState[i - 1].procPid != 0)
break;
}
segP->lastBackend = i;
- LWLockRelease(SInvalLock);
+ LWLockRelease(SInvalWriteLock);
}
/*
- * SIInsertDataEntry
- * Add a new invalidation message to the buffer.
- *
- * If we are unable to insert the message because the buffer is full,
- * then clear the buffer and assert the "reset" flag to each backend.
- * This will cause all the backends to discard *all* invalidatable state.
- *
- * Returns true for normal successful insertion, false if had to reset.
+ * SIInsertDataEntries
+ * Add new invalidation message(s) to the buffer.
*/
-bool
-SIInsertDataEntry(SharedInvalidationMessage *data)
+void
+SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
{
- int numMsgs;
- bool signal_postmaster = false;
- SISeg *segP;
+ SISeg *segP = shmInvalBuffer;
+
+ /*
+ * N can be arbitrarily large. We divide the work into groups of no more
+ * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
+ * an unreasonably long time. (This is not so much because we care about
+ * letting in other writers, as that some just-caught-up backend might be
+ * trying to do SICleanupQueue to pass on its signal, and we don't want it
+ * to have to wait a long time.) Also, we need to consider calling
+ * SICleanupQueue every so often.
+ */
+ while (n > 0)
+ {
+ int nthistime = Min(n, WRITE_QUANTUM);
+ int numMsgs;
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ n -= nthistime;
- segP = shmInvalBuffer;
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- /* Is the buffer full? */
- if (numMsgs >= MAXNUMMESSAGES)
- {
/*
- * Don't panic just yet: slowest backend might have consumed some
- * messages but not yet have done SIDelExpiredDataEntries() to advance
- * minMsgNum. So, make sure minMsgNum is up-to-date.
+ * If the buffer is full, we *must* acquire some space. Clean the
+ * queue and reset anyone who is preventing space from being freed.
+ * Otherwise, clean the queue only when it's exceeded the next
+ * fullness threshold.
*/
- SIDelExpiredDataEntries(true);
numMsgs = segP->maxMsgNum - segP->minMsgNum;
- if (numMsgs >= MAXNUMMESSAGES)
+ if (numMsgs + nthistime > MAXNUMMESSAGES)
{
- /* Yup, it's definitely full, no choice but to reset */
- SISetProcStateInvalid(segP);
- LWLockRelease(SInvalLock);
- return false;
+ SICleanupQueue(true, nthistime);
+ Assert((segP->maxMsgNum - segP->minMsgNum + nthistime) <= MAXNUMMESSAGES);
}
- }
-
- /*
- * Try to prevent table overflow. When the table is 70% full send a
- * WAKEN_CHILDREN request to the postmaster. The postmaster will send a
- * SIGUSR1 signal to all the backends, which will cause sinval.c to read
- * any pending SI entries.
- *
- * This should never happen if all the backends are actively executing
- * queries, but if a backend is sitting idle then it won't be starting
- * transactions and so won't be reading SI entries.
- */
- if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster)
- signal_postmaster = true;
-
- /*
- * Insert new message into proper slot of circular buffer
- */
- segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
- segP->maxMsgNum++;
-
- LWLockRelease(SInvalLock);
-
- if (signal_postmaster)
- {
- elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
- SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
- }
-
- return true;
-}
-
-/*
- * SISetProcStateInvalid
- * Flush pending messages from buffer, assert reset flag for each backend
- *
- * This is used only to recover from SI buffer overflow.
- */
-static void
-SISetProcStateInvalid(SISeg *segP)
-{
- int i;
-
- segP->minMsgNum = 0;
- segP->maxMsgNum = 0;
+ else if (numMsgs >= segP->nextThreshold)
+ SICleanupQueue(true, 0);
- for (i = 0; i < segP->lastBackend; i++)
- {
- if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
+ /*
+ * Insert new message(s) into proper slot of circular buffer
+ */
+ while (nthistime-- > 0)
{
- segP->procState[i].resetState = true;
- segP->procState[i].nextMsgNum = 0;
+ segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
+ segP->maxMsgNum++;
}
+
+ LWLockRelease(SInvalWriteLock);
}
}
/*
- * SIGetDataEntry
- * get next SI message for specified backend, if there is one
+ * SIGetDataEntries
+ * get next SI message(s) for current backend, if there are any
*
* Possible return values:
- * 0: no SI message available
- * 1: next SI message has been extracted into *data
- * (there may be more messages available after this one!)
- * -1: SI reset message extracted
+ * 0: no SI message available
+ * n>0: next n SI messages have been extracted into data[]
+ * -1: SI reset message extracted
+ *
+ * If the return value is less than the array size "datasize", the caller
+ * can assume that there are no more SI messages after the one(s) returned.
+ * Otherwise, another call is needed to collect more messages.
*
- * NB: this can run in parallel with other instances of SIGetDataEntry
+ * NB: this can run in parallel with other instances of SIGetDataEntries
* executing on behalf of other backends, since each instance will modify only
* fields of its own backend's ProcState, and no instance will look at fields
- * of other backends' ProcStates. We express this by grabbing SInvalLock in
- * shared mode. Note that this is not exactly the normal (read-only)
+ * of other backends' ProcStates. We express this by grabbing SInvalReadLock
+ * in shared mode. Note that this is not exactly the normal (read-only)
* interpretation of a shared lock! Look closely at the interactions before
- * allowing SInvalLock to be grabbed in shared mode for any other reason!
+ * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
+ *
+ * NB: this can also run in parallel with SIInsertDataEntries. It is not
+ * guaranteed that we will return any messages added after the routine is
+ * entered.
+ *
+ * Note: we assume that "datasize" is not so large that it might be important
+ * to break our hold on SInvalReadLock into segments.
*/
int
-SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
+SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
- ProcState *stateP;
SISeg *segP;
+ ProcState *stateP;
+ int n;
- LWLockAcquire(SInvalLock, LW_SHARED);
+ LWLockAcquire(SInvalReadLock, LW_SHARED);
segP = shmInvalBuffer;
- stateP = &segP->procState[backendId - 1];
+ stateP = &segP->procState[MyBackendId - 1];
if (stateP->resetState)
{
/*
* Force reset. We can say we have dealt with any messages added
- * since the reset, as well...
+ * since the reset, as well; and that means we should clear the
+ * signaled flag, too.
*/
- stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
- LWLockRelease(SInvalLock);
+ stateP->resetState = false;
+ stateP->signaled = false;
+ LWLockRelease(SInvalReadLock);
return -1;
}
- if (stateP->nextMsgNum >= segP->maxMsgNum)
- {
- LWLockRelease(SInvalLock);
- return 0; /* nothing to read */
- }
-
/*
- * Retrieve message and advance my counter.
+ * Retrieve messages and advance backend's counter, until data array is
+ * full or there are no more messages.
+ *
+ * There may be other backends that haven't read the message(s), so we
+ * cannot delete them here. SICleanupQueue() will eventually remove them
+ * from the queue.
+ *
+ * Note: depending on the compiler, we might read maxMsgNum only once
+ * here, or each time through the loop. It doesn't matter (as long as
+ * each fetch is atomic).
*/
- *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
- stateP->nextMsgNum++;
+ n = 0;
+ while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+ {
+ data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+ stateP->nextMsgNum++;
+ }
/*
- * There may be other backends that haven't read the message, so we cannot
- * delete it here. SIDelExpiredDataEntries() should be called to remove
- * dead messages.
+ * Reset our "signaled" flag whenever we have caught up completely.
*/
+ if (stateP->nextMsgNum >= segP->maxMsgNum)
+ stateP->signaled = false;
- LWLockRelease(SInvalLock);
- return 1; /* got a message */
+ LWLockRelease(SInvalReadLock);
+ return n;
}
/*
- * SIDelExpiredDataEntries
+ * SICleanupQueue
* Remove messages that have been consumed by all active backends
+ *
+ * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
+ * minFree is the minimum number of free message slots required at completion.
+ *
+ * Possible side effects of this routine include marking one or more
+ * backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
+ * to some backend that seems to be getting too far behind. We signal at
+ * most one backend at a time, for reasons explained at the top of the file.
*/
void
-SIDelExpiredDataEntries(bool locked)
+SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
int min,
- i,
- h;
+ minsig,
+ lowbound,
+ numMsgs,
+ i;
+ ProcState *needSig = NULL;
- if (!locked)
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ /* Lock out all writers and readers */
+ if (!callerHasWriteLock)
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+ LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
+ /*
+ * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
+ * the furthest-back backend that needs signaling (if any), and reset
+ * any backends that are too far back.
+ */
min = segP->maxMsgNum;
- if (min == segP->minMsgNum)
- {
- if (!locked)
- LWLockRelease(SInvalLock);
- return; /* fast path if no messages exist */
- }
-
- /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
+ minsig = min - SIG_THRESHOLD;
+ lowbound = min - MAXNUMMESSAGES + minFree;
for (i = 0; i < segP->lastBackend; i++)
{
- h = segP->procState[i].nextMsgNum;
- if (h >= 0)
- { /* backend active */
- if (h < min)
- min = h;
+ ProcState *stateP = &segP->procState[i];
+ int n = stateP->nextMsgNum;
+
+ /* Ignore if inactive or already in reset state */
+ if (stateP->procPid == 0 || stateP->resetState)
+ continue;
+
+ /*
+ * If we must free some space and this backend is preventing it,
+ * force him into reset state and then ignore until he catches up.
+ */
+ if (n < lowbound)
+ {
+ stateP->resetState = true;
+ /* no point in signaling him ... */
+ continue;
+ }
+
+ /* Track the global minimum nextMsgNum */
+ if (n < min)
+ min = n;
+
+ /* Also see who's furthest back of the unsignaled backends */
+ if (n < minsig && !stateP->signaled)
+ {
+ minsig = n;
+ needSig = stateP;
}
}
segP->minMsgNum = min;
/*
* When minMsgNum gets really large, decrement all message counters so as
- * to forestall overflow of the counters.
+ * to forestall overflow of the counters. This happens seldom enough
+ * that folding it into the previous loop would be a loser.
*/
if (min >= MSGNUMWRAPAROUND)
{
@@ -475,13 +562,43 @@ SIDelExpiredDataEntries(bool locked)
segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->lastBackend; i++)
{
- if (segP->procState[i].nextMsgNum >= 0)
- segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
+ /* we don't bother skipping inactive entries here */
+ segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
- if (!locked)
- LWLockRelease(SInvalLock);
+ /*
+ * Determine how many messages are still in the queue, and set the
+ * threshold at which we should repeat SICleanupQueue().
+ */
+ numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ if (numMsgs < CLEANUP_MIN)
+ segP->nextThreshold = CLEANUP_MIN;
+ else
+ segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
+
+ /*
+ * Lastly, signal anyone who needs a catchup interrupt. Since kill()
+ * might not be fast, we don't want to hold locks while executing it.
+ */
+ if (needSig)
+ {
+ pid_t his_pid = needSig->procPid;
+
+ needSig->signaled = true;
+ LWLockRelease(SInvalReadLock);
+ LWLockRelease(SInvalWriteLock);
+ elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
+ kill(his_pid, SIGUSR1);
+ if (callerHasWriteLock)
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+ }
+ else
+ {
+ LWLockRelease(SInvalReadLock);
+ if (!callerHasWriteLock)
+ LWLockRelease(SInvalWriteLock);
+ }
}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 50e27923566..050d7cc88de 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -80,7 +80,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.85 2008/06/19 00:46:05 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -203,7 +203,7 @@ AddInvalidationMessage(InvalidationChunk **listHdr,
if (chunk == NULL)
{
/* First time through; create initial chunk */
-#define FIRSTCHUNKSIZE 16
+#define FIRSTCHUNKSIZE 32
chunk = (InvalidationChunk *)
MemoryContextAlloc(CurTransactionContext,
sizeof(InvalidationChunk) +
@@ -275,6 +275,23 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
} \
} while (0)
+/*
+ * Process a list of invalidation messages group-wise.
+ *
+ * As above, but the code fragment can handle an array of messages.
+ * The fragment should refer to the messages as msgs[], with n entries.
+ */
+#define ProcessMessageListMulti(listHdr, codeFragment) \
+ do { \
+ InvalidationChunk *_chunk; \
+ for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
+ { \
+ SharedInvalidationMessage *msgs = _chunk->msgs; \
+ int n = _chunk->nitems; \
+ codeFragment; \
+ } \
+ } while (0)
+
/* ----------------------------------------------------------------
* Invalidation set support functions
@@ -371,6 +388,18 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
ProcessMessageList(hdr->rclist, func(msg));
}
+/*
+ * As above, but the function is able to process an array of messages
+ * rather than just one at a time.
+ */
+static void
+ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
+ void (*func) (const SharedInvalidationMessage *msgs, int n))
+{
+ ProcessMessageListMulti(hdr->cclist, func(msgs, n));
+ ProcessMessageListMulti(hdr->rclist, func(msgs, n));
+}
+
/* ----------------------------------------------------------------
* private support functions
* ----------------------------------------------------------------
@@ -792,7 +821,7 @@ inval_twophase_postcommit(TransactionId xid, uint16 info,
case TWOPHASE_INFO_MSG:
msg = (SharedInvalidationMessage *) recdata;
Assert(len == sizeof(SharedInvalidationMessage));
- SendSharedInvalidMessage(msg);
+ SendSharedInvalidMessages(msg, 1);
break;
case TWOPHASE_INFO_FILE_BEFORE:
RelationCacheInitFileInvalidate(true);
@@ -850,8 +879,8 @@ AtEOXact_Inval(bool isCommit)
AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
&transInvalInfo->CurrentCmdInvalidMsgs);
- ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
- SendSharedInvalidMessage);
+ ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+ SendSharedInvalidMessages);
if (transInvalInfo->RelcacheInitFileInval)
RelationCacheInitFileInvalidate(false);
@@ -1033,8 +1062,8 @@ EndNonTransactionalInvalidation(void)
/* Send out the invals */
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
LocalExecuteInvalidationMessage);
- ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
- SendSharedInvalidMessage);
+ ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+ SendSharedInvalidMessages);
/* Clean up and release memory */
for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index baccfbf5a68..b1088fcd33d 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.38 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.39 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -43,7 +43,8 @@ typedef enum LWLockId
OidGenLock,
XidGenLock,
ProcArrayLock,
- SInvalLock,
+ SInvalReadLock,
+ SInvalWriteLock,
FreeSpaceLock,
WALInsertLock,
WALWriteLock,
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index c02593e5a86..94f1770ffce 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.19 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.20 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -23,7 +23,6 @@
typedef enum
{
PMSIGNAL_PASSWORD_CHANGE, /* pg_auth file has changed */
- PMSIGNAL_WAKEN_CHILDREN, /* send a SIGUSR1 signal to all backends */
PMSIGNAL_WAKEN_ARCHIVER, /* send a NOTIFY signal to xlog archiver */
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 343c8d94bdb..3601216f1b6 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.47 2008/03/16 19:47:34 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -83,7 +83,8 @@ typedef union
} SharedInvalidationMessage;
-extern void SendSharedInvalidMessage(SharedInvalidationMessage *msg);
+extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
+ int n);
extern void ReceiveSharedInvalidMessages(
void (*invalFunction) (SharedInvalidationMessage *msg),
void (*resetFunction) (void));
diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h
index 8535cba0f06..1748f8821b4 100644
--- a/src/include/storage/sinvaladt.h
+++ b/src/include/storage/sinvaladt.h
@@ -1,12 +1,13 @@
/*-------------------------------------------------------------------------
*
* sinvaladt.h
- * POSTGRES shared cache invalidation segment definitions.
+ * POSTGRES shared cache invalidation data manager.
*
* The shared cache invalidation manager is responsible for transmitting
* invalidation messages between backends. Any message sent by any backend
* must be delivered to all already-running backends before it can be
- * forgotten.
+ * forgotten. (If we run out of space, we instead deliver a "RESET"
+ * message to backends that have fallen too far behind.)
*
* The struct type SharedInvalidationMessage, defining the contents of
* a single message, is defined in sinval.h.
@@ -14,7 +15,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.47 2008/03/17 11:50:27 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -23,7 +24,6 @@
#include "storage/sinval.h"
-
/*
* prototypes for functions in sinvaladt.c
*/
@@ -31,9 +31,9 @@ extern Size SInvalShmemSize(void);
extern void CreateSharedInvalidationState(void);
extern void SharedInvalBackendInit(void);
-extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
-extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
-extern void SIDelExpiredDataEntries(bool locked);
+extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
+extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize);
+extern void SICleanupQueue(bool callerHasWriteLock, int minFree);
extern LocalTransactionId GetNextLocalTransactionId(void);