aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
authorMarc G. Fournier <scrappy@hub.org>1998-08-30 21:05:27 +0000
committerMarc G. Fournier <scrappy@hub.org>1998-08-30 21:05:27 +0000
commit6c4982851a7ce1585fb89adc2747c8f848183d1b (patch)
tree75ec012134c37f3a9aff83220d8dd21a1bfb900d /src/backend/commands/async.c
parent6f3de1bb6673b2e8d4ca83a17a482c5c546cd71e (diff)
downloadpostgresql-6c4982851a7ce1585fb89adc2747c8f848183d1b.tar.gz
postgresql-6c4982851a7ce1585fb89adc2747c8f848183d1b.zip
From: Massimo Dal Zotto <dz@cs.unitn.it>
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c325
1 files changed, 226 insertions, 99 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 75d0e9d4a09..03e5a4ca046 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.37 1998/08/19 02:01:39 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.38 1998/08/30 21:04:43 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
@@ -34,29 +34,7 @@
* -- jw, 12/28/93
*
*/
-/*
- * The following is the old model which does not work.
- */
-/*
- * Model is:
- * 1. Multiple backends on same machine.
- *
- * 2. Query on one backend sends stuff over an asynchronous portal by
- * appending to a relation, and then doing an async. notification
- * (which takes place after commit) to all listeners on this relation.
- *
- * 3. Async. notification results in all backends listening on relation
- * to be woken up, by a process signal kill(SIGUSR2), with name of relation
- * passed in shared memory.
- *
- * 4. Each backend notifies its respective frontend over the comm
- * channel using the out-of-band channel.
- *
- * 5. Each frontend receives this notification and processes accordingly.
- *
- * #4,#5 are changing soon with pending rewrite of portal/protocol.
- *
- */
+
#include <unistd.h>
#include <signal.h>
#include <string.h>
@@ -82,17 +60,28 @@
#include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h"
+#include <utils/trace.h>
+#include <utils/ps_status.h>
+
+#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
+#define NotifyHack pg_options[OPT_NOTIFYHACK]
+
+extern TransactionState CurrentTransactionState;
+extern CommandDest whereToSendOutput;
+
+GlobalMemory notifyContext = NULL;
static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
-
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
+static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
+static void Async_UnlistenAll(void);
/*
*--------------------------------------------------------------
@@ -116,33 +105,36 @@ static void Async_UnlistenOnExit(int code, char *relname);
void
Async_NotifyHandler(SIGNAL_ARGS)
{
- extern TransactionState CurrentTransactionState;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
-
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Waking up sleeping backend process");
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
+ "waking up sleeping backend process");
+ PS_SET_STATUS("async_notify");
Async_NotifyFrontEnd();
-
+ PS_SET_STATUS("idle");
}
else
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
- CurrentTransactionState->state,
- CurrentTransactionState->blockState);
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
+ "process in middle of transaction, state=%d, blockstate=%d",
+ CurrentTransactionState->state,
+ CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
}
/*
*--------------------------------------------------------------
* Async_Notify --
*
+ * This is executed by the SQL notify command.
+ *
* Adds the relation to the list of pending notifies.
* All notification happens at end of commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -151,7 +143,6 @@ Async_NotifyHandler(SIGNAL_ARGS)
* then each backend notifies its corresponding front end at
* the end of commit.
*
- * This correspond to 'notify <relname>' command
* -- jw, 12/28/93
*
* Results:
@@ -180,9 +171,7 @@ Async_Notify(char *relname)
char *notifyName;
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Notify: %s", relname);
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
if (!pendingNotifies)
pendingNotifies = DLNewList();
@@ -217,18 +206,32 @@ Async_Notify(char *relname)
{
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
+ /* notify is really issued only if a tuple has been changed */
+ notifyIssued = 1;
}
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
+
+ /*
+ * Note: if the write lock is unset we can get multiple tuples
+ * with same oid if other backends notify the same relation.
+ * Use this option at your own risk.
+ */
+ if (NotifyUnlock) {
+ RelationUnsetLockForWrite(lRel);
+ }
+
heap_close(lRel);
- notifyIssued = 1;
+
+ TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}
/*
*--------------------------------------------------------------
* Async_NotifyAtCommit --
*
+ * This is called at transaction commit.
+ *
* Signal our corresponding frontend process on relations that
* were notified. Signal all other backend process that
* are listening also.
@@ -265,14 +268,12 @@ Async_NotifyAtCommit()
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
-
if (notifyIssued)
- { /* 'notify <relname>' issued by us */
+ {
+ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyAtCommit.");
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
F_INT4EQ,
@@ -294,16 +295,15 @@ Async_NotifyAtCommit()
if (MyProcPid == DatumGetInt32(d))
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
-#endif
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit: notifying self");
}
else
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying others");
-#endif
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit: notifying pid %d",
+ DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
@@ -315,19 +315,35 @@ Async_NotifyAtCommit()
}
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
heap_close(lRel);
+ /*
+ * Notify the frontend inside the current transaction while
+ * we still have a valid write lock on pg_listeners. This
+ * avoid waiting until all other backends have finished
+ * with pg_listener.
+ */
+ if (notifyFrontEndPending) {
+ /* The aux version is called inside transaction */
+ Async_NotifyFrontEnd_Aux();
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
CommitTransactionCommand();
- ClearPendingNotify();
}
-
- if (notifyFrontEndPending)
- { /* we need to notify the frontend of all
- * pending notifies. */
- notifyFrontEndPending = 1;
- Async_NotifyFrontEnd();
+ else
+ {
+ /*
+ * No notifies issued by us. If notifyFrontEndPending has been set
+ * by Async_NotifyHandler notify the frontend of pending notifies
+ * from other backends.
+ */
+ if (notifyFrontEndPending) {
+ Async_NotifyFrontEnd();
+ }
}
+
+ ClearPendingNotify();
}
}
@@ -335,6 +351,8 @@ Async_NotifyAtCommit()
*--------------------------------------------------------------
* Async_NotifyAtAbort --
*
+ * This is called at transaction commit.
+ *
* Gets rid of pending notifies. List elements are automatically
* freed through memory context.
*
@@ -350,20 +368,19 @@ Async_NotifyAtCommit()
void
Async_NotifyAtAbort()
{
- extern TransactionState CurrentTransactionState;
-
- if (notifyIssued)
+ if (pendingNotifies) {
ClearPendingNotify();
- notifyIssued = 0;
- if (pendingNotifies)
DLFreeList(pendingNotifies);
+ }
pendingNotifies = DLNewList();
+ notifyIssued = 0;
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
+ /* don't forget to notify front end */
if (notifyFrontEndPending)
- { /* don't forget to notify front end */
+ {
Async_NotifyFrontEnd();
}
}
@@ -373,11 +390,11 @@ Async_NotifyAtAbort()
*--------------------------------------------------------------
* Async_Listen --
*
+ * This is executed by the SQL listen command.
+ *
* Register a backend (identified by its Unix PID) as listening
* on the specified relation.
*
- * This corresponds to the 'listen <relation>' command in SQL
- *
* One listener per relation, pg_listener relation is keyed
* on (relname,pid) to provide multiple listeners in future.
*
@@ -406,9 +423,13 @@ Async_Listen(char *relname, int pid)
char *relnamei;
TupleDesc tupDesc;
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Listen: %s", relname);
-#endif
+ if (whereToSendOutput != Remote) {
+ elog(NOTICE, "Async_Listen: "
+ "listen not available on interactive sessions");
+ return;
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
@@ -438,6 +459,10 @@ Async_Listen(char *relname, int pid)
if (pid == MyProcPid)
alreadyListener = 1;
}
+ if (alreadyListener) {
+ /* No need to scan the rest of the table */
+ break;
+ }
}
heap_endscan(scan);
@@ -445,15 +470,14 @@ Async_Listen(char *relname, int pid)
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
return;
}
tupDesc = lDesc->rd_att;
- newtup = heap_formtuple(tupDesc,
- values,
- nulls);
+ newtup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, newtup);
-
pfree(newtup);
/*
@@ -477,12 +501,11 @@ Async_Listen(char *relname, int pid)
*--------------------------------------------------------------
* Async_Unlisten --
*
+ * This is executed by the SQL unlisten command.
+ *
* Remove the backend from the list of listening backends
* for the specified relation.
*
- * This would correspond to the 'unlisten <relation>'
- * command, but there isn't one yet.
- *
* Results:
* pg_listeners is updated.
*
@@ -497,20 +520,81 @@ Async_Unlisten(char *relname, int pid)
Relation lDesc;
HeapTuple lTuple;
- lTuple = SearchSysCacheTuple(LISTENREL,
- PointerGetDatum(relname),
+ /* Handle specially the `unlisten "*"' command */
+ if ((!relname) || (*relname == '\0') || (strcmp(relname,"*")==0)) {
+ Async_UnlistenAll();
+ return;
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
+
+ lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
-
if (lTuple != NULL)
+ {
+ lDesc = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lDesc);
heap_delete(lDesc, &lTuple->t_ctid);
-
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
+ }
}
+/*
+ *--------------------------------------------------------------
+ * Async_UnlistenAll --
+ *
+ * Unlisten all relations for this backend.
+ *
+ * Results:
+ * pg_listeners is updated.
+ *
+ * Side effects:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+static void
+Async_UnlistenAll()
+{
+ HeapTuple lTuple;
+ Relation lRel;
+ HeapScanDesc sRel;
+ TupleDesc tdesc;
+ ScanKeyData key[1];
+
+ TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+ ScanKeyEntryInitialize(&key[0], 0,
+ Anum_pg_listener_pid,
+ F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ tdesc = RelationGetTupleDescriptor(lRel);
+ sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ {
+ heap_delete(lRel, &lTuple->t_ctid);
+ }
+ heap_endscan(sRel);
+ RelationUnsetLockForWrite(lRel);
+ heap_close(lRel);
+ TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
+}
+
+/*
+ * --------------------------------------------------------------
+ * Async_UnlistenOnExit --
+ *
+ * This is called at backend exit for each registered listen.
+ *
+ * Results:
+ * XXX
+ *
+ * --------------------------------------------------------------
+ */
static void
Async_UnlistenOnExit(int code, /* from exitpg */
char *relname)
@@ -522,6 +606,25 @@ Async_UnlistenOnExit(int code, /* from exitpg */
* --------------------------------------------------------------
* Async_NotifyFrontEnd --
*
+ * This is called outside transactions. The real work is done
+ * by Async_NotifyFrontEnd_Aux().
+ *
+ * --------------------------------------------------------------
+ */
+static void
+Async_NotifyFrontEnd()
+{
+ StartTransactionCommand();
+ Async_NotifyFrontEnd_Aux();
+ CommitTransactionCommand();
+}
+
+/*
+ * --------------------------------------------------------------
+ * Async_NotifyFrontEnd_Aux --
+ *
+ * This must be called inside a transaction block.
+ *
* Perform an asynchronous notification to front end over
* portal comm channel. The name of the relation which contains the
* data is sent to the front end.
@@ -534,12 +637,9 @@ Async_UnlistenOnExit(int code, /* from exitpg */
*
* --------------------------------------------------------------
*/
-GlobalMemory notifyContext = NULL;
-
static void
-Async_NotifyFrontEnd()
+Async_NotifyFrontEnd_Aux()
{
- extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
Relation lRel;
@@ -552,12 +652,15 @@ Async_NotifyFrontEnd()
nulls[3];
bool isnull;
- notifyFrontEndPending = 0;
+#define MAX_DONE 64
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
-#endif
+ char *done[MAX_DONE];
+ int ndone = 0;
+ int i;
+
+ notifyFrontEndPending = 0;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
StartTransactionCommand();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
@@ -580,11 +683,35 @@ Async_NotifyFrontEnd()
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+ d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
+ &isnull);
+
+ /*
+ * This hack deletes duplicate tuples which can be left
+ * in the table if the NotifyUnlock option is set.
+ * I'm further investigating this. -- dz
+ */
+ if (NotifyHack) {
+ for (i=0; i<ndone; i++) {
+ if (strcmp(DatumGetName(d)->data, done[i]) == 0) {
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd: duplicate %s",
+ DatumGetName(d)->data);
+ heap_delete(lRel, &lTuple->t_ctid);
+ continue;
+ }
+ }
+ if (ndone < MAX_DONE) {
+ done[ndone++] = pstrdup(DatumGetName(d)->data);
+ }
+ }
+
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notifying the front end */
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
+ DatumGetName(d)->data);
if (whereToSendOutput == Remote)
{
@@ -593,12 +720,12 @@ Async_NotifyFrontEnd()
pq_putstr(DatumGetName(d)->data);
pq_flush();
}
- else
- elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
}
heap_endscan(sRel);
+ RelationUnsetLockForWrite(lRel);
heap_close(lRel);
- CommitTransactionCommand();
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
}
static int