Bruce Momjian 92288a1cf9 Change made to elog:
o  Change all current CVS messages of NOTICE to WARNING.  We were going
to do this just before 7.3 beta but it has to be done now, as you will
see below.

o Change current INFO messages that should be controlled by
client_min_messages to NOTICE.

o Force remaining INFO messages, like from EXPLAIN, VACUUM VERBOSE, etc.
to always go to the client.

o Remove INFO from the client_min_messages options and add NOTICE.

Seems we do need three non-ERROR elog levels to handle the various
behaviors we need for these messages.

Regression passed.
2002-03-06 06:10:59 +00:00

935 lines
23 KiB
C

/*-------------------------------------------------------------------------
*
* sequence.c
* PostgreSQL sequences support code.
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.70 2002/03/06 06:09:35 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/int8.h"
#ifdef MULTIBYTE
#include "mb/pg_wchar.h"
#endif
#define SEQ_MAGIC 0x1717
#ifndef INT64_IS_BUSTED
#ifdef HAVE_LL_CONSTANTS
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL)
#else
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFF)
#endif
#else /* INT64_IS_BUSTED */
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFF)
#endif /* INT64_IS_BUSTED */
#define SEQ_MINVALUE (-SEQ_MAXVALUE)
/*
* We don't want to log each fetching of a value from a sequence,
* so we pre-log a few fetches in advance. In the event of
* crash we can lose as much as we pre-logged.
*/
#define SEQ_LOG_VALS 32
typedef struct sequence_magic
{
uint32 magic;
} sequence_magic;
typedef struct SeqTableData
{
char *name;
Oid relid;
Relation rel; /* NULL if rel is not open in cur xact */
int64 cached;
int64 last;
int64 increment;
struct SeqTableData *next;
} SeqTableData;
typedef SeqTableData *SeqTable;
static SeqTable seqtab = NULL;
static char *get_seq_name(text *seqin);
static SeqTable init_sequence(char *caller, char *name);
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
static int64 get_param(DefElem *def);
static void do_setval(char *seqname, int64 next, bool iscalled);
/*
* DefineSequence
* Creates a new sequence relation
*/
void
DefineSequence(CreateSeqStmt *seq)
{
FormData_pg_sequence new;
CreateStmt *stmt = makeNode(CreateStmt);
ColumnDef *coldef;
TypeName *typnam;
Relation rel;
Buffer buf;
PageHeader page;
sequence_magic *sm;
HeapTuple tuple;
TupleDesc tupDesc;
Datum value[SEQ_COL_LASTCOL];
char null[SEQ_COL_LASTCOL];
int i;
NameData name;
/* Check and set values */
init_params(seq, &new);
/*
* Create relation (and fill *null & *value)
*/
stmt->tableElts = NIL;
for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
{
typnam = makeNode(TypeName);
typnam->setof = FALSE;
typnam->arrayBounds = NULL;
typnam->typmod = -1;
coldef = makeNode(ColumnDef);
coldef->typename = typnam;
coldef->raw_default = NULL;
coldef->cooked_default = NULL;
coldef->is_not_null = false;
null[i - 1] = ' ';
switch (i)
{
case SEQ_COL_NAME:
typnam->name = "name";
coldef->colname = "sequence_name";
namestrcpy(&name, seq->seqname);
value[i - 1] = NameGetDatum(&name);
break;
case SEQ_COL_LASTVAL:
typnam->name = "int8";
coldef->colname = "last_value";
value[i - 1] = Int64GetDatumFast(new.last_value);
break;
case SEQ_COL_INCBY:
typnam->name = "int8";
coldef->colname = "increment_by";
value[i - 1] = Int64GetDatumFast(new.increment_by);
break;
case SEQ_COL_MAXVALUE:
typnam->name = "int8";
coldef->colname = "max_value";
value[i - 1] = Int64GetDatumFast(new.max_value);
break;
case SEQ_COL_MINVALUE:
typnam->name = "int8";
coldef->colname = "min_value";
value[i - 1] = Int64GetDatumFast(new.min_value);
break;
case SEQ_COL_CACHE:
typnam->name = "int8";
coldef->colname = "cache_value";
value[i - 1] = Int64GetDatumFast(new.cache_value);
break;
case SEQ_COL_LOG:
typnam->name = "int8";
coldef->colname = "log_cnt";
value[i - 1] = Int64GetDatum((int64) 1);
break;
case SEQ_COL_CYCLE:
typnam->name = "bool";
coldef->colname = "is_cycled";
value[i - 1] = BoolGetDatum(new.is_cycled);
break;
case SEQ_COL_CALLED:
typnam->name = "bool";
coldef->colname = "is_called";
value[i - 1] = BoolGetDatum(false);
break;
}
stmt->tableElts = lappend(stmt->tableElts, coldef);
}
stmt->relname = seq->seqname;
stmt->inhRelnames = NIL;
stmt->constraints = NIL;
stmt->istemp = seq->istemp;
stmt->hasoids = false;
DefineRelation(stmt, RELKIND_SEQUENCE);
rel = heap_openr(seq->seqname, AccessExclusiveLock);
tupDesc = RelationGetDescr(rel);
/* Initialize first page of relation with special magic number */
buf = ReadBuffer(rel, P_NEW);
if (!BufferIsValid(buf))
elog(ERROR, "DefineSequence: ReadBuffer failed");
Assert(BufferGetBlockNumber(buf) == 0);
page = (PageHeader) BufferGetPage(buf);
PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
sm = (sequence_magic *) PageGetSpecialPointer(page);
sm->magic = SEQ_MAGIC;
/* hack: ensure heap_insert will insert on the just-created page */
rel->rd_targblock = 0;
/* Now form & insert sequence tuple */
tuple = heap_formtuple(tupDesc, value, null);
heap_insert(rel, tuple);
Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);
/*
* Two special hacks here:
*
* 1. Since VACUUM does not process sequences, we have to force the tuple
* to have xmin = FrozenTransactionId now. Otherwise it would become
* invisible to SELECTs after 2G transactions. It is okay to do this
* because if the current transaction aborts, no other xact will ever
* examine the sequence tuple anyway.
*
* 2. Even though heap_insert emitted a WAL log record, we have to emit
* an XLOG_SEQ_LOG record too, since (a) the heap_insert record will
* not have the right xmin, and (b) REDO of the heap_insert record
* would re-init page and sequence magic number would be lost. This
* means two log records instead of one :-(
*/
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
START_CRIT_SECTION();
{
/*
* Note that the "tuple" structure is still just a local tuple record
* created by heap_formtuple; its t_data pointer doesn't point at the
* disk buffer. To scribble on the disk buffer we need to fetch the
* item pointer. But do the same to the local tuple, since that will
* be the source for the WAL log record, below.
*/
ItemId itemId;
Item item;
itemId = PageGetItemId((Page) page, FirstOffsetNumber);
item = PageGetItem((Page) page, itemId);
((HeapTupleHeader) item)->t_xmin = FrozenTransactionId;
((HeapTupleHeader) item)->t_infomask |= HEAP_XMIN_COMMITTED;
tuple->t_data->t_xmin = FrozenTransactionId;
tuple->t_data->t_infomask |= HEAP_XMIN_COMMITTED;
}
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
/* We do not log first nextval call, so "advance" sequence here */
/* Note we are scribbling on local tuple, not the disk buffer */
newseq->is_called = true;
newseq->log_cnt = 0;
xlrec.node = rel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) tuple->t_data;
rdata[1].len = tuple->t_len;
rdata[1].next = NULL;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
heap_close(rel, NoLock);
}
Datum
nextval(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
char *seqname = get_seq_name(seqin);
SeqTable elm;
Buffer buf;
Form_pg_sequence seq;
int64 incby,
maxv,
minv,
cache,
log,
fetch,
last;
int64 result,
next,
rescnt = 0;
bool logit = false;
if (pg_aclcheck(seqname, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("nextval", seqname);
pfree(seqname);
if (elm->last != elm->cached) /* some numbers were cached */
{
elm->last += elm->increment;
PG_RETURN_INT64(elm->last);
}
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
last = next = result = seq->last_value;
incby = seq->increment_by;
maxv = seq->max_value;
minv = seq->min_value;
fetch = cache = seq->cache_value;
log = seq->log_cnt;
if (!seq->is_called)
{
rescnt++; /* last_value if not called */
fetch--;
log--;
}
if (log < fetch)
{
fetch = log = fetch - log + SEQ_LOG_VALS;
logit = true;
}
while (fetch) /* try to fetch cache [+ log ] numbers */
{
/*
* Check MAXVALUE for ascending sequences and MINVALUE for
* descending sequences
*/
if (incby > 0)
{
/* ascending sequence */
if ((maxv >= 0 && next > maxv - incby) ||
(maxv < 0 && next + incby > maxv))
{
if (rescnt > 0)
break; /* stop fetching */
if (!seq->is_cycled)
elog(ERROR, "%s.nextval: reached MAXVALUE (" INT64_FORMAT ")",
elm->name, maxv);
next = minv;
}
else
next += incby;
}
else
{
/* descending sequence */
if ((minv < 0 && next < minv - incby) ||
(minv >= 0 && next + incby < minv))
{
if (rescnt > 0)
break; /* stop fetching */
if (!seq->is_cycled)
elog(ERROR, "%s.nextval: reached MINVALUE (" INT64_FORMAT ")",
elm->name, minv);
next = maxv;
}
else
next += incby;
}
fetch--;
if (rescnt < cache)
{
log--;
rescnt++;
last = next;
if (rescnt == 1) /* if it's first result - */
result = next; /* it's what to return */
}
}
/* save info in local cache */
elm->last = result; /* last returned number */
elm->cached = last; /* last fetched number */
START_CRIT_SECTION();
if (logit)
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].next = NULL;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
if (fetch) /* not all numbers were fetched */
log -= fetch;
}
/* update on-disk data */
seq->last_value = last; /* last fetched number */
seq->is_called = true;
Assert(log >= 0);
seq->log_cnt = log; /* how much is logged */
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
PG_RETURN_INT64(result);
}
Datum
currval(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
char *seqname = get_seq_name(seqin);
SeqTable elm;
int64 result;
if (pg_aclcheck(seqname, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("currval", seqname);
if (elm->increment == 0) /* nextval/read_info were not called */
elog(ERROR, "%s.currval is not yet defined in this session",
seqname);
result = elm->last;
pfree(seqname);
PG_RETURN_INT64(result);
}
/*
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
*
* Note that the 3 arg version (which sets the is_called flag) is
* only for use in pg_dump, and setting the is_called flag may not
* work if multiple users are attached to the database and referencing
* the sequence (unlikely if pg_dump is restoring it).
*
* It is necessary to have the 3 arg version so that pg_dump can
* restore the state of a sequence exactly during data-only restores -
* it is the only way to clear the is_called flag in an existing
* sequence.
*/
static void
do_setval(char *seqname, int64 next, bool iscalled)
{
SeqTable elm;
Buffer buf;
Form_pg_sequence seq;
if (pg_aclcheck(seqname, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("setval", seqname);
seq = read_info("setval", elm, &buf); /* lock page' buffer and
* read tuple */
if ((next < seq->min_value) || (next > seq->max_value))
elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")",
seqname, next, seq->min_value, seq->max_value);
/* save info in local cache */
elm->last = next; /* last returned number */
elm->cached = next; /* last cached number (forget cached
* values) */
START_CRIT_SECTION();
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].next = NULL;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled;
seq->log_cnt = (iscalled) ? 0 : 1;
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.setval: WriteBuffer failed", seqname);
pfree(seqname);
}
/*
* Implement the 2 arg setval procedure.
* See do_setval for discussion.
*/
Datum
setval(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
int64 next = PG_GETARG_INT64(1);
char *seqname = get_seq_name(seqin);
do_setval(seqname, next, true);
PG_RETURN_INT64(next);
}
/*
* Implement the 3 arg setval procedure.
* See do_setval for discussion.
*/
Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
int64 next = PG_GETARG_INT64(1);
bool iscalled = PG_GETARG_BOOL(2);
char *seqname = get_seq_name(seqin);
do_setval(seqname, next, iscalled);
PG_RETURN_INT64(next);
}
/*
* Given a 'text' parameter to a sequence function, extract the actual
* sequence name. We downcase the name if it's not double-quoted,
* and truncate it if it's too long.
*
* This is a kluge, really --- should be able to write nextval(seqrel).
*/
static char *
get_seq_name(text *seqin)
{
char *rawname = DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(seqin)));
int rawlen = strlen(rawname);
char *seqname;
if (rawlen >= 2 &&
rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
{
/* strip off quotes, keep case */
rawname[rawlen - 1] = '\0';
seqname = pstrdup(rawname + 1);
pfree(rawname);
}
else
{
seqname = rawname;
/*
* It's important that this match the identifier downcasing code
* used by backend/parser/scan.l.
*/
for (; *rawname; rawname++)
{
if (isupper((unsigned char) *rawname))
*rawname = tolower((unsigned char) *rawname);
}
}
/* Truncate name if it's overlength; again, should match scan.l */
if (strlen(seqname) >= NAMEDATALEN)
{
#ifdef MULTIBYTE
int len;
len = pg_mbcliplen(seqname, rawlen, NAMEDATALEN - 1);
seqname[len] = '\0';
#else
seqname[NAMEDATALEN - 1] = '\0';
#endif
}
return seqname;
}
static Form_pg_sequence
read_info(char *caller, SeqTable elm, Buffer *buf)
{
PageHeader page;
ItemId lp;
HeapTupleData tuple;
sequence_magic *sm;
Form_pg_sequence seq;
if (elm->rel->rd_nblocks > 1)
elog(ERROR, "%s.%s: invalid number of blocks in sequence",
elm->name, caller);
*buf = ReadBuffer(elm->rel, 0);
if (!BufferIsValid(*buf))
elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
page = (PageHeader) BufferGetPage(*buf);
sm = (sequence_magic *) PageGetSpecialPointer(page);
if (sm->magic != SEQ_MAGIC)
elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsUsed(lp));
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
elm->increment = seq->increment_by;
return seq;
}
static SeqTable
init_sequence(char *caller, char *name)
{
SeqTable elm,
prev = (SeqTable) NULL;
Relation seqrel;
/* Look to see if we already have a seqtable entry for name */
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
{
if (strcmp(elm->name, name) == 0)
break;
prev = elm;
}
/* If so, and if it's already been opened in this xact, just return it */
if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
return elm;
/* Else open and check it */
seqrel = heap_openr(name, AccessShareLock);
if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
elog(ERROR, "%s.%s: %s is not a sequence", name, caller, name);
if (elm != (SeqTable) NULL)
{
/*
* We are using a seqtable entry left over from a previous xact;
* must check for relid change.
*/
elm->rel = seqrel;
if (RelationGetRelid(seqrel) != elm->relid)
{
elog(WARNING, "%s.%s: sequence was re-created",
name, caller);
elm->relid = RelationGetRelid(seqrel);
elm->cached = elm->last = elm->increment = 0;
}
}
else
{
/*
* Time to make a new seqtable entry. These entries live as long
* as the backend does, so we use plain malloc for them.
*/
elm = (SeqTable) malloc(sizeof(SeqTableData));
if (elm == NULL)
elog(ERROR, "Memory exhausted in init_sequence");
elm->name = strdup(name);
if (elm->name == NULL)
elog(ERROR, "Memory exhausted in init_sequence");
elm->rel = seqrel;
elm->relid = RelationGetRelid(seqrel);
elm->cached = elm->last = elm->increment = 0;
elm->next = (SeqTable) NULL;
if (seqtab == (SeqTable) NULL)
seqtab = elm;
else
prev->next = elm;
}
return elm;
}
/*
* CloseSequences
* is called by xact mgr at commit/abort.
*/
void
CloseSequences(void)
{
SeqTable elm;
Relation rel;
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
{
rel = elm->rel;
if (rel != (Relation) NULL) /* opened in current xact */
{
elm->rel = (Relation) NULL;
heap_close(rel, AccessShareLock);
}
}
}
static void
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
{
DefElem *last_value = NULL;
DefElem *increment_by = NULL;
DefElem *max_value = NULL;
DefElem *min_value = NULL;
DefElem *cache_value = NULL;
List *option;
new->is_cycled = false;
foreach(option, seq->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "increment") == 0)
increment_by = defel;
else if (strcmp(defel->defname, "start") == 0)
last_value = defel;
else if (strcmp(defel->defname, "maxvalue") == 0)
max_value = defel;
else if (strcmp(defel->defname, "minvalue") == 0)
min_value = defel;
else if (strcmp(defel->defname, "cache") == 0)
cache_value = defel;
else if (strcmp(defel->defname, "cycle") == 0)
{
if (defel->arg != (Node *) NULL)
elog(ERROR, "DefineSequence: CYCLE ??");
new->is_cycled = true;
}
else
elog(ERROR, "DefineSequence: option \"%s\" not recognized",
defel->defname);
}
if (increment_by == (DefElem *) NULL) /* INCREMENT BY */
new->increment_by = 1;
else if ((new->increment_by = get_param(increment_by)) == 0)
elog(ERROR, "DefineSequence: can't INCREMENT by 0");
if (max_value == (DefElem *) NULL) /* MAXVALUE */
{
if (new->increment_by > 0)
new->max_value = SEQ_MAXVALUE; /* ascending seq */
else
new->max_value = -1; /* descending seq */
}
else
new->max_value = get_param(max_value);
if (min_value == (DefElem *) NULL) /* MINVALUE */
{
if (new->increment_by > 0)
new->min_value = 1; /* ascending seq */
else
new->min_value = SEQ_MINVALUE; /* descending seq */
}
else
new->min_value = get_param(min_value);
if (new->min_value >= new->max_value)
elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")",
new->min_value, new->max_value);
if (last_value == (DefElem *) NULL) /* START WITH */
{
if (new->increment_by > 0)
new->last_value = new->min_value; /* ascending seq */
else
new->last_value = new->max_value; /* descending seq */
}
else
new->last_value = get_param(last_value);
if (new->last_value < new->min_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")",
new->last_value, new->min_value);
if (new->last_value > new->max_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be > MAXVALUE (" INT64_FORMAT ")",
new->last_value, new->max_value);
if (cache_value == (DefElem *) NULL) /* CACHE */
new->cache_value = 1;
else if ((new->cache_value = get_param(cache_value)) <= 0)
elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0",
new->cache_value);
}
static int64
get_param(DefElem *def)
{
if (def->arg == (Node *) NULL)
elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
if (IsA(def->arg, Integer))
return (int64) intVal(def->arg);
/*
* Values too large for int4 will be represented as Float constants by
* the lexer. Accept these if they are valid int8 strings.
*/
if (IsA(def->arg, Float))
return DatumGetInt64(DirectFunctionCall1(int8in,
CStringGetDatum(strVal(def->arg))));
/* Shouldn't get here unless parser messed up */
elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
return 0; /* not reached; keep compiler quiet */
}
void
seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
Relation reln;
Buffer buffer;
Page page;
char *item;
Size itemsz;
xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
sequence_magic *sm;
if (info != XLOG_SEQ_LOG)
elog(PANIC, "seq_redo: unknown op code %u", info);
reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, 0);
if (!BufferIsValid(buffer))
elog(PANIC, "seq_redo: can't read block of %u/%u",
xlrec->node.tblNode, xlrec->node.relNode);
page = (Page) BufferGetPage(buffer);
/* Always reinit the page and reinstall the magic number */
/* See comments in DefineSequence */
PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
sm = (sequence_magic *) PageGetSpecialPointer(page);
sm->magic = SEQ_MAGIC;
item = (char *) xlrec + sizeof(xl_seq_rec);
itemsz = record->xl_len - sizeof(xl_seq_rec);
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) item, itemsz,
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(PANIC, "seq_redo: failed to add item to page");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
void
seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
seq_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
xl_seq_rec *xlrec = (xl_seq_rec *) rec;
if (info == XLOG_SEQ_LOG)
strcat(buf, "log: ");
else
{
strcat(buf, "UNKNOWN");
return;
}
sprintf(buf + strlen(buf), "node %u/%u",
xlrec->node.tblNode, xlrec->node.relNode);
}