Compare commits

..

No commits in common. "a9a47fb6d99785d5d272b408d251dca28b0879e3" and "a3a836fb5e51183eae624d43225279306c2285b8" have entirely different histories.

21 changed files with 119 additions and 308 deletions

View File

@ -640,7 +640,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfoString(ctx->out, " (no-tuple-data)"); appendStringInfoString(ctx->out, " (no-tuple-data)");
else else
tuple_to_stringinfo(ctx->out, tupdesc, tuple_to_stringinfo(ctx->out, tupdesc,
change->data.tp.newtuple, &change->data.tp.newtuple->tuple,
false); false);
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -649,7 +649,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{ {
appendStringInfoString(ctx->out, " old-key:"); appendStringInfoString(ctx->out, " old-key:");
tuple_to_stringinfo(ctx->out, tupdesc, tuple_to_stringinfo(ctx->out, tupdesc,
change->data.tp.oldtuple, &change->data.tp.oldtuple->tuple,
true); true);
appendStringInfoString(ctx->out, " new-tuple:"); appendStringInfoString(ctx->out, " new-tuple:");
} }
@ -658,7 +658,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfoString(ctx->out, " (no-tuple-data)"); appendStringInfoString(ctx->out, " (no-tuple-data)");
else else
tuple_to_stringinfo(ctx->out, tupdesc, tuple_to_stringinfo(ctx->out, tupdesc,
change->data.tp.newtuple, &change->data.tp.newtuple->tuple,
false); false);
break; break;
case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_DELETE:
@ -670,7 +670,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* In DELETE, only the replica identity is present; display that */ /* In DELETE, only the replica identity is present; display that */
else else
tuple_to_stringinfo(ctx->out, tupdesc, tuple_to_stringinfo(ctx->out, tupdesc,
change->data.tp.oldtuple, &change->data.tp.oldtuple->tuple,
true); true);
break; break;
default: default:

View File

@ -2060,16 +2060,6 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
The default is false.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
<para> <para>
@ -2134,46 +2124,6 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
<para>
Change the definition of a replication slot.
See <xref linkend="streaming-replication-slots"/> for more about
replication slots. This command is currently only supported for logical
replication slots.
</para>
<variablelist>
<varlistentry>
<term><replaceable class="parameter">slot_name</replaceable></term>
<listitem>
<para>
The name of the slot to alter. Must be a valid replication slot
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
</para>
</listitem>
</varlistentry>
</variablelist>
<para>The following option is supported:</para>
<variablelist>
<varlistentry>
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>
<varlistentry id="protocol-replication-read-replication-slot"> <varlistentry id="protocol-replication-read-replication-slot">
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm> <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>

View File

@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
twophase_enabled = true; twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
false, CRS_NOEXPORT_SNAPSHOT, NULL); CRS_NOEXPORT_SNAPSHOT, NULL);
if (twophase_enabled) if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);

View File

@ -1093,7 +1093,7 @@ DropRole(DropRoleStmt *stmt)
Relation pg_authid_rel, Relation pg_authid_rel,
pg_auth_members_rel; pg_auth_members_rel;
ListCell *item; ListCell *item;
List *role_oids = NIL; List *role_addresses = NIL;
if (!have_createrole_privilege()) if (!have_createrole_privilege())
ereport(ERROR, ereport(ERROR,
@ -1119,6 +1119,7 @@ DropRole(DropRoleStmt *stmt)
ScanKeyData scankey; ScanKeyData scankey;
SysScanDesc sscan; SysScanDesc sscan;
Oid roleid; Oid roleid;
ObjectAddress *role_address;
if (rolspec->roletype != ROLESPEC_CSTRING) if (rolspec->roletype != ROLESPEC_CSTRING)
ereport(ERROR, ereport(ERROR,
@ -1259,16 +1260,21 @@ DropRole(DropRoleStmt *stmt)
*/ */
CommandCounterIncrement(); CommandCounterIncrement();
/* Looks tentatively OK, add it to the list if not there yet. */ /* Looks tentatively OK, add it to the list. */
role_oids = list_append_unique_oid(role_oids, roleid); role_address = palloc(sizeof(ObjectAddress));
role_address->classId = AuthIdRelationId;
role_address->objectId = roleid;
role_address->objectSubId = 0;
role_addresses = lappend(role_addresses, role_address);
} }
/* /*
* Second pass over the roles to be removed. * Second pass over the roles to be removed.
*/ */
foreach(item, role_oids) foreach(item, role_addresses)
{ {
Oid roleid = lfirst_oid(item); ObjectAddress *role_address = lfirst(item);
Oid roleid = role_address->objectId;
HeapTuple tuple; HeapTuple tuple;
Form_pg_authid roleform; Form_pg_authid roleform;
char *detail; char *detail;

View File

@ -73,11 +73,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname, const char *slotname,
bool temporary, bool temporary,
bool two_phase, bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn); XLogRecPtr *lsn);
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query, const char *query,
@ -98,7 +95,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_receive = libpqrcv_receive, .walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send, .walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot, .walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec, .walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect .walrcv_disconnect = libpqrcv_disconnect
@ -942,8 +938,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/ */
static char * static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, bool two_phase, bool failover, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) XLogRecPtr *lsn)
{ {
PGresult *res; PGresult *res;
StringInfoData cmd; StringInfoData cmd;
@ -973,15 +969,6 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoChar(&cmd, ' '); appendStringInfoChar(&cmd, ' ');
} }
if (failover)
{
appendStringInfoString(&cmd, "FAILOVER");
if (use_new_options_syntax)
appendStringInfoString(&cmd, ", ");
else
appendStringInfoChar(&cmd, ' ');
}
if (use_new_options_syntax) if (use_new_options_syntax)
{ {
switch (snapshot_action) switch (snapshot_action)
@ -1050,33 +1037,6 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
return snapshot; return snapshot;
} }
/*
* Change the definition of the replication slot.
*/
static void
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover)
{
StringInfoData cmd;
PGresult *res;
initStringInfo(&cmd);
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
quote_identifier(slotname),
failover ? "true" : "false");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not alter replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);
}
/* /*
* Return PID of remote backend process. * Return PID of remote backend process.
*/ */

View File

@ -62,7 +62,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* common function to decode tuples */ /* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple); static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */ /* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx, static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@ -1152,7 +1152,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferChange *change; ReorderBufferChange *change;
xl_multi_insert_tuple *xlhdr; xl_multi_insert_tuple *xlhdr;
int datalen; int datalen;
HeapTuple tuple; ReorderBufferTupleBuf *tuple;
HeapTupleHeader header; HeapTupleHeader header;
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
@ -1169,21 +1169,21 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferGetTupleBuf(ctx->reorder, datalen); ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple; tuple = change->data.tp.newtuple;
header = tuple->t_data; header = tuple->tuple.t_data;
/* not a disk based tuple */ /* not a disk based tuple */
ItemPointerSetInvalid(&tuple->t_self); ItemPointerSetInvalid(&tuple->tuple.t_self);
/* /*
* We can only figure this out after reassembling the transactions. * We can only figure this out after reassembling the transactions.
*/ */
tuple->t_tableOid = InvalidOid; tuple->tuple.t_tableOid = InvalidOid;
tuple->t_len = datalen + SizeofHeapTupleHeader; tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
memset(header, 0, SizeofHeapTupleHeader); memset(header, 0, SizeofHeapTupleHeader);
memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data, (char *) data,
datalen); datalen);
header->t_infomask = xlhdr->t_infomask; header->t_infomask = xlhdr->t_infomask;
@ -1253,7 +1253,7 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* computed outside as they are record specific. * computed outside as they are record specific.
*/ */
static void static void
DecodeXLogTuple(char *data, Size len, HeapTuple tuple) DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{ {
xl_heap_header xlhdr; xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader; int datalen = len - SizeOfHeapHeader;
@ -1261,14 +1261,14 @@ DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
Assert(datalen >= 0); Assert(datalen >= 0);
tuple->t_len = datalen + SizeofHeapTupleHeader; tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
header = tuple->t_data; header = tuple->tuple.t_data;
/* not a disk based tuple */ /* not a disk based tuple */
ItemPointerSetInvalid(&tuple->t_self); ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */ /* we can only figure this out after reassembling the transactions */
tuple->t_tableOid = InvalidOid; tuple->tuple.t_tableOid = InvalidOid;
/* data is not stored aligned, copy to aligned storage */ /* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr, memcpy((char *) &xlhdr,
@ -1277,7 +1277,7 @@ DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
memset(header, 0, SizeofHeapTupleHeader); memset(header, 0, SizeofHeapTupleHeader);
memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader, memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader, data + SizeOfHeapHeader,
datalen); datalen);

View File

@ -498,13 +498,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.newtuple) if (change->data.tp.newtuple)
{ {
ReorderBufferReturnTupleBuf(change->data.tp.newtuple); ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
change->data.tp.newtuple = NULL; change->data.tp.newtuple = NULL;
} }
if (change->data.tp.oldtuple) if (change->data.tp.oldtuple)
{ {
ReorderBufferReturnTupleBuf(change->data.tp.oldtuple); ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
change->data.tp.oldtuple = NULL; change->data.tp.oldtuple = NULL;
} }
break; break;
@ -547,29 +547,32 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
} }
/* /*
* Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
* overhead). * tuple_len (excluding header overhead).
*/ */
HeapTuple ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{ {
HeapTuple tuple; ReorderBufferTupleBuf *tuple;
Size alloc_len; Size alloc_len;
alloc_len = tuple_len + SizeofHeapTupleHeader; alloc_len = tuple_len + SizeofHeapTupleHeader;
tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context, tuple = (ReorderBufferTupleBuf *)
HEAPTUPLESIZE + alloc_len); MemoryContextAlloc(rb->tup_context,
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); sizeof(ReorderBufferTupleBuf) +
MAXIMUM_ALIGNOF + alloc_len);
tuple->alloc_tuple_size = alloc_len;
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
return tuple; return tuple;
} }
/* /*
* Free a HeapTuple returned by ReorderBufferGetTupleBuf(). * Free a ReorderBufferTupleBuf.
*/ */
void void
ReorderBufferReturnTupleBuf(HeapTuple tuple) ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{ {
pfree(tuple); pfree(tuple);
} }
@ -3756,8 +3759,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{ {
char *data; char *data;
HeapTuple oldtup, ReorderBufferTupleBuf *oldtup,
newtup; *newtup;
Size oldlen = 0; Size oldlen = 0;
Size newlen = 0; Size newlen = 0;
@ -3767,14 +3770,14 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldtup) if (oldtup)
{ {
sz += sizeof(HeapTupleData); sz += sizeof(HeapTupleData);
oldlen = oldtup->t_len; oldlen = oldtup->tuple.t_len;
sz += oldlen; sz += oldlen;
} }
if (newtup) if (newtup)
{ {
sz += sizeof(HeapTupleData); sz += sizeof(HeapTupleData);
newlen = newtup->t_len; newlen = newtup->tuple.t_len;
sz += newlen; sz += newlen;
} }
@ -3787,19 +3790,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen) if (oldlen)
{ {
memcpy(data, oldtup, sizeof(HeapTupleData)); memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData); data += sizeof(HeapTupleData);
memcpy(data, oldtup->t_data, oldlen); memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen; data += oldlen;
} }
if (newlen) if (newlen)
{ {
memcpy(data, newtup, sizeof(HeapTupleData)); memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData); data += sizeof(HeapTupleData);
memcpy(data, newtup->t_data, newlen); memcpy(data, newtup->tuple.t_data, newlen);
data += newlen; data += newlen;
} }
break; break;
@ -4115,8 +4118,8 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_DELETE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{ {
HeapTuple oldtup, ReorderBufferTupleBuf *oldtup,
newtup; *newtup;
Size oldlen = 0; Size oldlen = 0;
Size newlen = 0; Size newlen = 0;
@ -4126,14 +4129,14 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
if (oldtup) if (oldtup)
{ {
sz += sizeof(HeapTupleData); sz += sizeof(HeapTupleData);
oldlen = oldtup->t_len; oldlen = oldtup->tuple.t_len;
sz += oldlen; sz += oldlen;
} }
if (newtup) if (newtup)
{ {
sz += sizeof(HeapTupleData); sz += sizeof(HeapTupleData);
newlen = newtup->t_len; newlen = newtup->tuple.t_len;
sz += newlen; sz += newlen;
} }
@ -4362,16 +4365,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */ /* restore ->tuple */
memcpy(change->data.tp.oldtuple, data, memcpy(&change->data.tp.oldtuple->tuple, data,
sizeof(HeapTupleData)); sizeof(HeapTupleData));
data += sizeof(HeapTupleData); data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */ /* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->t_data = change->data.tp.oldtuple->tuple.t_data =
(HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE); ReorderBufferTupleBufData(change->data.tp.oldtuple);
/* restore tuple data itself */ /* restore tuple data itself */
memcpy(change->data.tp.oldtuple->t_data, data, tuplelen); memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
data += tuplelen; data += tuplelen;
} }
@ -4387,16 +4390,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */ /* restore ->tuple */
memcpy(change->data.tp.newtuple, data, memcpy(&change->data.tp.newtuple->tuple, data,
sizeof(HeapTupleData)); sizeof(HeapTupleData));
data += sizeof(HeapTupleData); data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */ /* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->t_data = change->data.tp.newtuple->tuple.t_data =
(HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE); ReorderBufferTupleBufData(change->data.tp.newtuple);
/* restore tuple data itself */ /* restore tuple data itself */
memcpy(change->data.tp.newtuple->t_data, data, tuplelen); memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
data += tuplelen; data += tuplelen;
} }
@ -4643,7 +4646,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) Relation relation, ReorderBufferChange *change)
{ {
ReorderBufferToastEnt *ent; ReorderBufferToastEnt *ent;
HeapTuple newtup; ReorderBufferTupleBuf *newtup;
bool found; bool found;
int32 chunksize; int32 chunksize;
bool isnull; bool isnull;
@ -4658,9 +4661,9 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert(IsToastRelation(relation)); Assert(IsToastRelation(relation));
newtup = change->data.tp.newtuple; newtup = change->data.tp.newtuple;
chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull)); chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
Assert(!isnull); Assert(!isnull);
chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull)); chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
Assert(!isnull); Assert(!isnull);
ent = (ReorderBufferToastEnt *) ent = (ReorderBufferToastEnt *)
@ -4683,7 +4686,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d", elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
chunk_seq, chunk_id, ent->last_chunk_seq + 1); chunk_seq, chunk_id, ent->last_chunk_seq + 1);
chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull)); chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
Assert(!isnull); Assert(!isnull);
/* calculate size so we can allocate the right size at once later */ /* calculate size so we can allocate the right size at once later */
@ -4734,7 +4737,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation toast_rel; Relation toast_rel;
TupleDesc toast_desc; TupleDesc toast_desc;
MemoryContext oldcontext; MemoryContext oldcontext;
HeapTuple newtup; ReorderBufferTupleBuf *newtup;
Size old_size; Size old_size;
/* no toast tuples changed */ /* no toast tuples changed */
@ -4774,7 +4777,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple; newtup = change->data.tp.newtuple;
heap_deform_tuple(newtup, desc, attrs, isnull); heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
for (natt = 0; natt < desc->natts; natt++) for (natt = 0; natt < desc->natts; natt++)
{ {
@ -4839,12 +4842,12 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
{ {
bool cisnull; bool cisnull;
ReorderBufferChange *cchange; ReorderBufferChange *cchange;
HeapTuple ctup; ReorderBufferTupleBuf *ctup;
Pointer chunk; Pointer chunk;
cchange = dlist_container(ReorderBufferChange, node, it.cur); cchange = dlist_container(ReorderBufferChange, node, it.cur);
ctup = cchange->data.tp.newtuple; ctup = cchange->data.tp.newtuple;
chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull)); chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &cisnull));
Assert(!cisnull); Assert(!cisnull);
Assert(!VARATT_IS_EXTERNAL(chunk)); Assert(!VARATT_IS_EXTERNAL(chunk));
@ -4879,11 +4882,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
* the tuplebuf because attrs[] will point back into the current content. * the tuplebuf because attrs[] will point back into the current content.
*/ */
tmphtup = heap_form_tuple(desc, attrs, isnull); tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->t_len <= MaxHeapTupleSize); Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE)); Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->t_len = tmphtup->t_len; newtup->tuple.t_len = tmphtup->t_len;
/* /*
* free resources we won't further need, more persistent stuff will be * free resources we won't further need, more persistent stuff will be

View File

@ -1430,7 +1430,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/ */
walrcv_create_slot(LogRepWorkerWalRcvConn, walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ , slotname, false /* permanent */ , false /* two_phase */ ,
false,
CRS_USE_SNAPSHOT, origin_startpos); CRS_USE_SNAPSHOT, origin_startpos);
/* /*

View File

@ -1473,7 +1473,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.oldtuple) if (change->data.tp.oldtuple)
{ {
old_slot = relentry->old_slot; old_slot = relentry->old_slot;
ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false); ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
/* Convert tuple if needed. */ /* Convert tuple if needed. */
if (relentry->attrmap) if (relentry->attrmap)
@ -1488,7 +1488,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple) if (change->data.tp.newtuple)
{ {
new_slot = relentry->new_slot; new_slot = relentry->new_slot;
ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false); ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
/* Convert tuple if needed. */ /* Convert tuple if needed. */
if (relentry->attrmap) if (relentry->attrmap)

View File

@ -64,7 +64,6 @@ Node *replication_parse_result;
%token K_START_REPLICATION %token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT %token K_CREATE_REPLICATION_SLOT
%token K_DROP_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT
%token K_ALTER_REPLICATION_SLOT
%token K_TIMELINE_HISTORY %token K_TIMELINE_HISTORY
%token K_WAIT %token K_WAIT
%token K_TIMELINE %token K_TIMELINE
@ -81,9 +80,8 @@ Node *replication_parse_result;
%type <node> command %type <node> command
%type <node> base_backup start_replication start_logical_replication %type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot create_replication_slot drop_replication_slot identify_system
alter_replication_slot identify_system read_replication_slot read_replication_slot timeline_history show upload_manifest
timeline_history show upload_manifest
%type <list> generic_option_list %type <list> generic_option_list
%type <defelt> generic_option %type <defelt> generic_option
%type <uintval> opt_timeline %type <uintval> opt_timeline
@ -114,7 +112,6 @@ command:
| start_logical_replication | start_logical_replication
| create_replication_slot | create_replication_slot
| drop_replication_slot | drop_replication_slot
| alter_replication_slot
| read_replication_slot | read_replication_slot
| timeline_history | timeline_history
| show | show
@ -262,18 +259,6 @@ drop_replication_slot:
} }
; ;
/* ALTER_REPLICATION_SLOT slot (options) */
alter_replication_slot:
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
{
AlterReplicationSlotCmd *cmd;
cmd = makeNode(AlterReplicationSlotCmd);
cmd->slotname = $2;
cmd->options = $4;
$$ = (Node *) cmd;
}
;
/* /*
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/ */
@ -425,7 +410,6 @@ ident_or_keyword:
| K_START_REPLICATION { $$ = "start_replication"; } | K_START_REPLICATION { $$ = "start_replication"; }
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
| K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; }
| K_WAIT { $$ = "wait"; } | K_WAIT { $$ = "wait"; }
| K_TIMELINE { $$ = "timeline"; } | K_TIMELINE { $$ = "timeline"; }

View File

@ -125,7 +125,6 @@ TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; } START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; } PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; } RESERVE_WAL { return K_RESERVE_WAL; }
@ -303,7 +302,6 @@ replication_scanner_is_replication_command(void)
case K_START_REPLICATION: case K_START_REPLICATION:
case K_CREATE_REPLICATION_SLOT: case K_CREATE_REPLICATION_SLOT:
case K_DROP_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT:
case K_ALTER_REPLICATION_SLOT:
case K_READ_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT:
case K_TIMELINE_HISTORY: case K_TIMELINE_HISTORY:
case K_UPLOAD_MANIFEST: case K_UPLOAD_MANIFEST:

View File

@ -465,7 +465,10 @@ retry:
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/* Check if the slot exits with the given name. */ /*
* Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out.
*/
s = SearchNamedReplicationSlot(name, false); s = SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use) if (s == NULL || !s->in_use)
{ {
@ -680,31 +683,6 @@ ReplicationSlotDrop(const char *name, bool nowait)
ReplicationSlotDropAcquired(); ReplicationSlotDropAcquired();
} }
/*
* Change the definition of the slot identified by the specified name.
*/
void
ReplicationSlotAlter(const char *name, bool failover)
{
Assert(MyReplicationSlot == NULL);
ReplicationSlotAcquire(name, false);
if (SlotIsPhysical(MyReplicationSlot))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
}
/* /*
* Permanently drop the currently acquired replication slot. * Permanently drop the currently acquired replication slot.
*/ */

View File

@ -387,7 +387,7 @@ WalReceiverMain(void)
"pg_walreceiver_%lld", "pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn)); (long long int) walrcv_get_backend_pid(wrconn));
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN); strlcpy(walrcv->slotname, slotname, NAMEDATALEN);

View File

@ -1126,13 +1126,12 @@ static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal, bool *reserve_wal,
CRSSnapshotAction *snapshot_action, CRSSnapshotAction *snapshot_action,
bool *two_phase, bool *failover) bool *two_phase)
{ {
ListCell *lc; ListCell *lc;
bool snapshot_action_given = false; bool snapshot_action_given = false;
bool reserve_wal_given = false; bool reserve_wal_given = false;
bool two_phase_given = false; bool two_phase_given = false;
bool failover_given = false;
/* Parse options */ /* Parse options */
foreach(lc, cmd->options) foreach(lc, cmd->options)
@ -1182,15 +1181,6 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
two_phase_given = true; two_phase_given = true;
*two_phase = defGetBoolean(defel); *two_phase = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else else
elog(ERROR, "unrecognized option: %s", defel->defname); elog(ERROR, "unrecognized option: %s", defel->defname);
} }
@ -1207,7 +1197,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char *slot_name; char *slot_name;
bool reserve_wal = false; bool reserve_wal = false;
bool two_phase = false; bool two_phase = false;
bool failover = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest; DestReceiver *dest;
TupOutputState *tstate; TupOutputState *tstate;
@ -1217,8 +1206,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot); Assert(!MyReplicationSlot);
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
&failover);
if (cmd->kind == REPLICATION_KIND_PHYSICAL) if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{ {
@ -1255,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/ */
ReplicationSlotCreate(cmd->slotname, true, ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
two_phase, failover); two_phase, false);
/* /*
* Do options check early so that we can bail before calling the * Do options check early so that we can bail before calling the
@ -1410,43 +1398,6 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop(cmd->slotname, !cmd->wait); ReplicationSlotDrop(cmd->slotname, !cmd->wait);
} }
/*
* Process extra options given to ALTER_REPLICATION_SLOT.
*/
static void
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
{
bool failover_given = false;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
{
if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
}
/*
* Change the definition of a replication slot.
*/
static void
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover = false;
ParseAlterReplSlotOptions(cmd, &failover);
ReplicationSlotAlter(cmd->slotname, failover);
}
/* /*
* Load previously initiated logical slot and prepare for sending data (via * Load previously initiated logical slot and prepare for sending data (via
* WalSndLoop). * WalSndLoop).
@ -2020,13 +1971,6 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag); EndReplicationCommand(cmdtag);
break; break;
case T_AlterReplicationSlotCmd:
cmdtag = "ALTER_REPLICATION_SLOT";
set_ps_display(cmdtag);
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd: case T_StartReplicationCmd:
{ {
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;

View File

@ -72,18 +72,6 @@ typedef struct DropReplicationSlotCmd
} DropReplicationSlotCmd; } DropReplicationSlotCmd;
/* ----------------------
* ALTER_REPLICATION_SLOT command
* ----------------------
*/
typedef struct AlterReplicationSlotCmd
{
NodeTag type;
char *slotname;
List *options;
} AlterReplicationSlotCmd;
/* ---------------------- /* ----------------------
* START_REPLICATION command * START_REPLICATION command
* ---------------------- * ----------------------

View File

@ -28,6 +28,25 @@ typedef enum
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE,
} DebugLogicalRepStreamingMode; } DebugLogicalRepStreamingMode;
/* an individual tuple, stored in one chunk of memory */
typedef struct ReorderBufferTupleBuf
{
/* position in preallocated list */
slist_node node;
/* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple;
/* pre-allocated size of tuple buffer, different from tuple size */
Size alloc_tuple_size;
/* actual tuple data follows */
} ReorderBufferTupleBuf;
/* pointer to the data stored in a TupleBuf */
#define ReorderBufferTupleBufData(p) \
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
/* /*
* Types of the change passed to a 'change' callback. * Types of the change passed to a 'change' callback.
* *
@ -95,9 +114,9 @@ typedef struct ReorderBufferChange
bool clear_toast_afterwards; bool clear_toast_afterwards;
/* valid for DELETE || UPDATE */ /* valid for DELETE || UPDATE */
HeapTuple oldtuple; ReorderBufferTupleBuf *oldtuple;
/* valid for INSERT || UPDATE */ /* valid for INSERT || UPDATE */
HeapTuple newtuple; ReorderBufferTupleBuf *newtuple;
} tp; } tp;
/* /*
@ -659,10 +678,10 @@ struct ReorderBuffer
extern ReorderBuffer *ReorderBufferAllocate(void); extern ReorderBuffer *ReorderBufferAllocate(void);
extern void ReorderBufferFree(ReorderBuffer *rb); extern void ReorderBufferFree(ReorderBuffer *rb);
extern HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *rb,
Size tuple_len); Size tuple_len);
extern void ReorderBufferReturnTupleBuf(HeapTuple tuple); extern void ReorderBufferReturnTupleBuf(ReorderBuffer *rb,
ReorderBufferTupleBuf *tuple);
extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb); extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb);
extern void ReorderBufferReturnChange(ReorderBuffer *rb, extern void ReorderBufferReturnChange(ReorderBuffer *rb,
ReorderBufferChange *change, bool upd_mem); ReorderBufferChange *change, bool upd_mem);

View File

@ -227,7 +227,6 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
bool two_phase, bool failover); bool two_phase, bool failover);
extern void ReplicationSlotPersist(void); extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void); extern void ReplicationSlotRelease(void);

View File

@ -355,20 +355,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, const char *slotname,
bool temporary, bool temporary,
bool two_phase, bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn); XLogRecPtr *lsn);
/*
* walrcv_alter_slot_fn
*
* Change the definition of a replication slot. Currently, it only supports
* changing the failover property of the slot.
*/
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool failover);
/* /*
* walrcv_get_backend_pid_fn * walrcv_get_backend_pid_fn
* *
@ -410,7 +399,6 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive; walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send; walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot; walrcv_create_slot_fn walrcv_create_slot;
walrcv_alter_slot_fn walrcv_alter_slot;
walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_get_backend_pid_fn walrcv_get_backend_pid;
walrcv_exec_fn walrcv_exec; walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect; walrcv_disconnect_fn walrcv_disconnect;
@ -440,10 +428,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \ #define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_alter_slot(conn, slotname, failover) \
WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
#define walrcv_get_backend_pid(conn) \ #define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn) WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \ #define walrcv_exec(conn, exec, nRetTypes, retTypes) \

View File

@ -251,8 +251,7 @@ DROP INDEX tenant_idx;
DROP TABLE tenant_table; DROP TABLE tenant_table;
DROP VIEW tenant_view; DROP VIEW tenant_view;
DROP SCHEMA regress_tenant2_schema; DROP SCHEMA regress_tenant2_schema;
-- check for duplicated drop DROP ROLE regress_tenant;
DROP ROLE regress_tenant, regress_tenant;
DROP ROLE regress_tenant2; DROP ROLE regress_tenant2;
DROP ROLE regress_rolecreator; DROP ROLE regress_rolecreator;
DROP ROLE regress_role_admin; DROP ROLE regress_role_admin;

View File

@ -206,8 +206,7 @@ DROP INDEX tenant_idx;
DROP TABLE tenant_table; DROP TABLE tenant_table;
DROP VIEW tenant_view; DROP VIEW tenant_view;
DROP SCHEMA regress_tenant2_schema; DROP SCHEMA regress_tenant2_schema;
-- check for duplicated drop DROP ROLE regress_tenant;
DROP ROLE regress_tenant, regress_tenant;
DROP ROLE regress_tenant2; DROP ROLE regress_tenant2;
DROP ROLE regress_rolecreator; DROP ROLE regress_rolecreator;
DROP ROLE regress_role_admin; DROP ROLE regress_role_admin;

View File

@ -85,7 +85,6 @@ AlterOwnerStmt
AlterPolicyStmt AlterPolicyStmt
AlterPublicationAction AlterPublicationAction
AlterPublicationStmt AlterPublicationStmt
AlterReplicationSlotCmd
AlterRoleSetStmt AlterRoleSetStmt
AlterRoleStmt AlterRoleStmt
AlterSeqStmt AlterSeqStmt
@ -2352,6 +2351,7 @@ ReorderBufferStreamTruncateCB
ReorderBufferTXN ReorderBufferTXN
ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
ReorderBufferToastEnt ReorderBufferToastEnt
ReorderBufferTupleBuf
ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
ReorderBufferTupleCidKey ReorderBufferTupleCidKey
ReorderBufferUpdateProgressTxnCB ReorderBufferUpdateProgressTxnCB
@ -3880,7 +3880,6 @@ varattrib_1b_e
varattrib_4b varattrib_4b
vbits vbits
verifier_context verifier_context
walrcv_alter_slot_fn
walrcv_check_conninfo_fn walrcv_check_conninfo_fn
walrcv_connect_fn walrcv_connect_fn
walrcv_create_slot_fn walrcv_create_slot_fn