Peter Eisentraut 1f605b82ba Change argument of appendBinaryStringInfo from char * to void *
There is some code that uses this function to assemble some kind of
packed binary layout, which requires a bunch of casts because of this.
Functions taking binary data plus length should take void * instead,
like memcpy() for example.

Discussion: https://www.postgresql.org/message-id/flat/a0086cfc-ff0f-2827-20fe-52b591d2666c%40enterprisedb.com
2022-12-30 11:05:09 +01:00

709 lines
18 KiB
C

/*-------------------------------------------------------------------------
* xid8funcs.c
*
* Export internal transaction IDs to user level.
*
* Note that only top-level transaction IDs are exposed to user sessions.
* This is important because xid8s frequently persist beyond the global
* xmin horizon, or may even be shipped to other machines, so we cannot
* rely on being able to correlate subtransaction IDs with their parents
* via functions such as SubTransGetTopmostTransaction().
*
* These functions are used to support the txid_XXX functions and the newer
* pg_current_xact, pg_current_snapshot and related fmgr functions, since the
* only difference between them is whether they expose xid8 or int8 values to
* users. The txid_XXX variants should eventually be dropped.
*
*
* Copyright (c) 2003-2022, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
* 64-bit txids: Marko Kreen, Skype Technologies
*
* src/backend/utils/adt/xid8funcs.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/clog.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "lib/qunique.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "storage/lwlock.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/xid8.h"
/*
* If defined, use bsearch() function for searching for xid8s in snapshots
* that have more than the specified number of values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
/*
* Snapshot containing FullTransactionIds.
*/
typedef struct
{
/*
* 4-byte length hdr, should not be touched directly.
*
* Explicit embedding is ok as we want always correct alignment anyway.
*/
int32 __varsz;
uint32 nxip; /* number of fxids in xip array */
FullTransactionId xmin;
FullTransactionId xmax;
/* in-progress fxids, xmin <= xip[i] < xmax: */
FullTransactionId xip[FLEXIBLE_ARRAY_MEMBER];
} pg_snapshot;
#define PG_SNAPSHOT_SIZE(nxip) \
(offsetof(pg_snapshot, xip) + sizeof(FullTransactionId) * (nxip))
#define PG_SNAPSHOT_MAX_NXIP \
((MaxAllocSize - offsetof(pg_snapshot, xip)) / sizeof(FullTransactionId))
/*
* Compile-time limits on the procarray (MAX_BACKENDS processes plus
* MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
*/
StaticAssertDecl(MAX_BACKENDS * 2 <= PG_SNAPSHOT_MAX_NXIP,
"possible overflow in pg_current_snapshot()");
/*
* Helper to get a TransactionId from a 64-bit xid with wraparound detection.
*
* It is an ERROR if the xid is in the future. Otherwise, returns true if
* the transaction is still new enough that we can determine whether it
* committed and false otherwise. If *extracted_xid is not NULL, it is set
* to the low 32 bits of the transaction ID (i.e. the actual XID, without the
* epoch).
*
* The caller must hold XactTruncationLock since it's dealing with arbitrary
* XIDs, and must continue to hold it until it's done with any clog lookups
* relating to those XIDs.
*/
static bool
TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
{
uint32 xid_epoch = EpochFromFullTransactionId(fxid);
TransactionId xid = XidFromFullTransactionId(fxid);
uint32 now_epoch;
TransactionId now_epoch_next_xid;
FullTransactionId now_fullxid;
now_fullxid = ReadNextFullTransactionId();
now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
now_epoch = EpochFromFullTransactionId(now_fullxid);
if (extracted_xid != NULL)
*extracted_xid = xid;
if (!TransactionIdIsValid(xid))
return false;
/* For non-normal transaction IDs, we can ignore the epoch. */
if (!TransactionIdIsNormal(xid))
return true;
/* If the transaction ID is in the future, throw an error. */
if (!FullTransactionIdPrecedes(fxid, now_fullxid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("transaction ID %llu is in the future",
(unsigned long long) U64FromFullTransactionId(fxid))));
/*
* ShmemVariableCache->oldestClogXid is protected by XactTruncationLock,
* but we don't acquire that lock here. Instead, we require the caller to
* acquire it, because the caller is presumably going to look up the
* returned XID. If we took and released the lock within this function, a
* CLOG truncation could occur before the caller finished with the XID.
*/
Assert(LWLockHeldByMe(XactTruncationLock));
/*
* If the transaction ID has wrapped around, it's definitely too old to
* determine the commit status. Otherwise, we can compare it to
* ShmemVariableCache->oldestClogXid to determine whether the relevant
* CLOG entry is guaranteed to still exist.
*/
if (xid_epoch + 1 < now_epoch
|| (xid_epoch + 1 == now_epoch && xid < now_epoch_next_xid)
|| TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
return false;
return true;
}
/*
* Convert a TransactionId obtained from a snapshot held by the caller to a
* FullTransactionId. Use next_fxid as a reference FullTransactionId, so that
* we can compute the high order bits. It must have been obtained by the
* caller with ReadNextFullTransactionId() after the snapshot was created.
*/
static FullTransactionId
widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
{
TransactionId next_xid = XidFromFullTransactionId(next_fxid);
uint32 epoch = EpochFromFullTransactionId(next_fxid);
/* Special transaction ID. */
if (!TransactionIdIsNormal(xid))
return FullTransactionIdFromEpochAndXid(0, xid);
/*
* The 64 bit result must be <= next_fxid, since next_fxid hadn't been
* issued yet when the snapshot was created. Every TransactionId in the
* snapshot must therefore be from the same epoch as next_fxid, or the
* epoch before. We know this because next_fxid is never allow to get
* more than one epoch ahead of the TransactionIds in any snapshot.
*/
if (xid > next_xid)
epoch--;
return FullTransactionIdFromEpochAndXid(epoch, xid);
}
/*
* txid comparator for qsort/bsearch
*/
static int
cmp_fxid(const void *aa, const void *bb)
{
FullTransactionId a = *(const FullTransactionId *) aa;
FullTransactionId b = *(const FullTransactionId *) bb;
if (FullTransactionIdPrecedes(a, b))
return -1;
if (FullTransactionIdPrecedes(b, a))
return 1;
return 0;
}
/*
* Sort a snapshot's txids, so we can use bsearch() later. Also remove
* any duplicates.
*
* For consistency of on-disk representation, we always sort even if bsearch
* will not be used.
*/
static void
sort_snapshot(pg_snapshot *snap)
{
if (snap->nxip > 1)
{
qsort(snap->xip, snap->nxip, sizeof(FullTransactionId), cmp_fxid);
snap->nxip = qunique(snap->xip, snap->nxip, sizeof(FullTransactionId),
cmp_fxid);
}
}
/*
* check fxid visibility.
*/
static bool
is_visible_fxid(FullTransactionId value, const pg_snapshot *snap)
{
if (FullTransactionIdPrecedes(value, snap->xmin))
return true;
else if (!FullTransactionIdPrecedes(value, snap->xmax))
return false;
#ifdef USE_BSEARCH_IF_NXIP_GREATER
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
{
void *res;
res = bsearch(&value, snap->xip, snap->nxip, sizeof(FullTransactionId),
cmp_fxid);
/* if found, transaction is still in progress */
return (res) ? false : true;
}
#endif
else
{
uint32 i;
for (i = 0; i < snap->nxip; i++)
{
if (FullTransactionIdEquals(value, snap->xip[i]))
return false;
}
return true;
}
}
/*
* helper functions to use StringInfo for pg_snapshot creation.
*/
static StringInfo
buf_init(FullTransactionId xmin, FullTransactionId xmax)
{
pg_snapshot snap;
StringInfo buf;
snap.xmin = xmin;
snap.xmax = xmax;
snap.nxip = 0;
buf = makeStringInfo();
appendBinaryStringInfo(buf, &snap, PG_SNAPSHOT_SIZE(0));
return buf;
}
static void
buf_add_txid(StringInfo buf, FullTransactionId fxid)
{
pg_snapshot *snap = (pg_snapshot *) buf->data;
/* do this before possible realloc */
snap->nxip++;
appendBinaryStringInfo(buf, &fxid, sizeof(fxid));
}
static pg_snapshot *
buf_finalize(StringInfo buf)
{
pg_snapshot *snap = (pg_snapshot *) buf->data;
SET_VARSIZE(snap, buf->len);
/* buf is not needed anymore */
buf->data = NULL;
pfree(buf);
return snap;
}
/*
* parse snapshot from cstring
*/
static pg_snapshot *
parse_snapshot(const char *str, Node *escontext)
{
FullTransactionId xmin;
FullTransactionId xmax;
FullTransactionId last_val = InvalidFullTransactionId;
FullTransactionId val;
const char *str_start = str;
char *endp;
StringInfo buf;
xmin = FullTransactionIdFromU64(strtou64(str, &endp, 10));
if (*endp != ':')
goto bad_format;
str = endp + 1;
xmax = FullTransactionIdFromU64(strtou64(str, &endp, 10));
if (*endp != ':')
goto bad_format;
str = endp + 1;
/* it should look sane */
if (!FullTransactionIdIsValid(xmin) ||
!FullTransactionIdIsValid(xmax) ||
FullTransactionIdPrecedes(xmax, xmin))
goto bad_format;
/* allocate buffer */
buf = buf_init(xmin, xmax);
/* loop over values */
while (*str != '\0')
{
/* read next value */
val = FullTransactionIdFromU64(strtou64(str, &endp, 10));
str = endp;
/* require the input to be in order */
if (FullTransactionIdPrecedes(val, xmin) ||
FullTransactionIdFollowsOrEquals(val, xmax) ||
FullTransactionIdPrecedes(val, last_val))
goto bad_format;
/* skip duplicates */
if (!FullTransactionIdEquals(val, last_val))
buf_add_txid(buf, val);
last_val = val;
if (*str == ',')
str++;
else if (*str != '\0')
goto bad_format;
}
return buf_finalize(buf);
bad_format:
ereturn(escontext, NULL,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"pg_snapshot", str_start)));
}
/*
* pg_current_xact_id() returns xid8
*
* Return the current toplevel full transaction ID.
* If the current transaction does not have one, one is assigned.
*/
Datum
pg_current_xact_id(PG_FUNCTION_ARGS)
{
/*
* Must prevent during recovery because if an xid is not assigned we try
* to assign one, which would fail. Programs already rely on this function
* to always return a valid current xid, so we should not change this to
* return NULL or similar invalid xid.
*/
PreventCommandDuringRecovery("pg_current_xact_id()");
PG_RETURN_FULLTRANSACTIONID(GetTopFullTransactionId());
}
/*
* Same as pg_current_xact_id() but doesn't assign a new xid if there
* isn't one yet.
*/
Datum
pg_current_xact_id_if_assigned(PG_FUNCTION_ARGS)
{
FullTransactionId topfxid = GetTopFullTransactionIdIfAny();
if (!FullTransactionIdIsValid(topfxid))
PG_RETURN_NULL();
PG_RETURN_FULLTRANSACTIONID(topfxid);
}
/*
* pg_current_snapshot() returns pg_snapshot
*
* Return current snapshot
*
* Note that only top-transaction XIDs are included in the snapshot.
*/
Datum
pg_current_snapshot(PG_FUNCTION_ARGS)
{
pg_snapshot *snap;
uint32 nxip,
i;
Snapshot cur;
FullTransactionId next_fxid = ReadNextFullTransactionId();
cur = GetActiveSnapshot();
if (cur == NULL)
elog(ERROR, "no active snapshot set");
/* allocate */
nxip = cur->xcnt;
snap = palloc(PG_SNAPSHOT_SIZE(nxip));
/* fill */
snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
snap->nxip = nxip;
for (i = 0; i < nxip; i++)
snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
/*
* We want them guaranteed to be in ascending order. This also removes
* any duplicate xids. Normally, an XID can only be assigned to one
* backend, but when preparing a transaction for two-phase commit, there
* is a transient state when both the original backend and the dummy
* PGPROC entry reserved for the prepared transaction hold the same XID.
*/
sort_snapshot(snap);
/* set size after sorting, because it may have removed duplicate xips */
SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(snap->nxip));
PG_RETURN_POINTER(snap);
}
/*
* pg_snapshot_in(cstring) returns pg_snapshot
*
* input function for type pg_snapshot
*/
Datum
pg_snapshot_in(PG_FUNCTION_ARGS)
{
char *str = PG_GETARG_CSTRING(0);
pg_snapshot *snap;
snap = parse_snapshot(str, fcinfo->context);
PG_RETURN_POINTER(snap);
}
/*
* pg_snapshot_out(pg_snapshot) returns cstring
*
* output function for type pg_snapshot
*/
Datum
pg_snapshot_out(PG_FUNCTION_ARGS)
{
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData str;
uint32 i;
initStringInfo(&str);
appendStringInfo(&str, UINT64_FORMAT ":",
U64FromFullTransactionId(snap->xmin));
appendStringInfo(&str, UINT64_FORMAT ":",
U64FromFullTransactionId(snap->xmax));
for (i = 0; i < snap->nxip; i++)
{
if (i > 0)
appendStringInfoChar(&str, ',');
appendStringInfo(&str, UINT64_FORMAT,
U64FromFullTransactionId(snap->xip[i]));
}
PG_RETURN_CSTRING(str.data);
}
/*
* pg_snapshot_recv(internal) returns pg_snapshot
*
* binary input function for type pg_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum
pg_snapshot_recv(PG_FUNCTION_ARGS)
{
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
pg_snapshot *snap;
FullTransactionId last = InvalidFullTransactionId;
int nxip;
int i;
FullTransactionId xmin;
FullTransactionId xmax;
/* load and validate nxip */
nxip = pq_getmsgint(buf, 4);
if (nxip < 0 || nxip > PG_SNAPSHOT_MAX_NXIP)
goto bad_format;
xmin = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
xmax = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
if (!FullTransactionIdIsValid(xmin) ||
!FullTransactionIdIsValid(xmax) ||
FullTransactionIdPrecedes(xmax, xmin))
goto bad_format;
snap = palloc(PG_SNAPSHOT_SIZE(nxip));
snap->xmin = xmin;
snap->xmax = xmax;
for (i = 0; i < nxip; i++)
{
FullTransactionId cur =
FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
if (FullTransactionIdPrecedes(cur, last) ||
FullTransactionIdPrecedes(cur, xmin) ||
FullTransactionIdPrecedes(xmax, cur))
goto bad_format;
/* skip duplicate xips */
if (FullTransactionIdEquals(cur, last))
{
i--;
nxip--;
continue;
}
snap->xip[i] = cur;
last = cur;
}
snap->nxip = nxip;
SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(nxip));
PG_RETURN_POINTER(snap);
bad_format:
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("invalid external pg_snapshot data")));
PG_RETURN_POINTER(NULL); /* keep compiler quiet */
}
/*
* pg_snapshot_send(pg_snapshot) returns bytea
*
* binary output function for type pg_snapshot
*
* format: int4 nxip, u64 xmin, u64 xmax, u64 xip...
*/
Datum
pg_snapshot_send(PG_FUNCTION_ARGS)
{
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData buf;
uint32 i;
pq_begintypsend(&buf);
pq_sendint32(&buf, snap->nxip);
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmin));
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmax));
for (i = 0; i < snap->nxip; i++)
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xip[i]));
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*
* pg_visible_in_snapshot(xid8, pg_snapshot) returns bool
*
* is txid visible in snapshot ?
*/
Datum
pg_visible_in_snapshot(PG_FUNCTION_ARGS)
{
FullTransactionId value = PG_GETARG_FULLTRANSACTIONID(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(1);
PG_RETURN_BOOL(is_visible_fxid(value, snap));
}
/*
* pg_snapshot_xmin(pg_snapshot) returns xid8
*
* return snapshot's xmin
*/
Datum
pg_snapshot_xmin(PG_FUNCTION_ARGS)
{
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
PG_RETURN_FULLTRANSACTIONID(snap->xmin);
}
/*
* pg_snapshot_xmax(pg_snapshot) returns xid8
*
* return snapshot's xmax
*/
Datum
pg_snapshot_xmax(PG_FUNCTION_ARGS)
{
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
PG_RETURN_FULLTRANSACTIONID(snap->xmax);
}
/*
* pg_snapshot_xip(pg_snapshot) returns setof xid8
*
* return in-progress xid8s in snapshot.
*/
Datum
pg_snapshot_xip(PG_FUNCTION_ARGS)
{
FuncCallContext *fctx;
pg_snapshot *snap;
FullTransactionId value;
/* on first call initialize fctx and get copy of snapshot */
if (SRF_IS_FIRSTCALL())
{
pg_snapshot *arg = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
fctx = SRF_FIRSTCALL_INIT();
/* make a copy of user snapshot */
snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
memcpy(snap, arg, VARSIZE(arg));
fctx->user_fctx = snap;
}
/* return values one-by-one */
fctx = SRF_PERCALL_SETUP();
snap = fctx->user_fctx;
if (fctx->call_cntr < snap->nxip)
{
value = snap->xip[fctx->call_cntr];
SRF_RETURN_NEXT(fctx, FullTransactionIdGetDatum(value));
}
else
{
SRF_RETURN_DONE(fctx);
}
}
/*
* Report the status of a recent transaction ID, or null for wrapped,
* truncated away or otherwise too old XIDs.
*
* The passed epoch-qualified xid is treated as a normal xid, not a
* multixact id.
*
* If it points to a committed subxact the result is the subxact status even
* though the parent xact may still be in progress or may have aborted.
*/
Datum
pg_xact_status(PG_FUNCTION_ARGS)
{
const char *status;
FullTransactionId fxid = PG_GETARG_FULLTRANSACTIONID(0);
TransactionId xid;
/*
* We must protect against concurrent truncation of clog entries to avoid
* an I/O error on SLRU lookup.
*/
LWLockAcquire(XactTruncationLock, LW_SHARED);
if (TransactionIdInRecentPast(fxid, &xid))
{
Assert(TransactionIdIsValid(xid));
/*
* Like when doing visiblity checks on a row, check whether the
* transaction is still in progress before looking into the CLOG.
* Otherwise we would incorrectly return "committed" for a transaction
* that is committing and has already updated the CLOG, but hasn't
* removed its XID from the proc array yet. (See comment on that race
* condition at the top of heapam_visibility.c)
*/
if (TransactionIdIsInProgress(xid))
status = "in progress";
else if (TransactionIdDidCommit(xid))
status = "committed";
else
{
/* it must have aborted or crashed */
status = "aborted";
}
}
else
{
status = NULL;
}
LWLockRelease(XactTruncationLock);
if (status == NULL)
PG_RETURN_NULL();
else
PG_RETURN_TEXT_P(cstring_to_text(status));
}