pgindent new GIST index code, per request from Tom.

This commit is contained in:
Bruce Momjian 2005-09-22 20:44:36 +00:00
parent 08817bdb76
commit b3364fc81b
7 changed files with 1531 additions and 1194 deletions

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.125 2005/06/30 17:52:13 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.126 2005/09/22 20:44:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -92,8 +92,8 @@ gistbuild(PG_FUNCTION_ARGS)
Buffer buffer;
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
* We expect to be called exactly once for any index relation. If that's
* not the case, big trouble's what we have.
*/
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
@ -105,7 +105,8 @@ gistbuild(PG_FUNCTION_ARGS)
/* initialize the root page */
buffer = gistNewBuffer(index);
GISTInitBuffer(buffer, F_LEAF);
if ( !index->rd_istemp ) {
if (!index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData rdata;
Page page;
@ -124,7 +125,8 @@ gistbuild(PG_FUNCTION_ARGS)
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
} else
}
else
PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
LockBuffer(buffer, GIST_UNLOCK);
WriteBuffer(buffer);
@ -132,9 +134,10 @@ gistbuild(PG_FUNCTION_ARGS)
/* build the index */
buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
buildstate.indtuples = 0;
/*
* create a temporary memory context that is reset once for each
* tuple inserted into the index
* create a temporary memory context that is reset once for each tuple
* inserted into the index
*/
buildstate.tmpCtx = createTempGistContext();
@ -195,11 +198,11 @@ gistbuildCallback(Relation index,
itup->t_tid = htup->t_self;
/*
* Since we already have the index relation locked, we call
* gistdoinsert directly. Normal access method calls dispatch through
* gistinsert, which locks the relation for write. This is the right
* thing to do if you're inserting single tups, but not when you're
* initializing the whole index at once.
* Since we already have the index relation locked, we call gistdoinsert
* directly. Normal access method calls dispatch through gistinsert,
* which locks the relation for write. This is the right thing to do if
* you're inserting single tups, but not when you're initializing the
* whole index at once.
*/
gistdoinsert(index, itup, &buildstate->giststate);
@ -221,6 +224,7 @@ gistinsert(PG_FUNCTION_ARGS)
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
@ -296,17 +300,18 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
}
static bool
gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
{
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
if (!is_leaf)
/*
* This node's key has been modified, either because a child
* split occurred or because we needed to adjust our key for
* an insert in a child node. Therefore, remove the old
* version of this node's key.
* This node's key has been modified, either because a child split
* occurred or because we needed to adjust our key for an insert in a
* child node. Therefore, remove the old version of this node's key.
*/
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
@ -316,8 +321,10 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
/* no space for insertion */
IndexTuple *itvec,
*newitup;
int tlen,olen;
SplitedPageLayout *dist=NULL, *ptr;
int tlen,
olen;
SplitedPageLayout *dist = NULL,
*ptr;
is_splitted = true;
itvec = gistextractbuffer(state->stack->buffer, &tlen);
@ -325,7 +332,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
if ( !state->r->rd_istemp ) {
if (!state->r->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
@ -336,16 +344,20 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
while(ptr) {
while (ptr)
{
PageSetLSN(BufferGetPage(ptr->buffer), recptr);
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
ptr = ptr->next;
}
END_CRIT_SECTION();
} else {
}
else
{
ptr = dist;
while(ptr) {
while (ptr)
{
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
ptr = ptr->next;
}
@ -354,12 +366,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
state->itup = newitup;
state->ituplen = tlen; /* now tlen >= 2 */
if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
if (state->stack->blkno == GIST_ROOT_BLKNO)
{
gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
state->needInsertComplete = false;
ptr = dist;
while(ptr) {
while (ptr)
{
Page page = (Page) BufferGetPage(ptr->buffer);
GistPageGetOpaque(page)->rightlink = (ptr->next) ?
ptr->next->block.blkno : InvalidBlockNumber;
GistPageGetOpaque(page)->nsn = PageGetLSN(page);
@ -367,7 +382,9 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
WriteBuffer(ptr->buffer);
ptr = ptr->next;
}
} else {
}
else
{
Page page;
BlockNumber rightrightlink = InvalidBlockNumber;
SplitedPageLayout *ourpage = NULL;
@ -375,10 +392,13 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
GISTPageOpaque opaque;
/* move origpage to first in chain */
if ( dist->block.blkno != state->stack->blkno ) {
if (dist->block.blkno != state->stack->blkno)
{
ptr = dist;
while(ptr->next) {
if ( ptr->next->block.blkno == state->stack->blkno ) {
while (ptr->next)
{
if (ptr->next->block.blkno == state->stack->blkno)
{
ourpage = ptr->next;
ptr->next = ptr->next->next;
ourpage->next = dist;
@ -388,7 +408,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
ptr = ptr->next;
}
Assert(ourpage != NULL);
} else
}
else
ourpage = dist;
@ -400,11 +421,13 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
opaque->nsn = PageGetLSN(page);
opaque->rightlink = ourpage->next->block.blkno;
/* fills and write all new pages.
They isn't linked into tree yet */
/*
* fills and write all new pages. They isn't linked into tree yet
*/
ptr = ourpage->next;
while(ptr) {
while (ptr)
{
page = (Page) BufferGetPage(ptr->buffer);
GistPageGetOpaque(page)->rightlink = (ptr->next) ?
ptr->next->block.blkno : rightrightlink;
@ -427,12 +450,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
oldlsn = PageGetLSN(state->stack->page);
if ( !state->r->rd_istemp ) {
OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
XLogRecPtr recptr;
XLogRecData *rdata;
if ( !is_leaf ) {
if (!is_leaf)
{
/* only on inner page we should delete previous version */
offs[0] = state->stack->childoffnum;
noffs = 1;
@ -449,30 +475,38 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
PageSetTLI(state->stack->page, ThisTimeLineID);
END_CRIT_SECTION();
} else
}
else
PageSetLSN(state->stack->page, XLogRecPtrForTemp);
if (state->stack->blkno == GIST_ROOT_BLKNO)
state->needInsertComplete = false;
WriteNoReleaseBuffer(state->stack->buffer);
if (!is_leaf) /* small optimization: inform scan ablout deleting... */
if (!is_leaf) /* small optimization: inform scan ablout
* deleting... */
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
state->stack->childoffnum, PageGetLSN(state->stack->page), oldlsn);
if (state->ituplen > 1)
{ /* previous is_splitted==true */
/*
* child was splited, so we must form union for insertion in
* parent
*/
IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
state->itup[0] = newtup;
state->ituplen = 1;
} else if (is_leaf) {
/* itup[0] store key to adjust parent, we set it to valid
to correct check by GistTupleIsInvalid macro in gistgetadjusted() */
}
else if (is_leaf)
{
/*
* itup[0] store key to adjust parent, we set it to valid to
* correct check by GistTupleIsInvalid macro in gistgetadjusted()
*/
ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
GistTupleSetValid(state->itup[0]);
}
@ -492,10 +526,13 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
IndexTuple idxtuple;
GISTPageOpaque opaque;
/* walk down, We don't lock page for a long time, but so
we should be ready to recheck path in a bad case...
We remember, that page->lsn should never be invalid. */
while( true ) {
/*
* walk down, We don't lock page for a long time, but so we should be
* ready to recheck path in a bad case... We remember, that page->lsn
* should never be invalid.
*/
while (true)
{
if (XLogRecPtrIsInvalid(state->stack->lsn))
state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
@ -508,8 +545,12 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn));
if (state->stack->blkno != GIST_ROOT_BLKNO &&
XLByteLT( state->stack->parent->lsn, opaque->nsn) ) {
/* caused split non-root page is detected, go up to parent to choose best child */
XLByteLT(state->stack->parent->lsn, opaque->nsn))
{
/*
* caused split non-root page is detected, go up to parent to
* choose best child
*/
LockBuffer(state->stack->buffer, GIST_UNLOCK);
ReleaseBuffer(state->stack->buffer);
state->stack = state->stack->parent;
@ -520,13 +561,12 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
if (!GistPageIsLeaf(state->stack->page))
{
/*
* This is an internal page, so continue to walk down the
* tree. We find the child node that has the minimum insertion
* penalty and recursively invoke ourselves to modify that
* node. Once the recursive call returns, we may need to
* adjust the parent node for two reasons: the child node
* split, or the key in this node needs to be adjusted for the
* newly inserted key below us.
* This is an internal page, so continue to walk down the tree. We
* find the child node that has the minimum insertion penalty and
* recursively invoke ourselves to modify that node. Once the
* recursive call returns, we may need to adjust the parent node
* for two reasons: the child node split, or the key in this node
* needs to be adjusted for the newly inserted key below us.
*/
GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
@ -542,28 +582,43 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
if (state->stack)
state->stack->child = item;
state->stack = item;
} else {
}
else
{
/* be carefull, during unlock/lock page may be changed... */
LockBuffer(state->stack->buffer, GIST_UNLOCK);
LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
state->stack->page = (Page) BufferGetPage(state->stack->buffer);
opaque = GistPageGetOpaque(state->stack->page);
if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
/* the only page can become inner instead of leaf is a root page,
so for root we should recheck it */
if ( !GistPageIsLeaf(state->stack->page) ) {
/* very rarely situation: during unlock/lock index
with number of pages = 1 was increased */
if (state->stack->blkno == GIST_ROOT_BLKNO)
{
/*
* the only page can become inner instead of leaf is a root
* page, so for root we should recheck it
*/
if (!GistPageIsLeaf(state->stack->page))
{
/*
* very rarely situation: during unlock/lock index with
* number of pages = 1 was increased
*/
LockBuffer(state->stack->buffer, GIST_UNLOCK);
continue;
}
/* we don't need to check root split, because checking
leaf/inner is enough to recognize split for root */
} else if ( XLByteLT( state->stack->parent->lsn, opaque->nsn) ) {
/* detecting split during unlock/lock, so we should
find better child on parent*/
/*
* we don't need to check root split, because checking
* leaf/inner is enough to recognize split for root
*/
}
else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
{
/*
* detecting split during unlock/lock, so we should find
* better child on parent
*/
/* forget buffer */
LockBuffer(state->stack->buffer, GIST_UNLOCK);
@ -587,8 +642,10 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
* Should have the same interface as XLogReadBuffer
*/
static Buffer
gistReadAndLockBuffer( Relation r, BlockNumber blkno ) {
gistReadAndLockBuffer(Relation r, BlockNumber blkno)
{
Buffer buffer = ReadBuffer(r, blkno);
LockBuffer(buffer, GIST_SHARE);
return buffer;
}
@ -601,23 +658,29 @@ gistReadAndLockBuffer( Relation r, BlockNumber blkno ) {
* returns from the begining of closest parent;
*/
GISTInsertStack *
gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, BlockNumber) ) {
gistFindPath(Relation r, BlockNumber child, Buffer (*myReadBuffer) (Relation, BlockNumber))
{
Page page;
Buffer buffer;
OffsetNumber i, maxoff;
OffsetNumber i,
maxoff;
ItemId iid;
IndexTuple idxtuple;
GISTInsertStack *top, *tail, *ptr;
GISTInsertStack *top,
*tail,
*ptr;
BlockNumber blkno;
top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
top->blkno = GIST_ROOT_BLKNO;
while( top && top->blkno != child ) {
while (top && top->blkno != child)
{
buffer = myReadBuffer(r, top->blkno); /* buffer locked */
page = (Page) BufferGetPage(buffer);
if ( GistPageIsLeaf(page) ) {
if (GistPageIsLeaf(page))
{
/* we can safety go away, follows only leaf pages */
LockBuffer(buffer, GIST_UNLOCK);
ReleaseBuffer(buffer);
@ -627,7 +690,8 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, B
top->lsn = PageGetLSN(page);
if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */) {
GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* page splited while we thinking of... */
ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
ptr->blkno = GistPageGetOpaque(page)->rightlink;
@ -640,25 +704,32 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, B
maxoff = PageGetMaxOffsetNumber(page);
for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) {
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
if ( blkno == child ) {
if (blkno == child)
{
OffsetNumber poff = InvalidOffsetNumber;
/* make childs links */
ptr = top;
while( ptr->parent ) {
while (ptr->parent)
{
/* set child link */
ptr->parent->child = ptr;
/* move childoffnum.. */
if ( ptr == top ) {
if (ptr == top)
{
/* first iteration */
poff = ptr->parent->childoffnum;
ptr->parent->childoffnum = ptr->childoffnum;
} else {
}
else
{
OffsetNumber tmp = ptr->parent->childoffnum;
ptr->parent->childoffnum = poff;
poff = tmp;
}
@ -668,11 +739,14 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, B
LockBuffer(buffer, GIST_UNLOCK);
ReleaseBuffer(buffer);
return top;
} else {
}
else
{
/* Install next inner page to the end of stack */
ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
ptr->blkno = blkno;
ptr->childoffnum = i; /* set offsetnumber of child to child !!! */
ptr->childoffnum = i; /* set offsetnumber of child to child
* !!! */
ptr->parent = top;
ptr->next = NULL;
tail->next = ptr;
@ -694,7 +768,8 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, B
*/
static void
gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
gistFindCorrectParent(Relation r, GISTInsertStack *child)
{
GISTInsertStack *parent = child->parent;
LockBuffer(parent->buffer, GIST_EXCLUSIVE);
@ -702,19 +777,24 @@ gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
/* here we don't need to distinguish between split and page update */
if ( parent->childoffnum == InvalidOffsetNumber || !XLByteEQ( parent->lsn, PageGetLSN(parent->page) ) ) {
if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
{
/* parent is changed, look child in right links until found */
OffsetNumber i, maxoff;
OffsetNumber i,
maxoff;
ItemId iid;
IndexTuple idxtuple;
GISTInsertStack *ptr;
while(true) {
while (true)
{
maxoff = PageGetMaxOffsetNumber(parent->page);
for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) {
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(parent->page, i);
idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
if ( ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno ) {
if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
{
/* yes!!, found */
parent->childoffnum = i;
return;
@ -725,19 +805,26 @@ gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
LockBuffer(parent->buffer, GIST_UNLOCK);
ReleaseBuffer(parent->buffer);
if (parent->blkno == InvalidBlockNumber)
/* end of chain and still didn't found parent,
It's very-very rare situation when root splited */
/*
* end of chain and still didn't found parent, It's very-very
* rare situation when root splited
*/
break;
parent->buffer = ReadBuffer(r, parent->blkno);
LockBuffer(parent->buffer, GIST_EXCLUSIVE);
parent->page = (Page) BufferGetPage(parent->buffer);
}
/* awful!!, we need search tree to find parent ... ,
but before we should release all old parent */
/*
* awful!!, we need search tree to find parent ... , but before we
* should release all old parent
*/
ptr = child->parent->parent; /* child->parent already released above */
while(ptr) {
ptr = child->parent->parent; /* child->parent already released
* above */
while (ptr)
{
ReleaseBuffer(ptr->buffer);
ptr = ptr->parent;
}
@ -747,7 +834,8 @@ gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
Assert(ptr != NULL);
/* read all buffers as supposed in caller */
while( ptr ) {
while (ptr)
{
ptr->buffer = ReadBuffer(r, ptr->blkno);
ptr->page = (Page) BufferGetPage(ptr->buffer);
ptr = ptr->parent;
@ -765,22 +853,28 @@ gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
}
void
gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
{
int is_splitted;
ItemId iid;
IndexTuple oldtup, newtup;
IndexTuple oldtup,
newtup;
/* walk up */
while( true ) {
while (true)
{
/*
* After this call: 1. if child page was splited, then itup
* contains keys for each page 2. if child page wasn't splited,
* then itup contains additional for adjustment of current key
* After this call: 1. if child page was splited, then itup contains
* keys for each page 2. if child page wasn't splited, then itup
* contains additional for adjustment of current key
*/
if ( state->stack->parent ) {
/* X-lock parent page before proceed child,
gistFindCorrectParent should find and lock it */
if (state->stack->parent)
{
/*
* X-lock parent page before proceed child, gistFindCorrectParent
* should find and lock it
*/
gistFindCorrectParent(state->r, state->stack);
}
is_splitted = gistplacetopage(state, giststate);
@ -796,7 +890,10 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
if (!state->stack)
break;
/* child did not split, so we can check is it needed to update parent tuple */
/*
* child did not split, so we can check is it needed to update parent
* tuple
*/
if (!is_splitted)
{
/* parent's tuple */
@ -804,7 +901,8 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
if (!newtup) { /* not need to update key */
if (!newtup)
{ /* not need to update key */
LockBuffer(state->stack->buffer, GIST_UNLOCK);
break;
}
@ -814,7 +912,8 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
} /* while */
/* release all parent buffers */
while( state->stack ) {
while (state->stack)
{
ReleaseBuffer(state->stack->buffer);
state->stack = state->stack->parent;
}
@ -825,7 +924,8 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
}
static void
gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) {
gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
{
int i;
for (i = 0; i < len; i++)
@ -856,7 +956,8 @@ gistSplit(Relation r,
GISTPageOpaque opaque;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i, fakeoffset,
int i,
fakeoffset,
nlen;
OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup;
@ -867,8 +968,8 @@ gistSplit(Relation r,
/*
* The root of the tree is the first block in the relation. If we're
* about to split the root, we need to do some hocus-pocus to enforce
* this guarantee.
* about to split the root, we need to do some hocus-pocus to enforce this
* guarantee.
*/
if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
{
@ -901,7 +1002,8 @@ gistSplit(Relation r,
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(p) && GistTupleIsInvalid( itup[i - 1] )) {
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
{
entryvec->n--;
/* remember position of invalid tuple */
realoffset[entryvec->n] = i;
@ -918,30 +1020,34 @@ gistSplit(Relation r,
}
/*
* if it was invalid tuple then we need special processing. If
* it's possible, we move all invalid tuples on right page.
* We should remember, that union with invalid tuples
* is a invalid tuple.
* if it was invalid tuple then we need special processing. If it's
* possible, we move all invalid tuples on right page. We should remember,
* that union with invalid tuples is a invalid tuple.
*/
if ( entryvec->n != *len + 1 ) {
if (entryvec->n != *len + 1)
{
lencleaneditup = entryvec->n - 1;
cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
for (i = 1; i < entryvec->n; i++)
cleaneditup[i - 1] = itup[realoffset[i] - 1];
if ( gistnospace( left, cleaneditup, lencleaneditup ) ) {
if (gistnospace(left, cleaneditup, lencleaneditup))
{
/* no space on left to put all good tuples, so picksplit */
gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
v.spl_leftvalid = true;
v.spl_rightvalid = false;
gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
gistToRealOffset(v.spl_right, v.spl_nright, realoffset);
} else {
}
else
{
/* we can try to store all valid tuples on one page */
v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
if ( lencleaneditup==0 ) {
if (lencleaneditup == 0)
{
/* all tuples are invalid, so moves half of its to right */
v.spl_leftvalid = v.spl_rightvalid = false;
v.spl_nright = 0;
@ -951,9 +1057,13 @@ gistSplit(Relation r,
v.spl_left[v.spl_nleft++] = i;
else
v.spl_right[v.spl_nright++] = i;
} else {
/* we will not call gistUserPicksplit, just put good
tuples on left and invalid on right */
}
else
{
/*
* we will not call gistUserPicksplit, just put good tuples on
* left and invalid on right
*/
v.spl_nleft = lencleaneditup;
v.spl_nright = 0;
for (i = 1; i < entryvec->n; i++)
@ -968,7 +1078,9 @@ gistSplit(Relation r,
v.spl_rightvalid = false;
}
}
} else {
}
else
{
/* there is no invalid tuples, so usial processing */
gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
v.spl_leftvalid = v.spl_rightvalid = true;
@ -986,7 +1098,8 @@ gistSplit(Relation r,
rvectup[i] = itup[v.spl_right[i] - 1];
/* place invalid tuples on right page if itsn't done yet */
for (fakeoffset = entryvec->n; fakeoffset < *len+1 && lencleaneditup; fakeoffset++) {
for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
{
rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
}
@ -1008,7 +1121,8 @@ gistSplit(Relation r,
(*dist)->block.num = v.spl_nright;
(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
ptr = (char *) ((*dist)->list);
for(i=0;i<v.spl_nright;i++) {
for (i = 0; i < v.spl_nright; i++)
{
memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
ptr += IndexTupleSize(rvectup[i]);
}
@ -1043,7 +1157,8 @@ gistSplit(Relation r,
(*dist)->block.num = v.spl_nleft;
(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
ptr = (char *) ((*dist)->list);
for(i=0;i<v.spl_nleft;i++) {
for (i = 0; i < v.spl_nleft; i++)
{
memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
ptr += IndexTupleSize(lvectup[i]);
}
@ -1076,7 +1191,8 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
GISTInitBuffer(buffer, 0);
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
if ( !r->rd_istemp ) {
if (!r->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
@ -1090,7 +1206,8 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
} else
}
else
PageSetLSN(page, XLogRecPtrForTemp);
}
@ -1136,4 +1253,3 @@ freeGISTstate(GISTSTATE *giststate)
{
/* no work */
}

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.51 2005/09/22 20:44:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -26,18 +26,22 @@ static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
OffsetNumber offset);
static void
killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
{
Buffer buffer = so->curbuf;
for(;;) {
for (;;)
{
Page p;
BlockNumber blkno;
OffsetNumber offset, maxoff;
OffsetNumber offset,
maxoff;
LockBuffer(buffer, GIST_SHARE);
p = (Page) BufferGetPage(buffer);
if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
if (buffer == so->curbuf && XLByteEQ(so->stack->lsn, PageGetLSN(p)))
{
/* page unchanged, so all is simple */
offset = ItemPointerGetOffsetNumber(iptr);
PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
@ -48,10 +52,12 @@ killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
maxoff = PageGetMaxOffsetNumber(p);
for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) {
for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
{
IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) {
if (ItemPointerEquals(&(ituple->t_tid), iptr))
{
/* found */
PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(buffer);
@ -63,9 +69,10 @@ killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
}
/* follow right link */
/*
* ??? is it good? if tuple dropped by concurrent vacuum,
* we will read all leaf pages...
* ??? is it good? if tuple dropped by concurrent vacuum, we will read
* all leaf pages...
*/
blkno = GistPageGetOpaque(p)->rightlink;
LockBuffer(buffer, GIST_UNLOCK);
@ -94,16 +101,16 @@ gistgettuple(PG_FUNCTION_ARGS)
so = (GISTScanOpaque) scan->opaque;
/*
* If we have produced an index tuple in the past and the executor
* has informed us we need to mark it as "killed", do so now.
* If we have produced an index tuple in the past and the executor has
* informed us we need to mark it as "killed", do so now.
*/
if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
killtuple(scan->indexRelation, so, &(scan->currentItemData));
/*
* Get the next tuple that matches the search key. If asked to
* skip killed tuples, continue looping until we find a non-killed
* tuple that matches the search key.
* Get the next tuple that matches the search key. If asked to skip killed
* tuples, continue looping until we find a non-killed tuple that matches
* the search key.
*/
res = (gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples)) ? true : false;
@ -154,11 +161,14 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
stk->next = NULL;
stk->block = GIST_ROOT_BLKNO;
} else if ( so->curbuf == InvalidBuffer ) {
}
else if (so->curbuf == InvalidBuffer)
{
return 0;
}
for(;;) {
for (;;)
{
/* First of all, we need lock buffer */
Assert(so->curbuf != InvalidBuffer);
LockBuffer(so->curbuf, GIST_SHARE);
@ -166,7 +176,8 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
opaque = GistPageGetOpaque(p);
resetoffset = false;
if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
if (XLogRecPtrIsInvalid(so->stack->lsn) || !XLByteEQ(so->stack->lsn, PageGetLSN(p)))
{
/* page changed from last visit or visit first time , reset offset */
so->stack->lsn = PageGetLSN(p);
resetoffset = true;
@ -175,7 +186,9 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
XLByteLT(so->stack->parentlsn, opaque->nsn) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
(so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) {
(so->stack->next == NULL || so->stack->next->block != opaque->rightlink) /* check if already
added */ )
{
/* detect page split, follow right link to add pages */
stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
@ -188,13 +201,15 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
}
/* if page is empty, then just skip it */
if ( PageIsEmpty(p) ) {
if (PageIsEmpty(p))
{
LockBuffer(so->curbuf, GIST_UNLOCK);
stk = so->stack->next;
pfree(so->stack);
so->stack = stk;
if (so->stack == NULL) {
if (so->stack == NULL)
{
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
return ntids;
@ -231,9 +246,9 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
if (!OffsetNumberIsValid(n))
{
/*
* We ran out of matching index entries on the current
* page, so pop the top stack entry and use it to continue
* the search.
* We ran out of matching index entries on the current page,
* so pop the top stack entry and use it to continue the
* search.
*/
LockBuffer(so->curbuf, GIST_UNLOCK);
stk = so->stack->next;
@ -259,19 +274,21 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
{
/*
* We've found a matching index entry in a leaf page, so
* return success. Note that we keep "curbuf" pinned so
* that we can efficiently resume the index scan later.
* return success. Note that we keep "curbuf" pinned so that
* we can efficiently resume the index scan later.
*/
ItemPointerSet(&(scan->currentItemData),
BufferGetBlockNumber(so->curbuf), n);
if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) {
if (!(ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n))))
{
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
ntids++;
if ( ntids == maxtids ) {
if (ntids == maxtids)
{
LockBuffer(so->curbuf, GIST_UNLOCK);
return ntids;
}
@ -334,7 +351,8 @@ gistindex_keytest(IndexTuple tuple,
IncrIndexProcessed();
/*
* Tuple doesn't restore after crash recovery because of inclomplete insert
* Tuple doesn't restore after crash recovery because of inclomplete
* insert
*/
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple))
return true;
@ -366,13 +384,12 @@ gistindex_keytest(IndexTuple tuple,
FALSE, isNull);
/*
* Call the Consistent function to evaluate the test. The
* arguments are the index datum (as a GISTENTRY*), the comparison
* datum, and the comparison operator's strategy number and
* subtype from pg_amop.
* Call the Consistent function to evaluate the test. The arguments
* are the index datum (as a GISTENTRY*), the comparison datum, and
* the comparison operator's strategy number and subtype from pg_amop.
*
* (Presently there's no need to pass the subtype since it'll always
* be zero, but might as well pass it for possible future use.)
* (Presently there's no need to pass the subtype since it'll always be
* zero, but might as well pass it for possible future use.)
*/
test = FunctionCall4(&key->sk_func,
PointerGetDatum(&de),
@ -410,15 +427,15 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
maxoff = PageGetMaxOffsetNumber(p);
/*
* Make sure we're in a short-lived memory context when we invoke
* a user-supplied GiST method in gistindex_keytest(), so we don't
* leak memory
* Make sure we're in a short-lived memory context when we invoke a
* user-supplied GiST method in gistindex_keytest(), so we don't leak
* memory
*/
oldcxt = MemoryContextSwitchTo(so->tempCxt);
/*
* If we modified the index during the scan, we may have a pointer to
* a ghost tuple, before the scan. If this is the case, back up one.
* If we modified the index during the scan, we may have a pointer to a
* ghost tuple, before the scan. If this is the case, back up one.
*/
if (so->flags & GS_CURBEFORE)
{
@ -442,9 +459,8 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
MemoryContextReset(so->tempCxt);
/*
* If we found a matching entry, return its offset; otherwise
* return InvalidOffsetNumber to inform the caller to go to the
* next page.
* If we found a matching entry, return its offset; otherwise return
* InvalidOffsetNumber to inform the caller to go to the next page.
*/
if (n >= FirstOffsetNumber && n <= maxoff)
return n;

View File

@ -10,7 +10,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.1 2005/07/01 19:19:02 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.2 2005/09/22 20:44:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -621,8 +621,8 @@ gist_poly_consistent(PG_FUNCTION_ARGS)
/*
* Since the operators are marked lossy anyway, we can just use
* rtree_internal_consistent even at leaf nodes. (This works
* in part because the index entries are bounding boxes not polygons.)
* rtree_internal_consistent even at leaf nodes. (This works in part
* because the index entries are bounding boxes not polygons.)
*/
result = rtree_internal_consistent(DatumGetBoxP(entry->key),
&(query->boundbox), strategy);
@ -693,8 +693,8 @@ gist_circle_consistent(PG_FUNCTION_ARGS)
/*
* Since the operators are marked lossy anyway, we can just use
* rtree_internal_consistent even at leaf nodes. (This works
* in part because the index entries are bounding boxes not circles.)
* rtree_internal_consistent even at leaf nodes. (This works in part
* because the index entries are bounding boxes not circles.)
*/
bbox.high.x = query->center.x + query->radius;
bbox.low.x = query->center.x - query->radius;

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.60 2005/09/22 18:49:45 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.61 2005/09/22 20:44:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -120,11 +120,11 @@ gistrescan(PG_FUNCTION_ARGS)
scan->numberOfKeys * sizeof(ScanKeyData));
/*
* Modify the scan key so that all the Consistent method is
* called for all comparisons. The original operator is passed
* to the Consistent function in the form of its strategy
* number, which is available from the sk_strategy field, and
* its subtype from the sk_subtype field.
* Modify the scan key so that all the Consistent method is called for
* all comparisons. The original operator is passed to the Consistent
* function in the form of its strategy number, which is available
* from the sk_strategy field, and its subtype from the sk_subtype
* field.
*/
for (i = 0; i < scan->numberOfKeys; i++)
scan->keyData[i].sk_func = so->giststate->consistentFn[scan->keyData[i].sk_attno - 1];
@ -308,9 +308,9 @@ ReleaseResources_gist(void)
GISTScanList next;
/*
* Note: this should be a no-op during normal query shutdown. However,
* in an abort situation ExecutorEnd is not called and so there may be
* open index scans to clean up.
* Note: this should be a no-op during normal query shutdown. However, in
* an abort situation ExecutorEnd is not called and so there may be open
* index scans to clean up.
*/
prev = NULL;
@ -399,7 +399,8 @@ adjustiptr(IndexScanDesc scan,
{
case GISTOP_DEL:
/* back up one if we need to */
if (curoff >= offnum && XLByteEQ(stk->lsn, oldlsn) ) /* the same vesrion of page */
if (curoff >= offnum && XLByteEQ(stk->lsn, oldlsn)) /* the same vesrion of
* page */
{
if (curoff > FirstOffsetNumber)
{
@ -409,8 +410,7 @@ adjustiptr(IndexScanDesc scan,
else
{
/*
* remember that we're before the current
* tuple
* remember that we're before the current tuple
*/
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(scan->currentItemData))
@ -435,6 +435,7 @@ gistfreestack(GISTSearchStack *s)
while (s != NULL)
{
GISTSearchStack *p = s->next;
pfree(s);
s = p;
}

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.6 2005/09/22 18:49:45 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.7 2005/09/22 20:44:36 momjian Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@ -47,8 +47,7 @@
} while(0);
static void
gistpenalty(GISTSTATE *giststate, int attno,
static void gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty);
@ -155,6 +154,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
for (j = 0; j < len; j++)
{
bool IsNull;
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
@ -402,8 +402,8 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
int curid = 1;
/*
* first key is always not null (see gistinsert), so we may not check
* for nulls
* first key is always not null (see gistinsert), so we may not check for
* nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
{
@ -555,8 +555,7 @@ gistadjsubkey(Relation r,
}
/*
* add
* XXX: refactor this to avoid duplicating code
* add XXX: refactor this to avoid duplicating code
*/
if (lpenalty < rpenalty)
{
@ -644,7 +643,8 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(itup) ) {
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
{
ereport(LOG,
(errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
RelationGetRelationName(r))));
@ -776,6 +776,7 @@ gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
@ -816,10 +817,11 @@ GISTInitBuffer(Buffer b, uint32 f)
void
gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate) {
IndexTuple *itup, int len, GISTSTATE *giststate)
{
/*
* now let the user-defined picksplit function set up the split
* vector; in entryvec have no null value!!
* now let the user-defined picksplit function set up the split vector; in
* entryvec have no null value!!
*/
FunctionCall2(&giststate->picksplitFn[0],
PointerGetDatum(entryvec),
@ -837,8 +839,8 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
v->spl_risnull[0] = false;
/*
* if index is multikey, then we must to try get smaller bounding box
* for subkey(s)
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
if (r->rd_att->natts > 1)
{
@ -854,8 +856,8 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
gistunionsubkey(r, giststate, itup, v, false);
/*
* if possible, we insert equivalent tuples with control by
* penalty for a subkey(s)
* if possible, we insert equivalent tuples with control by penalty
* for a subkey(s)
*/
if (MaxGrpId > 1)
gistadjsubkey(r, itup, len, v, giststate);
@ -863,22 +865,29 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
}
Buffer
gistNewBuffer(Relation r) {
gistNewBuffer(Relation r)
{
Buffer buffer = InvalidBuffer;
bool needLock;
while(true) {
while (true)
{
BlockNumber blkno = GetFreeIndexPage(&r->rd_node);
if (blkno == InvalidBlockNumber)
break;
buffer = ReadBuffer(r, blkno);
if ( ConditionalLockBuffer(buffer) ) {
if (ConditionalLockBuffer(buffer))
{
Page page = BufferGetPage(buffer);
if ( GistPageIsDeleted( page ) ) {
if (GistPageIsDeleted(page))
{
GistPageSetNonDeleted(page);
return buffer;
} else
}
else
LockBuffer(buffer, GIST_UNLOCK);
}

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.8 2005/09/22 18:49:45 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.9 2005/09/22 20:44:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -29,47 +29,60 @@
static bool needFullVacuum = false;
typedef struct {
typedef struct
{
GISTSTATE giststate;
Relation index;
MemoryContext opCtx;
IndexBulkDeleteResult *result;
} GistVacuum;
typedef struct {
typedef struct
{
IndexTuple *itup;
int ituplen;
bool emptypage;
} ArrayTuple;
static ArrayTuple
gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
{
ArrayTuple res = {NULL, 0, false};
Buffer buffer;
Page page;
OffsetNumber i, maxoff;
OffsetNumber i,
maxoff;
ItemId iid;
int lenaddon=4, curlenaddon=0, ntodelete=0;
IndexTuple idxtuple, *addon=NULL;
int lenaddon = 4,
curlenaddon = 0,
ntodelete = 0;
IndexTuple idxtuple,
*addon = NULL;
bool needwrite = false;
OffsetNumber todelete[MaxOffsetNumber];
ItemPointerData *completed = NULL;
int ncompleted=0, lencompleted=16;
int ncompleted = 0,
lencompleted = 16;
buffer = ReadBuffer(gv->index, blkno);
page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page);
if ( GistPageIsLeaf(page) ) {
if ( GistTuplesDeleted(page) ) {
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
{
needunion = needwrite = true;
GistClearTuplesDeleted(page);
}
} else {
}
else
{
completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
ArrayTuple chldtuple;
bool needchildunion;
@ -83,14 +96,18 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
chldtuple = gistVacuumUpdate(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
needchildunion);
if ( chldtuple.ituplen || chldtuple.emptypage ) {
if (chldtuple.ituplen || chldtuple.emptypage)
{
PageIndexTupleDelete(page, i);
todelete[ntodelete++] = i;
i--; maxoff--;
i--;
maxoff--;
needwrite = needunion = true;
if ( chldtuple.ituplen ) {
while( curlenaddon + chldtuple.ituplen >= lenaddon ) {
if (chldtuple.ituplen)
{
while (curlenaddon + chldtuple.ituplen >= lenaddon)
{
lenaddon *= 2;
addon = (IndexTuple *) repalloc(addon, sizeof(IndexTuple) * lenaddon);
}
@ -99,15 +116,21 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
curlenaddon += chldtuple.ituplen;
if ( chldtuple.ituplen > 1 ) {
/* child was splitted, so we need mark completion insert(split) */
if (chldtuple.ituplen > 1)
{
/*
* child was splitted, so we need mark completion
* insert(split)
*/
int j;
while( ncompleted + chldtuple.ituplen > lencompleted ) {
while (ncompleted + chldtuple.ituplen > lencompleted)
{
lencompleted *= 2;
completed = (ItemPointerData *) repalloc(completed, sizeof(ItemPointerData) * lencompleted);
}
for(j=0;j<chldtuple.ituplen;j++) {
for (j = 0; j < chldtuple.ituplen; j++)
{
ItemPointerCopy(&(chldtuple.itup[j]->t_tid), completed + ncompleted);
ncompleted++;
}
@ -117,12 +140,15 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
}
}
if ( curlenaddon ) {
if (curlenaddon)
{
/* insert updated tuples */
if (gistnospace(page, addon, curlenaddon)) {
if (gistnospace(page, addon, curlenaddon))
{
/* there is no space on page to insert tuples */
IndexTuple *vec;
SplitedPageLayout *dist=NULL,*ptr;
SplitedPageLayout *dist = NULL,
*ptr;
int i;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
@ -132,16 +158,19 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
MemoryContextSwitchTo(oldCtx);
vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
for(i=0;i<res.ituplen;i++) {
for (i = 0; i < res.ituplen; i++)
{
vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
}
res.itup = vec;
if ( !gv->index->rd_istemp ) {
if (!gv->index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
ItemPointerData key; /* set key for incomplete insert */
ItemPointerData key; /* set key for incomplete
* insert */
char *xlinfo;
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
@ -154,7 +183,8 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
while(ptr) {
while (ptr)
{
PageSetLSN(BufferGetPage(ptr->buffer), recptr);
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
ptr = ptr->next;
@ -163,24 +193,30 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
} else {
}
else
{
ptr = dist;
while(ptr) {
while (ptr)
{
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
ptr = ptr->next;
}
}
ptr = dist;
while(ptr) {
while (ptr)
{
if (BufferGetBlockNumber(ptr->buffer) != blkno)
LockBuffer(ptr->buffer, GIST_UNLOCK);
WriteBuffer(ptr->buffer);
ptr = ptr->next;
}
if ( blkno == GIST_ROOT_BLKNO ) {
ItemPointerData key; /* set key for incomplete insert */
if (blkno == GIST_ROOT_BLKNO)
{
ItemPointerData key; /* set key for incomplete
* insert */
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
@ -196,27 +232,37 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
MemoryContextReset(gv->opCtx);
needunion = false; /* gistSplit already forms unions */
} else {
}
else
{
/* enough free space */
gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber);
}
}
}
if ( needunion ) {
if (needunion)
{
/* forms union for page or check empty */
if ( PageIsEmpty(page) ) {
if ( blkno == GIST_ROOT_BLKNO ) {
if (PageIsEmpty(page))
{
if (blkno == GIST_ROOT_BLKNO)
{
needwrite = true;
GistPageSetLeaf(page);
} else {
}
else
{
needwrite = true;
res.emptypage = true;
GistPageSetDeleted(page);
gv->result->pages_deleted++;
}
} else {
IndexTuple *vec, tmp;
}
else
{
IndexTuple *vec,
tmp;
int veclen = 0;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
@ -236,8 +282,10 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
}
}
if ( needwrite ) {
if ( !gv->index->rd_istemp ) {
if (needwrite)
{
if (!gv->index->rd_istemp)
{
XLogRecData *rdata;
XLogRecPtr recptr;
char *xlinfo;
@ -254,10 +302,12 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
pfree(xlinfo);
pfree(rdata);
} else
}
else
PageSetLSN(page, XLogRecPtrForTemp);
WriteBuffer(buffer);
} else
}
else
ReleaseBuffer(buffer);
if (ncompleted && !gv->index->rd_istemp)
@ -265,8 +315,10 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
for (i = 0; i < curlenaddon; i++)
pfree(addon[i]);
if (addon) pfree(addon);
if (completed) pfree(completed);
if (addon)
pfree(addon);
if (completed)
pfree(completed);
return res;
}
@ -278,17 +330,23 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
*/
Datum
gistvacuumcleanup(PG_FUNCTION_ARGS) {
gistvacuumcleanup(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
BlockNumber npages, blkno;
BlockNumber nFreePages, *freePages, maxFreePages;
BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO;
BlockNumber npages,
blkno;
BlockNumber nFreePages,
*freePages,
maxFreePages;
BlockNumber lastBlock = GIST_ROOT_BLKNO,
lastFilledBlock = GIST_ROOT_BLKNO;
bool needLock;
/* gistVacuumUpdate may cause hard work */
if ( info->vacuum_full ) {
if (info->vacuum_full)
{
GistVacuum gv;
ArrayTuple res;
@ -302,15 +360,18 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
/* walk through the entire index for update tuples */
res = gistVacuumUpdate(&gv, GIST_ROOT_BLKNO, false);
/* cleanup */
if (res.itup) {
if (res.itup)
{
int i;
for (i = 0; i < res.ituplen; i++)
pfree(res.itup[i]);
pfree(res.itup);
}
freeGISTstate(&(gv.giststate));
MemoryContextDelete(gv.opCtx);
} else if (needFullVacuum)
}
else if (needFullVacuum)
ereport(NOTICE,
(errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
RelationGetRelationName(rel))));
@ -334,29 +395,36 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
nFreePages = 0;
freePages = (BlockNumber *) palloc(sizeof(BlockNumber) * maxFreePages);
for(blkno=GIST_ROOT_BLKNO+1;blkno<npages;blkno++) {
for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++)
{
Buffer buffer = ReadBuffer(rel, blkno);
Page page;
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
if ( GistPageIsDeleted(page) ) {
if (nFreePages < maxFreePages) {
if (GistPageIsDeleted(page))
{
if (nFreePages < maxFreePages)
{
freePages[nFreePages] = blkno;
nFreePages++;
}
} else
}
else
lastFilledBlock = blkno;
LockBuffer(buffer, GIST_UNLOCK);
ReleaseBuffer(buffer);
}
lastBlock = npages - 1;
if ( info->vacuum_full && nFreePages>0 ) { /* try to truncate index */
if (info->vacuum_full && nFreePages > 0)
{ /* try to truncate index */
int i;
for (i = 0; i < nFreePages; i++)
if ( freePages[i] >= lastFilledBlock ) {
if (freePages[i] >= lastFilledBlock)
{
nFreePages = i;
break;
}
@ -383,22 +451,26 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
PG_RETURN_POINTER(stats);
}
typedef struct GistBDItem {
typedef struct GistBDItem
{
GistNSN parentlsn;
BlockNumber blkno;
struct GistBDItem *next;
} GistBDItem;
static void
pushStackIfSplited(Page page, GistBDItem *stack) {
pushStackIfSplited(Page page, GistBDItem *stack)
{
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
XLByteLT(stack->parentlsn, opaque->nsn) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ ) {
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = opaque->rightlink;
ptr->parentlsn = stack->parentlsn;
ptr->next = stack->next;
@ -416,12 +488,14 @@ pushStackIfSplited(Page page, GistBDItem *stack) {
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
gistbulkdelete(PG_FUNCTION_ARGS) {
gistbulkdelete(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
void *callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
GistBDItem *stack, *ptr;
GistBDItem *stack,
*ptr;
bool needLock;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
@ -429,17 +503,20 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
stack->blkno = GIST_ROOT_BLKNO;
needFullVacuum = false;
while( stack ) {
while (stack)
{
Buffer buffer = ReadBuffer(rel, stack->blkno);
Page page;
OffsetNumber i, maxoff;
OffsetNumber i,
maxoff;
IndexTuple idxtuple;
ItemId iid;
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
if ( GistPageIsLeaf(page) ) {
if (GistPageIsLeaf(page))
{
OffsetNumber todelete[MaxOffsetNumber];
int ntodelete = 0;
@ -447,7 +524,8 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
if ( stack->blkno==GIST_ROOT_BLKNO && !GistPageIsLeaf(page) ) {
if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page))
{
/* the only root can become non-leaf during relock */
LockBuffer(buffer, GIST_UNLOCK);
ReleaseBuffer(buffer);
@ -455,30 +533,39 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
continue;
}
/* check for split proceeded after look at parent,
we should check it after relock */
/*
* check for split proceeded after look at parent, we should check
* it after relock
*/
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
if ( callback(&(idxtuple->t_tid), callback_state) ) {
if (callback(&(idxtuple->t_tid), callback_state))
{
PageIndexTupleDelete(page, i);
todelete[ntodelete] = i;
i--; maxoff--; ntodelete++;
i--;
maxoff--;
ntodelete++;
result->tuples_removed += 1;
Assert(maxoff == PageGetMaxOffsetNumber(page));
} else
}
else
result->num_index_tuples += 1;
}
if ( ntodelete ) {
if (ntodelete)
{
GistMarkTuplesDeleted(page);
if (!rel->rd_istemp ) {
if (!rel->rd_istemp)
{
XLogRecData *rdata;
XLogRecPtr recptr;
gistxlogEntryUpdate *xlinfo;
@ -495,17 +582,21 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
pfree(xlinfo);
pfree(rdata);
} else
}
else
PageSetLSN(page, XLogRecPtrForTemp);
WriteNoReleaseBuffer(buffer);
}
} else {
}
else
{
/* check for split proceeded after look at parent */
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
@ -541,4 +632,3 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
PG_RETURN_POINTER(result);
}

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.8 2005/09/22 18:49:45 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.9 2005/09/22 20:44:36 momjian Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@ -23,26 +23,30 @@
#include "utils/memutils.h"
typedef struct {
typedef struct
{
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
OffsetNumber *todelete;
} EntryUpdateRecord;
typedef struct {
typedef struct
{
gistxlogPage *header;
IndexTuple *itup;
} NewPage;
typedef struct {
typedef struct
{
gistxlogPageSplit *data;
NewPage *page;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
typedef struct gistIncompleteInsert {
typedef struct gistIncompleteInsert
{
RelFileNode node;
BlockNumber origblkno; /* for splits */
ItemPointerData key;
@ -68,7 +72,8 @@ static List *incomplete_inserts;
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
PageSplitRecord *xlinfo /* to extract blkno info */ )
{
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
@ -76,12 +81,15 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
ninsert->key = key;
ninsert->lsn = lsn;
if ( lenblk && blkno ) {
if (lenblk && blkno)
{
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
ninsert->origblkno = *blkno;
} else {
}
else
{
int i;
Assert(xlinfo);
@ -98,13 +106,16 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
}
static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{
ListCell *l;
foreach(l, incomplete_inserts) {
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
{
/* found */
pfree(insert->blkno);
@ -116,21 +127,27 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
}
static void
decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int i=0, addpath=0;
decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int i = 0,
addpath = 0;
decoded->data = (gistxlogEntryUpdate *) begin;
if ( decoded->data->ntodelete ) {
if (decoded->data->ntodelete)
{
decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath);
addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
} else
}
else
decoded->todelete = NULL;
decoded->len = 0;
ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
while( ptr - begin < record->xl_len ) {
while (ptr - begin < record->xl_len)
{
decoded->len++;
ptr += IndexTupleSize((IndexTuple) ptr);
}
@ -138,7 +155,8 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
while( ptr - begin < record->xl_len ) {
while (ptr - begin < record->xl_len)
{
decoded->itup[i] = (IndexTuple) ptr;
ptr += IndexTupleSize(decoded->itup[i]);
i++;
@ -149,7 +167,8 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
* redo any page update (except page split)
*/
static void
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
EntryUpdateRecord xlrec;
Relation reln;
Buffer buffer;
@ -165,23 +184,29 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
elog(PANIC, "block %u unfound", xlrec.data->blkno);
page = (Page) BufferGetPage(buffer);
if ( isnewroot ) {
if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
if (isnewroot)
{
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
} else {
}
else
{
if (PageIsNew((PageHeader) page))
elog(PANIC, "uninitialized page %u", xlrec.data->blkno);
if (XLByteLE(lsn, PageGetLSN(page))) {
if (XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
}
if ( xlrec.data->isemptypage ) {
if (xlrec.data->isemptypage)
{
while (!PageIsEmpty(page))
PageIndexTupleDelete(page, FirstOffsetNumber);
@ -189,11 +214,15 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
GistPageSetLeaf(page);
else
GistPageSetDeleted(page);
} else {
}
else
{
if (isnewroot)
GISTInitBuffer(buffer, 0);
else if ( xlrec.data->ntodelete ) {
else if (xlrec.data->ntodelete)
{
int i;
for (i = 0; i < xlrec.data->ntodelete; i++)
PageIndexTupleDelete(page, xlrec.todelete[i]);
if (GistPageIsLeaf(page))
@ -204,8 +233,10 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
if (xlrec.len > 0)
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
/* special case: leafpage, nothing to insert, nothing to delete, then
vacuum marks page */
/*
* special case: leafpage, nothing to insert, nothing to delete, then
* vacuum marks page
*/
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
GistClearTuplesDeleted(page);
}
@ -216,7 +247,8 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if (ItemPointerIsValid(&(xlrec.data->key)))
{
if (incomplete_inserts != NIL)
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
@ -228,15 +260,19 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
}
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int j,i=0;
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int j,
i = 0;
decoded->data = (gistxlogPageSplit *) begin;
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
ptr = begin + sizeof(gistxlogPageSplit);
for(i=0;i<decoded->data->npage;i++) {
for (i = 0; i < decoded->data->npage; i++)
{
Assert(ptr - begin < record->xl_len);
decoded->page[i].header = (gistxlogPage *) ptr;
ptr += sizeof(gistxlogPage);
@ -244,7 +280,8 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
decoded->page[i].itup = (IndexTuple *)
palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
j = 0;
while(j<decoded->page[i].header->num) {
while (j < decoded->page[i].header->num)
{
Assert(ptr - begin < record->xl_len);
decoded->page[i].itup[j] = (IndexTuple) ptr;
ptr += IndexTupleSize((IndexTuple) ptr);
@ -254,7 +291,8 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
}
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
PageSplitRecord xlrec;
Relation reln;
Buffer buffer;
@ -280,7 +318,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
ReleaseBuffer(buffer);
/* loop around all pages */
for(i=0;i<xlrec.data->npage;i++) {
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
@ -289,7 +328,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
elog(PANIC, "block %u unfound", newpage->header->blkno);
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) {
if (XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
continue;
@ -307,7 +347,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
WriteBuffer(buffer);
}
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if (ItemPointerIsValid(&(xlrec.data->key)))
{
if (incomplete_inserts != NIL)
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
@ -318,7 +359,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
}
static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
@ -332,7 +374,8 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
elog(PANIC, "root block unfound");
page = (Page) BufferGetPage(buffer);
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
@ -347,14 +390,17 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
}
static void
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
gistxlogInsertComplete *xlrec;
xlrec = (gistxlogInsertComplete *) begin;
ptr = begin + sizeof(gistxlogInsertComplete);
while( ptr - begin < record->xl_len ) {
while (ptr - begin < record->xl_len)
{
Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
ptr += sizeof(ItemPointerData);
@ -367,8 +413,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info) {
switch (info)
{
case XLOG_GIST_ENTRY_UPDATE:
case XLOG_GIST_ENTRY_DELETE:
gistRedoEntryUpdateRecord(lsn, record, false);
@ -403,14 +451,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key)
}
static void
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec)
{
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u",
xlrec->blkno);
}
static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec)
{
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u splits to %d pages",
@ -422,7 +472,8 @@ gist_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
switch (info) {
switch (info)
{
case XLOG_GIST_ENTRY_UPDATE:
strcat(buf, "entry_update: ");
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
@ -456,9 +507,12 @@ gist_desc(char *buf, uint8 xl_info, char *rec)
}
IndexTuple
gist_form_invalid_tuple(BlockNumber blkno) {
/* we don't alloc space for null's bitmap, this is invalid tuple,
be carefull in read and write code */
gist_form_invalid_tuple(BlockNumber blkno)
{
/*
* we don't alloc space for null's bitmap, this is invalid tuple, be
* carefull in read and write code
*/
Size size = IndexInfoFindDataOffset(0);
IndexTuple tuple = (IndexTuple) palloc0(size);
@ -471,8 +525,10 @@ gist_form_invalid_tuple(BlockNumber blkno) {
}
static Buffer
gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno)
{
Buffer buffer = XLogReadBuffer(false, r, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", blkno);
if (PageIsNew((PageHeader) (BufferGetPage(buffer))))
@ -483,16 +539,20 @@ gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
static void
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
gixtxlogFindPath(Relation index, gistIncompleteInsert *insert)
{
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) {
if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL)
{
int i;
GISTInsertStack *ptr = top;
while(ptr) {
while (ptr)
{
insert->pathlen++;
ptr = ptr->parent;
}
@ -501,12 +561,14 @@ gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
i = 0;
ptr = top;
while(ptr) {
while (ptr)
{
insert->path[i] = ptr->blkno;
i++;
ptr = ptr->parent;
}
} else
}
else
elog(LOG, "lost parent for block %u", insert->origblkno);
}
@ -520,27 +582,34 @@ gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
* lesser than stored lsn then changes in parent doesn't do yet.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert) {
gistContinueInsert(gistIncompleteInsert *insert)
{
IndexTuple *itup;
int i, lenitup;
int i,
lenitup;
Relation index;
index = XLogOpenRelation(insert->node);
if (!RelationIsValid(index))
return;
/* needed vector itup never will be more than initial lenblkno+2,
because during this processing Indextuple can be only smaller */
/*
* needed vector itup never will be more than initial lenblkno+2, because
* during this processing Indextuple can be only smaller
*/
lenitup = insert->lenblk;
itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
for (i = 0; i < insert->lenblk; i++)
itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
if ( insert->origblkno==GIST_ROOT_BLKNO ) {
/*it was split root, so we should only make new root.
it can't be simple insert into root, look at call
pushIncompleteInsert in gistRedoPageSplitRecord */
if (insert->origblkno == GIST_ROOT_BLKNO)
{
/*
* it was split root, so we should only make new root. it can't be
* simple insert into root, look at call pushIncompleteInsert in
* gistRedoPageSplitRecord
*/
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
Page page;
@ -548,7 +617,8 @@ gistContinueInsert(gistIncompleteInsert *insert) {
elog(PANIC, "root block unfound");
page = BufferGetPage(buffer);
if (XLByteLE(insert->lsn, PageGetLSN(page))) {
if (XLByteLE(insert->lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
@ -561,7 +631,9 @@ gistContinueInsert(gistIncompleteInsert *insert) {
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
} else {
}
else
{
Buffer *buffers;
Page *pages;
int numbuffer;
@ -574,8 +646,12 @@ gistContinueInsert(gistIncompleteInsert *insert) {
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
for(i=0;i<insert->pathlen;i++) {
int j, k, pituplen=0, childfound=0;
for (i = 0; i < insert->pathlen; i++)
{
int j,
k,
pituplen = 0,
childfound = 0;
numbuffer = 1;
buffers[numbuffer - 1] = XLogReadBuffer(false, index, insert->path[i]);
@ -585,7 +661,8 @@ gistContinueInsert(gistIncompleteInsert *insert) {
if (PageIsNew((PageHeader) (pages[numbuffer - 1])))
elog(PANIC, "uninitialized page %u", insert->path[i]);
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) {
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
{
LockBuffer(buffers[numbuffer - 1], BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffers[numbuffer - 1]);
return;
@ -594,7 +671,8 @@ gistContinueInsert(gistIncompleteInsert *insert) {
pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]);
/* remove old IndexTuples */
for(j=0;j<pituplen && childfound<lenitup;j++) {
for (j = 0; j < pituplen && childfound < lenitup; j++)
{
BlockNumber blkno;
ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber);
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid);
@ -602,15 +680,18 @@ gistContinueInsert(gistIncompleteInsert *insert) {
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
for (k = 0; k < lenitup; k++)
if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
{
PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber);
j--; pituplen--;
j--;
pituplen--;
childfound++;
break;
}
}
if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
if (gistnospace(pages[numbuffer - 1], itup, lenitup))
{
/* no space left on page, so we should split */
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
if (!BufferIsValid(buffers[numbuffer]))
@ -620,10 +701,14 @@ gistContinueInsert(gistIncompleteInsert *insert) {
gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
numbuffer++;
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
{
IndexTuple *parentitup;
/* we split root, just copy tuples from old root to new page */
/*
* we split root, just copy tuples from old root to new
* page
*/
parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen);
/* sanity check */
@ -642,8 +727,10 @@ gistContinueInsert(gistIncompleteInsert *insert) {
/* fill root page */
GISTInitBuffer(buffers[0], 0);
for(j=1;j<numbuffer;j++) {
for (j = 1; j < numbuffer; j++)
{
IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
if (PageAddItem(pages[0],
(Item) tuple,
IndexTupleSize(tuple),
@ -653,11 +740,13 @@ gistContinueInsert(gistIncompleteInsert *insert) {
RelationGetRelationName(index));
}
}
} else
}
else
gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
lenitup = numbuffer;
for(j=0;j<numbuffer;j++) {
for (j = 0; j < numbuffer; j++)
{
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
PageSetLSN(pages[j], insert->lsn);
PageSetTLI(pages[j], ThisTimeLineID);
@ -675,7 +764,8 @@ gistContinueInsert(gistIncompleteInsert *insert) {
}
void
gist_xlog_startup(void) {
gist_xlog_startup(void)
{
incomplete_inserts = NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"GiST recovery temporary context",
@ -686,7 +776,8 @@ gist_xlog_startup(void) {
}
void
gist_xlog_cleanup(void) {
gist_xlog_cleanup(void)
{
ListCell *l;
List *reverse = NIL;
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
@ -697,8 +788,10 @@ gist_xlog_cleanup(void) {
reverse = lappend(reverse, lfirst(l));
MemoryContextSwitchTo(opCtx);
foreach(l, reverse) {
foreach(l, reverse)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
gistContinueInsert(insert);
MemoryContextReset(opCtx);
}
@ -711,15 +804,18 @@ gist_xlog_cleanup(void) {
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno,
ItemPointer key, SplitedPageLayout *dist ) {
ItemPointer key, SplitedPageLayout *dist)
{
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
int npage = 0, cur=1;
int npage = 0,
cur = 1;
ptr = dist;
while( ptr ) {
while (ptr)
{
npage++;
ptr = ptr->next;
}
@ -740,7 +836,8 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
rdata[0].next = NULL;
ptr = dist;
while(ptr) {
while (ptr)
{
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) &(ptr->block);
rdata[cur].len = sizeof(gistxlogPage);
@ -763,7 +860,8 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
XLogRecData *
formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key ) {
IndexTuple *itup, int ituplen, ItemPointer key)
{
XLogRecData *rdata;
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate));
@ -774,7 +872,8 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
else
ItemPointerSetInvalid(&(xlrec->key));
if ( emptypage ) {
if (emptypage)
{
xlrec->isemptypage = true;
xlrec->ntodelete = 0;
@ -783,8 +882,11 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
rdata->data = (char *) xlrec;
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
} else {
int cur=1,i;
}
else
{
int cur = 1,
i;
xlrec->isemptypage = false;
xlrec->ntodelete = ntodelete;
@ -796,7 +898,8 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
if ( ntodelete ) {
if (ntodelete)
{
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) todelete;
@ -806,7 +909,8 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
}
/* new tuples */
for(i=0;i<ituplen;i++) {
for (i = 0; i < ituplen; i++)
{
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) (itup[i]);
rdata[cur].len = IndexTupleSize(itup[i]);
@ -820,7 +924,8 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
}
XLogRecPtr
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
{
gistxlogInsertComplete xlrec;
XLogRecData rdata[2];
XLogRecPtr recptr;