# Copyright (c) 2023-2025, PostgreSQL Global Development Group # Test for pg_upgrade of logical subscription. Note that after the successful # upgrade test, we can't use the old cluster to prevent failing in --link mode. use strict; use warnings FATAL => 'all'; use File::Find qw(find); use File::Path qw(rmtree); use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # Can be changed to test the other modes. my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy'; # Initialize publisher node my $publisher = PostgreSQL::Test::Cluster->new('publisher'); $publisher->init(allows_streaming => 'logical'); $publisher->start; # Initialize the old subscriber node my $old_sub = PostgreSQL::Test::Cluster->new('old_sub'); $old_sub->init; $old_sub->start; my $oldbindir = $old_sub->config_data('--bindir'); # Initialize the new subscriber my $new_sub = PostgreSQL::Test::Cluster->new('new_sub'); $new_sub->init; my $newbindir = $new_sub->config_data('--bindir'); # In a VPATH build, we'll be started in the source directory, but we want # to run pg_upgrade in the build directory so that any files generated finish # in it, like delete_old_cluster.{sh,bat}. chdir ${PostgreSQL::Test::Utils::tmp_check}; # Remember a connection string for the publisher node. It would be used # several times. my $connstr = $publisher->connstr . ' dbname=postgres'; # ------------------------------------------------------ # Check that pg_upgrade fails when max_active_replication_origins configured # in the new cluster is less than the number of subscriptions in the old # cluster. # ------------------------------------------------------ # It is sufficient to use disabled subscription to test upgrade failure. $publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); $old_sub->safe_psql('postgres', "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false)" ); $old_sub->stop; $new_sub->append_conf('postgresql.conf', "max_active_replication_origins = 0"); # pg_upgrade will fail because the new cluster has insufficient # max_active_replication_origins. command_checks_all( [ 'pg_upgrade', '--no-sync', '--old-datadir' => $old_sub->data_dir, '--new-datadir' => $new_sub->data_dir, '--old-bindir' => $oldbindir, '--new-bindir' => $newbindir, '--socketdir' => $new_sub->host, '--old-port' => $old_sub->port, '--new-port' => $new_sub->port, $mode, '--check', ], 1, [ qr/"max_active_replication_origins" \(0\) must be greater than or equal to the number of subscriptions \(1\) on the old cluster/ ], [qr//], 'run of pg_upgrade where the new cluster has insufficient max_active_replication_origins' ); # Reset max_active_replication_origins $new_sub->append_conf('postgresql.conf', "max_active_replication_origins = 10"); # Cleanup $publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); $old_sub->start; $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); # ------------------------------------------------------ # Check that pg_upgrade refuses to run if: # a) there's a subscription with tables in a state other than 'r' (ready) or # 'i' (init) and/or # b) the subscription has no replication origin. # ------------------------------------------------------ $publisher->safe_psql( 'postgres', qq[ CREATE TABLE tab_primary_key(id serial PRIMARY KEY); INSERT INTO tab_primary_key values(1); CREATE PUBLICATION regress_pub2 FOR TABLE tab_primary_key; ]); # Insert the same value that is already present in publisher to the primary key # column of subscriber so that the table sync will fail. $old_sub->safe_psql( 'postgres', qq[ CREATE TABLE tab_primary_key(id serial PRIMARY KEY); INSERT INTO tab_primary_key values(1); CREATE SUBSCRIPTION regress_sub2 CONNECTION '$connstr' PUBLICATION regress_pub2; ]); # Table will be in 'd' (data is being copied) state as table sync will fail # because of primary key constraint error. my $started_query = "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd'"; $old_sub->poll_query_until('postgres', $started_query) or die "Timed out while waiting for the table state to become 'd' (datasync)"; # Setup another logical replication and drop the subscription's replication # origin. $publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub3"); $old_sub->safe_psql('postgres', "CREATE SUBSCRIPTION regress_sub3 CONNECTION '$connstr' PUBLICATION regress_pub3 WITH (enabled = false)" ); my $sub_oid = $old_sub->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub3'"); my $reporigin = 'pg_' . qq($sub_oid); $old_sub->safe_psql('postgres', "SELECT pg_replication_origin_drop('$reporigin')"); $old_sub->stop; command_checks_all( [ 'pg_upgrade', '--no-sync', '--old-datadir' => $old_sub->data_dir, '--new-datadir' => $new_sub->data_dir, '--old-bindir' => $oldbindir, '--new-bindir' => $newbindir, '--socketdir' => $new_sub->host, '--old-port' => $old_sub->port, '--new-port' => $new_sub->port, $mode, '--check', ], 1, [ qr/\QYour installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state\E/ ], [], 'run of pg_upgrade --check for old instance with relation in \'d\' datasync(invalid) state and missing replication origin' ); # Verify the reason why the subscriber cannot be upgraded my $sub_relstate_filename; # Find a txt file that contains a list of tables that cannot be upgraded. We # cannot predict the file's path because the output directory contains a # milliseconds timestamp. File::Find::find must be used. find( sub { if ($File::Find::name =~ m/subs_invalid\.txt/) { $sub_relstate_filename = $File::Find::name; } }, $new_sub->data_dir . "/pg_upgrade_output.d"); # Check the file content which should have tab_primary_key table in an invalid # state. like( slurp_file($sub_relstate_filename), qr/The table sync state \"d\" is not allowed for database:\"postgres\" subscription:\"regress_sub2\" schema:\"public\" relation:\"tab_primary_key\"/m, 'the previous test failed due to subscription table in invalid state'); # Check the file content which should have regress_sub3 subscription. like( slurp_file($sub_relstate_filename), qr/The replication origin is missing for database:\"postgres\" subscription:\"regress_sub3\"/m, 'the previous test failed due to missing replication origin'); # Cleanup $old_sub->start; $publisher->safe_psql( 'postgres', qq[ DROP PUBLICATION regress_pub2; DROP PUBLICATION regress_pub3; DROP TABLE tab_primary_key; ]); $old_sub->safe_psql( 'postgres', qq[ DROP SUBSCRIPTION regress_sub2; DROP SUBSCRIPTION regress_sub3; DROP TABLE tab_primary_key; ]); rmtree($new_sub->data_dir . "/pg_upgrade_output.d"); # Verify that the upgrade should be successful with tables in 'ready'/'init' # state along with retaining the replication origin's remote lsn, subscription's # running status, and failover option. $publisher->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); CREATE PUBLICATION regress_pub4 FOR TABLE tab_upgraded1; ]); $old_sub->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true); ]); # Wait till the table tab_upgraded1 reaches 'ready' state my $synced_query = "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'r'"; $old_sub->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for the table to reach ready state"; $publisher->safe_psql('postgres', "INSERT INTO tab_upgraded1 VALUES (generate_series(1,50))"); $publisher->wait_for_catchup('regress_sub4'); # Change configuration to prepare a subscription table in init state $old_sub->append_conf('postgresql.conf', "max_logical_replication_workers = 0"); $old_sub->restart; # Setup another logical replication $publisher->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded2(id int); CREATE PUBLICATION regress_pub5 FOR TABLE tab_upgraded2; ]); $old_sub->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded2(id int); CREATE SUBSCRIPTION regress_sub5 CONNECTION '$connstr' PUBLICATION regress_pub5; ]); # The table tab_upgraded2 will be in the init state as the subscriber's # configuration for max_logical_replication_workers is set to 0. my $result = $old_sub->safe_psql('postgres', "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'i'"); is($result, qq(t), "Check that the table is in init state"); # Get the replication origin's remote_lsn of the old subscriber my $remote_lsn = $old_sub->safe_psql('postgres', "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub4'" ); # Have the subscription in disabled state before upgrade $old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub5 DISABLE"); my $tab_upgraded1_oid = $old_sub->safe_psql('postgres', "SELECT oid FROM pg_class WHERE relname = 'tab_upgraded1'"); my $tab_upgraded2_oid = $old_sub->safe_psql('postgres', "SELECT oid FROM pg_class WHERE relname = 'tab_upgraded2'"); $old_sub->stop; # Change configuration so that initial table sync does not get started # automatically $new_sub->append_conf('postgresql.conf', "max_logical_replication_workers = 0"); # ------------------------------------------------------ # Check that pg_upgrade is successful when all tables are in ready or in # init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is # in init state) along with retaining the replication origin's remote lsn, # subscription's running status, and failover option. # ------------------------------------------------------ command_ok( [ 'pg_upgrade', '--no-sync', '--old-datadir' => $old_sub->data_dir, '--new-datadir' => $new_sub->data_dir, '--old-bindir' => $oldbindir, '--new-bindir' => $newbindir, '--socketdir' => $new_sub->host, '--old-port' => $old_sub->port, '--new-port' => $new_sub->port, $mode ], 'run of pg_upgrade for old instance when the subscription tables are in init/ready state' ); ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d", "pg_upgrade_output.d/ removed after successful pg_upgrade"); # ------------------------------------------------------ # Check that the data inserted to the publisher when the new subscriber is down # will be replicated once it is started. Also check that the old subscription # states and relations origins are all preserved. # ------------------------------------------------------ $publisher->safe_psql( 'postgres', qq[ INSERT INTO tab_upgraded1 VALUES(51); INSERT INTO tab_upgraded2 VALUES(1); ]); $new_sub->start; # The subscription's running status and failover option should be preserved # in the upgraded instance. So regress_sub4 should still have subenabled and # subfailover set to true, while regress_sub5 should have both set to false. $result = $new_sub->safe_psql('postgres', "SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname" ); is( $result, qq(regress_sub4|t|t regress_sub5|f|f), "check that the subscription's running status and failover are preserved" ); # Subscription relations should be preserved $result = $new_sub->safe_psql('postgres', "SELECT srrelid, srsubstate FROM pg_subscription_rel ORDER BY srrelid"); is( $result, qq($tab_upgraded1_oid|r $tab_upgraded2_oid|i), "there should be 2 rows in pg_subscription_rel(representing tab_upgraded1 and tab_upgraded2)" ); # The replication origin's remote_lsn should be preserved $sub_oid = $new_sub->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub4'"); $result = $new_sub->safe_psql('postgres', "SELECT remote_lsn FROM pg_replication_origin_status WHERE external_id = 'pg_' || $sub_oid" ); is($result, qq($remote_lsn), "remote_lsn should have been preserved"); # Resume the initial sync and wait until all tables of subscription # 'regress_sub5' are synchronized $new_sub->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $new_sub->restart; $new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub5 ENABLE"); $new_sub->wait_for_subscription_sync($publisher, 'regress_sub5'); # wait for regress_sub4 to catchup as well $publisher->wait_for_catchup('regress_sub4'); # Rows on tab_upgraded1 and tab_upgraded2 should have been replicated $result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded1"); is($result, qq(51), "check replicated inserts on new subscriber"); $result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded2"); is($result, qq(1), "check the data is synced after enabling the subscription for the table that was in init state" ); done_testing();