mirror of
https://github.com/postgres/postgres.git
synced 2025-07-31 00:01:41 -04:00
Compare commits
4 Commits
a3a836fb5e
...
a9a47fb6d9
Author | SHA1 | Date | |
---|---|---|---|
|
a9a47fb6d9 | ||
|
7329240437 | ||
|
08e6344fd6 | ||
|
50b797dc99 |
@ -640,7 +640,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
||||
else
|
||||
tuple_to_stringinfo(ctx->out, tupdesc,
|
||||
&change->data.tp.newtuple->tuple,
|
||||
change->data.tp.newtuple,
|
||||
false);
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
@ -649,7 +649,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
{
|
||||
appendStringInfoString(ctx->out, " old-key:");
|
||||
tuple_to_stringinfo(ctx->out, tupdesc,
|
||||
&change->data.tp.oldtuple->tuple,
|
||||
change->data.tp.oldtuple,
|
||||
true);
|
||||
appendStringInfoString(ctx->out, " new-tuple:");
|
||||
}
|
||||
@ -658,7 +658,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
||||
else
|
||||
tuple_to_stringinfo(ctx->out, tupdesc,
|
||||
&change->data.tp.newtuple->tuple,
|
||||
change->data.tp.newtuple,
|
||||
false);
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
@ -670,7 +670,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
/* In DELETE, only the replica identity is present; display that */
|
||||
else
|
||||
tuple_to_stringinfo(ctx->out, tupdesc,
|
||||
&change->data.tp.oldtuple->tuple,
|
||||
change->data.tp.oldtuple,
|
||||
true);
|
||||
break;
|
||||
default:
|
||||
|
@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
If true, the slot is enabled to be synced to the standbys.
|
||||
The default is false.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
|
||||
<para>
|
||||
@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
|
||||
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
|
||||
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
|
||||
</term>
|
||||
<listitem>
|
||||
<para>
|
||||
Change the definition of a replication slot.
|
||||
See <xref linkend="streaming-replication-slots"/> for more about
|
||||
replication slots. This command is currently only supported for logical
|
||||
replication slots.
|
||||
</para>
|
||||
|
||||
<variablelist>
|
||||
<varlistentry>
|
||||
<term><replaceable class="parameter">slot_name</replaceable></term>
|
||||
<listitem>
|
||||
<para>
|
||||
The name of the slot to alter. Must be a valid replication slot
|
||||
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
|
||||
<para>The following option is supported:</para>
|
||||
|
||||
<variablelist>
|
||||
<varlistentry>
|
||||
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
If true, the slot is enabled to be synced to the standbys.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="protocol-replication-read-replication-slot">
|
||||
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
|
||||
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
|
||||
|
@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
||||
twophase_enabled = true;
|
||||
|
||||
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
|
||||
CRS_NOEXPORT_SNAPSHOT, NULL);
|
||||
false, CRS_NOEXPORT_SNAPSHOT, NULL);
|
||||
|
||||
if (twophase_enabled)
|
||||
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
|
||||
|
@ -1093,7 +1093,7 @@ DropRole(DropRoleStmt *stmt)
|
||||
Relation pg_authid_rel,
|
||||
pg_auth_members_rel;
|
||||
ListCell *item;
|
||||
List *role_addresses = NIL;
|
||||
List *role_oids = NIL;
|
||||
|
||||
if (!have_createrole_privilege())
|
||||
ereport(ERROR,
|
||||
@ -1119,7 +1119,6 @@ DropRole(DropRoleStmt *stmt)
|
||||
ScanKeyData scankey;
|
||||
SysScanDesc sscan;
|
||||
Oid roleid;
|
||||
ObjectAddress *role_address;
|
||||
|
||||
if (rolspec->roletype != ROLESPEC_CSTRING)
|
||||
ereport(ERROR,
|
||||
@ -1260,21 +1259,16 @@ DropRole(DropRoleStmt *stmt)
|
||||
*/
|
||||
CommandCounterIncrement();
|
||||
|
||||
/* Looks tentatively OK, add it to the list. */
|
||||
role_address = palloc(sizeof(ObjectAddress));
|
||||
role_address->classId = AuthIdRelationId;
|
||||
role_address->objectId = roleid;
|
||||
role_address->objectSubId = 0;
|
||||
role_addresses = lappend(role_addresses, role_address);
|
||||
/* Looks tentatively OK, add it to the list if not there yet. */
|
||||
role_oids = list_append_unique_oid(role_oids, roleid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Second pass over the roles to be removed.
|
||||
*/
|
||||
foreach(item, role_addresses)
|
||||
foreach(item, role_oids)
|
||||
{
|
||||
ObjectAddress *role_address = lfirst(item);
|
||||
Oid roleid = role_address->objectId;
|
||||
Oid roleid = lfirst_oid(item);
|
||||
HeapTuple tuple;
|
||||
Form_pg_authid roleform;
|
||||
char *detail;
|
||||
|
@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
|
||||
const char *slotname,
|
||||
bool temporary,
|
||||
bool two_phase,
|
||||
bool failover,
|
||||
CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn);
|
||||
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
|
||||
bool failover);
|
||||
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
|
||||
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
|
||||
const char *query,
|
||||
@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
|
||||
.walrcv_receive = libpqrcv_receive,
|
||||
.walrcv_send = libpqrcv_send,
|
||||
.walrcv_create_slot = libpqrcv_create_slot,
|
||||
.walrcv_alter_slot = libpqrcv_alter_slot,
|
||||
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
|
||||
.walrcv_exec = libpqrcv_exec,
|
||||
.walrcv_disconnect = libpqrcv_disconnect
|
||||
@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
|
||||
*/
|
||||
static char *
|
||||
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn)
|
||||
bool temporary, bool two_phase, bool failover,
|
||||
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
|
||||
{
|
||||
PGresult *res;
|
||||
StringInfoData cmd;
|
||||
@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
appendStringInfoChar(&cmd, ' ');
|
||||
}
|
||||
|
||||
if (failover)
|
||||
{
|
||||
appendStringInfoString(&cmd, "FAILOVER");
|
||||
if (use_new_options_syntax)
|
||||
appendStringInfoString(&cmd, ", ");
|
||||
else
|
||||
appendStringInfoChar(&cmd, ' ');
|
||||
}
|
||||
|
||||
if (use_new_options_syntax)
|
||||
{
|
||||
switch (snapshot_action)
|
||||
@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
/*
|
||||
* Change the definition of the replication slot.
|
||||
*/
|
||||
static void
|
||||
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
|
||||
bool failover)
|
||||
{
|
||||
StringInfoData cmd;
|
||||
PGresult *res;
|
||||
|
||||
initStringInfo(&cmd);
|
||||
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
|
||||
quote_identifier(slotname),
|
||||
failover ? "true" : "false");
|
||||
|
||||
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
|
||||
pfree(cmd.data);
|
||||
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||
errmsg("could not alter replication slot \"%s\": %s",
|
||||
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/*
|
||||
* Return PID of remote backend process.
|
||||
*/
|
||||
|
@ -62,7 +62,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
|
||||
|
||||
/* common function to decode tuples */
|
||||
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
|
||||
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
|
||||
|
||||
/* helper functions for decoding transactions */
|
||||
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
|
||||
@ -1152,7 +1152,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
ReorderBufferChange *change;
|
||||
xl_multi_insert_tuple *xlhdr;
|
||||
int datalen;
|
||||
ReorderBufferTupleBuf *tuple;
|
||||
HeapTuple tuple;
|
||||
HeapTupleHeader header;
|
||||
|
||||
change = ReorderBufferGetChange(ctx->reorder);
|
||||
@ -1169,21 +1169,21 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
|
||||
|
||||
tuple = change->data.tp.newtuple;
|
||||
header = tuple->tuple.t_data;
|
||||
header = tuple->t_data;
|
||||
|
||||
/* not a disk based tuple */
|
||||
ItemPointerSetInvalid(&tuple->tuple.t_self);
|
||||
ItemPointerSetInvalid(&tuple->t_self);
|
||||
|
||||
/*
|
||||
* We can only figure this out after reassembling the transactions.
|
||||
*/
|
||||
tuple->tuple.t_tableOid = InvalidOid;
|
||||
tuple->t_tableOid = InvalidOid;
|
||||
|
||||
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
|
||||
tuple->t_len = datalen + SizeofHeapTupleHeader;
|
||||
|
||||
memset(header, 0, SizeofHeapTupleHeader);
|
||||
|
||||
memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
|
||||
memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
|
||||
(char *) data,
|
||||
datalen);
|
||||
header->t_infomask = xlhdr->t_infomask;
|
||||
@ -1253,7 +1253,7 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
* computed outside as they are record specific.
|
||||
*/
|
||||
static void
|
||||
DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
|
||||
DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
|
||||
{
|
||||
xl_heap_header xlhdr;
|
||||
int datalen = len - SizeOfHeapHeader;
|
||||
@ -1261,14 +1261,14 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
|
||||
|
||||
Assert(datalen >= 0);
|
||||
|
||||
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
|
||||
header = tuple->tuple.t_data;
|
||||
tuple->t_len = datalen + SizeofHeapTupleHeader;
|
||||
header = tuple->t_data;
|
||||
|
||||
/* not a disk based tuple */
|
||||
ItemPointerSetInvalid(&tuple->tuple.t_self);
|
||||
ItemPointerSetInvalid(&tuple->t_self);
|
||||
|
||||
/* we can only figure this out after reassembling the transactions */
|
||||
tuple->tuple.t_tableOid = InvalidOid;
|
||||
tuple->t_tableOid = InvalidOid;
|
||||
|
||||
/* data is not stored aligned, copy to aligned storage */
|
||||
memcpy((char *) &xlhdr,
|
||||
@ -1277,7 +1277,7 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
|
||||
|
||||
memset(header, 0, SizeofHeapTupleHeader);
|
||||
|
||||
memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
|
||||
memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
|
||||
data + SizeOfHeapHeader,
|
||||
datalen);
|
||||
|
||||
|
@ -498,13 +498,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
|
||||
if (change->data.tp.newtuple)
|
||||
{
|
||||
ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
|
||||
ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
|
||||
change->data.tp.newtuple = NULL;
|
||||
}
|
||||
|
||||
if (change->data.tp.oldtuple)
|
||||
{
|
||||
ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
|
||||
ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
|
||||
change->data.tp.oldtuple = NULL;
|
||||
}
|
||||
break;
|
||||
@ -547,32 +547,29 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
|
||||
}
|
||||
|
||||
/*
|
||||
* Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
|
||||
* tuple_len (excluding header overhead).
|
||||
* Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
|
||||
* overhead).
|
||||
*/
|
||||
ReorderBufferTupleBuf *
|
||||
HeapTuple
|
||||
ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
|
||||
{
|
||||
ReorderBufferTupleBuf *tuple;
|
||||
HeapTuple tuple;
|
||||
Size alloc_len;
|
||||
|
||||
alloc_len = tuple_len + SizeofHeapTupleHeader;
|
||||
|
||||
tuple = (ReorderBufferTupleBuf *)
|
||||
MemoryContextAlloc(rb->tup_context,
|
||||
sizeof(ReorderBufferTupleBuf) +
|
||||
MAXIMUM_ALIGNOF + alloc_len);
|
||||
tuple->alloc_tuple_size = alloc_len;
|
||||
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
|
||||
tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
|
||||
HEAPTUPLESIZE + alloc_len);
|
||||
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
/*
|
||||
* Free a ReorderBufferTupleBuf.
|
||||
* Free a HeapTuple returned by ReorderBufferGetTupleBuf().
|
||||
*/
|
||||
void
|
||||
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
|
||||
ReorderBufferReturnTupleBuf(HeapTuple tuple)
|
||||
{
|
||||
pfree(tuple);
|
||||
}
|
||||
@ -3759,8 +3756,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
|
||||
{
|
||||
char *data;
|
||||
ReorderBufferTupleBuf *oldtup,
|
||||
*newtup;
|
||||
HeapTuple oldtup,
|
||||
newtup;
|
||||
Size oldlen = 0;
|
||||
Size newlen = 0;
|
||||
|
||||
@ -3770,14 +3767,14 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
if (oldtup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
oldlen = oldtup->tuple.t_len;
|
||||
oldlen = oldtup->t_len;
|
||||
sz += oldlen;
|
||||
}
|
||||
|
||||
if (newtup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
newlen = newtup->tuple.t_len;
|
||||
newlen = newtup->t_len;
|
||||
sz += newlen;
|
||||
}
|
||||
|
||||
@ -3790,19 +3787,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
|
||||
if (oldlen)
|
||||
{
|
||||
memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
|
||||
memcpy(data, oldtup, sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
memcpy(data, oldtup->tuple.t_data, oldlen);
|
||||
memcpy(data, oldtup->t_data, oldlen);
|
||||
data += oldlen;
|
||||
}
|
||||
|
||||
if (newlen)
|
||||
{
|
||||
memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
|
||||
memcpy(data, newtup, sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
memcpy(data, newtup->tuple.t_data, newlen);
|
||||
memcpy(data, newtup->t_data, newlen);
|
||||
data += newlen;
|
||||
}
|
||||
break;
|
||||
@ -4118,8 +4115,8 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
|
||||
{
|
||||
ReorderBufferTupleBuf *oldtup,
|
||||
*newtup;
|
||||
HeapTuple oldtup,
|
||||
newtup;
|
||||
Size oldlen = 0;
|
||||
Size newlen = 0;
|
||||
|
||||
@ -4129,14 +4126,14 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
|
||||
if (oldtup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
oldlen = oldtup->tuple.t_len;
|
||||
oldlen = oldtup->t_len;
|
||||
sz += oldlen;
|
||||
}
|
||||
|
||||
if (newtup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
newlen = newtup->tuple.t_len;
|
||||
newlen = newtup->t_len;
|
||||
sz += newlen;
|
||||
}
|
||||
|
||||
@ -4365,16 +4362,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
|
||||
|
||||
/* restore ->tuple */
|
||||
memcpy(&change->data.tp.oldtuple->tuple, data,
|
||||
memcpy(change->data.tp.oldtuple, data,
|
||||
sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
/* reset t_data pointer into the new tuplebuf */
|
||||
change->data.tp.oldtuple->tuple.t_data =
|
||||
ReorderBufferTupleBufData(change->data.tp.oldtuple);
|
||||
change->data.tp.oldtuple->t_data =
|
||||
(HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
|
||||
|
||||
/* restore tuple data itself */
|
||||
memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
|
||||
memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
|
||||
data += tuplelen;
|
||||
}
|
||||
|
||||
@ -4390,16 +4387,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
|
||||
|
||||
/* restore ->tuple */
|
||||
memcpy(&change->data.tp.newtuple->tuple, data,
|
||||
memcpy(change->data.tp.newtuple, data,
|
||||
sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
/* reset t_data pointer into the new tuplebuf */
|
||||
change->data.tp.newtuple->tuple.t_data =
|
||||
ReorderBufferTupleBufData(change->data.tp.newtuple);
|
||||
change->data.tp.newtuple->t_data =
|
||||
(HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
|
||||
|
||||
/* restore tuple data itself */
|
||||
memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
|
||||
memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
|
||||
data += tuplelen;
|
||||
}
|
||||
|
||||
@ -4646,7 +4643,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
ReorderBufferToastEnt *ent;
|
||||
ReorderBufferTupleBuf *newtup;
|
||||
HeapTuple newtup;
|
||||
bool found;
|
||||
int32 chunksize;
|
||||
bool isnull;
|
||||
@ -4661,9 +4658,9 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
Assert(IsToastRelation(relation));
|
||||
|
||||
newtup = change->data.tp.newtuple;
|
||||
chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
|
||||
chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
|
||||
Assert(!isnull);
|
||||
chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
|
||||
chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
|
||||
Assert(!isnull);
|
||||
|
||||
ent = (ReorderBufferToastEnt *)
|
||||
@ -4686,7 +4683,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
|
||||
chunk_seq, chunk_id, ent->last_chunk_seq + 1);
|
||||
|
||||
chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
|
||||
chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
|
||||
Assert(!isnull);
|
||||
|
||||
/* calculate size so we can allocate the right size at once later */
|
||||
@ -4737,7 +4734,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
Relation toast_rel;
|
||||
TupleDesc toast_desc;
|
||||
MemoryContext oldcontext;
|
||||
ReorderBufferTupleBuf *newtup;
|
||||
HeapTuple newtup;
|
||||
Size old_size;
|
||||
|
||||
/* no toast tuples changed */
|
||||
@ -4777,7 +4774,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
|
||||
newtup = change->data.tp.newtuple;
|
||||
|
||||
heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
|
||||
heap_deform_tuple(newtup, desc, attrs, isnull);
|
||||
|
||||
for (natt = 0; natt < desc->natts; natt++)
|
||||
{
|
||||
@ -4842,12 +4839,12 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
{
|
||||
bool cisnull;
|
||||
ReorderBufferChange *cchange;
|
||||
ReorderBufferTupleBuf *ctup;
|
||||
HeapTuple ctup;
|
||||
Pointer chunk;
|
||||
|
||||
cchange = dlist_container(ReorderBufferChange, node, it.cur);
|
||||
ctup = cchange->data.tp.newtuple;
|
||||
chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &cisnull));
|
||||
chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
|
||||
|
||||
Assert(!cisnull);
|
||||
Assert(!VARATT_IS_EXTERNAL(chunk));
|
||||
@ -4882,11 +4879,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
* the tuplebuf because attrs[] will point back into the current content.
|
||||
*/
|
||||
tmphtup = heap_form_tuple(desc, attrs, isnull);
|
||||
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
|
||||
Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
|
||||
Assert(newtup->t_len <= MaxHeapTupleSize);
|
||||
Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
|
||||
|
||||
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
|
||||
newtup->tuple.t_len = tmphtup->t_len;
|
||||
memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
|
||||
newtup->t_len = tmphtup->t_len;
|
||||
|
||||
/*
|
||||
* free resources we won't further need, more persistent stuff will be
|
||||
|
@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
|
||||
*/
|
||||
walrcv_create_slot(LogRepWorkerWalRcvConn,
|
||||
slotname, false /* permanent */ , false /* two_phase */ ,
|
||||
false,
|
||||
CRS_USE_SNAPSHOT, origin_startpos);
|
||||
|
||||
/*
|
||||
|
@ -1473,7 +1473,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
if (change->data.tp.oldtuple)
|
||||
{
|
||||
old_slot = relentry->old_slot;
|
||||
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
|
||||
ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
|
||||
|
||||
/* Convert tuple if needed. */
|
||||
if (relentry->attrmap)
|
||||
@ -1488,7 +1488,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
if (change->data.tp.newtuple)
|
||||
{
|
||||
new_slot = relentry->new_slot;
|
||||
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
|
||||
ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
|
||||
|
||||
/* Convert tuple if needed. */
|
||||
if (relentry->attrmap)
|
||||
|
@ -64,6 +64,7 @@ Node *replication_parse_result;
|
||||
%token K_START_REPLICATION
|
||||
%token K_CREATE_REPLICATION_SLOT
|
||||
%token K_DROP_REPLICATION_SLOT
|
||||
%token K_ALTER_REPLICATION_SLOT
|
||||
%token K_TIMELINE_HISTORY
|
||||
%token K_WAIT
|
||||
%token K_TIMELINE
|
||||
@ -80,8 +81,9 @@ Node *replication_parse_result;
|
||||
|
||||
%type <node> command
|
||||
%type <node> base_backup start_replication start_logical_replication
|
||||
create_replication_slot drop_replication_slot identify_system
|
||||
read_replication_slot timeline_history show upload_manifest
|
||||
create_replication_slot drop_replication_slot
|
||||
alter_replication_slot identify_system read_replication_slot
|
||||
timeline_history show upload_manifest
|
||||
%type <list> generic_option_list
|
||||
%type <defelt> generic_option
|
||||
%type <uintval> opt_timeline
|
||||
@ -112,6 +114,7 @@ command:
|
||||
| start_logical_replication
|
||||
| create_replication_slot
|
||||
| drop_replication_slot
|
||||
| alter_replication_slot
|
||||
| read_replication_slot
|
||||
| timeline_history
|
||||
| show
|
||||
@ -259,6 +262,18 @@ drop_replication_slot:
|
||||
}
|
||||
;
|
||||
|
||||
/* ALTER_REPLICATION_SLOT slot (options) */
|
||||
alter_replication_slot:
|
||||
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
|
||||
{
|
||||
AlterReplicationSlotCmd *cmd;
|
||||
cmd = makeNode(AlterReplicationSlotCmd);
|
||||
cmd->slotname = $2;
|
||||
cmd->options = $4;
|
||||
$$ = (Node *) cmd;
|
||||
}
|
||||
;
|
||||
|
||||
/*
|
||||
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
|
||||
*/
|
||||
@ -410,6 +425,7 @@ ident_or_keyword:
|
||||
| K_START_REPLICATION { $$ = "start_replication"; }
|
||||
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
|
||||
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
|
||||
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
|
||||
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
|
||||
| K_WAIT { $$ = "wait"; }
|
||||
| K_TIMELINE { $$ = "timeline"; }
|
||||
|
@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
|
||||
START_REPLICATION { return K_START_REPLICATION; }
|
||||
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
|
||||
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
|
||||
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
|
||||
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
|
||||
PHYSICAL { return K_PHYSICAL; }
|
||||
RESERVE_WAL { return K_RESERVE_WAL; }
|
||||
@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
|
||||
case K_START_REPLICATION:
|
||||
case K_CREATE_REPLICATION_SLOT:
|
||||
case K_DROP_REPLICATION_SLOT:
|
||||
case K_ALTER_REPLICATION_SLOT:
|
||||
case K_READ_REPLICATION_SLOT:
|
||||
case K_TIMELINE_HISTORY:
|
||||
case K_UPLOAD_MANIFEST:
|
||||
|
@ -465,10 +465,7 @@ retry:
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
|
||||
/*
|
||||
* Search for the slot with the specified name if the slot to acquire is
|
||||
* not given. If the slot is not found, we either return -1 or error out.
|
||||
*/
|
||||
/* Check if the slot exits with the given name. */
|
||||
s = SearchNamedReplicationSlot(name, false);
|
||||
if (s == NULL || !s->in_use)
|
||||
{
|
||||
@ -683,6 +680,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
|
||||
ReplicationSlotDropAcquired();
|
||||
}
|
||||
|
||||
/*
|
||||
* Change the definition of the slot identified by the specified name.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotAlter(const char *name, bool failover)
|
||||
{
|
||||
Assert(MyReplicationSlot == NULL);
|
||||
|
||||
ReplicationSlotAcquire(name, false);
|
||||
|
||||
if (SlotIsPhysical(MyReplicationSlot))
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot use %s with a physical replication slot",
|
||||
"ALTER_REPLICATION_SLOT"));
|
||||
|
||||
SpinLockAcquire(&MyReplicationSlot->mutex);
|
||||
MyReplicationSlot->data.failover = failover;
|
||||
SpinLockRelease(&MyReplicationSlot->mutex);
|
||||
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
ReplicationSlotRelease();
|
||||
}
|
||||
|
||||
/*
|
||||
* Permanently drop the currently acquired replication slot.
|
||||
*/
|
||||
|
@ -387,7 +387,7 @@ WalReceiverMain(void)
|
||||
"pg_walreceiver_%lld",
|
||||
(long long int) walrcv_get_backend_pid(wrconn));
|
||||
|
||||
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
|
||||
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
|
||||
|
@ -1126,12 +1126,13 @@ static void
|
||||
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
||||
bool *reserve_wal,
|
||||
CRSSnapshotAction *snapshot_action,
|
||||
bool *two_phase)
|
||||
bool *two_phase, bool *failover)
|
||||
{
|
||||
ListCell *lc;
|
||||
bool snapshot_action_given = false;
|
||||
bool reserve_wal_given = false;
|
||||
bool two_phase_given = false;
|
||||
bool failover_given = false;
|
||||
|
||||
/* Parse options */
|
||||
foreach(lc, cmd->options)
|
||||
@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
||||
two_phase_given = true;
|
||||
*two_phase = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "failover") == 0)
|
||||
{
|
||||
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
failover_given = true;
|
||||
*failover = defGetBoolean(defel);
|
||||
}
|
||||
else
|
||||
elog(ERROR, "unrecognized option: %s", defel->defname);
|
||||
}
|
||||
@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
char *slot_name;
|
||||
bool reserve_wal = false;
|
||||
bool two_phase = false;
|
||||
bool failover = false;
|
||||
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
|
||||
DestReceiver *dest;
|
||||
TupOutputState *tstate;
|
||||
@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
|
||||
Assert(!MyReplicationSlot);
|
||||
|
||||
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
|
||||
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
|
||||
&failover);
|
||||
|
||||
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
||||
{
|
||||
@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
*/
|
||||
ReplicationSlotCreate(cmd->slotname, true,
|
||||
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
||||
two_phase, false);
|
||||
two_phase, failover);
|
||||
|
||||
/*
|
||||
* Do options check early so that we can bail before calling the
|
||||
@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
|
||||
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
|
||||
}
|
||||
|
||||
/*
|
||||
* Process extra options given to ALTER_REPLICATION_SLOT.
|
||||
*/
|
||||
static void
|
||||
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
|
||||
{
|
||||
bool failover_given = false;
|
||||
|
||||
/* Parse options */
|
||||
foreach_ptr(DefElem, defel, cmd->options)
|
||||
{
|
||||
if (strcmp(defel->defname, "failover") == 0)
|
||||
{
|
||||
if (failover_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
failover_given = true;
|
||||
*failover = defGetBoolean(defel);
|
||||
}
|
||||
else
|
||||
elog(ERROR, "unrecognized option: %s", defel->defname);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Change the definition of a replication slot.
|
||||
*/
|
||||
static void
|
||||
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
|
||||
{
|
||||
bool failover = false;
|
||||
|
||||
ParseAlterReplSlotOptions(cmd, &failover);
|
||||
ReplicationSlotAlter(cmd->slotname, failover);
|
||||
}
|
||||
|
||||
/*
|
||||
* Load previously initiated logical slot and prepare for sending data (via
|
||||
* WalSndLoop).
|
||||
@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
|
||||
EndReplicationCommand(cmdtag);
|
||||
break;
|
||||
|
||||
case T_AlterReplicationSlotCmd:
|
||||
cmdtag = "ALTER_REPLICATION_SLOT";
|
||||
set_ps_display(cmdtag);
|
||||
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
|
||||
EndReplicationCommand(cmdtag);
|
||||
break;
|
||||
|
||||
case T_StartReplicationCmd:
|
||||
{
|
||||
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
|
||||
|
@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd
|
||||
} DropReplicationSlotCmd;
|
||||
|
||||
|
||||
/* ----------------------
|
||||
* ALTER_REPLICATION_SLOT command
|
||||
* ----------------------
|
||||
*/
|
||||
typedef struct AlterReplicationSlotCmd
|
||||
{
|
||||
NodeTag type;
|
||||
char *slotname;
|
||||
List *options;
|
||||
} AlterReplicationSlotCmd;
|
||||
|
||||
|
||||
/* ----------------------
|
||||
* START_REPLICATION command
|
||||
* ----------------------
|
||||
|
@ -28,25 +28,6 @@ typedef enum
|
||||
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE,
|
||||
} DebugLogicalRepStreamingMode;
|
||||
|
||||
/* an individual tuple, stored in one chunk of memory */
|
||||
typedef struct ReorderBufferTupleBuf
|
||||
{
|
||||
/* position in preallocated list */
|
||||
slist_node node;
|
||||
|
||||
/* tuple header, the interesting bit for users of logical decoding */
|
||||
HeapTupleData tuple;
|
||||
|
||||
/* pre-allocated size of tuple buffer, different from tuple size */
|
||||
Size alloc_tuple_size;
|
||||
|
||||
/* actual tuple data follows */
|
||||
} ReorderBufferTupleBuf;
|
||||
|
||||
/* pointer to the data stored in a TupleBuf */
|
||||
#define ReorderBufferTupleBufData(p) \
|
||||
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
|
||||
|
||||
/*
|
||||
* Types of the change passed to a 'change' callback.
|
||||
*
|
||||
@ -114,9 +95,9 @@ typedef struct ReorderBufferChange
|
||||
bool clear_toast_afterwards;
|
||||
|
||||
/* valid for DELETE || UPDATE */
|
||||
ReorderBufferTupleBuf *oldtuple;
|
||||
HeapTuple oldtuple;
|
||||
/* valid for INSERT || UPDATE */
|
||||
ReorderBufferTupleBuf *newtuple;
|
||||
HeapTuple newtuple;
|
||||
} tp;
|
||||
|
||||
/*
|
||||
@ -678,10 +659,10 @@ struct ReorderBuffer
|
||||
extern ReorderBuffer *ReorderBufferAllocate(void);
|
||||
extern void ReorderBufferFree(ReorderBuffer *rb);
|
||||
|
||||
extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *rb,
|
||||
Size tuple_len);
|
||||
extern void ReorderBufferReturnTupleBuf(ReorderBuffer *rb,
|
||||
ReorderBufferTupleBuf *tuple);
|
||||
extern HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb,
|
||||
Size tuple_len);
|
||||
extern void ReorderBufferReturnTupleBuf(HeapTuple tuple);
|
||||
|
||||
extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb);
|
||||
extern void ReorderBufferReturnChange(ReorderBuffer *rb,
|
||||
ReorderBufferChange *change, bool upd_mem);
|
||||
|
@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
bool two_phase, bool failover);
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||
extern void ReplicationSlotAlter(const char *name, bool failover);
|
||||
|
||||
extern void ReplicationSlotAcquire(const char *name, bool nowait);
|
||||
extern void ReplicationSlotRelease(void);
|
||||
|
@ -355,9 +355,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
|
||||
const char *slotname,
|
||||
bool temporary,
|
||||
bool two_phase,
|
||||
bool failover,
|
||||
CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn);
|
||||
|
||||
/*
|
||||
* walrcv_alter_slot_fn
|
||||
*
|
||||
* Change the definition of a replication slot. Currently, it only supports
|
||||
* changing the failover property of the slot.
|
||||
*/
|
||||
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
|
||||
const char *slotname,
|
||||
bool failover);
|
||||
|
||||
/*
|
||||
* walrcv_get_backend_pid_fn
|
||||
*
|
||||
@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType
|
||||
walrcv_receive_fn walrcv_receive;
|
||||
walrcv_send_fn walrcv_send;
|
||||
walrcv_create_slot_fn walrcv_create_slot;
|
||||
walrcv_alter_slot_fn walrcv_alter_slot;
|
||||
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
|
||||
walrcv_exec_fn walrcv_exec;
|
||||
walrcv_disconnect_fn walrcv_disconnect;
|
||||
@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
|
||||
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
|
||||
#define walrcv_send(conn, buffer, nbytes) \
|
||||
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
|
||||
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
|
||||
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
|
||||
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
|
||||
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
|
||||
#define walrcv_alter_slot(conn, slotname, failover) \
|
||||
WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
|
||||
#define walrcv_get_backend_pid(conn) \
|
||||
WalReceiverFunctions->walrcv_get_backend_pid(conn)
|
||||
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
|
||||
|
@ -251,7 +251,8 @@ DROP INDEX tenant_idx;
|
||||
DROP TABLE tenant_table;
|
||||
DROP VIEW tenant_view;
|
||||
DROP SCHEMA regress_tenant2_schema;
|
||||
DROP ROLE regress_tenant;
|
||||
-- check for duplicated drop
|
||||
DROP ROLE regress_tenant, regress_tenant;
|
||||
DROP ROLE regress_tenant2;
|
||||
DROP ROLE regress_rolecreator;
|
||||
DROP ROLE regress_role_admin;
|
||||
|
@ -206,7 +206,8 @@ DROP INDEX tenant_idx;
|
||||
DROP TABLE tenant_table;
|
||||
DROP VIEW tenant_view;
|
||||
DROP SCHEMA regress_tenant2_schema;
|
||||
DROP ROLE regress_tenant;
|
||||
-- check for duplicated drop
|
||||
DROP ROLE regress_tenant, regress_tenant;
|
||||
DROP ROLE regress_tenant2;
|
||||
DROP ROLE regress_rolecreator;
|
||||
DROP ROLE regress_role_admin;
|
||||
|
@ -85,6 +85,7 @@ AlterOwnerStmt
|
||||
AlterPolicyStmt
|
||||
AlterPublicationAction
|
||||
AlterPublicationStmt
|
||||
AlterReplicationSlotCmd
|
||||
AlterRoleSetStmt
|
||||
AlterRoleStmt
|
||||
AlterSeqStmt
|
||||
@ -2351,7 +2352,6 @@ ReorderBufferStreamTruncateCB
|
||||
ReorderBufferTXN
|
||||
ReorderBufferTXNByIdEnt
|
||||
ReorderBufferToastEnt
|
||||
ReorderBufferTupleBuf
|
||||
ReorderBufferTupleCidEnt
|
||||
ReorderBufferTupleCidKey
|
||||
ReorderBufferUpdateProgressTxnCB
|
||||
@ -3880,6 +3880,7 @@ varattrib_1b_e
|
||||
varattrib_4b
|
||||
vbits
|
||||
verifier_context
|
||||
walrcv_alter_slot_fn
|
||||
walrcv_check_conninfo_fn
|
||||
walrcv_connect_fn
|
||||
walrcv_create_slot_fn
|
||||
|
Loading…
x
Reference in New Issue
Block a user