mirror of
https://github.com/postgres/postgres.git
synced 2025-12-17 00:03:29 -05:00
Test failure on buildfarm member prion: The test failed due to an unexpected LOCATION: line appearing between the WARNING and ERROR messages. This occurred because the prion machine uses log_error_verbosity = verbose, which includes additional context in error messages. The test was originally checking for both WARNING and ERROR messages in sequence sync, but the extra LOCATION: line disrupted this pattern. To make the test robust across different verbosity settings, it now only checks for the presence of the WARNING message after the test, which is sufficient to validate the intended behavior. Failure to sync sequences with quoted names: The previous implementation did not correctly quote sequence names when querying remote information, leading to failures when quoted sequence names were used. This fix ensures that sequence names are properly quoted during remote queries, allowing sequences with quoted identifiers to be synced correctly. Author: Vignesh C <vignesh21@gmail.com> Author: Shinya Kato <shinya11.kato@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/CALDaNm0WcdSCoNPiE-5ek4J2dMJ5o111GPTzKCYj9G5i=ONYtQ@mail.gmail.com Discussion: https://postgr.es/m/CAOzEurQOSN=Zcp9uVnatNbAy=2WgMTJn_DYszYjv0KUeQX_e_A@mail.gmail.com
753 lines
22 KiB
C
753 lines
22 KiB
C
/*-------------------------------------------------------------------------
|
|
* sequencesync.c
|
|
* PostgreSQL logical replication: sequence synchronization
|
|
*
|
|
* Copyright (c) 2025, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/logical/sequencesync.c
|
|
*
|
|
* NOTES
|
|
* This file contains code for sequence synchronization for
|
|
* logical replication.
|
|
*
|
|
* Sequences requiring synchronization are tracked in the pg_subscription_rel
|
|
* catalog.
|
|
*
|
|
* Sequences to be synchronized will be added with state INIT when either of
|
|
* the following commands is executed:
|
|
* CREATE SUBSCRIPTION
|
|
* ALTER SUBSCRIPTION ... REFRESH PUBLICATION
|
|
*
|
|
* Executing the following command resets all sequences in the subscription to
|
|
* state INIT, triggering re-synchronization:
|
|
* ALTER SUBSCRIPTION ... REFRESH SEQUENCES
|
|
*
|
|
* The apply worker periodically scans pg_subscription_rel for sequences in
|
|
* INIT state. When such sequences are found, it spawns a sequencesync worker
|
|
* to handle synchronization.
|
|
*
|
|
* A single sequencesync worker is responsible for synchronizing all sequences.
|
|
* It begins by retrieving the list of sequences that are flagged for
|
|
* synchronization, i.e., those in the INIT state. These sequences are then
|
|
* processed in batches, allowing multiple entries to be synchronized within a
|
|
* single transaction. The worker fetches the current sequence values and page
|
|
* LSNs from the remote publisher, updates the corresponding sequences on the
|
|
* local subscriber, and finally marks each sequence as READY upon successful
|
|
* synchronization.
|
|
*
|
|
* Sequence state transitions follow this pattern:
|
|
* INIT -> READY
|
|
*
|
|
* To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
|
|
* sequences are synchronized per transaction. The locks on the sequence
|
|
* relation will be periodically released at each transaction commit.
|
|
*
|
|
* XXX: We didn't choose launcher process to maintain the launch of sequencesync
|
|
* worker as it didn't have database connection to access the sequences from the
|
|
* pg_subscription_rel system catalog that need to be synchronized.
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/table.h"
|
|
#include "catalog/pg_sequence.h"
|
|
#include "catalog/pg_subscription_rel.h"
|
|
#include "commands/sequence.h"
|
|
#include "pgstat.h"
|
|
#include "postmaster/interrupt.h"
|
|
#include "replication/logicalworker.h"
|
|
#include "replication/worker_internal.h"
|
|
#include "utils/acl.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/fmgroids.h"
|
|
#include "utils/guc.h"
|
|
#include "utils/inval.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/pg_lsn.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/usercontext.h"
|
|
|
|
#define REMOTE_SEQ_COL_COUNT 10
|
|
|
|
typedef enum CopySeqResult
|
|
{
|
|
COPYSEQ_SUCCESS,
|
|
COPYSEQ_MISMATCH,
|
|
COPYSEQ_INSUFFICIENT_PERM,
|
|
COPYSEQ_SKIPPED
|
|
} CopySeqResult;
|
|
|
|
static List *seqinfos = NIL;
|
|
|
|
/*
|
|
* Apply worker determines if sequence synchronization is needed.
|
|
*
|
|
* Start a sequencesync worker if one is not already running. The active
|
|
* sequencesync worker will handle all pending sequence synchronization. If any
|
|
* sequences remain unsynchronized after it exits, a new worker can be started
|
|
* in the next iteration.
|
|
*/
|
|
void
|
|
ProcessSequencesForSync(void)
|
|
{
|
|
LogicalRepWorker *sequencesync_worker;
|
|
int nsyncworkers;
|
|
bool has_pending_sequences;
|
|
bool started_tx;
|
|
|
|
FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
|
|
|
|
if (started_tx)
|
|
{
|
|
CommitTransactionCommand();
|
|
pgstat_report_stat(true);
|
|
}
|
|
|
|
if (!has_pending_sequences)
|
|
return;
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
/* Check if there is a sequencesync worker already running? */
|
|
sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
|
|
MyLogicalRepWorker->subid,
|
|
InvalidOid, true);
|
|
if (sequencesync_worker)
|
|
{
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Count running sync workers for this subscription, while we have the
|
|
* lock.
|
|
*/
|
|
nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
/*
|
|
* It is okay to read/update last_seqsync_start_time here in apply worker
|
|
* as we have already ensured that sync worker doesn't exist.
|
|
*/
|
|
launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
|
|
&MyLogicalRepWorker->last_seqsync_start_time);
|
|
}
|
|
|
|
/*
|
|
* get_sequences_string
|
|
*
|
|
* Build a comma-separated string of schema-qualified sequence names
|
|
* for the given list of sequence indexes.
|
|
*/
|
|
static void
|
|
get_sequences_string(List *seqindexes, StringInfo buf)
|
|
{
|
|
resetStringInfo(buf);
|
|
foreach_int(seqidx, seqindexes)
|
|
{
|
|
LogicalRepSequenceInfo *seqinfo =
|
|
(LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
|
|
|
|
if (buf->len > 0)
|
|
appendStringInfoString(buf, ", ");
|
|
|
|
appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* report_sequence_errors
|
|
*
|
|
* Report discrepancies found during sequence synchronization between
|
|
* the publisher and subscriber. Emits warnings for:
|
|
* a) mismatched definitions or concurrent rename
|
|
* b) insufficient privileges
|
|
* c) missing sequences on the subscriber
|
|
* Then raises an ERROR to indicate synchronization failure.
|
|
*/
|
|
static void
|
|
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
|
|
List *missing_seqs_idx)
|
|
{
|
|
StringInfo seqstr;
|
|
|
|
/* Quick exit if there are no errors to report */
|
|
if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
|
|
return;
|
|
|
|
seqstr = makeStringInfo();
|
|
|
|
if (mismatched_seqs_idx)
|
|
{
|
|
get_sequences_string(mismatched_seqs_idx, seqstr);
|
|
ereport(WARNING,
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
|
|
"mismatched or renamed sequences on subscriber (%s)",
|
|
list_length(mismatched_seqs_idx),
|
|
seqstr->data));
|
|
}
|
|
|
|
if (insuffperm_seqs_idx)
|
|
{
|
|
get_sequences_string(insuffperm_seqs_idx, seqstr);
|
|
ereport(WARNING,
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg_plural("insufficient privileges on sequence (%s)",
|
|
"insufficient privileges on sequences (%s)",
|
|
list_length(insuffperm_seqs_idx),
|
|
seqstr->data));
|
|
}
|
|
|
|
if (missing_seqs_idx)
|
|
{
|
|
get_sequences_string(missing_seqs_idx, seqstr);
|
|
ereport(WARNING,
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg_plural("missing sequence on publisher (%s)",
|
|
"missing sequences on publisher (%s)",
|
|
list_length(missing_seqs_idx),
|
|
seqstr->data));
|
|
}
|
|
|
|
ereport(ERROR,
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
|
|
MySubscription->name));
|
|
}
|
|
|
|
/*
|
|
* get_and_validate_seq_info
|
|
*
|
|
* Extracts remote sequence information from the tuple slot received from the
|
|
* publisher, and validates it against the corresponding local sequence
|
|
* definition.
|
|
*/
|
|
static CopySeqResult
|
|
get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
|
|
LogicalRepSequenceInfo **seqinfo, int *seqidx)
|
|
{
|
|
bool isnull;
|
|
int col = 0;
|
|
Oid remote_typid;
|
|
int64 remote_start;
|
|
int64 remote_increment;
|
|
int64 remote_min;
|
|
int64 remote_max;
|
|
bool remote_cycle;
|
|
CopySeqResult result = COPYSEQ_SUCCESS;
|
|
HeapTuple tup;
|
|
Form_pg_sequence local_seq;
|
|
LogicalRepSequenceInfo *seqinfo_local;
|
|
|
|
*seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
/* Identify the corresponding local sequence for the given index. */
|
|
*seqinfo = seqinfo_local =
|
|
(LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
|
|
|
|
seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
|
|
Assert(!isnull);
|
|
|
|
/* Sanity check */
|
|
Assert(col == REMOTE_SEQ_COL_COUNT);
|
|
|
|
seqinfo_local->found_on_pub = true;
|
|
|
|
*sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
|
|
|
|
/* Sequence was concurrently dropped? */
|
|
if (!*sequence_rel)
|
|
return COPYSEQ_SKIPPED;
|
|
|
|
tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
|
|
|
|
/* Sequence was concurrently dropped? */
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for sequence %u",
|
|
seqinfo_local->localrelid);
|
|
|
|
local_seq = (Form_pg_sequence) GETSTRUCT(tup);
|
|
|
|
/* Sequence parameters for remote/local are the same? */
|
|
if (local_seq->seqtypid != remote_typid ||
|
|
local_seq->seqstart != remote_start ||
|
|
local_seq->seqincrement != remote_increment ||
|
|
local_seq->seqmin != remote_min ||
|
|
local_seq->seqmax != remote_max ||
|
|
local_seq->seqcycle != remote_cycle)
|
|
result = COPYSEQ_MISMATCH;
|
|
|
|
/* Sequence was concurrently renamed? */
|
|
if (strcmp(seqinfo_local->nspname,
|
|
get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
|
|
strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
|
|
result = COPYSEQ_MISMATCH;
|
|
|
|
ReleaseSysCache(tup);
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Apply remote sequence state to local sequence and mark it as
|
|
* synchronized (READY).
|
|
*/
|
|
static CopySeqResult
|
|
copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
|
|
{
|
|
UserContext ucxt;
|
|
AclResult aclresult;
|
|
bool run_as_owner = MySubscription->runasowner;
|
|
Oid seqoid = seqinfo->localrelid;
|
|
|
|
/*
|
|
* If the user did not opt to run as the owner of the subscription
|
|
* ('run_as_owner'), then copy the sequence as the owner of the sequence.
|
|
*/
|
|
if (!run_as_owner)
|
|
SwitchToUntrustedUser(seqowner, &ucxt);
|
|
|
|
aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
|
|
|
|
if (aclresult != ACLCHECK_OK)
|
|
{
|
|
if (!run_as_owner)
|
|
RestoreUserContext(&ucxt);
|
|
|
|
return COPYSEQ_INSUFFICIENT_PERM;
|
|
}
|
|
|
|
/*
|
|
* The log counter (log_cnt) tracks how many sequence values are still
|
|
* unused locally. It is only relevant to the local node and managed
|
|
* internally by nextval() when allocating new ranges. Since log_cnt does
|
|
* not affect the visible sequence state (like last_value or is_called)
|
|
* and is only used for local caching, it need not be copied to the
|
|
* subscriber during synchronization.
|
|
*/
|
|
SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
|
|
|
|
if (!run_as_owner)
|
|
RestoreUserContext(&ucxt);
|
|
|
|
/*
|
|
* Record the remote sequence's LSN in pg_subscription_rel and mark the
|
|
* sequence as READY.
|
|
*/
|
|
UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
|
|
seqinfo->page_lsn, false);
|
|
|
|
return COPYSEQ_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* Copy existing data of sequences from the publisher.
|
|
*/
|
|
static void
|
|
copy_sequences(WalReceiverConn *conn)
|
|
{
|
|
int cur_batch_base_index = 0;
|
|
int n_seqinfos = list_length(seqinfos);
|
|
List *mismatched_seqs_idx = NIL;
|
|
List *missing_seqs_idx = NIL;
|
|
List *insuffperm_seqs_idx = NIL;
|
|
StringInfo seqstr = makeStringInfo();
|
|
StringInfo cmd = makeStringInfo();
|
|
MemoryContext oldctx;
|
|
|
|
#define MAX_SEQUENCES_SYNC_PER_BATCH 100
|
|
|
|
elog(DEBUG1,
|
|
"logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
|
|
MySubscription->name, n_seqinfos);
|
|
|
|
while (cur_batch_base_index < n_seqinfos)
|
|
{
|
|
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
|
|
BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
|
|
int batch_size = 0;
|
|
int batch_succeeded_count = 0;
|
|
int batch_mismatched_count = 0;
|
|
int batch_skipped_count = 0;
|
|
int batch_insuffperm_count = 0;
|
|
int batch_missing_count;
|
|
Relation sequence_rel;
|
|
|
|
WalRcvExecResult *res;
|
|
TupleTableSlot *slot;
|
|
|
|
StartTransactionCommand();
|
|
|
|
for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
|
|
{
|
|
char *nspname_literal;
|
|
char *seqname_literal;
|
|
|
|
LogicalRepSequenceInfo *seqinfo =
|
|
(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
|
|
|
|
if (seqstr->len > 0)
|
|
appendStringInfoString(seqstr, ", ");
|
|
|
|
nspname_literal = quote_literal_cstr(seqinfo->nspname);
|
|
seqname_literal = quote_literal_cstr(seqinfo->seqname);
|
|
|
|
appendStringInfo(seqstr, "(%s, %s, %d)",
|
|
nspname_literal, seqname_literal, idx);
|
|
|
|
if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* We deliberately avoid acquiring a local lock on the sequence before
|
|
* querying the publisher to prevent potential distributed deadlocks
|
|
* in bi-directional replication setups.
|
|
*
|
|
* Example scenario:
|
|
*
|
|
* - On each node, a background worker acquires a lock on a sequence
|
|
* as part of a sync operation.
|
|
*
|
|
* - Concurrently, a user transaction attempts to alter the same
|
|
* sequence, waiting on the background worker's lock.
|
|
*
|
|
* - Meanwhile, a query from the other node tries to access metadata
|
|
* that depends on the completion of the alter operation.
|
|
*
|
|
* - This creates a circular wait across nodes:
|
|
*
|
|
* Node-1: Query -> waits on Alter -> waits on Sync Worker
|
|
*
|
|
* Node-2: Query -> waits on Alter -> waits on Sync Worker
|
|
*
|
|
* Since each node only sees part of the wait graph, the deadlock may
|
|
* go undetected, leading to indefinite blocking.
|
|
*
|
|
* Note: Each entry in VALUES includes an index 'seqidx' that
|
|
* represents the sequence's position in the local 'seqinfos' list.
|
|
* This index is propagated to the query results and later used to
|
|
* directly map the fetched publisher sequence rows back to their
|
|
* corresponding local entries without relying on result order or name
|
|
* matching.
|
|
*/
|
|
appendStringInfo(cmd,
|
|
"SELECT s.seqidx, ps.*, seq.seqtypid,\n"
|
|
" seq.seqstart, seq.seqincrement, seq.seqmin,\n"
|
|
" seq.seqmax, seq.seqcycle\n"
|
|
"FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
|
|
"JOIN pg_namespace n ON n.nspname = s.schname\n"
|
|
"JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
|
|
"JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
|
|
"JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
|
|
seqstr->data);
|
|
|
|
res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
|
|
if (res->status != WALRCV_OK_TUPLES)
|
|
ereport(ERROR,
|
|
errcode(ERRCODE_CONNECTION_FAILURE),
|
|
errmsg("could not fetch sequence information from the publisher: %s",
|
|
res->err));
|
|
|
|
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
|
|
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
|
{
|
|
CopySeqResult sync_status;
|
|
LogicalRepSequenceInfo *seqinfo;
|
|
int seqidx;
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
if (ConfigReloadPending)
|
|
{
|
|
ConfigReloadPending = false;
|
|
ProcessConfigFile(PGC_SIGHUP);
|
|
}
|
|
|
|
sync_status = get_and_validate_seq_info(slot, &sequence_rel,
|
|
&seqinfo, &seqidx);
|
|
if (sync_status == COPYSEQ_SUCCESS)
|
|
sync_status = copy_sequence(seqinfo,
|
|
sequence_rel->rd_rel->relowner);
|
|
|
|
switch (sync_status)
|
|
{
|
|
case COPYSEQ_SUCCESS:
|
|
elog(DEBUG1,
|
|
"logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
|
|
MySubscription->name, seqinfo->nspname,
|
|
seqinfo->seqname);
|
|
batch_succeeded_count++;
|
|
break;
|
|
case COPYSEQ_MISMATCH:
|
|
|
|
/*
|
|
* Remember mismatched sequences in a long-lived memory
|
|
* context since these will be used after the transaction
|
|
* is committed.
|
|
*/
|
|
oldctx = MemoryContextSwitchTo(ApplyContext);
|
|
mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
|
|
seqidx);
|
|
MemoryContextSwitchTo(oldctx);
|
|
batch_mismatched_count++;
|
|
break;
|
|
case COPYSEQ_INSUFFICIENT_PERM:
|
|
|
|
/*
|
|
* Remember sequences with insufficient privileges in a
|
|
* long-lived memory context since these will be used
|
|
* after the transaction is committed.
|
|
*/
|
|
oldctx = MemoryContextSwitchTo(ApplyContext);
|
|
insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
|
|
seqidx);
|
|
MemoryContextSwitchTo(oldctx);
|
|
batch_insuffperm_count++;
|
|
break;
|
|
case COPYSEQ_SKIPPED:
|
|
ereport(LOG,
|
|
errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
|
|
seqinfo->nspname,
|
|
seqinfo->seqname));
|
|
batch_skipped_count++;
|
|
break;
|
|
}
|
|
|
|
if (sequence_rel)
|
|
table_close(sequence_rel, NoLock);
|
|
}
|
|
|
|
ExecDropSingleTupleTableSlot(slot);
|
|
walrcv_clear_result(res);
|
|
resetStringInfo(seqstr);
|
|
resetStringInfo(cmd);
|
|
|
|
batch_missing_count = batch_size - (batch_succeeded_count +
|
|
batch_mismatched_count +
|
|
batch_insuffperm_count +
|
|
batch_skipped_count);
|
|
|
|
elog(DEBUG1,
|
|
"logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
|
|
MySubscription->name,
|
|
(cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
|
|
batch_size, batch_succeeded_count, batch_mismatched_count,
|
|
batch_insuffperm_count, batch_missing_count, batch_skipped_count);
|
|
|
|
/* Commit this batch, and prepare for next batch */
|
|
CommitTransactionCommand();
|
|
|
|
if (batch_missing_count)
|
|
{
|
|
for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
|
|
{
|
|
LogicalRepSequenceInfo *seqinfo =
|
|
(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
|
|
|
|
/* If the sequence was not found on publisher, record it */
|
|
if (!seqinfo->found_on_pub)
|
|
missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* cur_batch_base_index is not incremented sequentially because some
|
|
* sequences may be missing, and the number of fetched rows may not
|
|
* match the batch size.
|
|
*/
|
|
cur_batch_base_index += batch_size;
|
|
}
|
|
|
|
/* Report mismatches, permission issues, or missing sequences */
|
|
report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
|
|
missing_seqs_idx);
|
|
}
|
|
|
|
/*
|
|
* Identifies sequences that require synchronization and initiates the
|
|
* synchronization process.
|
|
*/
|
|
static void
|
|
LogicalRepSyncSequences(void)
|
|
{
|
|
char *err;
|
|
bool must_use_password;
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
ScanKeyData skey[2];
|
|
SysScanDesc scan;
|
|
Oid subid = MyLogicalRepWorker->subid;
|
|
StringInfoData app_name;
|
|
|
|
StartTransactionCommand();
|
|
|
|
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&skey[0],
|
|
Anum_pg_subscription_rel_srsubid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(subid));
|
|
|
|
ScanKeyInit(&skey[1],
|
|
Anum_pg_subscription_rel_srsubstate,
|
|
BTEqualStrategyNumber, F_CHAREQ,
|
|
CharGetDatum(SUBREL_STATE_INIT));
|
|
|
|
scan = systable_beginscan(rel, InvalidOid, false,
|
|
NULL, 2, skey);
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
{
|
|
Form_pg_subscription_rel subrel;
|
|
LogicalRepSequenceInfo *seq;
|
|
Relation sequence_rel;
|
|
MemoryContext oldctx;
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
|
|
|
|
sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
|
|
|
|
/* Skip if sequence was dropped concurrently */
|
|
if (!sequence_rel)
|
|
continue;
|
|
|
|
/* Skip if the relation is not a sequence */
|
|
if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
|
|
{
|
|
table_close(sequence_rel, NoLock);
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* Worker needs to process sequences across transaction boundary, so
|
|
* allocate them under long-lived context.
|
|
*/
|
|
oldctx = MemoryContextSwitchTo(ApplyContext);
|
|
|
|
seq = palloc0_object(LogicalRepSequenceInfo);
|
|
seq->localrelid = subrel->srrelid;
|
|
seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
|
|
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
|
|
seqinfos = lappend(seqinfos, seq);
|
|
|
|
MemoryContextSwitchTo(oldctx);
|
|
|
|
table_close(sequence_rel, NoLock);
|
|
}
|
|
|
|
/* Cleanup */
|
|
systable_endscan(scan);
|
|
table_close(rel, AccessShareLock);
|
|
|
|
CommitTransactionCommand();
|
|
|
|
/*
|
|
* Exit early if no catalog entries found, likely due to concurrent drops.
|
|
*/
|
|
if (!seqinfos)
|
|
return;
|
|
|
|
/* Is the use of a password mandatory? */
|
|
must_use_password = MySubscription->passwordrequired &&
|
|
!MySubscription->ownersuperuser;
|
|
|
|
initStringInfo(&app_name);
|
|
appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
|
|
MySubscription->oid, GetSystemIdentifier());
|
|
|
|
/*
|
|
* Establish the connection to the publisher for sequence synchronization.
|
|
*/
|
|
LogRepWorkerWalRcvConn =
|
|
walrcv_connect(MySubscription->conninfo, true, true,
|
|
must_use_password,
|
|
app_name.data, &err);
|
|
if (LogRepWorkerWalRcvConn == NULL)
|
|
ereport(ERROR,
|
|
errcode(ERRCODE_CONNECTION_FAILURE),
|
|
errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
|
|
MySubscription->name, err));
|
|
|
|
pfree(app_name.data);
|
|
|
|
copy_sequences(LogRepWorkerWalRcvConn);
|
|
}
|
|
|
|
/*
|
|
* Execute the initial sync with error handling. Disable the subscription,
|
|
* if required.
|
|
*
|
|
* Note that we don't handle FATAL errors which are probably because of system
|
|
* resource error and are not repeatable.
|
|
*/
|
|
static void
|
|
start_sequence_sync()
|
|
{
|
|
Assert(am_sequencesync_worker());
|
|
|
|
PG_TRY();
|
|
{
|
|
/* Call initial sync. */
|
|
LogicalRepSyncSequences();
|
|
}
|
|
PG_CATCH();
|
|
{
|
|
if (MySubscription->disableonerr)
|
|
DisableSubscriptionAndExit();
|
|
else
|
|
{
|
|
/*
|
|
* Report the worker failed during sequence synchronization. Abort
|
|
* the current transaction so that the stats message is sent in an
|
|
* idle state.
|
|
*/
|
|
AbortOutOfAnyTransaction();
|
|
PG_RE_THROW();
|
|
}
|
|
}
|
|
PG_END_TRY();
|
|
}
|
|
|
|
/* Logical Replication sequencesync worker entry point */
|
|
void
|
|
SequenceSyncWorkerMain(Datum main_arg)
|
|
{
|
|
int worker_slot = DatumGetInt32(main_arg);
|
|
|
|
SetupApplyOrSyncWorker(worker_slot);
|
|
|
|
start_sequence_sync();
|
|
|
|
FinishSyncWorker();
|
|
}
|