mirror of
https://github.com/postgres/postgres.git
synced 2025-05-17 00:03:56 -04:00
1. Rewritten libpq to allow asynchronous clients. 2. Implemented client side of cancel protocol in library, and patched psql.c to send a cancel request upon SIGINT. The backend doesn't notice it yet :-( 3. Implemented 'Z' protocol message addition and renaming of copy in/out start messages. These are implemented conditionally, ie, the client protocol version is checked; so the code should still work with 1.0 clients. 4. Revised protocol and libpq sgml documents (don't have an SGML compiler, though, so there may be some markup glitches here). What remains to be done: 1. Implement addition of atttypmod field to RowDescriptor messages. The client-side code is there but ifdef'd out. I have no idea what to change on the backend side. The field should be sent only if protocol >= 2.0, of course. 2. Implement backend response to cancel requests received as OOB messages. (This prolly need not be conditional on protocol version; just do it if you get SIGURG.) 3. Update libpq.3. (I'm hoping this can be generated mechanically from libpq.sgml... if not, will do it by hand.) Is there any other doco to fix? 4. Update non-libpq interfaces as necessary. I patched libpgtcl so that it would compile, but haven't tested it. Dunno what needs to be done with the other interfaces. Have at it! Tom Lane
646 lines
16 KiB
C
646 lines
16 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* async.c--
|
|
* Asynchronous notification
|
|
*
|
|
* Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.32 1998/05/06 23:49:52 momjian Exp $
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
/* New Async Notification Model:
|
|
* 1. Multiple backends on same machine. Multiple backends listening on
|
|
* one relation.
|
|
*
|
|
* 2. One of the backend does a 'notify <relname>'. For all backends that
|
|
* are listening to this relation (all notifications take place at the
|
|
* end of commit),
|
|
* 2.a If the process is the same as the backend process that issued
|
|
* notification (we are notifying something that we are listening),
|
|
* signal the corresponding frontend over the comm channel.
|
|
* 2.b For all other listening processes, we send kill(2) to wake up
|
|
* the listening backend.
|
|
* 3. Upon receiving a kill(2) signal from another backend process notifying
|
|
* that one of the relation that we are listening is being notified,
|
|
* we can be in either of two following states:
|
|
* 3.a We are sleeping, wake up and signal our frontend.
|
|
* 3.b We are in middle of another transaction, wait until the end of
|
|
* of the current transaction and signal our frontend.
|
|
* 4. Each frontend receives this notification and processes accordingly.
|
|
*
|
|
* -- jw, 12/28/93
|
|
*
|
|
*/
|
|
/*
|
|
* The following is the old model which does not work.
|
|
*/
|
|
/*
|
|
* Model is:
|
|
* 1. Multiple backends on same machine.
|
|
*
|
|
* 2. Query on one backend sends stuff over an asynchronous portal by
|
|
* appending to a relation, and then doing an async. notification
|
|
* (which takes place after commit) to all listeners on this relation.
|
|
*
|
|
* 3. Async. notification results in all backends listening on relation
|
|
* to be woken up, by a process signal kill(2), with name of relation
|
|
* passed in shared memory.
|
|
*
|
|
* 4. Each backend notifies its respective frontend over the comm
|
|
* channel using the out-of-band channel.
|
|
*
|
|
* 5. Each frontend receives this notification and processes accordingly.
|
|
*
|
|
* #4,#5 are changing soon with pending rewrite of portal/protocol.
|
|
*
|
|
*/
|
|
#include <unistd.h>
|
|
#include <signal.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <sys/types.h> /* Needed by in.h on Ultrix */
|
|
#include <netinet/in.h>
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/heapam.h"
|
|
#include "access/relscan.h"
|
|
#include "access/xact.h"
|
|
#include "catalog/catname.h"
|
|
#include "catalog/pg_listener.h"
|
|
#include "commands/async.h"
|
|
#include "fmgr.h"
|
|
#include "lib/dllist.h"
|
|
#include "libpq/libpq.h"
|
|
#include "miscadmin.h"
|
|
#include "nodes/memnodes.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "storage/lmgr.h"
|
|
#include "tcop/dest.h"
|
|
#include "utils/mcxt.h"
|
|
#include "utils/syscache.h"
|
|
|
|
static int notifyFrontEndPending = 0;
|
|
static int notifyIssued = 0;
|
|
static Dllist *pendingNotifies = NULL;
|
|
|
|
|
|
static int AsyncExistsPendingNotify(char *);
|
|
static void ClearPendingNotify(void);
|
|
static void Async_NotifyFrontEnd(void);
|
|
void Async_Unlisten(char *relname, int pid);
|
|
static void Async_UnlistenOnExit(int code, char *relname);
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_NotifyHandler --
|
|
*
|
|
* This is the signal handler for SIGUSR2. When the backend
|
|
* is signaled, the backend can be in two states.
|
|
* 1. If the backend is in the middle of another transaction,
|
|
* we set the flag, notifyFrontEndPending, and wait until
|
|
* the end of the transaction to notify the front end.
|
|
* 2. If the backend is not in the middle of another transaction,
|
|
* we notify the front end immediately.
|
|
*
|
|
* -- jw, 12/28/93
|
|
* Results:
|
|
* none
|
|
*
|
|
* Side effects:
|
|
* none
|
|
*/
|
|
void
|
|
Async_NotifyHandler(SIGNAL_ARGS)
|
|
{
|
|
extern TransactionState CurrentTransactionState;
|
|
|
|
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
|
(CurrentTransactionState->blockState == TRANS_DEFAULT))
|
|
{
|
|
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Waking up sleeping backend process");
|
|
#endif
|
|
Async_NotifyFrontEnd();
|
|
|
|
}
|
|
else
|
|
{
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
|
|
CurrentTransactionState->state,
|
|
CurrentTransactionState->blockState);
|
|
#endif
|
|
notifyFrontEndPending = 1;
|
|
}
|
|
}
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_Notify --
|
|
*
|
|
* Adds the relation to the list of pending notifies.
|
|
* All notification happens at end of commit.
|
|
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
|
*
|
|
* All notification of backend processes happens here,
|
|
* then each backend notifies its corresponding front end at
|
|
* the end of commit.
|
|
*
|
|
* This correspond to 'notify <relname>' command
|
|
* -- jw, 12/28/93
|
|
*
|
|
* Results:
|
|
* XXX
|
|
*
|
|
* Side effects:
|
|
* All tuples for relname in pg_listener are updated.
|
|
*
|
|
*--------------------------------------------------------------
|
|
*/
|
|
void
|
|
Async_Notify(char *relname)
|
|
{
|
|
|
|
HeapTuple lTuple,
|
|
rTuple;
|
|
Relation lRel;
|
|
HeapScanDesc sRel;
|
|
TupleDesc tdesc;
|
|
ScanKeyData key;
|
|
Buffer b;
|
|
Datum d,
|
|
value[3];
|
|
bool isnull;
|
|
char repl[3],
|
|
nulls[3];
|
|
|
|
char *notifyName;
|
|
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Async_Notify: %s", relname);
|
|
#endif
|
|
|
|
if (!pendingNotifies)
|
|
pendingNotifies = DLNewList();
|
|
|
|
/*
|
|
* Allocate memory from the global malloc pool because it needs to be
|
|
* referenced also when the transaction is finished. DZ - 26-08-1996
|
|
*/
|
|
notifyName = strdup(relname);
|
|
DLAddHead(pendingNotifies, DLNewElem(notifyName));
|
|
|
|
ScanKeyEntryInitialize(&key, 0,
|
|
Anum_pg_listener_relname,
|
|
F_NAMEEQ,
|
|
PointerGetDatum(notifyName));
|
|
|
|
lRel = heap_openr(ListenerRelationName);
|
|
tdesc = RelationGetTupleDescriptor(lRel);
|
|
RelationSetLockForWrite(lRel);
|
|
sRel = heap_beginscan(lRel, 0, false, 1, &key);
|
|
|
|
nulls[0] = nulls[1] = nulls[2] = ' ';
|
|
repl[0] = repl[1] = repl[2] = ' ';
|
|
repl[Anum_pg_listener_notify - 1] = 'r';
|
|
value[0] = value[1] = value[2] = (Datum) 0;
|
|
value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
|
|
|
|
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
|
|
{
|
|
d = heap_getattr(lTuple, Anum_pg_listener_notify,
|
|
tdesc, &isnull);
|
|
if (!DatumGetInt32(d))
|
|
{
|
|
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
|
|
heap_replace(lRel, &lTuple->t_ctid, rTuple);
|
|
}
|
|
ReleaseBuffer(b);
|
|
}
|
|
heap_endscan(sRel);
|
|
RelationUnsetLockForWrite(lRel);
|
|
heap_close(lRel);
|
|
notifyIssued = 1;
|
|
}
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_NotifyAtCommit --
|
|
*
|
|
* Signal our corresponding frontend process on relations that
|
|
* were notified. Signal all other backend process that
|
|
* are listening also.
|
|
*
|
|
* -- jw, 12/28/93
|
|
*
|
|
* Results:
|
|
* XXX
|
|
*
|
|
* Side effects:
|
|
* Tuples in pg_listener that has our listenerpid are updated so
|
|
* that the notification is 0. We do not want to notify frontend
|
|
* more than once.
|
|
*
|
|
* -- jw, 12/28/93
|
|
*
|
|
*--------------------------------------------------------------
|
|
*/
|
|
void
|
|
Async_NotifyAtCommit()
|
|
{
|
|
HeapTuple lTuple;
|
|
Relation lRel;
|
|
HeapScanDesc sRel;
|
|
TupleDesc tdesc;
|
|
ScanKeyData key;
|
|
Datum d;
|
|
bool isnull;
|
|
Buffer b;
|
|
extern TransactionState CurrentTransactionState;
|
|
|
|
if (!pendingNotifies)
|
|
pendingNotifies = DLNewList();
|
|
|
|
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
|
(CurrentTransactionState->blockState == TRANS_DEFAULT))
|
|
{
|
|
|
|
if (notifyIssued)
|
|
{ /* 'notify <relname>' issued by us */
|
|
notifyIssued = 0;
|
|
StartTransactionCommand();
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Async_NotifyAtCommit.");
|
|
#endif
|
|
ScanKeyEntryInitialize(&key, 0,
|
|
Anum_pg_listener_notify,
|
|
F_INT4EQ,
|
|
Int32GetDatum(1));
|
|
lRel = heap_openr(ListenerRelationName);
|
|
RelationSetLockForWrite(lRel);
|
|
sRel = heap_beginscan(lRel, 0, false, 1, &key);
|
|
tdesc = RelationGetTupleDescriptor(lRel);
|
|
|
|
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
|
|
{
|
|
d = heap_getattr(lTuple, Anum_pg_listener_relname,
|
|
tdesc, &isnull);
|
|
|
|
if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
|
|
{
|
|
d = heap_getattr(lTuple, Anum_pg_listener_pid,
|
|
tdesc, &isnull);
|
|
|
|
if (MyProcPid == DatumGetInt32(d))
|
|
{
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
|
|
#endif
|
|
notifyFrontEndPending = 1;
|
|
}
|
|
else
|
|
{
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Notifying others");
|
|
#endif
|
|
#ifdef HAVE_KILL
|
|
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
|
|
{
|
|
if (errno == ESRCH)
|
|
{
|
|
heap_delete(lRel, &lTuple->t_ctid);
|
|
}
|
|
}
|
|
#endif
|
|
}
|
|
}
|
|
ReleaseBuffer(b);
|
|
}
|
|
heap_endscan(sRel);
|
|
RelationUnsetLockForWrite(lRel);
|
|
heap_close(lRel);
|
|
|
|
CommitTransactionCommand();
|
|
ClearPendingNotify();
|
|
}
|
|
|
|
if (notifyFrontEndPending)
|
|
{ /* we need to notify the frontend of all
|
|
* pending notifies. */
|
|
notifyFrontEndPending = 1;
|
|
Async_NotifyFrontEnd();
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_NotifyAtAbort --
|
|
*
|
|
* Gets rid of pending notifies. List elements are automatically
|
|
* freed through memory context.
|
|
*
|
|
*
|
|
* Results:
|
|
* XXX
|
|
*
|
|
* Side effects:
|
|
* XXX
|
|
*
|
|
*--------------------------------------------------------------
|
|
*/
|
|
void
|
|
Async_NotifyAtAbort()
|
|
{
|
|
extern TransactionState CurrentTransactionState;
|
|
|
|
if (notifyIssued)
|
|
{
|
|
ClearPendingNotify();
|
|
}
|
|
notifyIssued = 0;
|
|
if (pendingNotifies)
|
|
DLFreeList(pendingNotifies);
|
|
pendingNotifies = DLNewList();
|
|
|
|
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
|
(CurrentTransactionState->blockState == TRANS_DEFAULT))
|
|
{
|
|
if (notifyFrontEndPending)
|
|
{ /* don't forget to notify front end */
|
|
Async_NotifyFrontEnd();
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_Listen --
|
|
*
|
|
* Register a backend (identified by its Unix PID) as listening
|
|
* on the specified relation.
|
|
*
|
|
* This corresponds to the 'listen <relation>' command in SQL
|
|
*
|
|
* One listener per relation, pg_listener relation is keyed
|
|
* on (relname,pid) to provide multiple listeners in future.
|
|
*
|
|
* Results:
|
|
* pg_listeners is updated.
|
|
*
|
|
* Side effects:
|
|
* XXX
|
|
*
|
|
*--------------------------------------------------------------
|
|
*/
|
|
void
|
|
Async_Listen(char *relname, int pid)
|
|
{
|
|
Datum values[Natts_pg_listener];
|
|
char nulls[Natts_pg_listener];
|
|
TupleDesc tdesc;
|
|
HeapScanDesc s;
|
|
HeapTuple htup,
|
|
tup;
|
|
Relation lDesc;
|
|
Buffer b;
|
|
Datum d;
|
|
int i;
|
|
bool isnull;
|
|
int alreadyListener = 0;
|
|
char *relnamei;
|
|
TupleDesc tupDesc;
|
|
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Async_Listen: %s", relname);
|
|
#endif
|
|
for (i = 0; i < Natts_pg_listener; i++)
|
|
{
|
|
nulls[i] = ' ';
|
|
values[i] = PointerGetDatum(NULL);
|
|
}
|
|
|
|
i = 0;
|
|
values[i++] = (Datum) relname;
|
|
values[i++] = (Datum) pid;
|
|
values[i++] = (Datum) 0; /* no notifies pending */
|
|
|
|
lDesc = heap_openr(ListenerRelationName);
|
|
RelationSetLockForWrite(lDesc);
|
|
|
|
/* is someone already listening. One listener per relation */
|
|
tdesc = RelationGetTupleDescriptor(lDesc);
|
|
s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
|
|
while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
|
|
{
|
|
d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
|
|
&isnull);
|
|
relnamei = DatumGetPointer(d);
|
|
if (!strncmp(relnamei, relname, NAMEDATALEN))
|
|
{
|
|
d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
|
|
pid = DatumGetInt32(d);
|
|
if (pid == MyProcPid)
|
|
{
|
|
alreadyListener = 1;
|
|
}
|
|
}
|
|
ReleaseBuffer(b);
|
|
}
|
|
heap_endscan(s);
|
|
|
|
if (alreadyListener)
|
|
{
|
|
elog(NOTICE, "Async_Listen: We are already listening on %s",
|
|
relname);
|
|
return;
|
|
}
|
|
|
|
tupDesc = lDesc->rd_att;
|
|
tup = heap_formtuple(tupDesc,
|
|
values,
|
|
nulls);
|
|
heap_insert(lDesc, tup);
|
|
|
|
pfree(tup);
|
|
|
|
/*
|
|
* if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
|
|
* listener on %s (possibly dead)",relname); }
|
|
*/
|
|
|
|
RelationUnsetLockForWrite(lDesc);
|
|
heap_close(lDesc);
|
|
|
|
/*
|
|
* now that we are listening, we should make a note to ourselves to
|
|
* unlisten prior to dying.
|
|
*/
|
|
relnamei = malloc(NAMEDATALEN); /* persists to process exit */
|
|
StrNCpy(relnamei, relname, NAMEDATALEN);
|
|
on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
|
|
}
|
|
|
|
/*
|
|
*--------------------------------------------------------------
|
|
* Async_Unlisten --
|
|
*
|
|
* Remove the backend from the list of listening backends
|
|
* for the specified relation.
|
|
*
|
|
* This would correspond to the 'unlisten <relation>'
|
|
* command, but there isn't one yet.
|
|
*
|
|
* Results:
|
|
* pg_listeners is updated.
|
|
*
|
|
* Side effects:
|
|
* XXX
|
|
*
|
|
*--------------------------------------------------------------
|
|
*/
|
|
void
|
|
Async_Unlisten(char *relname, int pid)
|
|
{
|
|
Relation lDesc;
|
|
HeapTuple lTuple;
|
|
|
|
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
|
|
Int32GetDatum(pid),
|
|
0, 0);
|
|
lDesc = heap_openr(ListenerRelationName);
|
|
RelationSetLockForWrite(lDesc);
|
|
|
|
if (lTuple != NULL)
|
|
{
|
|
heap_delete(lDesc, &lTuple->t_ctid);
|
|
}
|
|
|
|
RelationUnsetLockForWrite(lDesc);
|
|
heap_close(lDesc);
|
|
}
|
|
|
|
static void
|
|
Async_UnlistenOnExit(int code, /* from exitpg */
|
|
char *relname)
|
|
{
|
|
Async_Unlisten((char *) relname, MyProcPid);
|
|
}
|
|
|
|
/*
|
|
* --------------------------------------------------------------
|
|
* Async_NotifyFrontEnd --
|
|
*
|
|
* Perform an asynchronous notification to front end over
|
|
* portal comm channel. The name of the relation which contains the
|
|
* data is sent to the front end.
|
|
*
|
|
* We remove the notification flag from the pg_listener tuple
|
|
* associated with our process.
|
|
*
|
|
* Results:
|
|
* XXX
|
|
*
|
|
* --------------------------------------------------------------
|
|
*/
|
|
GlobalMemory notifyContext = NULL;
|
|
|
|
static void
|
|
Async_NotifyFrontEnd()
|
|
{
|
|
extern CommandDest whereToSendOutput;
|
|
HeapTuple lTuple,
|
|
rTuple;
|
|
Relation lRel;
|
|
HeapScanDesc sRel;
|
|
TupleDesc tdesc;
|
|
ScanKeyData key[2];
|
|
Datum d,
|
|
value[3];
|
|
char repl[3],
|
|
nulls[3];
|
|
Buffer b;
|
|
bool isnull;
|
|
|
|
notifyFrontEndPending = 0;
|
|
|
|
#ifdef ASYNC_DEBUG
|
|
elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
|
|
#endif
|
|
|
|
StartTransactionCommand();
|
|
ScanKeyEntryInitialize(&key[0], 0,
|
|
Anum_pg_listener_notify,
|
|
F_INT4EQ,
|
|
Int32GetDatum(1));
|
|
ScanKeyEntryInitialize(&key[1], 0,
|
|
Anum_pg_listener_pid,
|
|
F_INT4EQ,
|
|
Int32GetDatum(MyProcPid));
|
|
lRel = heap_openr(ListenerRelationName);
|
|
RelationSetLockForWrite(lRel);
|
|
tdesc = RelationGetTupleDescriptor(lRel);
|
|
sRel = heap_beginscan(lRel, 0, false, 2, key);
|
|
|
|
nulls[0] = nulls[1] = nulls[2] = ' ';
|
|
repl[0] = repl[1] = repl[2] = ' ';
|
|
repl[Anum_pg_listener_notify - 1] = 'r';
|
|
value[0] = value[1] = value[2] = (Datum) 0;
|
|
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
|
|
|
|
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
|
|
{
|
|
d = heap_getattr(lTuple, Anum_pg_listener_relname,
|
|
tdesc, &isnull);
|
|
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
|
|
heap_replace(lRel, &lTuple->t_ctid, rTuple);
|
|
|
|
/* notifying the front end */
|
|
|
|
if (whereToSendOutput == Remote)
|
|
{
|
|
pq_putnchar("A", 1);
|
|
pq_putint((int32) MyProcPid, sizeof(int32));
|
|
pq_putstr(DatumGetName(d)->data);
|
|
pq_flush();
|
|
}
|
|
else
|
|
{
|
|
elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
|
|
}
|
|
ReleaseBuffer(b);
|
|
}
|
|
CommitTransactionCommand();
|
|
}
|
|
|
|
static int
|
|
AsyncExistsPendingNotify(char *relname)
|
|
{
|
|
Dlelem *p;
|
|
|
|
for (p = DLGetHead(pendingNotifies);
|
|
p != NULL;
|
|
p = DLGetSucc(p))
|
|
{
|
|
/* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
|
|
if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
ClearPendingNotify()
|
|
{
|
|
Dlelem *p;
|
|
|
|
while ((p = DLRemHead(pendingNotifies)) != NULL)
|
|
free(DLE_VAL(p));
|
|
}
|