diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f3bfcca434c..97594cd9b18 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* @@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * origin and by this time the origin might be already * removed. For these reasons, passing missing_ok = true. */ - ReplicationOriginNameForTablesync(sub->oid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, char originname[NAMEDATALEN]; XLogRecPtr remote_lsn; - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, false); remote_lsn = replorigin_get_progress(originid, false); @@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(subid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) RemoveSubscriptionRel(subid, InvalidOid); /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b4a7b4b7f6e..94e813ac53c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ StartTransactionCommand(); - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); /* * Resetting the origin session removes the ownership of the slot. @@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* @@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } -/* - * Form the origin name for tablesync. - * - * Return the name in the supplied buffer. - */ -void -ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname) -{ - snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); -} - /* * Start syncing the table in the sync worker. * @@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); /* Assign the origin tracking record name. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 207a5805ba7..5250ae7f54c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -364,6 +364,30 @@ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +/* + * Form the origin name for the subscription. + * + * This is a common function for tablesync and other workers. Tablesync workers + * must pass a valid relid. Other callers must pass relid = InvalidOid. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname) +{ + if (OidIsValid(relid)) + { + /* Replication origin name for tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); + } + else + { + /* Replication origin name for non-tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u", suboid); + } +} + /* * Should this worker apply changes for given relation. * @@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg) * Allocate the origin name in long-lived context for error context * message. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, originname); } @@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg) /* Setup replication origin tracking. */ StartTransactionCommand(); - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f82bc518c32..2b7114ff6d9 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname); +extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void);