From 474774918b4b55e774d2fcc1d7e94c8c632fadef Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Tue, 17 Jul 2007 05:02:03 +0000 Subject: [PATCH] Implement CREATE TABLE LIKE ... INCLUDING INDEXES. Patch from NikhilS, based in part on an earlier patch from Trevor Hardcastle, and reviewed by myself. --- doc/src/sgml/ref/create_table.sgml | 13 +- src/backend/bootstrap/bootparse.y | 6 +- src/backend/commands/indexcmds.c | 17 +- src/backend/commands/tablecmds.c | 3 +- src/backend/nodes/copyfuncs.c | 3 +- src/backend/nodes/equalfuncs.c | 3 +- src/backend/nodes/outfuncs.c | 3 +- src/backend/parser/parse_utilcmd.c | 640 +++++++++++++++++++------- src/backend/tcop/utility.c | 3 +- src/backend/utils/adt/ruleutils.c | 36 +- src/include/commands/defrem.h | 3 +- src/include/nodes/parsenodes.h | 3 +- src/include/utils/builtins.h | 4 +- src/test/regress/expected/inherit.out | 20 + src/test/regress/sql/inherit.sql | 15 + 15 files changed, 574 insertions(+), 198 deletions(-) diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index 064769cee0a..68e8f045e6a 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -1,5 +1,5 @@ @@ -23,7 +23,7 @@ PostgreSQL documentation CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } ] TABLE table_name ( [ { column_name data_type [ DEFAULT default_expr ] [ column_constraint [ ... ] ] | table_constraint - | LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ] ... } + | LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ] ... } [, ... ] ] ) [ INHERITS ( parent_table [, ... ] ) ] @@ -237,7 +237,7 @@ and table_constraint is: - LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ] + LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ] The LIKE clause specifies a table from which @@ -265,11 +265,16 @@ and table_constraint is: column constraints and table constraints — when constraints are requested, all check constraints are copied. + + Any indexes on the original table will not be created on the new + table, unless the INCLUDING INDEXES clause is + specified. + Note also that unlike INHERITS, copied columns and constraints are not merged with similarly named columns and constraints. If the same name is specified explicitly or in another - LIKE clause an error is signalled. + LIKE clause, an error is signalled. diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index ff2f7f70c3c..3fd29b7e971 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.88 2007/03/13 00:33:39 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.89 2007/07/17 05:02:00 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -252,7 +252,7 @@ Boot_DeclareIndexStmt: LexIDStr($8), NULL, $10, - NULL, NIL, + NULL, NIL, NULL, false, false, false, false, false, true, false, false); do_end(); @@ -270,7 +270,7 @@ Boot_DeclareUniqueIndexStmt: LexIDStr($9), NULL, $11, - NULL, NIL, + NULL, NIL, NULL, true, false, false, false, false, true, false, false); do_end(); diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 98dad737133..943662f8f8b 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.160 2007/06/23 22:12:50 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.161 2007/07/17 05:02:00 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -79,6 +79,8 @@ static bool relationHasPrimaryKey(Relation rel); * to index on. * 'predicate': the partial-index condition, or NULL if none. * 'options': reloptions from WITH (in list-of-DefElem form). + * 'src_options': reloptions from the source index, if this is a cloned + * index produced by CREATE TABLE LIKE ... INCLUDING INDEXES * 'unique': make the index enforce uniqueness. * 'primary': mark the index as a primary key in the catalogs. * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint, @@ -100,6 +102,7 @@ DefineIndex(RangeVar *heapRelation, List *attributeList, Expr *predicate, List *options, + char *src_options, bool unique, bool primary, bool isconstraint, @@ -392,9 +395,17 @@ DefineIndex(RangeVar *heapRelation, } /* - * Parse AM-specific options, convert to text array form, validate. + * Parse AM-specific options, convert to text array form, + * validate. The src_options introduced due to using indexes + * via the "CREATE LIKE INCLUDING INDEXES" statement also need to + * be merged here */ - reloptions = transformRelOptions((Datum) 0, options, false, false); + if (src_options) + reloptions = unflatten_reloptions(src_options); + else + reloptions = (Datum) 0; + + reloptions = transformRelOptions(reloptions, options, false, false); (void) index_reloptions(amoptions, reloptions, true); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 4bc2a25fcdd..07e56620428 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.229 2007/07/03 01:30:36 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.230 2007/07/17 05:02:00 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -3794,6 +3794,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->options, + stmt->src_options, stmt->unique, stmt->primary, stmt->isconstraint, diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index ec3d6168897..3bc6afe1df8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -15,7 +15,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.379 2007/06/11 22:22:40 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.380 2007/07/17 05:02:01 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -2192,6 +2192,7 @@ _copyIndexStmt(IndexStmt *from) COPY_STRING_FIELD(tableSpace); COPY_NODE_FIELD(indexParams); COPY_NODE_FIELD(options); + COPY_STRING_FIELD(src_options); COPY_NODE_FIELD(whereClause); COPY_SCALAR_FIELD(unique); COPY_SCALAR_FIELD(primary); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 114550f17da..317a5a29959 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -18,7 +18,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.310 2007/06/11 22:22:40 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.311 2007/07/17 05:02:01 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -1044,6 +1044,7 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b) COMPARE_STRING_FIELD(tableSpace); COMPARE_NODE_FIELD(indexParams); COMPARE_NODE_FIELD(options); + COMPARE_STRING_FIELD(src_options); COMPARE_NODE_FIELD(whereClause); COMPARE_SCALAR_FIELD(unique); COMPARE_SCALAR_FIELD(primary); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index d5d81eaae59..2d2b229c9e8 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.312 2007/07/17 01:21:43 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.313 2007/07/17 05:02:01 neilc Exp $ * * NOTES * Every node type that can appear in stored rules' parsetrees *must* @@ -1541,6 +1541,7 @@ _outIndexStmt(StringInfo str, IndexStmt *node) WRITE_STRING_FIELD(tableSpace); WRITE_NODE_FIELD(indexParams); WRITE_NODE_FIELD(options); + WRITE_STRING_FIELD(src_options); WRITE_NODE_FIELD(whereClause); WRITE_BOOL_FIELD(unique); WRITE_BOOL_FIELD(primary); diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 37822251943..f17ad480212 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -19,20 +19,23 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.1 2007/06/23 22:12:51 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.2 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "access/genam.h" #include "access/heapam.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" +#include "catalog/pg_opclass.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" @@ -47,6 +50,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/relcache.h" #include "utils/syscache.h" @@ -63,6 +67,7 @@ typedef struct List *ckconstraints; /* CHECK constraints */ List *fkconstraints; /* FOREIGN KEY constraints */ List *ixconstraints; /* index-creating constraints */ + List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */ List *blist; /* "before list" of things to do before * creating the table */ List *alist; /* "after list" of things to do after creating @@ -93,8 +98,13 @@ static void transformTableConstraint(ParseState *pstate, Constraint *constraint); static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, InhRelation *inhrelation); +static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, + Relation parent_index, AttrNumber *attmap); +static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt); +static IndexStmt *transformIndexConstraint(Constraint *constraint, + CreateStmtContext *cxt); static void transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, bool skipValidation, @@ -146,6 +156,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; + cxt.inh_indexes = NIL; cxt.blist = NIL; cxt.alist = NIL; cxt.pkey = NULL; @@ -555,11 +566,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } - if (including_indexes) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("LIKE INCLUDING INDEXES is not implemented"))); - /* * Insert the copied attributes into the cxt for the new table * definition. @@ -657,6 +663,35 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } + if (including_indexes && relation->rd_rel->relhasindex) + { + AttrNumber *attmap; + List *parent_indexes; + ListCell *l; + + attmap = varattnos_map_schema(tupleDesc, cxt->columns); + parent_indexes = RelationGetIndexList(relation); + + foreach(l, parent_indexes) + { + Oid parent_index_oid = lfirst_oid(l); + Relation parent_index; + IndexStmt *index_stmt; + + parent_index = index_open(parent_index_oid, AccessShareLock); + + /* Build CREATE INDEX statement to recreate the parent_index */ + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap); + + /* Add the new IndexStmt to the create context */ + cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); + + /* Keep our lock on the index till xact commit */ + index_close(parent_index, NoLock); + } + } + /* * Close the parent rel, but keep our AccessShareLock on it until xact * commit. That will prevent someone else from deleting or ALTERing the @@ -665,189 +700,255 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, heap_close(relation, NoLock); } +/* + * Generate an IndexStmt entry using information from an already + * existing index "source_idx". + * + * Note: Much of this functionality is cribbed from pg_get_indexdef. + */ +static IndexStmt * +generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, + AttrNumber *attmap) +{ + HeapTuple ht_idx; + HeapTuple ht_idxrel; + HeapTuple ht_am; + Form_pg_index idxrec; + Form_pg_class idxrelrec; + Form_pg_am amrec; + List *indexprs = NIL; + ListCell *indexpr_item; + Oid indrelid; + Oid source_relid; + int keyno; + Oid keycoltype; + Datum indclassDatum; + Datum indoptionDatum; + bool isnull; + oidvector *indclass; + int2vector *indoption; + IndexStmt *index; + Datum reloptions; + + source_relid = RelationGetRelid(source_idx); + + /* Fetch pg_index tuple for source index */ + ht_idx = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idx)) + elog(ERROR, "cache lookup failed for index %u", source_relid); + idxrec = (Form_pg_index) GETSTRUCT(ht_idx); + + Assert(source_relid == idxrec->indexrelid); + indrelid = idxrec->indrelid; + + index = makeNode(IndexStmt); + index->unique = idxrec->indisunique; + index->concurrent = false; + index->primary = idxrec->indisprimary; + index->relation = cxt->relation; + index->isconstraint = false; + + /* + * We don't try to preserve the name of the source index; instead, just + * let DefineIndex() choose a reasonable name. + */ + index->idxname = NULL; + + /* Must get indclass and indoption the hard way */ + indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indoption, &isnull); + Assert(!isnull); + indoption = (int2vector *) DatumGetPointer(indoptionDatum); + + /* Fetch pg_class tuple of source index */ + ht_idxrel = SearchSysCache(RELOID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idxrel)) + elog(ERROR, "cache lookup failed for relation %u", source_relid); + + /* + * Store the reloptions for later use by this new index + */ + reloptions = SysCacheGetAttr(RELOID, ht_idxrel, + Anum_pg_class_reloptions, &isnull); + if (!isnull) + index->src_options = flatten_reloptions(source_relid); + + idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); + + /* Fetch pg_am tuple for the index's access method */ + ht_am = SearchSysCache(AMOID, + ObjectIdGetDatum(idxrelrec->relam), + 0, 0, 0); + if (!HeapTupleIsValid(ht_am)) + elog(ERROR, "cache lookup failed for access method %u", + idxrelrec->relam); + amrec = (Form_pg_am) GETSTRUCT(ht_am); + index->accessMethod = pstrdup(NameStr(amrec->amname)); + + /* Get the index expressions, if any */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs)) + { + Datum exprsDatum; + bool isnull; + char *exprsString; + + exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indexprs, &isnull); + exprsString = DatumGetCString(DirectFunctionCall1(textout, + exprsDatum)); + Assert(!isnull); + indexprs = (List *) stringToNode(exprsString); + } + + indexpr_item = list_head(indexprs); + + for (keyno = 0; keyno < idxrec->indnatts; keyno++) + { + IndexElem *iparam; + AttrNumber attnum = idxrec->indkey.values[keyno]; + int16 opt = indoption->values[keyno]; + + iparam = makeNode(IndexElem); + + if (AttributeNumberIsValid(attnum)) + { + /* Simple index column */ + char *attname; + + attname = get_relid_attribute_name(indrelid, attnum); + keycoltype = get_atttype(indrelid, attnum); + + iparam->name = attname; + iparam->expr = NULL; + } + else + { + /* Expressional index */ + Node *indexkey; + + if (indexpr_item == NULL) + elog(ERROR, "too few entries in indexprs list"); + indexkey = (Node *) lfirst(indexpr_item); + change_varattnos_of_a_node(indexkey, attmap); + iparam->name = NULL; + iparam->expr = indexkey; + + indexpr_item = lnext(indexpr_item); + keycoltype = exprType(indexkey); + } + + /* Add the operator class name, if non-default */ + iparam->opclass = get_opclass(indclass->values[keyno], keycoltype); + + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + + /* Adjust options if necessary */ + if (amrec->amcanorder) + { + /* If it supports sort ordering, report DESC and NULLS opts */ + if (opt & INDOPTION_DESC) + iparam->ordering = SORTBY_DESC; + if (opt & INDOPTION_NULLS_FIRST) + iparam->nulls_ordering = SORTBY_NULLS_FIRST; + } + + index->indexParams = lappend(index->indexParams, iparam); + } + + /* Use the same tablespace as the source index */ + index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); + + /* If it's a partial index, decompile and append the predicate */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) + { + Datum pred_datum; + bool isnull; + char *pred_str; + + /* Convert text string to node tree */ + pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indpred, &isnull); + Assert(!isnull); + pred_str = DatumGetCString(DirectFunctionCall1(textout, + pred_datum)); + index->whereClause = (Node *) stringToNode(pred_str); + change_varattnos_of_a_node(index->whereClause, attmap); + } + + /* Clean up */ + ReleaseSysCache(ht_idx); + ReleaseSysCache(ht_idxrel); + ReleaseSysCache(ht_am); + + return index; +} + +/* + * get_opclass - fetch name of an index operator class + * + * If the opclass is the default for the given actual_datatype, then + * the return value is NIL. + */ +static List * +get_opclass(Oid opclass, Oid actual_datatype) +{ + HeapTuple ht_opc; + Form_pg_opclass opc_rec; + List *result = NIL; + + ht_opc = SearchSysCache(CLAOID, + ObjectIdGetDatum(opclass), + 0, 0, 0); + if (!HeapTupleIsValid(ht_opc)) + elog(ERROR, "cache lookup failed for opclass %u", opclass); + opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc); + + if (!OidIsValid(actual_datatype) || + GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) + { + char *nsp_name = get_namespace_name(opc_rec->opcnamespace); + char *opc_name = NameStr(opc_rec->opcname); + + result = list_make2(makeString(nsp_name), makeString(opc_name)); + } + + ReleaseSysCache(ht_opc); + return result; +} + + /* * transformIndexConstraints - * Handle UNIQUE and PRIMARY KEY constraints, which create indexes + * Handle UNIQUE and PRIMARY KEY constraints, which create + * indexes. We also merge index definitions arising from + * LIKE ... INCLUDING INDEXES. */ static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { IndexStmt *index; List *indexlist = NIL; - ListCell *listptr; - ListCell *l; + ListCell *lc; /* * Run through the constraints that need to generate an index. For PRIMARY * KEY, mark each column as NOT NULL and create an index. For UNIQUE, * create an index as for PRIMARY KEY, but do not insist on NOT NULL. */ - foreach(listptr, cxt->ixconstraints) + foreach(lc, cxt->ixconstraints) { - Constraint *constraint = lfirst(listptr); - ListCell *keys; - IndexElem *iparam; - - Assert(IsA(constraint, Constraint)); - Assert((constraint->contype == CONSTR_PRIMARY) - || (constraint->contype == CONSTR_UNIQUE)); - - index = makeNode(IndexStmt); - - index->unique = true; - index->primary = (constraint->contype == CONSTR_PRIMARY); - if (index->primary) - { - if (cxt->pkey != NULL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - cxt->pkey = index; - - /* - * In ALTER TABLE case, a primary index might already exist, but - * DefineIndex will check for it. - */ - } - index->isconstraint = true; - - if (constraint->name != NULL) - index->idxname = pstrdup(constraint->name); - else - index->idxname = NULL; /* DefineIndex will choose name */ - - index->relation = cxt->relation; - index->accessMethod = DEFAULT_INDEX_TYPE; - index->options = constraint->options; - index->tableSpace = constraint->indexspace; - index->indexParams = NIL; - index->whereClause = NULL; - index->concurrent = false; - - /* - * Make sure referenced keys exist. If we are making a PRIMARY KEY - * index, also make sure they are NOT NULL, if possible. (Although we - * could leave it to DefineIndex to mark the columns NOT NULL, it's - * more efficient to get it right the first time.) - */ - foreach(keys, constraint->keys) - { - char *key = strVal(lfirst(keys)); - bool found = false; - ColumnDef *column = NULL; - ListCell *columns; - - foreach(columns, cxt->columns) - { - column = (ColumnDef *) lfirst(columns); - Assert(IsA(column, ColumnDef)); - if (strcmp(column->colname, key) == 0) - { - found = true; - break; - } - } - if (found) - { - /* found column in the new table; force it to be NOT NULL */ - if (constraint->contype == CONSTR_PRIMARY) - column->is_not_null = TRUE; - } - else if (SystemAttributeByName(key, cxt->hasoids) != NULL) - { - /* - * column will be a system column in the new table, so accept - * it. System columns can't ever be null, so no need to worry - * about PRIMARY/NOT NULL constraint. - */ - found = true; - } - else if (cxt->inhRelations) - { - /* try inherited tables */ - ListCell *inher; - - foreach(inher, cxt->inhRelations) - { - RangeVar *inh = (RangeVar *) lfirst(inher); - Relation rel; - int count; - - Assert(IsA(inh, RangeVar)); - rel = heap_openrv(inh, AccessShareLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("inherited relation \"%s\" is not a table", - inh->relname))); - for (count = 0; count < rel->rd_att->natts; count++) - { - Form_pg_attribute inhattr = rel->rd_att->attrs[count]; - char *inhname = NameStr(inhattr->attname); - - if (inhattr->attisdropped) - continue; - if (strcmp(key, inhname) == 0) - { - found = true; - - /* - * We currently have no easy way to force an - * inherited column to be NOT NULL at creation, if - * its parent wasn't so already. We leave it to - * DefineIndex to fix things up in this case. - */ - break; - } - } - heap_close(rel, NoLock); - if (found) - break; - } - } - - /* - * In the ALTER TABLE case, don't complain about index keys not - * created in the command; they may well exist already. - * DefineIndex will complain about them if not, and will also take - * care of marking them NOT NULL. - */ - if (!found && !cxt->isalter) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" named in key does not exist", - key))); - - /* Check for PRIMARY KEY(foo, foo) */ - foreach(columns, index->indexParams) - { - iparam = (IndexElem *) lfirst(columns); - if (iparam->name && strcmp(key, iparam->name) == 0) - { - if (index->primary) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_COLUMN), - errmsg("column \"%s\" appears twice in primary key constraint", - key))); - else - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_COLUMN), - errmsg("column \"%s\" appears twice in unique constraint", - key))); - } - } - - /* OK, add it to the index definition */ - iparam = makeNode(IndexElem); - iparam->name = pstrdup(key); - iparam->expr = NULL; - iparam->opclass = NIL; - iparam->ordering = SORTBY_DEFAULT; - iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; - index->indexParams = lappend(index->indexParams, iparam); - } + Constraint *constraint = (Constraint *) lfirst(lc); + index = transformIndexConstraint(constraint, cxt); indexlist = lappend(indexlist, index); } @@ -867,12 +968,12 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) cxt->alist = list_make1(cxt->pkey); } - foreach(l, indexlist) + foreach(lc, indexlist) { bool keep = true; ListCell *k; - index = lfirst(l); + index = lfirst(lc); /* if it's pkey, it's already in cxt->alist */ if (index == cxt->pkey) @@ -900,6 +1001,194 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) if (keep) cxt->alist = lappend(cxt->alist, index); } + + /* Copy indexes defined by LIKE ... INCLUDING INDEXES */ + foreach(lc, cxt->inh_indexes) + { + index = (IndexStmt *) lfirst(lc); + + if (index->primary) + { + if (cxt->pkey) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + + cxt->pkey = index; + } + + cxt->alist = lappend(cxt->alist, index); + } +} + +static IndexStmt * +transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) +{ + IndexStmt *index; + ListCell *keys; + IndexElem *iparam; + + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + + index = makeNode(IndexStmt); + index->unique = true; + index->primary = (constraint->contype == CONSTR_PRIMARY); + + if (index->primary) + { + if (cxt->pkey != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + cxt->pkey = index; + + /* + * In ALTER TABLE case, a primary index might already exist, but + * DefineIndex will check for it. + */ + } + index->isconstraint = true; + + if (constraint->name != NULL) + index->idxname = pstrdup(constraint->name); + else + index->idxname = NULL; /* DefineIndex will choose name */ + + index->relation = cxt->relation; + index->accessMethod = DEFAULT_INDEX_TYPE; + index->options = constraint->options; + index->tableSpace = constraint->indexspace; + index->indexParams = NIL; + index->whereClause = NULL; + index->concurrent = false; + + /* + * Make sure referenced keys exist. If we are making a PRIMARY KEY + * index, also make sure they are NOT NULL, if possible. (Although we + * could leave it to DefineIndex to mark the columns NOT NULL, it's + * more efficient to get it right the first time.) + */ + foreach(keys, constraint->keys) + { + char *key = strVal(lfirst(keys)); + bool found = false; + ColumnDef *column = NULL; + ListCell *columns; + + foreach(columns, cxt->columns) + { + column = (ColumnDef *) lfirst(columns); + Assert(IsA(column, ColumnDef)); + if (strcmp(column->colname, key) == 0) + { + found = true; + break; + } + } + if (found) + { + /* found column in the new table; force it to be NOT NULL */ + if (constraint->contype == CONSTR_PRIMARY) + column->is_not_null = TRUE; + } + else if (SystemAttributeByName(key, cxt->hasoids) != NULL) + { + /* + * column will be a system column in the new table, so accept + * it. System columns can't ever be null, so no need to worry + * about PRIMARY/NOT NULL constraint. + */ + found = true; + } + else if (cxt->inhRelations) + { + /* try inherited tables */ + ListCell *inher; + + foreach(inher, cxt->inhRelations) + { + RangeVar *inh = (RangeVar *) lfirst(inher); + Relation rel; + int count; + + Assert(IsA(inh, RangeVar)); + rel = heap_openrv(inh, AccessShareLock); + if (rel->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("inherited relation \"%s\" is not a table", + inh->relname))); + for (count = 0; count < rel->rd_att->natts; count++) + { + Form_pg_attribute inhattr = rel->rd_att->attrs[count]; + char *inhname = NameStr(inhattr->attname); + + if (inhattr->attisdropped) + continue; + if (strcmp(key, inhname) == 0) + { + found = true; + + /* + * We currently have no easy way to force an + * inherited column to be NOT NULL at creation, if + * its parent wasn't so already. We leave it to + * DefineIndex to fix things up in this case. + */ + break; + } + } + heap_close(rel, NoLock); + if (found) + break; + } + } + + /* + * In the ALTER TABLE case, don't complain about index keys not + * created in the command; they may well exist already. + * DefineIndex will complain about them if not, and will also take + * care of marking them NOT NULL. + */ + if (!found && !cxt->isalter) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" named in key does not exist", + key))); + + /* Check for PRIMARY KEY(foo, foo) */ + foreach(columns, index->indexParams) + { + iparam = (IndexElem *) lfirst(columns); + if (iparam->name && strcmp(key, iparam->name) == 0) + { + if (index->primary) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in primary key constraint", + key))); + else + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in unique constraint", + key))); + } + } + + /* OK, add it to the index definition */ + iparam = makeNode(IndexElem); + iparam->name = pstrdup(key); + iparam->expr = NULL; + iparam->opclass = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + + return index; } /* @@ -1376,6 +1665,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; + cxt.inh_indexes = NIL; cxt.blist = NIL; cxt.alist = NIL; cxt.pkey = NULL; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index ec9aa9d2637..77e40674df9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.283 2007/07/03 01:30:37 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.284 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -886,6 +886,7 @@ ProcessUtility(Node *parsetree, stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->options, + stmt->src_options, stmt->unique, stmt->primary, stmt->isconstraint, diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index df5dbec6078..4870209d46b 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.262 2007/06/18 21:40:58 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.263 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -193,7 +193,6 @@ static char *generate_relation_name(Oid relid); static char *generate_function_name(Oid funcid, int nargs, Oid *argtypes); static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static text *string_to_text(char *str); -static char *flatten_reloptions(Oid relid); #define only_marker(rte) ((rte)->inh ? "" : "ONLY ") @@ -763,8 +762,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno, int prettyFlags) /* Add the operator class name */ if (!colno) - get_opclass_name(indclass->values[keyno], keycoltype, - &buf); + get_opclass_name(indclass->values[keyno], keycoltype, &buf); /* Add options if relevant */ if (amrec->amcanorder) @@ -5417,7 +5415,7 @@ string_to_text(char *str) /* * Generate a C string representing a relation's reloptions, or NULL if none. */ -static char * +char * flatten_reloptions(Oid relid) { char *result = NULL; @@ -5453,3 +5451,31 @@ flatten_reloptions(Oid relid) return result; } + +/* + * Generate an Array Datum representing a relation's reloptions using + * a C string + */ +Datum +unflatten_reloptions(char *reloptstring) +{ + Datum result = (Datum) 0; + + if (reloptstring) + { + Datum sep, relopts; + + /* + * We want to use text_to_array(reloptstring, ', ') --- but + * DirectFunctionCall2(text_to_array) does not work, because + * text_to_array() relies on fcinfo to be valid. So use + * OidFunctionCall2. + */ + sep = DirectFunctionCall1(textin, CStringGetDatum(", ")); + relopts = DirectFunctionCall1(textin, CStringGetDatum(reloptstring)); + + result = OidFunctionCall2(F_TEXT_TO_ARRAY, relopts, sep); + } + + return result; +} diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 5bb94a24f25..cf74692208b 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/commands/defrem.h,v 1.81 2007/03/13 00:33:43 tgl Exp $ + * $PostgreSQL: pgsql/src/include/commands/defrem.h,v 1.82 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -26,6 +26,7 @@ extern void DefineIndex(RangeVar *heapRelation, List *attributeList, Expr *predicate, List *options, + char *src_options, bool unique, bool primary, bool isconstraint, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 50bb6c2048f..a108759b760 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.349 2007/06/23 22:12:52 tgl Exp $ + * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.350 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -1501,6 +1501,7 @@ typedef struct IndexStmt char *tableSpace; /* tablespace, or NULL to use parent's */ List *indexParams; /* a list of IndexElem */ List *options; /* options from WITH clause */ + char *src_options; /* relopts inherited from source index */ Node *whereClause; /* qualification (partial-index predicate) */ bool unique; /* is index unique? */ bool primary; /* is index on primary key? */ diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index e35a287718b..3c3a9ed2d42 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.297 2007/06/26 16:48:09 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.298 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -560,6 +560,8 @@ extern List *deparse_context_for_plan(Node *outer_plan, Node *inner_plan, extern const char *quote_identifier(const char *ident); extern char *quote_qualified_identifier(const char *namespace, const char *ident); +extern char *flatten_reloptions(Oid relid); +extern Datum unflatten_reloptions(char *reloptstring); /* tid.c */ extern Datum tidin(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index fa97f019b1d..40dfaeda902 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -633,6 +633,26 @@ SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y (2 rows) DROP TABLE inhg; +CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */ +INSERT INTO inhg VALUES (5, 10); +INSERT INTO inhg VALUES (20, 10); -- should fail +ERROR: duplicate key value violates unique constraint "inhg_pkey" +DROP TABLE inhg; +/* Multiple primary keys creation should fail */ +CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */ +ERROR: multiple primary keys for table "inhg" are not allowed +CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE); +NOTICE: CREATE TABLE / UNIQUE will create implicit index "inhz_yy_key" for table "inhz" +CREATE UNIQUE INDEX inhz_xx_idx on inhz (xx) WHERE xx <> 'test'; +/* Ok to create multiple unique indexes */ +CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES); +NOTICE: CREATE TABLE / UNIQUE will create implicit index "inhg_x_key" for table "inhg" +INSERT INTO inhg (xx, yy, x) VALUES ('test', 5, 10); +INSERT INTO inhg (xx, yy, x) VALUES ('test', 10, 15); +INSERT INTO inhg (xx, yy, x) VALUES ('foo', 10, 15); -- should fail +ERROR: duplicate key value violates unique constraint "inhg_x_key" +DROP TABLE inhg; +DROP TABLE inhz; -- Test changing the type of inherited columns insert into d values('test','one','two','three'); alter table a alter column aa type integer using bit_length(aa); diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql index cd4221f899d..b0499a64928 100644 --- a/src/test/regress/sql/inherit.sql +++ b/src/test/regress/sql/inherit.sql @@ -156,6 +156,21 @@ INSERT INTO inhg VALUES ('x', 'foo', 'y'); /* fails due to constraint */ SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */ DROP TABLE inhg; +CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */ +INSERT INTO inhg VALUES (5, 10); +INSERT INTO inhg VALUES (20, 10); -- should fail +DROP TABLE inhg; +/* Multiple primary keys creation should fail */ +CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */ +CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE); +CREATE UNIQUE INDEX inhz_xx_idx on inhz (xx) WHERE xx <> 'test'; +/* Ok to create multiple unique indexes */ +CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES); +INSERT INTO inhg (xx, yy, x) VALUES ('test', 5, 10); +INSERT INTO inhg (xx, yy, x) VALUES ('test', 10, 15); +INSERT INTO inhg (xx, yy, x) VALUES ('foo', 10, 15); -- should fail +DROP TABLE inhg; +DROP TABLE inhz; -- Test changing the type of inherited columns insert into d values('test','one','two','three');