2001-06-01 20:07:16 +00:00

439 lines
11 KiB
C

/*-------------------------------------------------------------------------
*
* sinval.c
* POSTGRES shared cache invalidation communication code.
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.32 2001/06/01 20:07:16 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <sys/types.h>
#include "storage/backendid.h"
#include "storage/proc.h"
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
#include "utils/tqual.h"
SPINLOCK SInvalLock = (SPINLOCK) NULL;
/****************************************************************************/
/* CreateSharedInvalidationState() Initialize SI buffer */
/* */
/* should be called only by the POSTMASTER */
/****************************************************************************/
void
CreateSharedInvalidationState(int maxBackends)
{
/* SInvalLock must be initialized already, during spinlock init */
SIBufferInit(maxBackends);
}
/*
* InitBackendSharedInvalidationState
* Initialize new backend's state info in buffer segment.
*/
void
InitBackendSharedInvalidationState(void)
{
SpinAcquire(SInvalLock);
if (!SIBackendInit(shmInvalBuffer))
{
SpinRelease(SInvalLock);
elog(FATAL, "Backend cache invalidation initialization failed");
}
SpinRelease(SInvalLock);
}
/*
* RegisterSharedInvalid
* Add a shared-cache-invalidation message to the global SI message queue.
*
* Note:
* Assumes hash index is valid.
* Assumes item pointer is valid.
*/
void
RegisterSharedInvalid(int cacheId, /* XXX */
Index hashIndex,
ItemPointer pointer)
{
SharedInvalidData newInvalid;
bool insertOK;
/*
* This code has been hacked to accept two types of messages. This
* might be treated more generally in the future.
*
* (1) cacheId= system cache id hashIndex= system cache hash index for a
* (possibly) cached tuple pointer= pointer of (possibly) cached tuple
*
* (2) cacheId= special non-syscache id hashIndex= object id contained in
* (possibly) cached relation descriptor pointer= null
*/
newInvalid.cacheId = cacheId;
newInvalid.hashIndex = hashIndex;
if (ItemPointerIsValid(pointer))
ItemPointerCopy(pointer, &newInvalid.pointerData);
else
ItemPointerSetInvalid(&newInvalid.pointerData);
SpinAcquire(SInvalLock);
insertOK = SIInsertDataEntry(shmInvalBuffer, &newInvalid);
SpinRelease(SInvalLock);
if (!insertOK)
elog(DEBUG, "RegisterSharedInvalid: SI buffer overflow");
}
/*
* InvalidateSharedInvalid
* Process shared-cache-invalidation messages waiting for this backend
*/
void
InvalidateSharedInvalid(void (*invalFunction) (),
void (*resetFunction) ())
{
SharedInvalidData data;
int getResult;
bool gotMessage = false;
for (;;)
{
SpinAcquire(SInvalLock);
getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
SpinRelease(SInvalLock);
if (getResult == 0)
break; /* nothing more to do */
if (getResult < 0)
{
/* got a reset message */
elog(DEBUG, "InvalidateSharedInvalid: cache state reset");
resetFunction();
}
else
{
/* got a normal data message */
invalFunction(data.cacheId,
data.hashIndex,
&data.pointerData);
}
gotMessage = true;
}
/* If we got any messages, try to release dead messages */
if (gotMessage)
{
SpinAcquire(SInvalLock);
SIDelExpiredDataEntries(shmInvalBuffer);
SpinRelease(SInvalLock);
}
}
/****************************************************************************/
/* Functions that need to scan the PROC structures of all running backends. */
/* It's a bit strange to keep these in sinval.c, since they don't have any */
/* direct relationship to shared-cache invalidation. But the procState */
/* array in the SI segment is the only place in the system where we have */
/* an array of per-backend data, so it is the most convenient place to keep */
/* pointers to the backends' PROC structures. We used to implement these */
/* functions with a slow, ugly search through the ShmemIndex hash table --- */
/* now they are simple loops over the SI ProcState array. */
/****************************************************************************/
/*
* DatabaseHasActiveBackends -- are there any backends running in the given DB
*
* If 'ignoreMyself' is TRUE, ignore this particular backend while checking
* for backends in the target database.
*
* This function is used to interlock DROP DATABASE against there being
* any active backends in the target DB --- dropping the DB while active
* backends remain would be a Bad Thing. Note that we cannot detect here
* the possibility of a newly-started backend that is trying to connect
* to the doomed database, so additional interlocking is needed during
* backend startup.
*/
bool
DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
SpinAcquire(SInvalLock);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
if (proc->databaseId == databaseId)
{
if (ignoreMyself && proc == MyProc)
continue;
result = true;
break;
}
}
}
SpinRelease(SInvalLock);
return result;
}
/*
* TransactionIdIsInProgress -- is given transaction running by some backend
*/
bool
TransactionIdIsInProgress(TransactionId xid)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
SpinAcquire(SInvalLock);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
if (proc->xid == xid)
{
result = true;
break;
}
}
}
SpinRelease(SInvalLock);
return result;
}
/*
* GetXmaxRecent -- returns oldest transaction that was running
* when all current transaction were started.
* It's used by vacuum to decide what deleted
* tuples must be preserved in a table.
*/
void
GetXmaxRecent(TransactionId *XmaxRecent)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
*XmaxRecent = GetCurrentTransactionId();
SpinAcquire(SInvalLock);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
TransactionId xmin;
xmin = proc->xmin; /* we don't use spin-locking in
* AbortTransaction() ! */
if (proc == MyProc || xmin < FirstTransactionId)
continue;
if (xmin < *XmaxRecent)
*XmaxRecent = xmin;
}
}
SpinRelease(SInvalLock);
}
/*
* GetSnapshotData -- returns information about running transactions.
*/
Snapshot
GetSnapshotData(bool serializable)
{
Snapshot snapshot = (Snapshot) malloc(sizeof(SnapshotData));
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
int count = 0;
if (snapshot == NULL)
elog(ERROR, "Memory exhausted in GetSnapshotData");
snapshot->xmin = GetCurrentTransactionId();
SpinAcquire(SInvalLock);
/*
* There can be no more than lastBackend active transactions, so this
* is enough space:
*/
snapshot->xip = (TransactionId *)
malloc(segP->lastBackend * sizeof(TransactionId));
if (snapshot->xip == NULL)
{
SpinRelease(SInvalLock);
elog(ERROR, "Memory exhausted in GetSnapshotData");
}
/*
* Unfortunately, we have to call ReadNewTransactionId() after
* acquiring SInvalLock above. It's not good because
* ReadNewTransactionId() does SpinAcquire(XidGenLockId) but
* _necessary_.
*/
ReadNewTransactionId(&(snapshot->xmax));
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
TransactionId xid;
/*
* We don't use spin-locking when changing proc->xid in
* GetNewTransactionId() and in AbortTransaction() !..
*/
xid = proc->xid;
if (proc == MyProc ||
xid < FirstTransactionId || xid >= snapshot->xmax)
{
/*--------
* Seems that there is no sense to store
* xid >= snapshot->xmax
* (what we got from ReadNewTransactionId above)
* in snapshot->xip. We just assume that all xacts
* with such xid-s are running and may be ignored.
*--------
*/
continue;
}
if (xid < snapshot->xmin)
snapshot->xmin = xid;
snapshot->xip[count] = xid;
count++;
}
}
if (serializable)
MyProc->xmin = snapshot->xmin;
/* Serializable snapshot must be computed before any other... */
Assert(MyProc->xmin != InvalidTransactionId);
SpinRelease(SInvalLock);
snapshot->xcnt = count;
return snapshot;
}
/*
* CountActiveBackends --- count backends (other than myself) that are in
* active transactions. This is used as a heuristic to decide if
* a pre-XLOG-flush delay is worthwhile during commit.
*
* An active transaction is something that has written at least one XLOG
* record; read-only transactions don't count. Also, do not count backends
* that are blocked waiting for locks, since they are not going to get to
* run until someone else commits.
*/
int
CountActiveBackends(void)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int count = 0;
int index;
/*
* Note: for speed, we don't acquire SInvalLock. This is a little bit
* bogus, but since we are only testing xrecoff for zero or nonzero,
* it should be OK. The result is only used for heuristic purposes
* anyway...
*/
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
if (proc == MyProc)
continue; /* do not count myself */
if (proc->logRec.xrecoff == 0)
continue; /* do not count if not in a transaction */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;
}
}
return count;
}
/*
* GetUndoRecPtr -- returns oldest PROC->logRec.
*/
XLogRecPtr
GetUndoRecPtr(void)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
XLogRecPtr urec = {0, 0};
XLogRecPtr tempr;
int index;
SpinAcquire(SInvalLock);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PROC *proc = (PROC *) MAKE_PTR(pOffset);
tempr = proc->logRec;
if (tempr.xrecoff == 0)
continue;
if (urec.xrecoff != 0 && XLByteLT(urec, tempr))
continue;
urec = tempr;
}
}
SpinRelease(SInvalLock);
return (urec);
}