aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/024_add_drop_pub.pl
blob: e995d8b3839016695d25db63f393b92b7cae9139 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright (c) 2021-2025, PostgreSQL Global Development Group

# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
# ensures that creating a publication associated with a subscription at a later
# point of time does not break logical replication.
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->start;

# Create table on publisher
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_1 (a int)");
$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_1 SELECT generate_series(1,10)");

# Create table on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_1 (a int)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1");
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub_2");

$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2"
);

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');

# Check the initial data of tab_1 is copied to subscriber
my $result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*), min(a), max(a) FROM tab_1");
is($result, qq(10|1|10), 'check initial data is copied to subscriber');

# Create a new table on publisher
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_2 (a int)");
$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_2 SELECT generate_series(1,10)");

# Create a new table on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_2 (a int)");

# Add the table to publication
$node_publisher->safe_psql('postgres',
	"ALTER PUBLICATION tap_pub_2 ADD TABLE tab_2");

# Dropping tap_pub_1 will refresh the entire publication list
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');

# Check the initial data of tab_2 was copied to subscriber
$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*), min(a), max(a) FROM tab_2");
is($result, qq(10|1|10), 'check initial data is copied to subscriber');

# Re-adding tap_pub_1 will refresh the entire publication list
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');

# Check the initial data of tab_1 was copied to subscriber again
$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*), min(a), max(a) FROM tab_1");
is($result, qq(20|1|10), 'check initial data is copied to subscriber');

# Ensure that setting a missing publication to the subscription does not
# disrupt existing logical replication. Instead, it should log a warning
# while allowing replication to continue. Additionally, verify that replication
# resumes after the missing publication is created for the publication table.

# Create table on publisher and subscriber
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");

my $oldpid = $node_publisher->safe_psql('postgres',
	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
);

# Set the subscription with a missing publication
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");

# Wait for the walsender to restart after altering the subscription
$node_publisher->poll_query_until('postgres',
	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
  )
  or die
  "Timed out while waiting for apply worker to restart after altering the subscription";

my $offset = -s $node_publisher->logfile;

$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");

# Verify that a warning is logged.
$node_publisher->wait_for_log(
	qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset);

$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3");

$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub REFRESH  PUBLICATION");

$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');

$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)");

$node_publisher->wait_for_catchup('tap_sub');

# Verify that the insert operation gets replicated to subscriber after
# publication is created.
$result = $node_subscriber->safe_psql('postgres',
	"SELECT * FROM tab_3");
is($result, qq(1
2), 'check that the incremental data is replicated after the publication is created');

# shutdown
$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();