mirror of
https://github.com/postgres/postgres.git
synced 2025-05-28 00:03:23 -04:00
Extend pg_publication_tables to display column list and row filter.
Commit 923def9a53 and 52e4f0cd47 allowed to specify column lists and row filters for publication tables. This commit extends the pg_publication_tables view and pg_get_publication_tables function to display that information. This information will be useful to users and we also need this for the later commit that prohibits combining multiple publications with different column lists for the same table. Author: Hou Zhijie Reviewed By: Amit Kapila, Alvaro Herrera, Shi Yu, Takamichi Osumi Discussion: https://postgr.es/m/202204251548.mudq7jbqnh7r@alvherre.pgsql
This commit is contained in:
parent
62221ef187
commit
0ff20288e1
@ -9691,7 +9691,7 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
|
|||||||
|
|
||||||
<row>
|
<row>
|
||||||
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
|
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
|
||||||
<entry>publications and their associated tables</entry>
|
<entry>publications and information of their associated tables</entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
<row>
|
<row>
|
||||||
@ -11635,8 +11635,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
|
|||||||
|
|
||||||
<para>
|
<para>
|
||||||
The view <structname>pg_publication_tables</structname> provides
|
The view <structname>pg_publication_tables</structname> provides
|
||||||
information about the mapping between publications and the tables they
|
information about the mapping between publications and information of
|
||||||
contain. Unlike the underlying catalog
|
tables they contain. Unlike the underlying catalog
|
||||||
<link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
|
<link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
|
||||||
this view expands publications defined as <literal>FOR ALL TABLES</literal>
|
this view expands publications defined as <literal>FOR ALL TABLES</literal>
|
||||||
and <literal>FOR ALL TABLES IN SCHEMA</literal>, so for such publications
|
and <literal>FOR ALL TABLES IN SCHEMA</literal>, so for such publications
|
||||||
@ -11687,6 +11687,27 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
|
|||||||
Name of table
|
Name of table
|
||||||
</para></entry>
|
</para></entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>attnames</structfield> <type>name[]</type>
|
||||||
|
(references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attname</structfield>)
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Names of table columns included in the publication. This contains all
|
||||||
|
the columns of the table when the user didn't specify the column list
|
||||||
|
for the table.
|
||||||
|
</para></entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>rowfilter</structfield> <type>text</type>
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Expression for the table's publication qualifying condition
|
||||||
|
</para></entry>
|
||||||
|
</row>
|
||||||
</tbody>
|
</tbody>
|
||||||
</tgroup>
|
</tgroup>
|
||||||
</table>
|
</table>
|
||||||
|
@ -1077,11 +1077,12 @@ get_publication_name(Oid pubid, bool missing_ok)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns Oids of tables in a publication.
|
* Returns information of tables in a publication.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
pg_get_publication_tables(PG_FUNCTION_ARGS)
|
pg_get_publication_tables(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
|
#define NUM_PUBLICATOIN_TABLES_ELEM 3
|
||||||
FuncCallContext *funcctx;
|
FuncCallContext *funcctx;
|
||||||
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
||||||
Publication *publication;
|
Publication *publication;
|
||||||
@ -1090,6 +1091,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
|
|||||||
/* stuff done only on the first call of the function */
|
/* stuff done only on the first call of the function */
|
||||||
if (SRF_IS_FIRSTCALL())
|
if (SRF_IS_FIRSTCALL())
|
||||||
{
|
{
|
||||||
|
TupleDesc tupdesc;
|
||||||
MemoryContext oldcontext;
|
MemoryContext oldcontext;
|
||||||
|
|
||||||
/* create a function context for cross-call persistence */
|
/* create a function context for cross-call persistence */
|
||||||
@ -1136,6 +1138,16 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
|
|||||||
tables = filter_partitions(tables);
|
tables = filter_partitions(tables);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Construct a tuple descriptor for the result rows. */
|
||||||
|
tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATOIN_TABLES_ELEM);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
|
||||||
|
OIDOID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
|
||||||
|
INT2VECTOROID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
|
||||||
|
PG_NODE_TREEOID, -1, 0);
|
||||||
|
|
||||||
|
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
|
||||||
funcctx->user_fctx = (void *) tables;
|
funcctx->user_fctx = (void *) tables;
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
@ -1147,9 +1159,47 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
if (funcctx->call_cntr < list_length(tables))
|
if (funcctx->call_cntr < list_length(tables))
|
||||||
{
|
{
|
||||||
|
HeapTuple pubtuple = NULL;
|
||||||
|
HeapTuple rettuple;
|
||||||
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
|
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
|
||||||
|
Datum values[NUM_PUBLICATOIN_TABLES_ELEM];
|
||||||
|
bool nulls[NUM_PUBLICATOIN_TABLES_ELEM];
|
||||||
|
|
||||||
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
|
/*
|
||||||
|
* Form tuple with appropriate data.
|
||||||
|
*/
|
||||||
|
MemSet(nulls, 0, sizeof(nulls));
|
||||||
|
MemSet(values, 0, sizeof(values));
|
||||||
|
|
||||||
|
publication = GetPublicationByName(pubname, false);
|
||||||
|
|
||||||
|
values[0] = ObjectIdGetDatum(relid);
|
||||||
|
|
||||||
|
pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
|
||||||
|
ObjectIdGetDatum(relid),
|
||||||
|
ObjectIdGetDatum(publication->oid));
|
||||||
|
|
||||||
|
if (HeapTupleIsValid(pubtuple))
|
||||||
|
{
|
||||||
|
/* Lookup the column list attribute. */
|
||||||
|
values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
|
||||||
|
Anum_pg_publication_rel_prattrs,
|
||||||
|
&(nulls[1]));
|
||||||
|
|
||||||
|
/* Null indicates no filter. */
|
||||||
|
values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
|
||||||
|
Anum_pg_publication_rel_prqual,
|
||||||
|
&(nulls[2]));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nulls[1] = true;
|
||||||
|
nulls[2] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
|
||||||
|
|
||||||
|
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
|
||||||
}
|
}
|
||||||
|
|
||||||
SRF_RETURN_DONE(funcctx);
|
SRF_RETURN_DONE(funcctx);
|
||||||
|
@ -368,7 +368,15 @@ CREATE VIEW pg_publication_tables AS
|
|||||||
SELECT
|
SELECT
|
||||||
P.pubname AS pubname,
|
P.pubname AS pubname,
|
||||||
N.nspname AS schemaname,
|
N.nspname AS schemaname,
|
||||||
C.relname AS tablename
|
C.relname AS tablename,
|
||||||
|
( SELECT array_agg(a.attname ORDER BY a.attnum)
|
||||||
|
FROM unnest(CASE WHEN GPT.attrs IS NOT NULL THEN GPT.attrs
|
||||||
|
ELSE (SELECT array_agg(g) FROM generate_series(1, C.relnatts) g)
|
||||||
|
END) k
|
||||||
|
JOIN pg_attribute a
|
||||||
|
ON (a.attrelid = GPT.relid AND a.attnum = k)
|
||||||
|
) AS attnames,
|
||||||
|
pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
|
||||||
FROM pg_publication P,
|
FROM pg_publication P,
|
||||||
LATERAL pg_get_publication_tables(P.pubname) GPT,
|
LATERAL pg_get_publication_tables(P.pubname) GPT,
|
||||||
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
|
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
|
||||||
|
@ -795,15 +795,12 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
resetStringInfo(&cmd);
|
resetStringInfo(&cmd);
|
||||||
appendStringInfo(&cmd,
|
appendStringInfo(&cmd,
|
||||||
"SELECT DISTINCT unnest"
|
"SELECT DISTINCT unnest"
|
||||||
" FROM pg_publication p"
|
" FROM pg_publication p,"
|
||||||
" LEFT OUTER JOIN pg_publication_rel pr"
|
|
||||||
" ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
|
|
||||||
" LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
|
|
||||||
" LATERAL pg_get_publication_tables(p.pubname) gpt"
|
" LATERAL pg_get_publication_tables(p.pubname) gpt"
|
||||||
|
" LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
|
||||||
" WHERE gpt.relid = %u"
|
" WHERE gpt.relid = %u"
|
||||||
" AND p.pubname IN ( %s )",
|
" AND p.pubname IN ( %s )",
|
||||||
lrel->remoteid,
|
lrel->remoteid,
|
||||||
lrel->remoteid,
|
|
||||||
pub_names.data);
|
pub_names.data);
|
||||||
|
|
||||||
pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
|
pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
|
||||||
@ -965,15 +962,12 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
/* Check for row filters. */
|
/* Check for row filters. */
|
||||||
resetStringInfo(&cmd);
|
resetStringInfo(&cmd);
|
||||||
appendStringInfo(&cmd,
|
appendStringInfo(&cmd,
|
||||||
"SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
|
"SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
|
||||||
" FROM pg_publication p"
|
" FROM pg_publication p,"
|
||||||
" LEFT OUTER JOIN pg_publication_rel pr"
|
|
||||||
" ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
|
|
||||||
" LATERAL pg_get_publication_tables(p.pubname) gpt"
|
" LATERAL pg_get_publication_tables(p.pubname) gpt"
|
||||||
" WHERE gpt.relid = %u"
|
" WHERE gpt.relid = %u"
|
||||||
" AND p.pubname IN ( %s )",
|
" AND p.pubname IN ( %s )",
|
||||||
lrel->remoteid,
|
lrel->remoteid,
|
||||||
lrel->remoteid,
|
|
||||||
pub_names.data);
|
pub_names.data);
|
||||||
|
|
||||||
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
|
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
|
||||||
|
@ -53,6 +53,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* yyyymmddN */
|
/* yyyymmddN */
|
||||||
#define CATALOG_VERSION_NO 202205131
|
#define CATALOG_VERSION_NO 202205191
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -11673,11 +11673,11 @@
|
|||||||
prosrc => 'pg_show_replication_origin_status' },
|
prosrc => 'pg_show_replication_origin_status' },
|
||||||
|
|
||||||
# publications
|
# publications
|
||||||
{ oid => '6119', descr => 'get OIDs of tables in a publication',
|
{ oid => '6119', descr => 'get information of tables in a publication',
|
||||||
proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
|
proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
|
||||||
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
|
provolatile => 's', prorettype => 'record', proargtypes => 'text',
|
||||||
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
|
proallargtypes => '{text,oid,int2vector,pg_node_tree}', proargmodes => '{i,o,o,o}',
|
||||||
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
|
proargnames => '{pubname,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' },
|
||||||
{ oid => '6121',
|
{ oid => '6121',
|
||||||
descr => 'returns whether a relation can be part of a publication',
|
descr => 'returns whether a relation can be part of a publication',
|
||||||
proname => 'pg_relation_is_publishable', provolatile => 's',
|
proname => 'pg_relation_is_publishable', provolatile => 's',
|
||||||
|
@ -1585,52 +1585,52 @@ CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10);
|
|||||||
-- Schema publication that does not include the schema that has the parent table
|
-- Schema publication that does not include the schema that has the parent table
|
||||||
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+------------
|
---------+------------+------------+----------+-----------
|
||||||
pub | sch2 | tbl1_part1
|
pub | sch2 | tbl1_part1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP PUBLICATION pub;
|
DROP PUBLICATION pub;
|
||||||
-- Table publication that does not include the parent table
|
-- Table publication that does not include the parent table
|
||||||
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+------------
|
---------+------------+------------+----------+-----------
|
||||||
pub | sch2 | tbl1_part1
|
pub | sch2 | tbl1_part1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Table publication that includes both the parent table and the child table
|
-- Table publication that includes both the parent table and the child table
|
||||||
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
|
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+-----------
|
---------+------------+-----------+----------+-----------
|
||||||
pub | sch1 | tbl1
|
pub | sch1 | tbl1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP PUBLICATION pub;
|
DROP PUBLICATION pub;
|
||||||
-- Schema publication that does not include the schema that has the parent table
|
-- Schema publication that does not include the schema that has the parent table
|
||||||
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
|
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+------------
|
---------+------------+------------+----------+-----------
|
||||||
pub | sch2 | tbl1_part1
|
pub | sch2 | tbl1_part1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP PUBLICATION pub;
|
DROP PUBLICATION pub;
|
||||||
-- Table publication that does not include the parent table
|
-- Table publication that does not include the parent table
|
||||||
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
|
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+------------
|
---------+------------+------------+----------+-----------
|
||||||
pub | sch2 | tbl1_part1
|
pub | sch2 | tbl1_part1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Table publication that includes both the parent table and the child table
|
-- Table publication that includes both the parent table and the child table
|
||||||
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
|
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+------------
|
---------+------------+------------+----------+-----------
|
||||||
pub | sch2 | tbl1_part1
|
pub | sch2 | tbl1_part1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP PUBLICATION pub;
|
DROP PUBLICATION pub;
|
||||||
@ -1643,9 +1643,9 @@ CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a);
|
|||||||
ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30);
|
ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30);
|
||||||
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
|
||||||
SELECT * FROM pg_publication_tables;
|
SELECT * FROM pg_publication_tables;
|
||||||
pubname | schemaname | tablename
|
pubname | schemaname | tablename | attnames | rowfilter
|
||||||
---------+------------+-----------
|
---------+------------+-----------+----------+-----------
|
||||||
pub | sch1 | tbl1
|
pub | sch1 | tbl1 | {a} |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
@ -1437,9 +1437,18 @@ pg_prepared_xacts| SELECT p.transaction,
|
|||||||
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
|
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
|
||||||
pg_publication_tables| SELECT p.pubname,
|
pg_publication_tables| SELECT p.pubname,
|
||||||
n.nspname AS schemaname,
|
n.nspname AS schemaname,
|
||||||
c.relname AS tablename
|
c.relname AS tablename,
|
||||||
|
( SELECT array_agg(a.attname ORDER BY a.attnum) AS array_agg
|
||||||
|
FROM (unnest(
|
||||||
|
CASE
|
||||||
|
WHEN (gpt.attrs IS NOT NULL) THEN (gpt.attrs)::integer[]
|
||||||
|
ELSE ( SELECT array_agg(g.g) AS array_agg
|
||||||
|
FROM generate_series(1, (c.relnatts)::integer) g(g))
|
||||||
|
END) k(k)
|
||||||
|
JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
|
||||||
|
pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
|
||||||
FROM pg_publication p,
|
FROM pg_publication p,
|
||||||
LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid),
|
LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
|
||||||
(pg_class c
|
(pg_class c
|
||||||
JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
|
JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
|
||||||
WHERE (c.oid = gpt.relid);
|
WHERE (c.oid = gpt.relid);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user