aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c181
1 files changed, 103 insertions, 78 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b5eae..ee01df589f2 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -75,8 +75,10 @@
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that
- * are already up to date, though. We don't bother with a self-signal
- * either, but just process the queue directly.
+ * are already up to date, though, and we can also exclude backends that
+ * are in other databases (unless they are way behind and should be kicked
+ * to make them advance their pointers). We don't bother with a
+ * self-signal either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +91,14 @@
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
- * the head pointer's position. Then we check if we were the laziest
- * backend: if our pointer is set to the same position as the global tail
- * pointer is set, then we move the global tail pointer ahead to where the
- * second-laziest backend is (in general, we take the MIN of the current
- * head position and all active backends' new tail pointers). Whenever we
- * move the global tail pointer we also truncate now-unused pages (i.e.,
- * delete files in pg_notify/ that are no longer used).
+ * the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
+ * pointer needs to be advanced so that old pages can be truncated.
+ * This is relatively expensive (notably, it requires an exclusive lock),
+ * so we don't want to do it often. We make sending backends do this work
+ * if they advanced the queue head into a new page, but only once every
+ * QUEUE_CLEANUP_DELAY pages.
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -212,6 +215,19 @@ typedef struct QueuePosition
(x).offset > (y).offset ? (x) : (y))
/*
+ * Parameter determining how often we try to advance the tail pointer:
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
+ * also the distance by which a backend in another database needs to be
+ * behind before we'll decide we need to wake it up to advance its pointer.
+ *
+ * Resist the temptation to make this really large. While that would save
+ * work in some places, it would add cost in others. In particular, this
+ * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
+ * catch up before the pages they'll need to read fall out of SLRU cache.
+ */
+#define QUEUE_CLEANUP_DELAY 4
+
+/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos of
- * the "slowest" backend */
+ QueuePosition tail; /* tail must be <= the queue position of every
+ * listening backend */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
+static bool backendTryAdvanceTail = false;
+
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
@@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
- * We will work on the page range of 0..QUEUE_MAX_PAGE.
+ * Compute the difference between two queue page numbers (i.e., p - q),
+ * accounting for wraparound.
*/
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
{
int diff;
@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
- return diff < 0;
+ return diff;
+}
+
+/* Is p < q, accounting for wraparound? */
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+ return asyncQueuePageDiff(p, q) < 0;
}
/*
@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue.
- *
- * This will also advance the global tail pointer if possible.
*/
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
@@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void)
* of a transaction. If we issued any notifications in the just-completed
* transaction, send signals to other backends to process them, and also
* process the queue ourselves to send messages to our own frontend.
+ * Also, if we filled enough queue pages with new notifies, try to advance
+ * the queue tail pointer.
*
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
@@ -1156,7 +1184,6 @@ void
ProcessCompletedNotifies(void)
{
MemoryContext caller_context;
- bool signalled;
/* Nothing to do if we didn't send any notifications */
if (!backendHasSentNotifications)
@@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void)
StartTransactionCommand();
/* Send signals to other backends */
- signalled = SignalBackends();
+ SignalBackends();
if (listenChannels != NIL)
{
/* Read the queue ourselves, and send relevant stuff to the frontend */
asyncQueueReadAllNotifications();
}
- else if (!signalled)
+
+ /*
+ * If it's time to try to advance the global tail pointer, do that.
+ */
+ if (backendTryAdvanceTail)
{
- /*
- * If we found no other listening backends, and we aren't listening
- * ourselves, then we must execute asyncQueueAdvanceTail to flush the
- * queue, because ain't nobody else gonna do it. This prevents queue
- * overflow when we're sending useless notifies to nobody. (A new
- * listener could have joined since we looked, but if so this is
- * harmless.)
- */
+ backendTryAdvanceTail = false;
asyncQueueAdvanceTail();
}
@@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- bool advanceTail;
-
Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
@@ -1253,10 +1275,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- /* check if entry is valid and oldest ... */
- advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
- QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
- /* ... then mark it invalid */
+ /* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */
@@ -1278,10 +1297,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
-
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
}
/*
@@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page without overrunning the queue.
*/
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
+
+ /*
+ * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
+ * set flag to remember that we should try to advance the tail
+ * pointer (we don't want to actually do that right here).
+ */
+ if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+ backendTryAdvanceTail = true;
+
/* And exit the loop */
break;
}
@@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void)
}
/*
- * Send signals to all listening backends (except our own).
+ * Send signals to listening backends.
*
- * Returns true if we sent at least one signal.
+ * We never signal our own process; that should be handled by our caller.
*
- * Since we need EXCLUSIVE lock anyway we also check the position of the other
- * backends and in case one is already up-to-date we don't signal it.
- * This can happen if concurrent notifying transactions have sent a signal and
- * the signaled backend has read the other notifications and ours in the same
- * step.
+ * Normally we signal only backends in our own database, since only those
+ * backends could be interested in notifies we send. However, if there's
+ * notify traffic in our database but no traffic in another database that
+ * does have listener(s), those listeners will fall further and further
+ * behind. Waken them anyway if they're far enough behind, so that they'll
+ * advance their queue position pointers, allowing the global tail to advance.
*
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/
-static bool
+static void
SignalBackends(void)
{
- bool signalled = false;
int32 *pids;
BackendId *ids;
int count;
- int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date. We
- * don't want to send signals while holding the AsyncQueueLock, so we just
- * build a list of target PIDs.
+ * Identify backends that we need to signal. We don't want to send
+ * signals while holding the AsyncQueueLock, so this loop just builds a
+ * list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial
@@ -1607,26 +1630,43 @@ SignalBackends(void)
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
- pid = QUEUE_BACKEND_PID(i);
+ int32 pid = QUEUE_BACKEND_PID(i);
+ QueuePosition pos;
+
Assert(pid != InvalidPid);
- if (pid != MyProcPid)
+ if (pid == MyProcPid)
+ continue; /* never signal self */
+ pos = QUEUE_BACKEND_POS(i);
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
- QueuePosition pos = QUEUE_BACKEND_POS(i);
-
- if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
- {
- pids[count] = pid;
- ids[count] = i;
- count++;
- }
+ /*
+ * Always signal listeners in our own database, unless they're
+ * already caught up (unlikely, but possible).
+ */
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue;
+ }
+ else
+ {
+ /*
+ * Listeners in other databases should be signaled only if they
+ * are far behind.
+ */
+ if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+ QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+ continue;
}
+ /* OK, need to signal this one */
+ pids[count] = pid;
+ ids[count] = i;
+ count++;
}
LWLockRelease(AsyncQueueLock);
/* Now send signals */
for (int i = 0; i < count; i++)
{
- pid = pids[i];
+ int32 pid = pids[i];
/*
* Note: assuming things aren't broken, a signal failure here could
@@ -1636,14 +1676,10 @@ SignalBackends(void)
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
- else
- signalled = true;
}
pfree(pids);
pfree(ids);
-
- return signalled;
}
/*
@@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition oldpos;
QueuePosition head;
Snapshot snapshot;
- bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -1966,13 +2001,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
-
PG_RE_THROW();
}
PG_END_TRY();
@@ -1980,13 +2010,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
-
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}