Jeff Davis 4b74ebf726 When creating materialized views, use REFRESH to load data.
Previously, CREATE MATERIALIZED VIEW ... WITH DATA populated the MV
the same way as CREATE TABLE ... AS.

Instead, reuse the REFRESH logic, which locks down security-restricted
operations and restricts the search_path. This reduces the chance that
a subsequent refresh will fail.

Reported-by: Noah Misch
Backpatch-through: 17
Discussion: https://postgr.es/m/20240630222344.db.nmisch@google.com
2024-07-16 15:41:29 -07:00

971 lines
31 KiB
C

/*-------------------------------------------------------------------------
*
* matview.c
* materialized view support
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/matview.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/multixact.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_opclass.h"
#include "commands/cluster.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
Oid transientoid; /* OID of new heap into which to store */
/* These fields are filled by transientrel_startup: */
Relation transientrel; /* relation to write to */
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
} DR_transientrel;
static int matview_maintenance_depth = 0;
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self);
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
const char *queryString);
static char *make_temptable_name_n(char *tempname, int n);
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
int save_sec_context);
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
static bool is_usable_unique_index(Relation indexRel);
static void OpenMatViewIncrementalMaintenance(void);
static void CloseMatViewIncrementalMaintenance(void);
/*
* SetMatViewPopulatedState
* Mark a materialized view as populated, or not.
*
* NOTE: caller must be holding an appropriate lock on the relation.
*/
void
SetMatViewPopulatedState(Relation relation, bool newstate)
{
Relation pgrel;
HeapTuple tuple;
Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
/*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
*/
pgrel = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(relation));
((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
heap_freetuple(tuple);
table_close(pgrel, RowExclusiveLock);
/*
* Advance command counter to make the updated pg_class row locally
* visible.
*/
CommandCounterIncrement();
}
/*
* ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
*
* If WITH NO DATA was specified, this is effectively like a TRUNCATE;
* otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
* statement associated with the materialized view. The statement node's
* skipData field shows whether the clause was used.
*/
ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
ParamListInfo params, QueryCompletion *qc)
{
Oid matviewOid;
LOCKMODE lockmode;
/* Determine strength of lock needed. */
lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock;
/*
* Get a lock until end of transaction.
*/
matviewOid = RangeVarGetRelidExtended(stmt->relation,
lockmode, 0,
RangeVarCallbackMaintainsTable,
NULL);
return RefreshMatViewByOid(matviewOid, stmt->skipData, stmt->concurrent,
queryString, params, qc);
}
/*
* RefreshMatViewByOid -- refresh materialized view by OID
*
* This refreshes the materialized view by creating a new table and swapping
* the relfilenumbers of the new table and the old materialized view, so the OID
* of the original materialized view is preserved. Thus we do not lose GRANT
* nor references to this materialized view.
*
* If skipData is true, this is effectively like a TRUNCATE; otherwise it is
* like a TRUNCATE followed by an INSERT using the SELECT statement associated
* with the materialized view.
*
* Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
* the new heap, it's better to create the indexes afterwards than to fill them
* incrementally while we load.
*
* The matview's "populated" state is changed based on whether the contents
* reflect the result set of the materialized view's query.
*/
ObjectAddress
RefreshMatViewByOid(Oid matviewOid, bool skipData, bool concurrent,
const char *queryString, ParamListInfo params,
QueryCompletion *qc)
{
Relation matviewRel;
RewriteRule *rule;
List *actions;
Query *dataQuery;
Oid tableSpace;
Oid relowner;
Oid OIDNewHeap;
DestReceiver *dest;
uint64 processed = 0;
char relpersistence;
Oid save_userid;
int save_sec_context;
int save_nestlevel;
ObjectAddress address;
matviewRel = table_open(matviewOid, NoLock);
relowner = matviewRel->rd_rel->relowner;
/*
* Switch to the owner's userid, so that any functions are run as that
* user. Also lock down security-restricted operations and arrange to
* make GUC variable changes local to this command.
*/
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();
/* Make sure it is a materialized view. */
if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a materialized view",
RelationGetRelationName(matviewRel))));
/* Check that CONCURRENTLY is not specified if not populated. */
if (concurrent && !RelationIsPopulated(matviewRel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
/* Check that conflicting options have not been specified. */
if (concurrent && skipData)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s options cannot be used together",
"CONCURRENTLY", "WITH NO DATA")));
/*
* Check that everything is correct for a refresh. Problems at this point
* are internal errors, so elog is sufficient.
*/
if (matviewRel->rd_rel->relhasrules == false ||
matviewRel->rd_rules->numLocks < 1)
elog(ERROR,
"materialized view \"%s\" is missing rewrite information",
RelationGetRelationName(matviewRel));
if (matviewRel->rd_rules->numLocks > 1)
elog(ERROR,
"materialized view \"%s\" has too many rules",
RelationGetRelationName(matviewRel));
rule = matviewRel->rd_rules->rules[0];
if (rule->event != CMD_SELECT || !(rule->isInstead))
elog(ERROR,
"the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
RelationGetRelationName(matviewRel));
actions = rule->actions;
if (list_length(actions) != 1)
elog(ERROR,
"the rule for materialized view \"%s\" is not a single action",
RelationGetRelationName(matviewRel));
/*
* Check that there is a unique index with no WHERE clause on one or more
* columns of the materialized view if CONCURRENTLY is specified.
*/
if (concurrent)
{
List *indexoidlist = RelationGetIndexList(matviewRel);
ListCell *indexoidscan;
bool hasUniqueIndex = false;
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
indexRel = index_open(indexoid, AccessShareLock);
hasUniqueIndex = is_usable_unique_index(indexRel);
index_close(indexRel, AccessShareLock);
if (hasUniqueIndex)
break;
}
list_free(indexoidlist);
if (!hasUniqueIndex)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot refresh materialized view \"%s\" concurrently",
quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel))),
errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
}
/*
* The stored query was rewritten at the time of the MV definition, but
* has not been scribbled on by the planner.
*/
dataQuery = linitial_node(Query, actions);
/*
* Check for active uses of the relation in the current transaction, such
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
* data using TABLE_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
/*
* Tentatively mark the matview as populated or not (this will roll back
* if we fail later).
*/
SetMatViewPopulatedState(matviewRel, !skipData);
/* Concurrent refresh builds new data in temp tablespace, and does diff. */
if (concurrent)
{
tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
relpersistence = RELPERSISTENCE_TEMP;
}
else
{
tableSpace = matviewRel->rd_rel->reltablespace;
relpersistence = matviewRel->rd_rel->relpersistence;
}
/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(matviewOid, tableSpace,
matviewRel->rd_rel->relam,
relpersistence, ExclusiveLock);
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap);
/* Generate the data, if wanted. */
if (!skipData)
processed = refresh_matview_datafill(dest, dataQuery, queryString);
/* Make the matview match the newly generated data. */
if (concurrent)
{
int old_depth = matview_maintenance_depth;
PG_TRY();
{
refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
save_sec_context);
}
PG_CATCH();
{
matview_maintenance_depth = old_depth;
PG_RE_THROW();
}
PG_END_TRY();
Assert(matview_maintenance_depth == old_depth);
}
else
{
refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
/*
* Inform cumulative stats system about our activity: basically, we
* truncated the matview and inserted some new data. (The concurrent
* code path above doesn't need to worry about this because the
* inserts and deletes it issues get counted by lower-level code.)
*/
pgstat_count_truncate(matviewRel);
if (!skipData)
pgstat_count_heap_insert(matviewRel, processed);
}
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
ObjectAddressSet(address, RelationRelationId, matviewOid);
/*
* Save the rowcount so that pg_stat_statements can track the total number
* of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
* still don't display the rowcount in the command completion tag output,
* i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
* command tag is left false in cmdtaglist.h. Otherwise, the change of
* completion tag output might break applications using it.
*/
if (qc)
SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
return address;
}
/*
* refresh_matview_datafill
*
* Execute the given query, sending result rows to "dest" (which will
* insert them into the target matview).
*
* Returns number of rows inserted.
*/
static uint64
refresh_matview_datafill(DestReceiver *dest, Query *query,
const char *queryString)
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
Query *copied_query;
uint64 processed;
/* Lock and rewrite, using a copy to preserve the original query. */
copied_query = copyObject(query);
AcquireRewriteLocks(copied_query, true, false);
rewritten = QueryRewrite(copied_query);
/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
query = (Query *) linitial(rewritten);
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
* the planner executed an allegedly-stable function that changed the
* database contents, but let's do it anyway to be safe.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, NULL, NULL, 0);
/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, 0);
/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
processed = queryDesc->estate->es_processed;
/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
return processed;
}
DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid)
{
DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
self->pub.receiveSlot = transientrel_receive;
self->pub.rStartup = transientrel_startup;
self->pub.rShutdown = transientrel_shutdown;
self->pub.rDestroy = transientrel_destroy;
self->pub.mydest = DestTransientRel;
self->transientoid = transientoid;
return (DestReceiver *) self;
}
/*
* transientrel_startup --- executor startup
*/
static void
transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_transientrel *myState = (DR_transientrel *) self;
Relation transientrel;
transientrel = table_open(myState->transientoid, NoLock);
/*
* Fill private fields of myState for use by later routines
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
myState->bistate = GetBulkInsertState();
/*
* Valid smgr_targblock implies something already wrote to the relation.
* This may be harmless, but this function hasn't planned for it.
*/
Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
}
/*
* transientrel_receive --- receive one tuple
*/
static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
/*
* Note that the input slot might not be of the type of the target
* relation. That's supported by table_tuple_insert(), but slightly less
* efficient than inserting with the right slot - but the alternative
* would be to copy into a slot of the right type, which would not be
* cheap either. This also doesn't allow accessing per-AM data (say a
* tuple's xmin), but since we don't do that here...
*/
table_tuple_insert(myState->transientrel,
slot,
myState->output_cid,
myState->ti_options,
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
return true;
}
/*
* transientrel_shutdown --- executor end
*/
static void
transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->transientrel, myState->ti_options);
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
myState->transientrel = NULL;
}
/*
* transientrel_destroy --- release DestReceiver object
*/
static void
transientrel_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* Given a qualified temporary table name, append an underscore followed by
* the given integer, to make a new table name based on the old one.
* The result is a palloc'd string.
*
* As coded, this would fail to make a valid SQL name if the given name were,
* say, "FOO"."BAR". Currently, the table name portion of the input will
* never be double-quoted because it's of the form "pg_temp_NNN", cf
* make_new_heap(). But we might have to work harder someday.
*/
static char *
make_temptable_name_n(char *tempname, int n)
{
StringInfoData namebuf;
initStringInfo(&namebuf);
appendStringInfoString(&namebuf, tempname);
appendStringInfo(&namebuf, "_%d", n);
return namebuf.data;
}
/*
* refresh_by_match_merge
*
* Refresh a materialized view with transactional semantics, while allowing
* concurrent reads.
*
* This is called after a new version of the data has been created in a
* temporary table. It performs a full outer join against the old version of
* the data, producing "diff" results. This join cannot work if there are any
* duplicated rows in either the old or new versions, in the sense that every
* column would compare as equal between the two rows. It does work correctly
* in the face of rows which have at least one NULL value, with all non-NULL
* columns equal. The behavior of NULLs on equality tests and on UNIQUE
* indexes turns out to be quite convenient here; the tests we need to make
* are consistent with default behavior. If there is at least one UNIQUE
* index on the materialized view, we have exactly the guarantee we need.
*
* The temporary table used to hold the diff results contains just the TID of
* the old record (if matched) and the ROW from the new table as a single
* column of complex record type (if matched).
*
* Once we have the diff table, we perform set-based DELETE and INSERT
* operations against the materialized view, and discard both temporary
* tables.
*
* Everything from the generation of the new data to applying the differences
* takes place under cover of an ExclusiveLock, since it seems as though we
* would want to prohibit not only concurrent REFRESH operations, but also
* incremental maintenance. It also doesn't seem reasonable or safe to allow
* SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
* this command.
*/
static void
refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
int save_sec_context)
{
StringInfoData querybuf;
Relation matviewRel;
Relation tempRel;
char *matviewname;
char *tempname;
char *diffname;
TupleDesc tupdesc;
bool foundUniqueIndex;
List *indexoidlist;
ListCell *indexoidscan;
int16 relnatts;
Oid *opUsedForQual;
initStringInfo(&querybuf);
matviewRel = table_open(matviewOid, NoLock);
matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel));
tempRel = table_open(tempOid, NoLock);
tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
RelationGetRelationName(tempRel));
diffname = make_temptable_name_n(tempname, 2);
relnatts = RelationGetNumberOfAttributes(matviewRel);
/* Open SPI context. */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/* Analyze the temp table with the new contents. */
appendStringInfo(&querybuf, "ANALYZE %s", tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/*
* We need to ensure that there are not duplicate rows without NULLs in
* the new data set before we can count on the "diff" results. Check for
* that in a way that allows showing the first duplicated row found. Even
* after we pass this test, a unique index on the materialized view may
* find a duplicate key problem.
*
* Note: here and below, we use "tablename.*::tablerowtype" as a hack to
* keep ".*" from being expanded into multiple columns in a SELECT list.
* Compare ruleutils.c's get_variable().
*/
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"SELECT newdata.*::%s FROM %s newdata "
"WHERE newdata.* IS NOT NULL AND EXISTS "
"(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
"AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
"AND newdata2.ctid OPERATOR(pg_catalog.<>) "
"newdata.ctid)",
tempname, tempname, tempname);
if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
if (SPI_processed > 0)
{
/*
* Note that this ereport() is returning data to the user. Generally,
* we would want to make sure that the user has been granted access to
* this data. However, REFRESH MAT VIEW is only able to be run by the
* owner of the mat view (or a superuser) and therefore there is no
* need to check for access to data in the mat view.
*/
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
RelationGetRelationName(matviewRel)),
errdetail("Row: %s",
SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
}
/*
* Create the temporary "diff" table.
*
* Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
* because you cannot create temp tables in SRO context. For extra
* paranoia, add the composite type column only after switching back to
* SRO context.
*/
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"CREATE TEMP TABLE %s (tid pg_catalog.tid)",
diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"ALTER TABLE %s ADD COLUMN newdata %s",
diffname, tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Start building the query for populating the diff table. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"INSERT INTO %s "
"SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
"FROM %s mv FULL JOIN %s newdata ON (",
diffname, tempname, matviewname, tempname);
/*
* Get the list of index OIDs for the table from the relcache, and look up
* each one in the pg_index syscache. We will test for equality on all
* columns present in all unique indexes which only reference columns and
* include all rows.
*/
tupdesc = matviewRel->rd_att;
opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
foundUniqueIndex = false;
indexoidlist = RelationGetIndexList(matviewRel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
indexRel = index_open(indexoid, RowExclusiveLock);
if (is_usable_unique_index(indexRel))
{
Form_pg_index indexStruct = indexRel->rd_index;
int indnkeyatts = indexStruct->indnkeyatts;
oidvector *indclass;
Datum indclassDatum;
int i;
/* Must get indclass the hard way. */
indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
indexRel->rd_indextuple,
Anum_pg_index_indclass);
indclass = (oidvector *) DatumGetPointer(indclassDatum);
/* Add quals for all columns from this index. */
for (i = 0; i < indnkeyatts; i++)
{
int attnum = indexStruct->indkey.values[i];
Oid opclass = indclass->values[i];
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
Oid attrtype = attr->atttypid;
HeapTuple cla_ht;
Form_pg_opclass cla_tup;
Oid opfamily;
Oid opcintype;
Oid op;
const char *leftop;
const char *rightop;
/*
* Identify the equality operator associated with this index
* column. First we need to look up the column's opclass.
*/
cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
if (!HeapTupleIsValid(cla_ht))
elog(ERROR, "cache lookup failed for opclass %u", opclass);
cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
Assert(cla_tup->opcmethod == BTREE_AM_OID);
opfamily = cla_tup->opcfamily;
opcintype = cla_tup->opcintype;
ReleaseSysCache(cla_ht);
op = get_opfamily_member(opfamily, opcintype, opcintype,
BTEqualStrategyNumber);
if (!OidIsValid(op))
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
BTEqualStrategyNumber, opcintype, opcintype, opfamily);
/*
* If we find the same column with the same equality semantics
* in more than one index, we only need to emit the equality
* clause once.
*
* Since we only remember the last equality operator, this
* code could be fooled into emitting duplicate clauses given
* multiple indexes with several different opclasses ... but
* that's so unlikely it doesn't seem worth spending extra
* code to avoid.
*/
if (opUsedForQual[attnum - 1] == op)
continue;
opUsedForQual[attnum - 1] = op;
/*
* Actually add the qual, ANDed with any others.
*/
if (foundUniqueIndex)
appendStringInfoString(&querybuf, " AND ");
leftop = quote_qualified_identifier("newdata",
NameStr(attr->attname));
rightop = quote_qualified_identifier("mv",
NameStr(attr->attname));
generate_operator_clause(&querybuf,
leftop, attrtype,
op,
rightop, attrtype);
foundUniqueIndex = true;
}
}
/* Keep the locks, since we're about to run DML which needs them. */
index_close(indexRel, NoLock);
}
list_free(indexoidlist);
/*
* There must be at least one usable unique index on the matview.
*
* ExecRefreshMatView() checks that after taking the exclusive lock on the
* matview. So at least one unique index is guaranteed to exist here
* because the lock is still being held. (One known exception is if a
* function called as part of refreshing the matview drops the index.
* That's a pretty silly thing to do.)
*/
if (!foundUniqueIndex)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("could not find suitable unique index on materialized view"));
appendStringInfoString(&querybuf,
" AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
"WHERE newdata.* IS NULL OR mv.* IS NULL "
"ORDER BY tid");
/* Populate the temporary "diff" table. */
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/*
* We have no further use for data from the "full-data" temp table, but we
* must keep it around because its type is referenced from the diff table.
*/
/* Analyze the diff table. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "ANALYZE %s", diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
OpenMatViewIncrementalMaintenance();
/* Deletes must come before inserts; do them first. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
"(SELECT diff.tid FROM %s diff "
"WHERE diff.tid IS NOT NULL "
"AND diff.newdata IS NULL)",
matviewname, diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Inserts go last. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"INSERT INTO %s SELECT (diff.newdata).* "
"FROM %s diff WHERE tid IS NULL",
matviewname, diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* We're done maintaining the materialized view. */
CloseMatViewIncrementalMaintenance();
table_close(tempRel, NoLock);
table_close(matviewRel, NoLock);
/* Clean up temp tables. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Close SPI context. */
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
}
/*
* Swap the physical files of the target and transient tables, then rebuild
* the target's indexes and throw away the transient table. Security context
* swapping is handled by the called function, so it is not needed here.
*/
static void
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
{
finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
RecentXmin, ReadNextMultiXactId(), relpersistence);
}
/*
* Check whether specified index is usable for match merge.
*/
static bool
is_usable_unique_index(Relation indexRel)
{
Form_pg_index indexStruct = indexRel->rd_index;
/*
* Must be unique, valid, immediate, non-partial, and be defined over
* plain user columns (not expressions). We also require it to be a
* btree. Even if we had any other unique index kinds, we'd not know how
* to identify the corresponding equality operator, nor could we be sure
* that the planner could implement the required FULL JOIN with non-btree
* operators.
*/
if (indexStruct->indisunique &&
indexStruct->indimmediate &&
indexRel->rd_rel->relam == BTREE_AM_OID &&
indexStruct->indisvalid &&
RelationGetIndexPredicate(indexRel) == NIL &&
indexStruct->indnatts > 0)
{
/*
* The point of groveling through the index columns individually is to
* reject both index expressions and system columns. Currently,
* matviews couldn't have OID columns so there's no way to create an
* index on a system column; but maybe someday that wouldn't be true,
* so let's be safe.
*/
int numatts = indexStruct->indnatts;
int i;
for (i = 0; i < numatts; i++)
{
int attnum = indexStruct->indkey.values[i];
if (attnum <= 0)
return false;
}
return true;
}
return false;
}
/*
* This should be used to test whether the backend is in a context where it is
* OK to allow DML statements to modify materialized views. We only want to
* allow that for internal code driven by the materialized view definition,
* not for arbitrary user-supplied code.
*
* While the function names reflect the fact that their main intended use is
* incremental maintenance of materialized views (in response to changes to
* the data in referenced relations), they are initially used to allow REFRESH
* without blocking concurrent reads.
*/
bool
MatViewIncrementalMaintenanceIsEnabled(void)
{
return matview_maintenance_depth > 0;
}
static void
OpenMatViewIncrementalMaintenance(void)
{
matview_maintenance_depth++;
}
static void
CloseMatViewIncrementalMaintenance(void)
{
matview_maintenance_depth--;
Assert(matview_maintenance_depth >= 0);
}