Tom Lane 72a3902a66 Create an internal semaphore API that is not tied to SysV semaphores.
As proof of concept, provide an alternate implementation based on POSIX
semaphores.  Also push the SysV shared-memory implementation into a
separate file so that it can be replaced conveniently.
2002-05-05 00:03:29 +00:00

350 lines
8.6 KiB
C

/*-------------------------------------------------------------------------
*
* sinvaladt.c
* POSTGRES shared cache invalidation segment definitions.
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.46 2002/05/05 00:03:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/sinvaladt.h"
SISeg *shmInvalBuffer;
static void CleanupInvalidationState(int status, Datum arg);
static void SISetProcStateInvalid(SISeg *segP);
/*
* SInvalShmemSize --- return shared-memory space needed
*/
int
SInvalShmemSize(int maxBackends)
{
/*
* Figure space needed. Note sizeof(SISeg) includes the first
* ProcState entry.
*/
return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
}
/*
* SIBufferInit
* Create and initialize a new SI message buffer
*/
void
SIBufferInit(int maxBackends)
{
int segSize;
SISeg *segP;
int i;
/* Allocate space in shared memory */
segSize = SInvalShmemSize(maxBackends);
shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
/* Clear message counters, save size of procState array */
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
segP->lastBackend = 0;
segP->maxBackends = maxBackends;
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive */
for (i = 0; i < maxBackends; i++)
{
segP->procState[i].nextMsgNum = -1; /* inactive */
segP->procState[i].resetState = false;
segP->procState[i].procStruct = INVALID_OFFSET;
}
}
/*
* SIBackendInit
* Initialize a new backend to operate on the sinval buffer
*
* Returns:
* >0 A-OK
* 0 Failed to find a free procState slot (ie, MaxBackends exceeded)
* <0 Some other failure (not currently used)
*
* NB: this routine, and all following ones, must be executed with the
* SInvalLock lock held, since there may be multiple backends trying
* to access the buffer.
*/
int
SIBackendInit(SISeg *segP)
{
int index;
ProcState *stateP = NULL;
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
{
if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
{
stateP = &segP->procState[index];
break;
}
}
if (stateP == NULL)
{
if (segP->lastBackend < segP->maxBackends)
{
stateP = &segP->procState[segP->lastBackend];
Assert(stateP->nextMsgNum < 0);
segP->lastBackend++;
}
else
{
/* out of procState slots */
MyBackendId = InvalidBackendId;
return 0;
}
}
MyBackendId = (stateP - &segP->procState[0]) + 1;
#ifdef INVALIDDEBUG
elog(DEBUG1, "SIBackendInit: backend id %d", MyBackendId);
#endif /* INVALIDDEBUG */
/* mark myself active, with all extant messages already read */
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->procStruct = MAKE_OFFSET(MyProc);
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
return 1;
}
/*
* CleanupInvalidationState
* Mark the current backend as no longer active.
*
* This function is called via on_shmem_exit() during backend shutdown,
* so the caller has NOT acquired the lock for us.
*
* arg is really of type "SISeg*".
*/
static void
CleanupInvalidationState(int status, Datum arg)
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
int i;
Assert(PointerIsValid(segP));
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/* Mark myself inactive */
segP->procState[MyBackendId - 1].nextMsgNum = -1;
segP->procState[MyBackendId - 1].resetState = false;
segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
/* Recompute index of last active backend */
for (i = segP->lastBackend; i > 0; i--)
{
if (segP->procState[i - 1].nextMsgNum >= 0)
break;
}
segP->lastBackend = i;
LWLockRelease(SInvalLock);
}
/*
* SIInsertDataEntry
* Add a new invalidation message to the buffer.
*
* If we are unable to insert the message because the buffer is full,
* then clear the buffer and assert the "reset" flag to each backend.
* This will cause all the backends to discard *all* invalidatable state.
*
* Returns true for normal successful insertion, false if had to reset.
*/
bool
SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
{
int numMsgs = segP->maxMsgNum - segP->minMsgNum;
/* Is the buffer full? */
if (numMsgs >= MAXNUMMESSAGES)
{
/*
* Don't panic just yet: slowest backend might have consumed some
* messages but not yet have done SIDelExpiredDataEntries() to
* advance minMsgNum. So, make sure minMsgNum is up-to-date.
*/
SIDelExpiredDataEntries(segP);
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs >= MAXNUMMESSAGES)
{
/* Yup, it's definitely full, no choice but to reset */
SISetProcStateInvalid(segP);
return false;
}
}
/*
* Try to prevent table overflow. When the table is 70% full send a
* WAKEN_CHILDREN request to the postmaster. The postmaster will send
* a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
* This will force idle backends to execute a transaction to look
* through pg_listener for NOTIFY messages, and as a byproduct of the
* transaction start they will read SI entries.
*
* This should never happen if all the backends are actively executing
* queries, but if a backend is sitting idle then it won't be starting
* transactions and so won't be reading SI entries.
*
* dz - 27 Jan 1998
*/
if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
IsUnderPostmaster)
{
elog(DEBUG3, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
}
/*
* Insert new message into proper slot of circular buffer
*/
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
segP->maxMsgNum++;
return true;
}
/*
* SISetProcStateInvalid
* Flush pending messages from buffer, assert reset flag for each backend
*
* This is used only to recover from SI buffer overflow.
*/
static void
SISetProcStateInvalid(SISeg *segP)
{
int i;
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
for (i = 0; i < segP->lastBackend; i++)
{
if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
{
segP->procState[i].resetState = true;
segP->procState[i].nextMsgNum = 0;
}
}
}
/*
* SIGetDataEntry
* get next SI message for specified backend, if there is one
*
* Possible return values:
* 0: no SI message available
* 1: next SI message has been extracted into *data
* (there may be more messages available after this one!)
* -1: SI reset message extracted
*
* NB: this can run in parallel with other instances of SIGetDataEntry
* executing on behalf of other backends. See comments in sinval.c in
* ReceiveSharedInvalidMessages().
*/
int
SIGetDataEntry(SISeg *segP, int backendId,
SharedInvalidationMessage *data)
{
ProcState *stateP = &segP->procState[backendId - 1];
if (stateP->resetState)
{
/*
* Force reset. We can say we have dealt with any messages added
* since the reset, as well...
*/
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
return -1;
}
if (stateP->nextMsgNum >= segP->maxMsgNum)
return 0; /* nothing to read */
/*
* Retrieve message and advance my counter.
*/
*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
/*
* There may be other backends that haven't read the message, so we
* cannot delete it here. SIDelExpiredDataEntries() should be called
* to remove dead messages.
*/
return 1; /* got a message */
}
/*
* SIDelExpiredDataEntries
* Remove messages that have been consumed by all active backends
*/
void
SIDelExpiredDataEntries(SISeg *segP)
{
int min,
i,
h;
min = segP->maxMsgNum;
if (min == segP->minMsgNum)
return; /* fast path if no messages exist */
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
for (i = 0; i < segP->lastBackend; i++)
{
h = segP->procState[i].nextMsgNum;
if (h >= 0)
{ /* backend active */
if (h < min)
min = h;
}
}
segP->minMsgNum = min;
/*
* When minMsgNum gets really large, decrement all message counters so
* as to forestall overflow of the counters.
*/
if (min >= MSGNUMWRAPAROUND)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->lastBackend; i++)
{
if (segP->procState[i].nextMsgNum >= 0)
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
}