diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 831 |
1 files changed, 435 insertions, 396 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 8a1e6d59b57..42d440a8676 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1,35 +1,35 @@ /*------------------------------------------------------------------------- * * async.c-- - * Asynchronous notification + * Asynchronous notification * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.17 1997/08/19 21:30:42 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.18 1997/09/07 04:40:35 momjian Exp $ * *------------------------------------------------------------------------- */ /* New Async Notification Model: * 1. Multiple backends on same machine. Multiple backends listening on - * one relation. + * one relation. * * 2. One of the backend does a 'notify <relname>'. For all backends that - * are listening to this relation (all notifications take place at the - * end of commit), - * 2.a If the process is the same as the backend process that issued - * notification (we are notifying something that we are listening), - * signal the corresponding frontend over the comm channel using the - * out-of-band channel. - * 2.b For all other listening processes, we send kill(2) to wake up - * the listening backend. + * are listening to this relation (all notifications take place at the + * end of commit), + * 2.a If the process is the same as the backend process that issued + * notification (we are notifying something that we are listening), + * signal the corresponding frontend over the comm channel using the + * out-of-band channel. + * 2.b For all other listening processes, we send kill(2) to wake up + * the listening backend. * 3. Upon receiving a kill(2) signal from another backend process notifying - * that one of the relation that we are listening is being notified, - * we can be in either of two following states: - * 3.a We are sleeping, wake up and signal our frontend. - * 3.b We are in middle of another transaction, wait until the end of - * of the current transaction and signal our frontend. + * that one of the relation that we are listening is being notified, + * we can be in either of two following states: + * 3.a We are sleeping, wake up and signal our frontend. + * 3.b We are in middle of another transaction, wait until the end of + * of the current transaction and signal our frontend. * 4. Each frontend receives this notification and prcesses accordingly. * * -- jw, 12/28/93 @@ -42,16 +42,16 @@ * Model is: * 1. Multiple backends on same machine. * - * 2. Query on one backend sends stuff over an asynchronous portal by - * appending to a relation, and then doing an async. notification - * (which takes place after commit) to all listeners on this relation. + * 2. Query on one backend sends stuff over an asynchronous portal by + * appending to a relation, and then doing an async. notification + * (which takes place after commit) to all listeners on this relation. * - * 3. Async. notification results in all backends listening on relation - * to be woken up, by a process signal kill(2), with name of relation - * passed in shared memory. + * 3. Async. notification results in all backends listening on relation + * to be woken up, by a process signal kill(2), with name of relation + * passed in shared memory. * * 4. Each backend notifies its respective frontend over the comm - * channel using the out-of-band channel. + * channel using the out-of-band channel. * * 5. Each frontend receives this notification and processes accordingly. * @@ -62,7 +62,7 @@ #include <signal.h> #include <string.h> #include <errno.h> -#include <sys/types.h> /* Needed by in.h on Ultrix */ +#include <sys/types.h> /* Needed by in.h on Ultrix */ #include <netinet/in.h> #include <postgres.h> @@ -75,546 +75,585 @@ #include <catalog/pg_proc.h> #include <catalog/catname.h> #include <catalog/pg_listener.h> -#include <access/heapam.h> +#include <access/heapam.h> #include <storage/bufmgr.h> #include <nodes/memnodes.h> #include <utils/mcxt.h> #include <commands/async.h> #include <libpq/libpq.h> -#include <port-protos.h> /* for strdup() */ +#include <port-protos.h> /* for strdup() */ #include <storage/lmgr.h> -static int notifyFrontEndPending = 0; -static int notifyIssued = 0; -static Dllist *pendingNotifies = NULL; +static int notifyFrontEndPending = 0; +static int notifyIssued = 0; +static Dllist *pendingNotifies = NULL; -static int AsyncExistsPendingNotify(char *); -static void ClearPendingNotify(void); -static void Async_NotifyFrontEnd(void); -static void Async_Unlisten(char *relname, int pid); -static void Async_UnlistenOnExit(int code, char *relname); - +static int AsyncExistsPendingNotify(char *); +static void ClearPendingNotify(void); +static void Async_NotifyFrontEnd(void); +static void Async_Unlisten(char *relname, int pid); +static void Async_UnlistenOnExit(int code, char *relname); + /* *-------------------------------------------------------------- * Async_NotifyHandler -- * - * This is the signal handler for SIGUSR2. When the backend - * is signaled, the backend can be in two states. - * 1. If the backend is in the middle of another transaction, - * we set the flag, notifyFrontEndPending, and wait until - * the end of the transaction to notify the front end. - * 2. If the backend is not in the middle of another transaction, - * we notify the front end immediately. + * This is the signal handler for SIGUSR2. When the backend + * is signaled, the backend can be in two states. + * 1. If the backend is in the middle of another transaction, + * we set the flag, notifyFrontEndPending, and wait until + * the end of the transaction to notify the front end. + * 2. If the backend is not in the middle of another transaction, + * we notify the front end immediately. * - * -- jw, 12/28/93 + * -- jw, 12/28/93 * Results: - * none + * none * * Side effects: - * none + * none */ void Async_NotifyHandler(SIGNAL_ARGS) { - extern TransactionState CurrentTransactionState; - - if ((CurrentTransactionState->state == TRANS_DEFAULT) && - (CurrentTransactionState->blockState == TRANS_DEFAULT)) { + extern TransactionState CurrentTransactionState; + + if ((CurrentTransactionState->state == TRANS_DEFAULT) && + (CurrentTransactionState->blockState == TRANS_DEFAULT)) + { #ifdef ASYNC_DEBUG - elog(DEBUG, "Waking up sleeping backend process"); + elog(DEBUG, "Waking up sleeping backend process"); #endif - Async_NotifyFrontEnd(); + Async_NotifyFrontEnd(); - }else { + } + else + { #ifdef ASYNC_DEBUG - elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d", - CurrentTransactionState->state, - CurrentTransactionState->blockState); + elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d", + CurrentTransactionState->state, + CurrentTransactionState->blockState); #endif - notifyFrontEndPending = 1; - } + notifyFrontEndPending = 1; + } } /* *-------------------------------------------------------------- * Async_Notify -- * - * Adds the relation to the list of pending notifies. - * All notification happens at end of commit. - * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + * Adds the relation to the list of pending notifies. + * All notification happens at end of commit. + * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * - * All notification of backend processes happens here, - * then each backend notifies its corresponding front end at - * the end of commit. + * All notification of backend processes happens here, + * then each backend notifies its corresponding front end at + * the end of commit. * - * This correspond to 'notify <relname>' command - * -- jw, 12/28/93 + * This correspond to 'notify <relname>' command + * -- jw, 12/28/93 * * Results: - * XXX + * XXX * * Side effects: - * All tuples for relname in pg_listener are updated. + * All tuples for relname in pg_listener are updated. * *-------------------------------------------------------------- */ void Async_Notify(char *relname) { - - HeapTuple lTuple, rTuple; - Relation lRel; - HeapScanDesc sRel; - TupleDesc tdesc; - ScanKeyData key; - Buffer b; - Datum d, value[3]; - bool isnull; - char repl[3], nulls[3]; - - char *notifyName; - + + HeapTuple lTuple, + rTuple; + Relation lRel; + HeapScanDesc sRel; + TupleDesc tdesc; + ScanKeyData key; + Buffer b; + Datum d, + value[3]; + bool isnull; + char repl[3], + nulls[3]; + + char *notifyName; + #ifdef ASYNC_DEBUG - elog(DEBUG,"Async_Notify: %s",relname); + elog(DEBUG, "Async_Notify: %s", relname); #endif - - if (!pendingNotifies) - pendingNotifies = DLNewList(); - - /* - * Allocate memory from the global malloc pool because it needs to be - * referenced also when the transaction is finished. DZ - 26-08-1996 - */ - notifyName = strdup(relname); - DLAddHead(pendingNotifies, DLNewElem(notifyName)); - - ScanKeyEntryInitialize(&key, 0, - Anum_pg_listener_relname, - NameEqualRegProcedure, - PointerGetDatum(notifyName)); - - lRel = heap_openr(ListenerRelationName); - tdesc = RelationGetTupleDescriptor(lRel); - RelationSetLockForWrite(lRel); - sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key); - - nulls[0] = nulls[1] = nulls[2] = ' '; - repl[0] = repl[1] = repl[2] = ' '; - repl[Anum_pg_listener_notify - 1] = 'r'; - value[0] = value[1] = value[2] = (Datum) 0; - value[Anum_pg_listener_notify - 1] = Int32GetDatum(1); - - while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) { - d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify, - tdesc, &isnull); - if (!DatumGetInt32(d)) { - rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl); - heap_replace(lRel, &lTuple->t_ctid, rTuple); + + if (!pendingNotifies) + pendingNotifies = DLNewList(); + + /* + * Allocate memory from the global malloc pool because it needs to be + * referenced also when the transaction is finished. DZ - 26-08-1996 + */ + notifyName = strdup(relname); + DLAddHead(pendingNotifies, DLNewElem(notifyName)); + + ScanKeyEntryInitialize(&key, 0, + Anum_pg_listener_relname, + NameEqualRegProcedure, + PointerGetDatum(notifyName)); + + lRel = heap_openr(ListenerRelationName); + tdesc = RelationGetTupleDescriptor(lRel); + RelationSetLockForWrite(lRel); + sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key); + + nulls[0] = nulls[1] = nulls[2] = ' '; + repl[0] = repl[1] = repl[2] = ' '; + repl[Anum_pg_listener_notify - 1] = 'r'; + value[0] = value[1] = value[2] = (Datum) 0; + value[Anum_pg_listener_notify - 1] = Int32GetDatum(1); + + while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) + { + d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify, + tdesc, &isnull); + if (!DatumGetInt32(d)) + { + rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl); + heap_replace(lRel, &lTuple->t_ctid, rTuple); + } + ReleaseBuffer(b); } - ReleaseBuffer(b); - } - heap_endscan(sRel); - RelationUnsetLockForWrite(lRel); - heap_close(lRel); - notifyIssued = 1; + heap_endscan(sRel); + RelationUnsetLockForWrite(lRel); + heap_close(lRel); + notifyIssued = 1; } /* *-------------------------------------------------------------- * Async_NotifyAtCommit -- * - * Signal our corresponding frontend process on relations that - * were notified. Signal all other backend process that - * are listening also. + * Signal our corresponding frontend process on relations that + * were notified. Signal all other backend process that + * are listening also. * - * -- jw, 12/28/93 + * -- jw, 12/28/93 * * Results: - * XXX + * XXX * * Side effects: - * Tuples in pg_listener that has our listenerpid are updated so - * that the notification is 0. We do not want to notify frontend - * more than once. + * Tuples in pg_listener that has our listenerpid are updated so + * that the notification is 0. We do not want to notify frontend + * more than once. * - * -- jw, 12/28/93 + * -- jw, 12/28/93 * *-------------------------------------------------------------- */ void Async_NotifyAtCommit() { - HeapTuple lTuple; - Relation lRel; - HeapScanDesc sRel; - TupleDesc tdesc; - ScanKeyData key; - Datum d; - int ourpid; - bool isnull; - Buffer b; - extern TransactionState CurrentTransactionState; - - if (!pendingNotifies) - pendingNotifies = DLNewList(); - - if ((CurrentTransactionState->state == TRANS_DEFAULT) && - (CurrentTransactionState->blockState == TRANS_DEFAULT)) { - - if (notifyIssued) { /* 'notify <relname>' issued by us */ - notifyIssued = 0; - StartTransactionCommand(); + HeapTuple lTuple; + Relation lRel; + HeapScanDesc sRel; + TupleDesc tdesc; + ScanKeyData key; + Datum d; + int ourpid; + bool isnull; + Buffer b; + extern TransactionState CurrentTransactionState; + + if (!pendingNotifies) + pendingNotifies = DLNewList(); + + if ((CurrentTransactionState->state == TRANS_DEFAULT) && + (CurrentTransactionState->blockState == TRANS_DEFAULT)) + { + + if (notifyIssued) + { /* 'notify <relname>' issued by us */ + notifyIssued = 0; + StartTransactionCommand(); #ifdef ASYNC_DEBUG - elog(DEBUG, "Async_NotifyAtCommit."); + elog(DEBUG, "Async_NotifyAtCommit."); #endif - ScanKeyEntryInitialize(&key, 0, - Anum_pg_listener_notify, - Integer32EqualRegProcedure, - Int32GetDatum(1)); - lRel = heap_openr(ListenerRelationName); - RelationSetLockForWrite(lRel); - sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key); - tdesc = RelationGetTupleDescriptor(lRel); - ourpid = getpid(); - - while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) { - d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname, - tdesc, &isnull); - - if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) { - d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid, - tdesc, &isnull); - - if (ourpid == DatumGetInt32(d)) { + ScanKeyEntryInitialize(&key, 0, + Anum_pg_listener_notify, + Integer32EqualRegProcedure, + Int32GetDatum(1)); + lRel = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lRel); + sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key); + tdesc = RelationGetTupleDescriptor(lRel); + ourpid = getpid(); + + while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) + { + d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname, + tdesc, &isnull); + + if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) + { + d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid, + tdesc, &isnull); + + if (ourpid == DatumGetInt32(d)) + { #ifdef ASYNC_DEBUG - elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1"); + elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1"); #endif - notifyFrontEndPending = 1; - } else { + notifyFrontEndPending = 1; + } + else + { #ifdef ASYNC_DEBUG - elog(DEBUG, "Notifying others"); + elog(DEBUG, "Notifying others"); #endif #ifdef HAVE_KILL - if (kill(DatumGetInt32(d), SIGUSR2) < 0) { - if (errno == ESRCH) { - heap_delete(lRel, &lTuple->t_ctid); - } + if (kill(DatumGetInt32(d), SIGUSR2) < 0) + { + if (errno == ESRCH) + { + heap_delete(lRel, &lTuple->t_ctid); + } + } +#endif + } + } + ReleaseBuffer(b); } -#endif - } + heap_endscan(sRel); + RelationUnsetLockForWrite(lRel); + heap_close(lRel); + + CommitTransactionCommand(); + ClearPendingNotify(); } - ReleaseBuffer(b); - } - heap_endscan(sRel); - RelationUnsetLockForWrite(lRel); - heap_close(lRel); - CommitTransactionCommand(); - ClearPendingNotify(); - } - - if (notifyFrontEndPending) { /* we need to notify the frontend of - all pending notifies. */ - notifyFrontEndPending = 1; - Async_NotifyFrontEnd(); + if (notifyFrontEndPending) + { /* we need to notify the frontend of all + * pending notifies. */ + notifyFrontEndPending = 1; + Async_NotifyFrontEnd(); + } } - } } /* *-------------------------------------------------------------- * Async_NotifyAtAbort -- * - * Gets rid of pending notifies. List elements are automatically - * freed through memory context. - * + * Gets rid of pending notifies. List elements are automatically + * freed through memory context. + * * * Results: - * XXX + * XXX * * Side effects: - * XXX + * XXX * *-------------------------------------------------------------- */ void Async_NotifyAtAbort() { - extern TransactionState CurrentTransactionState; - - if (notifyIssued) { - ClearPendingNotify(); - } - notifyIssued = 0; - if (pendingNotifies) - DLFreeList(pendingNotifies); - pendingNotifies = DLNewList(); - - if ((CurrentTransactionState->state == TRANS_DEFAULT) && - (CurrentTransactionState->blockState == TRANS_DEFAULT)) { - if (notifyFrontEndPending) { /* don't forget to notify front end */ - Async_NotifyFrontEnd(); + extern TransactionState CurrentTransactionState; + + if (notifyIssued) + { + ClearPendingNotify(); + } + notifyIssued = 0; + if (pendingNotifies) + DLFreeList(pendingNotifies); + pendingNotifies = DLNewList(); + + if ((CurrentTransactionState->state == TRANS_DEFAULT) && + (CurrentTransactionState->blockState == TRANS_DEFAULT)) + { + if (notifyFrontEndPending) + { /* don't forget to notify front end */ + Async_NotifyFrontEnd(); + } } - } } /* *-------------------------------------------------------------- * Async_Listen -- * - * Register a backend (identified by its Unix PID) as listening - * on the specified relation. + * Register a backend (identified by its Unix PID) as listening + * on the specified relation. * - * This corresponds to the 'listen <relation>' command in SQL + * This corresponds to the 'listen <relation>' command in SQL * - * One listener per relation, pg_listener relation is keyed - * on (relname,pid) to provide multiple listeners in future. + * One listener per relation, pg_listener relation is keyed + * on (relname,pid) to provide multiple listeners in future. * * Results: - * pg_listeners is updated. + * pg_listeners is updated. * * Side effects: - * XXX + * XXX * *-------------------------------------------------------------- */ void Async_Listen(char *relname, int pid) { - Datum values[Natts_pg_listener]; - char nulls[Natts_pg_listener]; - TupleDesc tdesc; - HeapScanDesc s; - HeapTuple htup,tup; - Relation lDesc; - Buffer b; - Datum d; - int i; - bool isnull; - int alreadyListener = 0; - int ourPid = getpid(); - char *relnamei; - TupleDesc tupDesc; - + Datum values[Natts_pg_listener]; + char nulls[Natts_pg_listener]; + TupleDesc tdesc; + HeapScanDesc s; + HeapTuple htup, + tup; + Relation lDesc; + Buffer b; + Datum d; + int i; + bool isnull; + int alreadyListener = 0; + int ourPid = getpid(); + char *relnamei; + TupleDesc tupDesc; + #ifdef ASYNC_DEBUG - elog(DEBUG,"Async_Listen: %s",relname); + elog(DEBUG, "Async_Listen: %s", relname); #endif - for (i = 0 ; i < Natts_pg_listener; i++) { - nulls[i] = ' '; - values[i] = PointerGetDatum(NULL); - } - - i = 0; - values[i++] = (Datum) relname; - values[i++] = (Datum) pid; - values[i++] = (Datum) 0; /* no notifies pending */ - - lDesc = heap_openr(ListenerRelationName); - RelationSetLockForWrite(lDesc); - - /* is someone already listening. One listener per relation */ - tdesc = RelationGetTupleDescriptor(lDesc); - s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL); - while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) { - d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc, - &isnull); - relnamei = DatumGetPointer(d); - if (!strncmp(relnamei,relname, NAMEDATALEN)) { - d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull); - pid = DatumGetInt32(d); - if (pid == ourPid) { - alreadyListener = 1; - } + for (i = 0; i < Natts_pg_listener; i++) + { + nulls[i] = ' '; + values[i] = PointerGetDatum(NULL); } - ReleaseBuffer(b); - } - heap_endscan(s); - - if (alreadyListener) { - elog(NOTICE, "Async_Listen: We are already listening on %s", - relname); - return; - } - - tupDesc = lDesc->rd_att; - tup = heap_formtuple(tupDesc, - values, - nulls); - heap_insert(lDesc, tup); - - pfree(tup); - /* if (alreadyListener) { - elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname); - }*/ - - RelationUnsetLockForWrite(lDesc); - heap_close(lDesc); - - /* - * now that we are listening, we should make a note to ourselves - * to unlisten prior to dying. - */ - relnamei = malloc(NAMEDATALEN); /* persists to process exit */ - strNcpy(relnamei, relname, NAMEDATALEN-1); - on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei); + + i = 0; + values[i++] = (Datum) relname; + values[i++] = (Datum) pid; + values[i++] = (Datum) 0; /* no notifies pending */ + + lDesc = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lDesc); + + /* is someone already listening. One listener per relation */ + tdesc = RelationGetTupleDescriptor(lDesc); + s = heap_beginscan(lDesc, 0, NowTimeQual, 0, (ScanKey) NULL); + while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b))) + { + d = (Datum) heap_getattr(htup, b, Anum_pg_listener_relname, tdesc, + &isnull); + relnamei = DatumGetPointer(d); + if (!strncmp(relnamei, relname, NAMEDATALEN)) + { + d = (Datum) heap_getattr(htup, b, Anum_pg_listener_pid, tdesc, &isnull); + pid = DatumGetInt32(d); + if (pid == ourPid) + { + alreadyListener = 1; + } + } + ReleaseBuffer(b); + } + heap_endscan(s); + + if (alreadyListener) + { + elog(NOTICE, "Async_Listen: We are already listening on %s", + relname); + return; + } + + tupDesc = lDesc->rd_att; + tup = heap_formtuple(tupDesc, + values, + nulls); + heap_insert(lDesc, tup); + + pfree(tup); + + /* + * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one + * listener on %s (possibly dead)",relname); } + */ + + RelationUnsetLockForWrite(lDesc); + heap_close(lDesc); + + /* + * now that we are listening, we should make a note to ourselves to + * unlisten prior to dying. + */ + relnamei = malloc(NAMEDATALEN); /* persists to process exit */ + strNcpy(relnamei, relname, NAMEDATALEN - 1); + on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei); } /* *-------------------------------------------------------------- * Async_Unlisten -- * - * Remove the backend from the list of listening backends - * for the specified relation. - * - * This would correspond to the 'unlisten <relation>' - * command, but there isn't one yet. + * Remove the backend from the list of listening backends + * for the specified relation. + * + * This would correspond to the 'unlisten <relation>' + * command, but there isn't one yet. * * Results: - * pg_listeners is updated. + * pg_listeners is updated. * * Side effects: - * XXX + * XXX * *-------------------------------------------------------------- */ static void Async_Unlisten(char *relname, int pid) { - Relation lDesc; - HeapTuple lTuple; - - lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname), - Int32GetDatum(pid), - 0,0); - lDesc = heap_openr(ListenerRelationName); - RelationSetLockForWrite(lDesc); - - if (lTuple != NULL) { - heap_delete(lDesc,&lTuple->t_ctid); - } - - RelationUnsetLockForWrite(lDesc); - heap_close(lDesc); + Relation lDesc; + HeapTuple lTuple; + + lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname), + Int32GetDatum(pid), + 0, 0); + lDesc = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lDesc); + + if (lTuple != NULL) + { + heap_delete(lDesc, &lTuple->t_ctid); + } + + RelationUnsetLockForWrite(lDesc); + heap_close(lDesc); } static void Async_UnlistenOnExit(int code, /* from exitpg */ - char *relname) + char *relname) { - Async_Unlisten((char *) relname, getpid()); + Async_Unlisten((char *) relname, getpid()); } /* * -------------------------------------------------------------- * Async_NotifyFrontEnd -- * - * Perform an asynchronous notification to front end over - * portal comm channel. The name of the relation which contains the - * data is sent to the front end. + * Perform an asynchronous notification to front end over + * portal comm channel. The name of the relation which contains the + * data is sent to the front end. * - * We remove the notification flag from the pg_listener tuple - * associated with our process. + * We remove the notification flag from the pg_listener tuple + * associated with our process. * * Results: - * XXX + * XXX * * Side effects: * - * We make use of the out-of-band channel to transmit the - * notification to the front end. The actual data transfer takes - * place at the front end's request. + * We make use of the out-of-band channel to transmit the + * notification to the front end. The actual data transfer takes + * place at the front end's request. * * -------------------------------------------------------------- */ -GlobalMemory notifyContext = NULL; +GlobalMemory notifyContext = NULL; static void Async_NotifyFrontEnd() { - extern CommandDest whereToSendOutput; - HeapTuple lTuple, rTuple; - Relation lRel; - HeapScanDesc sRel; - TupleDesc tdesc; - ScanKeyData key[2]; - Datum d, value[3]; - char repl[3], nulls[3]; - Buffer b; - int ourpid; - bool isnull; - - notifyFrontEndPending = 0; - + extern CommandDest whereToSendOutput; + HeapTuple lTuple, + rTuple; + Relation lRel; + HeapScanDesc sRel; + TupleDesc tdesc; + ScanKeyData key[2]; + Datum d, + value[3]; + char repl[3], + nulls[3]; + Buffer b; + int ourpid; + bool isnull; + + notifyFrontEndPending = 0; + #ifdef ASYNC_DEBUG - elog(DEBUG, "Async_NotifyFrontEnd: notifying front end."); + elog(DEBUG, "Async_NotifyFrontEnd: notifying front end."); #endif - - StartTransactionCommand(); - ourpid = getpid(); - ScanKeyEntryInitialize(&key[0], 0, - Anum_pg_listener_notify, - Integer32EqualRegProcedure, - Int32GetDatum(1)); - ScanKeyEntryInitialize(&key[1], 0, - Anum_pg_listener_pid, - Integer32EqualRegProcedure, - Int32GetDatum(ourpid)); - lRel = heap_openr(ListenerRelationName); - RelationSetLockForWrite(lRel); - tdesc = RelationGetTupleDescriptor(lRel); - sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key); - - nulls[0] = nulls[1] = nulls[2] = ' '; - repl[0] = repl[1] = repl[2] = ' '; - repl[Anum_pg_listener_notify - 1] = 'r'; - value[0] = value[1] = value[2] = (Datum) 0; - value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); - - while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) { - d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname, - tdesc, &isnull); - rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl); - heap_replace(lRel, &lTuple->t_ctid, rTuple); - - /* notifying the front end */ - - if (whereToSendOutput == Remote) { - pq_putnchar("A", 1); - pq_putint(ourpid, 4); - pq_putstr(DatumGetName(d)->data); - pq_flush(); - } else { - elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions"); + + StartTransactionCommand(); + ourpid = getpid(); + ScanKeyEntryInitialize(&key[0], 0, + Anum_pg_listener_notify, + Integer32EqualRegProcedure, + Int32GetDatum(1)); + ScanKeyEntryInitialize(&key[1], 0, + Anum_pg_listener_pid, + Integer32EqualRegProcedure, + Int32GetDatum(ourpid)); + lRel = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lRel); + tdesc = RelationGetTupleDescriptor(lRel); + sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key); + + nulls[0] = nulls[1] = nulls[2] = ' '; + repl[0] = repl[1] = repl[2] = ' '; + repl[Anum_pg_listener_notify - 1] = 'r'; + value[0] = value[1] = value[2] = (Datum) 0; + value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); + + while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) + { + d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname, + tdesc, &isnull); + rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl); + heap_replace(lRel, &lTuple->t_ctid, rTuple); + + /* notifying the front end */ + + if (whereToSendOutput == Remote) + { + pq_putnchar("A", 1); + pq_putint(ourpid, 4); + pq_putstr(DatumGetName(d)->data); + pq_flush(); + } + else + { + elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions"); + } + ReleaseBuffer(b); } - ReleaseBuffer(b); - } - CommitTransactionCommand(); + CommitTransactionCommand(); } static int AsyncExistsPendingNotify(char *relname) { - Dlelem* p; - for (p = DLGetHead(pendingNotifies); - p != NULL; - p = DLGetSucc(p)) { - /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ - if (!strncmp(DLE_VAL(p), relname, NAMEDATALEN)) - return 1; - } - - return 0; + Dlelem *p; + + for (p = DLGetHead(pendingNotifies); + p != NULL; + p = DLGetSucc(p)) + { + /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ + if (!strncmp(DLE_VAL(p), relname, NAMEDATALEN)) + return 1; + } + + return 0; } static void ClearPendingNotify() { - Dlelem* p; - while ( (p = DLRemHead(pendingNotifies)) != NULL) - free(DLE_VAL(p)); -} + Dlelem *p; + while ((p = DLRemHead(pendingNotifies)) != NULL) + free(DLE_VAL(p)); +} |