aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c39
1 files changed, 29 insertions, 10 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08eb84f..a93c81bca28 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,7 +137,9 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
#include "utils/timestamp.h"
+#include "utils/tqual.h"
/*
@@ -387,7 +389,8 @@ static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer);
+ char *page_buffer,
+ Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
@@ -798,7 +801,7 @@ PreCommit_Notify(void)
}
}
- /* Queue any pending notifies */
+ /* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
ListCell *nextNotify;
@@ -987,7 +990,9 @@ Exec_ListenPreCommit(void)
* have already committed before we started to LISTEN.
*
* Note that we are not yet listening on anything, so we won't deliver any
- * notification to the frontend.
+ * notification to the frontend. Also, although our transaction might
+ * have executed NOTIFY, those message(s) aren't queued yet so we can't
+ * see them in the queue.
*
* This will also advance the global tail pointer if possible.
*/
@@ -1744,6 +1749,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
+ Snapshot snapshot;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
@@ -1767,6 +1773,9 @@ asyncQueueReadAllNotifications(void)
return;
}
+ /* Get snapshot we'll use to decide which xacts are still in progress */
+ snapshot = RegisterSnapshot(GetLatestSnapshot());
+
/*----------
* Note that we deliver everything that we see in the queue and that
* matches our _current_ listening state.
@@ -1854,7 +1863,8 @@ asyncQueueReadAllNotifications(void)
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf);
+ page_buffer.buf,
+ snapshot);
} while (!reachedStop);
}
PG_CATCH();
@@ -1882,6 +1892,9 @@ asyncQueueReadAllNotifications(void)
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
+
+ /* Done with snapshot */
+ UnregisterSnapshot(snapshot);
}
/*
@@ -1903,7 +1916,8 @@ asyncQueueReadAllNotifications(void)
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer)
+ char *page_buffer,
+ Snapshot snapshot)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -1928,7 +1942,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
- if (TransactionIdIsInProgress(qe->xid))
+ if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
* The source transaction is still in progress, so we can't
@@ -1939,10 +1953,15 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
- * Note that we must test TransactionIdIsInProgress before we
- * test TransactionIdDidCommit, else we might return a message
- * from a transaction that is not yet visible to snapshots;
- * compare the comments at the head of tqual.c.
+ * Note that we must test XidInMVCCSnapshot before we test
+ * TransactionIdDidCommit, else we might return a message from
+ * a transaction that is not yet visible to snapshots; compare
+ * the comments at the head of tqual.c.
+ *
+ * Also, while our own xact won't be listed in the snapshot,
+ * we need not check for TransactionIdIsCurrentTransactionId
+ * because our transaction cannot (yet) have queued any
+ * messages.
*/
*current = thisentry;
reachedStop = true;