diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 608193b3070..46e8a0b7467 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -383,6 +383,79 @@ make prefix=/usr/local/pgsql.new install
+
+ Prepare for publisher upgrades
+
+
+ pg_upgrade attempts to migrate logical
+ slots. This helps avoid the need for manually defining the same
+ logical slots on the new publisher. Migration of logical slots is
+ only supported when the old cluster is version 17.0 or later.
+ Logical slots on clusters before version 17.0 will silently be
+ ignored.
+
+
+
+ Before you start upgrading the publisher cluster, ensure that the
+ subscription is temporarily disabled, by executing
+ ALTER SUBSCRIPTION ... DISABLE.
+ Re-enable the subscription after the upgrade.
+
+
+
+ There are some prerequisites for pg_upgrade to
+ be able to upgrade the logical slots. If these are not met an error
+ will be reported.
+
+
+
+
+
+ The new cluster must have
+ wal_level as
+ logical.
+
+
+
+
+ The new cluster must have
+ max_replication_slots
+ configured to a value greater than or equal to the number of slots
+ present in the old cluster.
+
+
+
+
+ The output plugins referenced by the slots on the old cluster must be
+ installed in the new PostgreSQL executable directory.
+
+
+
+
+ The old cluster has replicated all the transactions and logical decoding
+ messages to subscribers.
+
+
+
+
+ All slots on the old cluster must be usable, i.e., there are no slots
+ whose
+ pg_replication_slots.conflicting
+ is true.
+
+
+
+
+ The new cluster must not have permanent logical slots, i.e.,
+ there must be no slots where
+ pg_replication_slots.temporary
+ is false.
+
+
+
+
+
+
Stop both servers
@@ -650,8 +723,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
Configure the servers for log shipping. (You do not need to run
pg_backup_start() and pg_backup_stop()
or take a file system backup as the standbys are still synchronized
- with the primary.) Replication slots are not copied and must
- be recreated.
+ with the primary.) Only logical slots on the primary are copied to the
+ new standby, but other slots on the old standby are not copied so must
+ be recreated manually.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 24b712aa667..1237118e84f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ /* If we don't have snapshot, there is no point in decoding messages */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
message = (xl_logical_message *) XLogRecGetData(r);
@@ -622,6 +618,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;
+ /*
+ * We also skip decoding in fast_forward mode. This check must be last
+ * because we don't want to set the processing_required flag unless we
+ * have a decodable message.
+ */
+ if (ctx->fast_forward)
+ {
+ /*
+ * We need to set processing_required flag to notify the message's
+ * existence to the caller. Usually, the flag is set when either the
+ * COMMIT or ABORT records are decoded, but this must be turned on
+ * here because the non-transactional logical message is decoded
+ * without waiting for these records.
+ */
+ if (!message->transactional)
+ ctx->processing_required = true;
+
+ return;
+ }
+
/*
* If this is a non-transactional change, get the snapshot we're expected
* to use. We only get here when the snapshot is consistent, and the
@@ -1286,7 +1302,21 @@ static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
- return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
+ return true;
+
+ /*
+ * We also skip decoding in fast_forward mode. In passing set the
+ * processing_required flag to indicate that if it were not for
+ * fast_forward mode, processing would have been required.
+ */
+ if (ctx->fast_forward)
+ {
+ ctx->processing_required = true;
+ return true;
+ }
+
+ return false;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187a..8288da5277f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlogutils.h"
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
/* data for errcontext callback */
@@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->totalTxns = 0;
rb->totalBytes = 0;
}
+
+/*
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
+ */
+bool
+LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
+{
+ bool has_pending_wal = false;
+
+ Assert(MyReplicationSlot);
+
+ PG_TRY();
+ {
+ LogicalDecodingContext *ctx;
+
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from the slot's
+ * confirmed_flush.
+ */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true, /* fast_forward */
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
+
+ /*
+ * Start reading at the slot's restart_lsn, which we know points to a
+ * valid record.
+ */
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+ /* Invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Loop until the end of WAL or some changes are processed */
+ while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
+ {
+ XLogRecord *record;
+ char *errm = NULL;
+
+ record = XLogReadRecord(ctx->reader, &errm);
+
+ if (errm)
+ elog(ERROR, "could not find record for logical decoding: %s", errm);
+
+ if (record != NULL)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ has_pending_wal = ctx->processing_required;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Clean up */
+ FreeDecodingContext(ctx);
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return has_pending_wal;
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 7e5ec500d89..99823df3c7d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1423,6 +1423,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
SpinLockRelease(&s->mutex);
+ /*
+ * The logical replication slots shouldn't be invalidated as
+ * max_slot_wal_keep_size GUC is set to -1 during the upgrade.
+ *
+ * The following is just a sanity check.
+ */
+ if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slots must not be invalidated during the upgrade"),
+ errhint("\"max_slot_wal_keep_size\" must be set to -1 during the upgrade"));
+ }
+
if (active_pid != 0)
{
/*
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f8..2f6fc86c3df 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -17,6 +17,7 @@
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
+#include "replication/logical.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -261,3 +262,46 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+/*
+ * Verify the given slot has already consumed all the WAL changes.
+ *
+ * Returns true if there are no decodable WAL records after the
+ * confirmed_flush_lsn. Otherwise false.
+ *
+ * This is a special purpose function to ensure that the given slot can be
+ * upgraded without data loss.
+ */
+Datum
+binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
+{
+ Name slot_name;
+ XLogRecPtr end_of_wal;
+ bool found_pending_wal;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /* We must check before dereferencing the argument */
+ if (PG_ARGISNULL(0))
+ elog(ERROR, "null argument to binary_upgrade_validate_wal_records is not allowed");
+
+ CheckSlotPermissions();
+
+ slot_name = PG_GETARG_NAME(0);
+
+ /* Acquire the given slot */
+ ReplicationSlotAcquire(NameStr(*slot_name), true);
+
+ Assert(SlotIsLogical(MyReplicationSlot));
+
+ /* Slots must be valid as otherwise we won't be able to scan the WAL */
+ Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+
+ end_of_wal = GetFlushRecPtr(NULL);
+ found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);
+
+ /* Clean up */
+ ReplicationSlotRelease();
+
+ PG_RETURN_BOOL(!found_pending_wal);
+}
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add4..05e92996544 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
PGAPPICON = win32
+# required for 003_upgrade_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
subdir = src/bin/pg_upgrade
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 21a0ff9e42d..179f85ae8a8 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -33,6 +33,8 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
static void check_for_pg_role_prefix(ClusterInfo *cluster);
static void check_for_new_tablespace_dir(void);
static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
/*
@@ -89,8 +91,11 @@ check_and_dump_old_cluster(bool live_check)
if (!live_check)
start_postmaster(&old_cluster, true);
- /* Extract a list of databases and tables from the old cluster */
- get_db_and_rel_infos(&old_cluster);
+ /*
+ * Extract a list of databases, tables, and logical replication slots from
+ * the old cluster.
+ */
+ get_db_rel_and_slot_infos(&old_cluster, live_check);
init_tablespaces();
@@ -107,6 +112,13 @@ check_and_dump_old_cluster(bool live_check)
check_for_reg_data_type_usage(&old_cluster);
check_for_isn_and_int8_passing_mismatch(&old_cluster);
+ /*
+ * Logical replication slots can be migrated since PG17. See comments atop
+ * get_old_cluster_logical_slot_infos().
+ */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+ check_old_cluster_for_valid_slots(live_check);
+
/*
* PG 16 increased the size of the 'aclitem' type, which breaks the
* on-disk format for existing data.
@@ -200,7 +212,7 @@ check_and_dump_old_cluster(bool live_check)
void
check_new_cluster(void)
{
- get_db_and_rel_infos(&new_cluster);
+ get_db_rel_and_slot_infos(&new_cluster, false);
check_new_cluster_is_empty();
@@ -223,6 +235,8 @@ check_new_cluster(void)
check_for_prepared_transactions(&new_cluster);
check_for_new_tablespace_dir();
+
+ check_new_cluster_logical_replication_slots();
}
@@ -1451,3 +1465,151 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
else
check_ok();
}
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Verify that there are no logical replication slots on the new cluster and
+ * that the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+ PGresult *res;
+ PGconn *conn;
+ int nslots_on_old;
+ int nslots_on_new;
+ int max_replication_slots;
+ char *wal_level;
+
+ /* Logical slots can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+ return;
+
+ nslots_on_old = count_old_cluster_logical_slots();
+
+ /* Quick return if there are no logical slots to be migrated. */
+ if (nslots_on_old == 0)
+ return;
+
+ conn = connectToServer(&new_cluster, "template1");
+
+ prep_status("Checking for new cluster logical replication slots");
+
+ res = executeQueryOrDie(conn, "SELECT count(*) "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "temporary IS FALSE;");
+
+ if (PQntuples(res) != 1)
+ pg_fatal("could not count the number of logical replication slots");
+
+ nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+
+ if (nslots_on_new)
+ pg_fatal("Expected 0 logical replication slots but found %d.",
+ nslots_on_new);
+
+ PQclear(res);
+
+ res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings "
+ "WHERE name IN ('wal_level', 'max_replication_slots') "
+ "ORDER BY name DESC;");
+
+ if (PQntuples(res) != 2)
+ pg_fatal("could not determine parameter settings on new cluster");
+
+ wal_level = PQgetvalue(res, 0, 0);
+
+ if (strcmp(wal_level, "logical") != 0)
+ pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+ wal_level);
+
+ max_replication_slots = atoi(PQgetvalue(res, 1, 0));
+
+ if (nslots_on_old > max_replication_slots)
+ pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+ "logical replication slots (%d) on the old cluster",
+ max_replication_slots, nslots_on_old);
+
+ PQclear(res);
+ PQfinish(conn);
+
+ check_ok();
+}
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Verify that all the logical slots are valid and have consumed all the WAL
+ * before shutdown.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+ char output_path[MAXPGPATH];
+ FILE *script = NULL;
+
+ prep_status("Checking for valid logical replication slots");
+
+ snprintf(output_path, sizeof(output_path), "%s/%s",
+ log_opts.basedir,
+ "invalid_logical_replication_slots.txt");
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+ /* Is the slot usable? */
+ if (slot->invalid)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+
+ fprintf(script, "The slot \"%s\" is invalid\n",
+ slot->slotname);
+
+ continue;
+ }
+
+ /*
+ * Do additional check to ensure that all logical replication
+ * slots have consumed all the WAL before shutdown.
+ *
+ * Note: This can be satisfied only when the old cluster has been
+ * shut down, so we skip this for live checks.
+ */
+ if (!live_check && !slot->caught_up)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+
+ fprintf(script,
+ "The slot \"%s\" has not consumed the WAL yet\n",
+ slot->slotname);
+ }
+ }
+ }
+
+ if (script)
+ {
+ fclose(script);
+
+ pg_log(PG_REPORT, "fatal");
+ pg_fatal("Your installation contains logical replication slots that can't be upgraded.\n"
+ "You can remove invalid slots and/or consume the pending WAL for other slots,\n"
+ "and then restart the upgrade.\n"
+ "A list of the problematic slots is in the file:\n"
+ " %s", output_path);
+ }
+
+ check_ok();
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cde..5af936bd458 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,9 @@ library_name_compare(const void *p1, const void *p2)
/*
* get_loadable_libraries()
*
- * Fetch the names of all old libraries containing C-language functions.
+ * Fetch the names of all old libraries containing either C-language functions
+ * or are corresponding to logical replication output plugins.
+ *
* We will later check that they all exist in the new installation.
*/
void
@@ -55,6 +57,7 @@ get_loadable_libraries(void)
PGresult **ress;
int totaltups;
int dbnum;
+ int n_libinfos;
ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
totaltups = 0;
@@ -81,7 +84,12 @@ get_loadable_libraries(void)
PQfinish(conn);
}
- os_info.libraries = (LibraryInfo *) pg_malloc(totaltups * sizeof(LibraryInfo));
+ /*
+ * Allocate memory for required libraries and logical replication output
+ * plugins.
+ */
+ n_libinfos = totaltups + count_old_cluster_logical_slots();
+ os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
totaltups = 0;
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
@@ -89,6 +97,7 @@ get_loadable_libraries(void)
PGresult *res = ress[dbnum];
int ntups;
int rowno;
+ LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
ntups = PQntuples(res);
for (rowno = 0; rowno < ntups; rowno++)
@@ -101,6 +110,23 @@ get_loadable_libraries(void)
totaltups++;
}
PQclear(res);
+
+ /*
+ * Store the names of output plugins as well. There is a possibility
+ * that duplicated plugins are set, but the consumer function
+ * check_loadable_libraries() will avoid checking the same library, so
+ * we do not have to consider their uniqueness here.
+ */
+ for (int slotno = 0; slotno < slot_arr->nslots; slotno++)
+ {
+ if (slot_arr->slots[slotno].invalid)
+ continue;
+
+ os_info.libraries[totaltups].name = pg_strdup(slot_arr->slots[slotno].plugin);
+ os_info.libraries[totaltups].dbnum = dbnum;
+
+ totaltups++;
+ }
}
pg_free(ress);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d61..7f21d26fd23 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,8 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
static void free_rel_infos(RelInfoArr *rel_arr);
static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
/*
@@ -266,13 +268,15 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
}
/*
- * get_db_and_rel_infos()
+ * get_db_rel_and_slot_infos()
*
* higher level routine to generate dbinfos for the database running
* on the given "port". Assumes that server is already running.
+ *
+ * live_check would be used only when the target is the old cluster.
*/
void
-get_db_and_rel_infos(ClusterInfo *cluster)
+get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
{
int dbnum;
@@ -283,7 +287,17 @@ get_db_and_rel_infos(ClusterInfo *cluster)
get_db_infos(cluster);
for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- get_rel_infos(cluster, &cluster->dbarr.dbs[dbnum]);
+ {
+ DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+ get_rel_infos(cluster, pDbInfo);
+
+ /*
+ * Retrieve the logical replication slots infos for the old cluster.
+ */
+ if (cluster == &old_cluster)
+ get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+ }
if (cluster == &old_cluster)
pg_log(PG_VERBOSE, "\nsource databases:");
@@ -600,6 +614,125 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
dbinfo->rel_arr.nrels = num_rels;
}
+/*
+ * get_old_cluster_logical_slot_infos()
+ *
+ * Gets the LogicalSlotInfos for all the logical replication slots of the
+ * database referred to by "dbinfo". The status of each logical slot is gotten
+ * here, but they are used at the checking phase. See
+ * check_old_cluster_for_valid_slots().
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not saved at shutdown, so
+ * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
+ * which can lead to data loss. It is still not guaranteed for manually created
+ * slots in PG17, so subsequent checks done in
+ * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
+ * are included.
+ */
+static void
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+{
+ PGconn *conn;
+ PGresult *res;
+ LogicalSlotInfo *slotinfos = NULL;
+ int num_slots = 0;
+
+ /* Logical slots can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+ {
+ dbinfo->slot_arr.slots = slotinfos;
+ dbinfo->slot_arr.nslots = num_slots;
+ return;
+ }
+
+ conn = connectToServer(&old_cluster, dbinfo->db_name);
+
+ /*
+ * Fetch the logical replication slot information. The check whether the
+ * slot is considered caught up is done by an upgrade function. This
+ * regards the slot as caught up if we don't find any decodable changes.
+ * See binary_upgrade_logical_slot_has_caught_up().
+ *
+ * Note that we can't ensure whether the slot is caught up during
+ * live_check as the new WAL records could be generated.
+ *
+ * We intentionally skip checking the WALs for invalidated slots as the
+ * corresponding WALs could have been removed for such slots.
+ *
+ * The temporary slots are explicitly ignored while checking because such
+ * slots cannot exist after the upgrade. During the upgrade, clusters are
+ * started and stopped several times causing any temporary slots to be
+ * removed.
+ */
+ res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
+ "%s as caught_up, conflicting as invalid "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "database = current_database() AND "
+ "temporary IS FALSE;",
+ live_check ? "FALSE" :
+ "(CASE WHEN conflicting THEN FALSE "
+ "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+ "END)");
+
+ num_slots = PQntuples(res);
+
+ if (num_slots)
+ {
+ int i_slotname;
+ int i_plugin;
+ int i_twophase;
+ int i_caught_up;
+ int i_invalid;
+
+ slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+ i_slotname = PQfnumber(res, "slot_name");
+ i_plugin = PQfnumber(res, "plugin");
+ i_twophase = PQfnumber(res, "two_phase");
+ i_caught_up = PQfnumber(res, "caught_up");
+ i_invalid = PQfnumber(res, "invalid");
+
+ for (int slotnum = 0; slotnum < num_slots; slotnum++)
+ {
+ LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+ curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+ curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+ curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+ curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
+ curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+ }
+ }
+
+ PQclear(res);
+ PQfinish(conn);
+
+ dbinfo->slot_arr.slots = slotinfos;
+ dbinfo->slot_arr.nslots = num_slots;
+}
+
+
+/*
+ * count_old_cluster_logical_slots()
+ *
+ * Returns the number of logical replication slots for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather slot information only for cluster versions greater than or
+ * equal to PG17. See get_old_cluster_logical_slot_infos().
+ */
+int
+count_old_cluster_logical_slots(void)
+{
+ int slot_count = 0;
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ slot_count += old_cluster.dbarr.dbs[dbnum].slot_arr.nslots;
+
+ return slot_count;
+}
static void
free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -642,8 +775,11 @@ print_db_infos(DbInfoArr *db_arr)
for (dbnum = 0; dbnum < db_arr->ndbs; dbnum++)
{
- pg_log(PG_VERBOSE, "Database: \"%s\"", db_arr->dbs[dbnum].db_name);
- print_rel_infos(&db_arr->dbs[dbnum].rel_arr);
+ DbInfo *pDbInfo = &db_arr->dbs[dbnum];
+
+ pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+ print_rel_infos(&pDbInfo->rel_arr);
+ print_slot_infos(&pDbInfo->slot_arr);
}
}
@@ -660,3 +796,23 @@ print_rel_infos(RelInfoArr *rel_arr)
rel_arr->rels[relnum].reloid,
rel_arr->rels[relnum].tablespace);
}
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+ /* Quick return if there are no logical slots. */
+ if (slot_arr->nslots == 0)
+ return;
+
+ pg_log(PG_VERBOSE, "Logical replication slots within the database:");
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+ pg_log(PG_VERBOSE, "slot_name: \"%s\", plugin: \"%s\", two_phase: %s",
+ slot_info->slotname,
+ slot_info->plugin,
+ slot_info->two_phase ? "true" : "false");
+ }
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e29..2c4f38d865b 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
'tests': [
't/001_basic.pl',
't/002_pg_upgrade.pl',
+ 't/003_upgrade_logical_replication_slots.pl',
],
'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
},
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 96bfb67167f..3960af40368 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
static void set_frozenxids(bool minmxid_only);
static void make_outputdirs(char *pgdata);
static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
ClusterInfo old_cluster,
new_cluster;
@@ -188,6 +189,21 @@ main(int argc, char **argv)
new_cluster.pgdata);
check_ok();
+ /*
+ * Migrate the logical slots to the new cluster. Note that we need to do
+ * this after resetting WAL because otherwise the required WAL would be
+ * removed and slots would become unusable. There is a possibility that
+ * background processes might generate some WAL before we could create the
+ * slots in the new cluster but we can ignore that WAL as that won't be
+ * required downstream.
+ */
+ if (count_old_cluster_logical_slots())
+ {
+ start_postmaster(&new_cluster, true);
+ create_logical_replication_slots();
+ stop_postmaster(false);
+ }
+
if (user_opts.do_sync)
{
prep_status("Sync data directory to disk");
@@ -593,7 +609,7 @@ create_new_objects(void)
set_frozenxids(true);
/* update new_cluster info now that we have objects in the databases */
- get_db_and_rel_infos(&new_cluster);
+ get_db_rel_and_slot_infos(&new_cluster, false);
}
/*
@@ -862,3 +878,59 @@ set_frozenxids(bool minmxid_only)
check_ok();
}
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+ prep_status_progress("Restoring logical replication slots in the new cluster");
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
+ LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+ PGconn *conn;
+ PQExpBuffer query;
+
+ /* Skip this database if there are no slots */
+ if (slot_arr->nslots == 0)
+ continue;
+
+ conn = connectToServer(&new_cluster, old_db->db_name);
+ query = createPQExpBuffer();
+
+ pg_log(PG_STATUS, "%s", old_db->db_name);
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+ /* Constructs a query for creating logical replication slots */
+ appendPQExpBuffer(query,
+ "SELECT * FROM "
+ "pg_catalog.pg_create_logical_replication_slot(");
+ appendStringLiteralConn(query, slot_info->slotname, conn);
+ appendPQExpBuffer(query, ", ");
+ appendStringLiteralConn(query, slot_info->plugin, conn);
+ appendPQExpBuffer(query, ", false, %s);",
+ slot_info->two_phase ? "true" : "false");
+
+ PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+ resetPQExpBuffer(query);
+ }
+
+ PQfinish(conn);
+
+ destroyPQExpBuffer(query);
+ }
+
+ end_progress_output();
+ check_ok();
+
+ return;
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 842f3b6cd37..ba8129d1354 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -150,6 +150,24 @@ typedef struct
int nrels;
} RelInfoArr;
+/*
+ * Structure to store logical replication slot information.
+ */
+typedef struct
+{
+ char *slotname; /* slot name */
+ char *plugin; /* plugin */
+ bool two_phase; /* can the slot decode 2PC? */
+ bool caught_up; /* has the slot caught up to latest changes? */
+ bool invalid; /* if true, the slot is unusable */
+} LogicalSlotInfo;
+
+typedef struct
+{
+ int nslots; /* number of logical slot infos */
+ LogicalSlotInfo *slots; /* array of logical slot infos */
+} LogicalSlotInfoArr;
+
/*
* The following structure represents a relation mapping.
*/
@@ -176,6 +194,7 @@ typedef struct
char db_tablespace[MAXPGPATH]; /* database default tablespace
* path */
RelInfoArr rel_arr; /* array of all user relinfos */
+ LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */
} DbInfo;
/*
@@ -400,7 +419,8 @@ void check_loadable_libraries(void);
FileNameMap *gen_db_file_maps(DbInfo *old_db,
DbInfo *new_db, int *nmaps, const char *old_pgdata,
const char *new_pgdata);
-void get_db_and_rel_infos(ClusterInfo *cluster);
+void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
+int count_old_cluster_logical_slots(void);
/* option.c */
diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index 0bc3d2806b8..d7f6c268ef4 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -201,6 +201,7 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
PGconn *conn;
bool pg_ctl_return = false;
char socket_string[MAXPGPATH + 200];
+ PQExpBufferData pgoptions;
static bool exit_hook_registered = false;
@@ -227,23 +228,41 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
cluster->sockdir);
#endif
+ initPQExpBuffer(&pgoptions);
+
/*
- * Use -b to disable autovacuum.
+ * Construct a parameter string which is passed to the server process.
*
* Turn off durability requirements to improve object creation speed, and
* we only modify the new cluster, so only use it there. If there is a
* crash, the new cluster has to be recreated anyway. fsync=off is a big
* win on ext4.
*/
+ if (cluster == &new_cluster)
+ appendPQExpBufferStr(&pgoptions, " -c synchronous_commit=off -c fsync=off -c full_page_writes=off");
+
+ /*
+ * Use max_slot_wal_keep_size as -1 to prevent the WAL removal by the
+ * checkpointer process. If WALs required by logical replication slots
+ * are removed, the slots are unusable. This setting prevents the
+ * invalidation of slots during the upgrade. We set this option when
+ * cluster is PG17 or later because logical replication slots can only be
+ * migrated since then. Besides, max_slot_wal_keep_size is added in PG13.
+ */
+ if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+ appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
+
+ /* Use -b to disable autovacuum. */
snprintf(cmd, sizeof(cmd),
"\"%s/pg_ctl\" -w -l \"%s/%s\" -D \"%s\" -o \"-p %d -b%s %s%s\" start",
cluster->bindir,
log_opts.logdir,
SERVER_LOG_FILE, cluster->pgconfig, cluster->port,
- (cluster == &new_cluster) ?
- " -c synchronous_commit=off -c fsync=off -c full_page_writes=off" : "",
+ pgoptions.data,
cluster->pgopts ? cluster->pgopts : "", socket_string);
+ termPQExpBuffer(&pgoptions);
+
/*
* Don't throw an error right away, let connecting throw the error because
* it might supply a reason for the failure.
diff --git a/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
new file mode 100644
index 00000000000..5e416f553da
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
@@ -0,0 +1,192 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading logical replication slots
+
+use strict;
+use warnings;
+
+use File::Find qw(find);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+# Setup a pg_upgrade command. This will be used anywhere.
+my @pg_upgrade_cmd = (
+ 'pg_upgrade', '--no-sync',
+ '-d', $old_publisher->data_dir,
+ '-D', $new_publisher->data_dir,
+ '-b', $old_publisher->config_data('--bindir'),
+ '-B', $new_publisher->config_data('--bindir'),
+ '-s', $new_publisher->host,
+ '-p', $old_publisher->port,
+ '-P', $new_publisher->port,
+ $mode);
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the new cluster has wrong GUC values
+
+# Preparations for the subsequent test:
+# 1. Create two slots on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding');
+]);
+$old_publisher->stop();
+
+# 2. Set 'max_replication_slots' to be less than the number of slots (2)
+# present on the old cluster.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots
+command_checks_all(
+ [@pg_upgrade_cmd],
+ 1,
+ [
+ qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+ ],
+ [qr//],
+ 'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+ "pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Set 'max_replication_slots' to match the number of slots (2) present on the
+# old cluster. Both slots will be used for subsequent tests.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 2");
+
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+# Preparations for the subsequent test:
+# 1. Generate extra WAL records. At this point neither test_slot1 nor
+# test_slot2 has consumed them.
+#
+# 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1
+# still has unconsumed WAL records.
+#
+# 3. Emit a non-transactional message. This will cause test_slot2 to detect the
+# unconsumed WAL record.
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+ SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn());
+ SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');
+]);
+$old_publisher->stop;
+
+# pg_upgrade will fail because there are slots still having unconsumed WAL
+# records
+command_checks_all(
+ [@pg_upgrade_cmd],
+ 1,
+ [
+ qr/Your installation contains logical replication slots that can't be upgraded./
+ ],
+ [qr//],
+ 'run of pg_upgrade of old cluster with slots having unconsumed WAL records'
+);
+
+# Verify the reason why the logical replication slot cannot be upgraded
+my $slots_filename;
+
+# Find a txt file that contains a list of logical replication slots that cannot
+# be upgraded. We cannot predict the file's path because the output directory
+# contains a milliseconds timestamp. File::Find::find must be used.
+find(
+ sub {
+ if ($File::Find::name =~ m/invalid_logical_replication_slots\.txt/)
+ {
+ $slots_filename = $File::Find::name;
+ }
+ },
+ $new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Check the file content. Both slots should be reporting that they have
+# unconsumed WAL records.
+like(
+ slurp_file($slots_filename),
+ qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+ 'the previous test failed due to unconsumed WALs');
+like(
+ slurp_file($slots_filename),
+ qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
+ 'the previous test failed due to unconsumed WALs');
+
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication (first, cleanup slots from the previous tests)
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ SELECT * FROM pg_drop_replication_slot('test_slot1');
+ SELECT * FROM pg_drop_replication_slot('test_slot2');
+ CREATE PUBLICATION regress_pub FOR ALL TABLES;
+]);
+
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init();
+
+$subscriber->start;
+$subscriber->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tbl (a int);
+ CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub DISABLE");
+$old_publisher->stop;
+
+# pg_upgrade should be successful
+command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
+
+# Check that the slot 'regress_sub' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+ "SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+ 'postgres', qq[
+ ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
+ ALTER SUBSCRIPTION regress_sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+ "INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('regress_sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
+
+done_testing();
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 2f46fdc7391..f9e1c0e5351 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202310181
+#define CATALOG_VERSION_NO 202310261
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index c92d0631a01..06435e8b925 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11379,6 +11379,11 @@
proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+ proname => 'binary_upgrade_logical_slot_has_caught_up', proisstrict => 'f',
+ provolatile => 'v', proparallel => 'u', prorettype => 'bool',
+ proargtypes => 'name',
+ prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
# conversion functions
{ oid => '4302',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea05..dffc0d15648 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -109,6 +109,9 @@ typedef struct LogicalDecodingContext
TransactionId write_xid;
/* Are we processing the end LSN of a transaction? */
bool end_xact;
+
+ /* Do we need to process any change in fast_forward mode? */
+ bool processing_required;
} LogicalDecodingContext;
@@ -145,4 +148,6 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 085d0d7e548..87c1aee379f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1504,6 +1504,8 @@ LogicalRepTyp
LogicalRepWorker
LogicalRepWorkerType
LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
LogicalTape
LogicalTapeSet
LsnReadQueue