Compare commits

...

4 Commits

Author SHA1 Message Date
Amit Kapila
a9a47fb6d9 Fix comments in ReplicationSlotAcquire().
They were incorrectly referring to a slot parameter in
ReplicationSlotAcquire() which is not passed to the API.

Author: Wang Wei
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/OS3PR01MB6275E3CE4DC15FF8B8B80D3A9E7A2@OS3PR01MB6275.jpnprd01.prod.outlook.com
2024-01-29 10:12:58 +05:30
Amit Kapila
7329240437 Allow setting failover property in the replication command.
This commit implements a new replication command called
ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named
walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has
been extended to support the failover option.

These new additions allow the modification of the failover property of a
replication slot on the publisher. A subsequent commit will make use of
these commands in subscription commands and will add the tests as well to
cover the functionality added/changed by this commit.

Author: Hou Zhijie, Shveta Malik
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
2024-01-29 09:37:23 +05:30
Masahiko Sawada
08e6344fd6 Remove ReorderBufferTupleBuf structure.
Since commit a4ccc1cef, the 'node' and 'alloc_tuple_size' fields of
the ReorderBufferTupleBuf structure are no longer used. This leaves
only the 'tuple' field in the structure. Since keeping a single-field
structure makes little sense, the ReorderBufferTupleBuf is removed
entirely. The code is refactored accordingly.

No back-patching since these are ABI changes in an exposed structure
and functions, and there would be some risk of breaking extensions.

Author: Aleksander Alekseev
Reviewed-by: Amit Kapila, Masahiko Sawada, Reid Thompson
Discussion: https://postgr.es/m/CAD21AoCvnuxiXXfRecp7g9+CeC35POQfhuQeJFr7_9u_Q5jc_Q@mail.gmail.com
2024-01-29 10:37:16 +09:00
Michael Paquier
50b797dc99 Fix DROP ROLE when specifying duplicated roles
This commit fixes failures with "tuple already updated by self" when
listing twice the same role and in a DROP ROLE query.

This is an oversight in 6566133c5f52, that has introduced a two-phase
logic in DropRole() where dependencies of all the roles to drop are
removed in a first phase, with the roles themselves removed from
pg_authid in a second phase.

The code is simplified to not rely on a List of ObjectAddress built in
the first phase used to remove the pg_authid entries in the second
phase, switching to a list of OIDs.  Duplicated OIDs can be simply
avoided in the first phase thanks to that.  Using ObjectAddress was not
necessary for the roles as they are not used for anything specific to
dependency.c, building all the ObjectAddress in the List with
AuthIdRelationId as class ID.

In 15 and older versions, where a single phase is used, DROP ROLE with
duplicated role names would fail on "role \"blah\" does not exist" for
the second entry after the CCI() done by the first deletion.  This is
not really incorrect, but it does not seem worth changing based on a
lack of complaints.

Reported-by: Alexander Lakhin
Reviewed-by: Tender Wang
Discussion: https://postgr.es/m/18310-1eb233c5908189c8@postgresql.org
Backpatch-through: 16
2024-01-29 08:05:59 +09:00
21 changed files with 308 additions and 119 deletions

View File

@ -640,7 +640,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.newtuple->tuple,
change->data.tp.newtuple,
false);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@ -649,7 +649,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
appendStringInfoString(ctx->out, " old-key:");
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.oldtuple->tuple,
change->data.tp.oldtuple,
true);
appendStringInfoString(ctx->out, " new-tuple:");
}
@ -658,7 +658,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.newtuple->tuple,
change->data.tp.newtuple,
false);
break;
case REORDER_BUFFER_CHANGE_DELETE:
@ -670,7 +670,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* In DELETE, only the replica identity is present; display that */
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.oldtuple->tuple,
change->data.tp.oldtuple,
true);
break;
default:

View File

@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
The default is false.
</para>
</listitem>
</varlistentry>
</variablelist>
<para>
@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</listitem>
</varlistentry>
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
<para>
Change the definition of a replication slot.
See <xref linkend="streaming-replication-slots"/> for more about
replication slots. This command is currently only supported for logical
replication slots.
</para>
<variablelist>
<varlistentry>
<term><replaceable class="parameter">slot_name</replaceable></term>
<listitem>
<para>
The name of the slot to alter. Must be a valid replication slot
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
</para>
</listitem>
</varlistentry>
</variablelist>
<para>The following option is supported:</para>
<variablelist>
<varlistentry>
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>
<varlistentry id="protocol-replication-read-replication-slot">
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>

View File

@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
CRS_NOEXPORT_SNAPSHOT, NULL);
false, CRS_NOEXPORT_SNAPSHOT, NULL);
if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);

View File

@ -1093,7 +1093,7 @@ DropRole(DropRoleStmt *stmt)
Relation pg_authid_rel,
pg_auth_members_rel;
ListCell *item;
List *role_addresses = NIL;
List *role_oids = NIL;
if (!have_createrole_privilege())
ereport(ERROR,
@ -1119,7 +1119,6 @@ DropRole(DropRoleStmt *stmt)
ScanKeyData scankey;
SysScanDesc sscan;
Oid roleid;
ObjectAddress *role_address;
if (rolspec->roletype != ROLESPEC_CSTRING)
ereport(ERROR,
@ -1260,21 +1259,16 @@ DropRole(DropRoleStmt *stmt)
*/
CommandCounterIncrement();
/* Looks tentatively OK, add it to the list. */
role_address = palloc(sizeof(ObjectAddress));
role_address->classId = AuthIdRelationId;
role_address->objectId = roleid;
role_address->objectSubId = 0;
role_addresses = lappend(role_addresses, role_address);
/* Looks tentatively OK, add it to the list if not there yet. */
role_oids = list_append_unique_oid(role_oids, roleid);
}
/*
* Second pass over the roles to be removed.
*/
foreach(item, role_addresses)
foreach(item, role_oids)
{
ObjectAddress *role_address = lfirst(item);
Oid roleid = role_address->objectId;
Oid roleid = lfirst_oid(item);
HeapTuple tuple;
Form_pg_authid roleform;
char *detail;

View File

@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query,
@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn)
bool temporary, bool two_phase, bool failover,
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoChar(&cmd, ' ');
}
if (failover)
{
appendStringInfoString(&cmd, "FAILOVER");
if (use_new_options_syntax)
appendStringInfoString(&cmd, ", ");
else
appendStringInfoChar(&cmd, ' ');
}
if (use_new_options_syntax)
{
switch (snapshot_action)
@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
return snapshot;
}
/*
* Change the definition of the replication slot.
*/
static void
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover)
{
StringInfoData cmd;
PGresult *res;
initStringInfo(&cmd);
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
quote_identifier(slotname),
failover ? "true" : "false");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not alter replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);
}
/*
* Return PID of remote backend process.
*/

View File

@ -62,7 +62,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@ -1152,7 +1152,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferChange *change;
xl_multi_insert_tuple *xlhdr;
int datalen;
ReorderBufferTupleBuf *tuple;
HeapTuple tuple;
HeapTupleHeader header;
change = ReorderBufferGetChange(ctx->reorder);
@ -1169,21 +1169,21 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
header = tuple->tuple.t_data;
header = tuple->t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
ItemPointerSetInvalid(&tuple->t_self);
/*
* We can only figure this out after reassembling the transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
tuple->t_tableOid = InvalidOid;
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
tuple->t_len = datalen + SizeofHeapTupleHeader;
memset(header, 0, SizeofHeapTupleHeader);
memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
header->t_infomask = xlhdr->t_infomask;
@ -1253,7 +1253,7 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* computed outside as they are record specific.
*/
static void
DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
@ -1261,14 +1261,14 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
Assert(datalen >= 0);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
header = tuple->tuple.t_data;
tuple->t_len = datalen + SizeofHeapTupleHeader;
header = tuple->t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
ItemPointerSetInvalid(&tuple->t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
tuple->t_tableOid = InvalidOid;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
@ -1277,7 +1277,7 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
memset(header, 0, SizeofHeapTupleHeader);
memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);

View File

@ -498,13 +498,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.newtuple)
{
ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
change->data.tp.newtuple = NULL;
}
if (change->data.tp.oldtuple)
{
ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
change->data.tp.oldtuple = NULL;
}
break;
@ -547,32 +547,29 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
}
/*
* Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
* tuple_len (excluding header overhead).
* Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
* overhead).
*/
ReorderBufferTupleBuf *
HeapTuple
ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
ReorderBufferTupleBuf *tuple;
HeapTuple tuple;
Size alloc_len;
alloc_len = tuple_len + SizeofHeapTupleHeader;
tuple = (ReorderBufferTupleBuf *)
MemoryContextAlloc(rb->tup_context,
sizeof(ReorderBufferTupleBuf) +
MAXIMUM_ALIGNOF + alloc_len);
tuple->alloc_tuple_size = alloc_len;
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
HEAPTUPLESIZE + alloc_len);
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
return tuple;
}
/*
* Free a ReorderBufferTupleBuf.
* Free a HeapTuple returned by ReorderBufferGetTupleBuf().
*/
void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
ReorderBufferReturnTupleBuf(HeapTuple tuple)
{
pfree(tuple);
}
@ -3759,8 +3756,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
char *data;
ReorderBufferTupleBuf *oldtup,
*newtup;
HeapTuple oldtup,
newtup;
Size oldlen = 0;
Size newlen = 0;
@ -3770,14 +3767,14 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldtup)
{
sz += sizeof(HeapTupleData);
oldlen = oldtup->tuple.t_len;
oldlen = oldtup->t_len;
sz += oldlen;
}
if (newtup)
{
sz += sizeof(HeapTupleData);
newlen = newtup->tuple.t_len;
newlen = newtup->t_len;
sz += newlen;
}
@ -3790,19 +3787,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen)
{
memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
memcpy(data, oldtup, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, oldtup->tuple.t_data, oldlen);
memcpy(data, oldtup->t_data, oldlen);
data += oldlen;
}
if (newlen)
{
memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
memcpy(data, newtup, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, newtup->tuple.t_data, newlen);
memcpy(data, newtup->t_data, newlen);
data += newlen;
}
break;
@ -4118,8 +4115,8 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
case REORDER_BUFFER_CHANGE_DELETE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
ReorderBufferTupleBuf *oldtup,
*newtup;
HeapTuple oldtup,
newtup;
Size oldlen = 0;
Size newlen = 0;
@ -4129,14 +4126,14 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
if (oldtup)
{
sz += sizeof(HeapTupleData);
oldlen = oldtup->tuple.t_len;
oldlen = oldtup->t_len;
sz += oldlen;
}
if (newtup)
{
sz += sizeof(HeapTupleData);
newlen = newtup->tuple.t_len;
newlen = newtup->t_len;
sz += newlen;
}
@ -4365,16 +4362,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */
memcpy(&change->data.tp.oldtuple->tuple, data,
memcpy(change->data.tp.oldtuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data =
ReorderBufferTupleBufData(change->data.tp.oldtuple);
change->data.tp.oldtuple->t_data =
(HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
/* restore tuple data itself */
memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
data += tuplelen;
}
@ -4390,16 +4387,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */
memcpy(&change->data.tp.newtuple->tuple, data,
memcpy(change->data.tp.newtuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data =
ReorderBufferTupleBufData(change->data.tp.newtuple);
change->data.tp.newtuple->t_data =
(HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
/* restore tuple data itself */
memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
data += tuplelen;
}
@ -4646,7 +4643,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
ReorderBufferToastEnt *ent;
ReorderBufferTupleBuf *newtup;
HeapTuple newtup;
bool found;
int32 chunksize;
bool isnull;
@ -4661,9 +4658,9 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert(IsToastRelation(relation));
newtup = change->data.tp.newtuple;
chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
Assert(!isnull);
chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
Assert(!isnull);
ent = (ReorderBufferToastEnt *)
@ -4686,7 +4683,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
chunk_seq, chunk_id, ent->last_chunk_seq + 1);
chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
Assert(!isnull);
/* calculate size so we can allocate the right size at once later */
@ -4737,7 +4734,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation toast_rel;
TupleDesc toast_desc;
MemoryContext oldcontext;
ReorderBufferTupleBuf *newtup;
HeapTuple newtup;
Size old_size;
/* no toast tuples changed */
@ -4777,7 +4774,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple;
heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
heap_deform_tuple(newtup, desc, attrs, isnull);
for (natt = 0; natt < desc->natts; natt++)
{
@ -4842,12 +4839,12 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
bool cisnull;
ReorderBufferChange *cchange;
ReorderBufferTupleBuf *ctup;
HeapTuple ctup;
Pointer chunk;
cchange = dlist_container(ReorderBufferChange, node, it.cur);
ctup = cchange->data.tp.newtuple;
chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &cisnull));
chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
Assert(!cisnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
@ -4882,11 +4879,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
* the tuplebuf because attrs[] will point back into the current content.
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
Assert(newtup->t_len <= MaxHeapTupleSize);
Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len;
memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
newtup->t_len = tmphtup->t_len;
/*
* free resources we won't further need, more persistent stuff will be

View File

@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ ,
false,
CRS_USE_SNAPSHOT, origin_startpos);
/*

View File

@ -1473,7 +1473,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.oldtuple)
{
old_slot = relentry->old_slot;
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)
@ -1488,7 +1488,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple)
{
new_slot = relentry->new_slot;
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)

View File

@ -64,6 +64,7 @@ Node *replication_parse_result;
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
%token K_DROP_REPLICATION_SLOT
%token K_ALTER_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_WAIT
%token K_TIMELINE
@ -80,8 +81,9 @@ Node *replication_parse_result;
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
read_replication_slot timeline_history show upload_manifest
create_replication_slot drop_replication_slot
alter_replication_slot identify_system read_replication_slot
timeline_history show upload_manifest
%type <list> generic_option_list
%type <defelt> generic_option
%type <uintval> opt_timeline
@ -112,6 +114,7 @@ command:
| start_logical_replication
| create_replication_slot
| drop_replication_slot
| alter_replication_slot
| read_replication_slot
| timeline_history
| show
@ -259,6 +262,18 @@ drop_replication_slot:
}
;
/* ALTER_REPLICATION_SLOT slot (options) */
alter_replication_slot:
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
{
AlterReplicationSlotCmd *cmd;
cmd = makeNode(AlterReplicationSlotCmd);
cmd->slotname = $2;
cmd->options = $4;
$$ = (Node *) cmd;
}
;
/*
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
@ -410,6 +425,7 @@ ident_or_keyword:
| K_START_REPLICATION { $$ = "start_replication"; }
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
| K_WAIT { $$ = "wait"; }
| K_TIMELINE { $$ = "timeline"; }

View File

@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
case K_START_REPLICATION:
case K_CREATE_REPLICATION_SLOT:
case K_DROP_REPLICATION_SLOT:
case K_ALTER_REPLICATION_SLOT:
case K_READ_REPLICATION_SLOT:
case K_TIMELINE_HISTORY:
case K_UPLOAD_MANIFEST:

View File

@ -465,10 +465,7 @@ retry:
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/*
* Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out.
*/
/* Check if the slot exits with the given name. */
s = SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use)
{
@ -683,6 +680,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
ReplicationSlotDropAcquired();
}
/*
* Change the definition of the slot identified by the specified name.
*/
void
ReplicationSlotAlter(const char *name, bool failover)
{
Assert(MyReplicationSlot == NULL);
ReplicationSlotAcquire(name, false);
if (SlotIsPhysical(MyReplicationSlot))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
}
/*
* Permanently drop the currently acquired replication slot.
*/

View File

@ -387,7 +387,7 @@ WalReceiverMain(void)
"pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn));
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);

View File

@ -1126,12 +1126,13 @@ static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
CRSSnapshotAction *snapshot_action,
bool *two_phase)
bool *two_phase, bool *failover)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
bool two_phase_given = false;
bool failover_given = false;
/* Parse options */
foreach(lc, cmd->options)
@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
two_phase_given = true;
*two_phase = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char *slot_name;
bool reserve_wal = false;
bool two_phase = false;
bool failover = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
&failover);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
two_phase, false);
two_phase, failover);
/*
* Do options check early so that we can bail before calling the
@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
}
/*
* Process extra options given to ALTER_REPLICATION_SLOT.
*/
static void
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
{
bool failover_given = false;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
{
if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
}
/*
* Change the definition of a replication slot.
*/
static void
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover = false;
ParseAlterReplSlotOptions(cmd, &failover);
ReplicationSlotAlter(cmd->slotname, failover);
}
/*
* Load previously initiated logical slot and prepare for sending data (via
* WalSndLoop).
@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
case T_AlterReplicationSlotCmd:
cmdtag = "ALTER_REPLICATION_SLOT";
set_ps_display(cmdtag);
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;

View File

@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd
} DropReplicationSlotCmd;
/* ----------------------
* ALTER_REPLICATION_SLOT command
* ----------------------
*/
typedef struct AlterReplicationSlotCmd
{
NodeTag type;
char *slotname;
List *options;
} AlterReplicationSlotCmd;
/* ----------------------
* START_REPLICATION command
* ----------------------

View File

@ -28,25 +28,6 @@ typedef enum
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE,
} DebugLogicalRepStreamingMode;
/* an individual tuple, stored in one chunk of memory */
typedef struct ReorderBufferTupleBuf
{
/* position in preallocated list */
slist_node node;
/* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple;
/* pre-allocated size of tuple buffer, different from tuple size */
Size alloc_tuple_size;
/* actual tuple data follows */
} ReorderBufferTupleBuf;
/* pointer to the data stored in a TupleBuf */
#define ReorderBufferTupleBufData(p) \
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
/*
* Types of the change passed to a 'change' callback.
*
@ -114,9 +95,9 @@ typedef struct ReorderBufferChange
bool clear_toast_afterwards;
/* valid for DELETE || UPDATE */
ReorderBufferTupleBuf *oldtuple;
HeapTuple oldtuple;
/* valid for INSERT || UPDATE */
ReorderBufferTupleBuf *newtuple;
HeapTuple newtuple;
} tp;
/*
@ -678,10 +659,10 @@ struct ReorderBuffer
extern ReorderBuffer *ReorderBufferAllocate(void);
extern void ReorderBufferFree(ReorderBuffer *rb);
extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *rb,
Size tuple_len);
extern void ReorderBufferReturnTupleBuf(ReorderBuffer *rb,
ReorderBufferTupleBuf *tuple);
extern HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb,
Size tuple_len);
extern void ReorderBufferReturnTupleBuf(HeapTuple tuple);
extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb);
extern void ReorderBufferReturnChange(ReorderBuffer *rb,
ReorderBufferChange *change, bool upd_mem);

View File

@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
bool two_phase, bool failover);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);

View File

@ -355,9 +355,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
/*
* walrcv_alter_slot_fn
*
* Change the definition of a replication slot. Currently, it only supports
* changing the failover property of the slot.
*/
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool failover);
/*
* walrcv_get_backend_pid_fn
*
@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
walrcv_alter_slot_fn walrcv_alter_slot;
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_alter_slot(conn, slotname, failover) \
WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \

View File

@ -251,7 +251,8 @@ DROP INDEX tenant_idx;
DROP TABLE tenant_table;
DROP VIEW tenant_view;
DROP SCHEMA regress_tenant2_schema;
DROP ROLE regress_tenant;
-- check for duplicated drop
DROP ROLE regress_tenant, regress_tenant;
DROP ROLE regress_tenant2;
DROP ROLE regress_rolecreator;
DROP ROLE regress_role_admin;

View File

@ -206,7 +206,8 @@ DROP INDEX tenant_idx;
DROP TABLE tenant_table;
DROP VIEW tenant_view;
DROP SCHEMA regress_tenant2_schema;
DROP ROLE regress_tenant;
-- check for duplicated drop
DROP ROLE regress_tenant, regress_tenant;
DROP ROLE regress_tenant2;
DROP ROLE regress_rolecreator;
DROP ROLE regress_role_admin;

View File

@ -85,6 +85,7 @@ AlterOwnerStmt
AlterPolicyStmt
AlterPublicationAction
AlterPublicationStmt
AlterReplicationSlotCmd
AlterRoleSetStmt
AlterRoleStmt
AlterSeqStmt
@ -2351,7 +2352,6 @@ ReorderBufferStreamTruncateCB
ReorderBufferTXN
ReorderBufferTXNByIdEnt
ReorderBufferToastEnt
ReorderBufferTupleBuf
ReorderBufferTupleCidEnt
ReorderBufferTupleCidKey
ReorderBufferUpdateProgressTxnCB
@ -3880,6 +3880,7 @@ varattrib_1b_e
varattrib_4b
vbits
verifier_context
walrcv_alter_slot_fn
walrcv_check_conninfo_fn
walrcv_connect_fn
walrcv_create_slot_fn