aboutsummaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/test')
-rw-r--r--src/test/Makefile2
-rw-r--r--src/test/perl/PostgresNode.pm13
-rw-r--r--src/test/regress/expected/publication.out156
-rw-r--r--src/test/regress/expected/rules.out18
-rw-r--r--src/test/regress/expected/sanity_check.out3
-rw-r--r--src/test/regress/expected/subscription.out66
-rw-r--r--src/test/regress/parallel_schedule3
-rw-r--r--src/test/regress/serial_schedule2
-rw-r--r--src/test/regress/sql/publication.sql82
-rw-r--r--src/test/regress/sql/subscription.sql44
-rw-r--r--src/test/subscription/.gitignore2
-rw-r--r--src/test/subscription/Makefile22
-rw-r--r--src/test/subscription/README16
-rw-r--r--src/test/subscription/t/001_rep_changes.pl188
-rw-r--r--src/test/subscription/t/002_types.pl539
15 files changed, 1154 insertions, 2 deletions
diff --git a/src/test/Makefile b/src/test/Makefile
index 6b40cf50ed2..3c2215849ed 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,7 @@ subdir = src/test
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS = perl regress isolation modules recovery
+SUBDIRS = perl regress isolation modules recovery subscription
# We don't build or execute examples/, locale/, or thread/ by default,
# but we do want "make clean" etc to recurse into them. Likewise for ssl/,
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 932478183a8..18d5d12454b 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -380,7 +380,9 @@ WAL archiving can be enabled on this node by passing the keyword parameter
has_archiving => 1. This is disabled by default.
postgresql.conf can be set up for replication by passing the keyword
-parameter allows_streaming => 1. This is disabled by default.
+parameter allows_streaming => 'logical' or 'physical' (passing 1 will also
+suffice for physical replication) depending on type of replication that
+should be enabled. This is disabled by default.
The new node is set up in a fast but unsafe configuration where fsync is
disabled.
@@ -415,7 +417,16 @@ sub init
if ($params{allows_streaming})
{
+ if ($params{allows_streaming} eq "logical")
+ {
+ print $conf "wal_level = logical\n";
+ }
+ else
+ {
+ print $conf "wal_level = replica\n";
+ }
print $conf "max_wal_senders = 5\n";
+ print $conf "max_replication_slots = 5\n";
print $conf "wal_keep_segments = 20\n";
print $conf "max_wal_size = 128MB\n";
print $conf "shared_buffers = 1MB\n";
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
new file mode 100644
index 00000000000..47b04f1a57a
--- /dev/null
+++ b/src/test/regress/expected/publication.out
@@ -0,0 +1,156 @@
+--
+-- PUBLICATION
+--
+CREATE ROLE regress_publication_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'regress_publication_user';
+CREATE PUBLICATION testpub_default;
+CREATE PUBLICATION testpib_ins_trunct WITH (nopublish delete, nopublish update);
+ALTER PUBLICATION testpub_default WITH (nopublish insert, nopublish delete);
+\dRp
+ List of publications
+ Name | Owner | Inserts | Updates | Deletes
+--------------------+--------------------------+---------+---------+---------
+ testpib_ins_trunct | regress_publication_user | t | f | f
+ testpub_default | regress_publication_user | f | t | f
+(2 rows)
+
+ALTER PUBLICATION testpub_default WITH (publish insert, publish delete);
+\dRp
+ List of publications
+ Name | Owner | Inserts | Updates | Deletes
+--------------------+--------------------------+---------+---------+---------
+ testpib_ins_trunct | regress_publication_user | t | f | f
+ testpub_default | regress_publication_user | t | t | t
+(2 rows)
+
+--- adding tables
+CREATE SCHEMA pub_test;
+CREATE TABLE testpub_tbl1 (id serial primary key, data text);
+CREATE TABLE pub_test.testpub_nopk (foo int, bar int);
+CREATE VIEW testpub_view AS SELECT 1;
+CREATE PUBLICATION testpub_foralltables FOR ALL TABLES WITH (nopublish delete, nopublish update);
+ALTER PUBLICATION testpub_foralltables WITH (publish update);
+CREATE TABLE testpub_tbl2 (id serial primary key, data text);
+-- fail - can't add to for all tables publication
+ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2;
+ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES
+DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications.
+-- fail - can't drop from all tables publication
+ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2;
+ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES
+DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications.
+-- fail - can't add to for all tables publication
+ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk;
+ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES
+DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications.
+SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables';
+ pubname | puballtables
+----------------------+--------------
+ testpub_foralltables | t
+(1 row)
+
+\d+ testpub_tbl2
+ Table "public.testpub_tbl2"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+------------------------------------------+----------+--------------+-------------
+ id | integer | | not null | nextval('testpub_tbl2_id_seq'::regclass) | plain | |
+ data | text | | | | extended | |
+Indexes:
+ "testpub_tbl2_pkey" PRIMARY KEY, btree (id)
+Publications:
+ "testpub_foralltables"
+
+DROP TABLE testpub_tbl2;
+DROP PUBLICATION testpub_foralltables;
+-- fail - view
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
+ERROR: "testpub_view" is not a table
+DETAIL: Only tables can be added to publications.
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk;
+-- fail - already added
+ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_tbl1;
+ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl"
+-- fail - already added
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
+ERROR: publication "testpub_fortbl" already exists
+\dRp+ testpub_fortbl
+ Publication testpub_fortbl
+ Inserts | Updates | Deletes
+---------+---------+---------
+ t | t | t
+Tables:
+ "pub_test.testpub_nopk"
+ "public.testpub_tbl1"
+
+-- fail - view
+ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
+ERROR: "testpub_view" is not a table
+DETAIL: Only tables can be added to publications.
+ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
+ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
+ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
+ALTER PUBLICATION testpib_ins_trunct ADD TABLE pub_test.testpub_nopk, testpub_tbl1;
+\d+ pub_test.testpub_nopk
+ Table "pub_test.testpub_nopk"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ foo | integer | | | | plain | |
+ bar | integer | | | | plain | |
+Publications:
+ "testpib_ins_trunct"
+ "testpub_default"
+ "testpub_fortbl"
+
+\d+ testpub_tbl1
+ Table "public.testpub_tbl1"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+------------------------------------------+----------+--------------+-------------
+ id | integer | | not null | nextval('testpub_tbl1_id_seq'::regclass) | plain | |
+ data | text | | | | extended | |
+Indexes:
+ "testpub_tbl1_pkey" PRIMARY KEY, btree (id)
+Publications:
+ "testpib_ins_trunct"
+ "testpub_default"
+ "testpub_fortbl"
+
+\dRp+ testpub_default
+ Publication testpub_default
+ Inserts | Updates | Deletes
+---------+---------+---------
+ t | t | t
+Tables:
+ "pub_test.testpub_nopk"
+ "public.testpub_tbl1"
+
+ALTER PUBLICATION testpub_default DROP TABLE testpub_tbl1, pub_test.testpub_nopk;
+-- fail - nonexistent
+ALTER PUBLICATION testpub_default DROP TABLE pub_test.testpub_nopk;
+ERROR: relation "testpub_nopk" is not part of the publication
+\d+ testpub_tbl1
+ Table "public.testpub_tbl1"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+------------------------------------------+----------+--------------+-------------
+ id | integer | | not null | nextval('testpub_tbl1_id_seq'::regclass) | plain | |
+ data | text | | | | extended | |
+Indexes:
+ "testpub_tbl1_pkey" PRIMARY KEY, btree (id)
+Publications:
+ "testpib_ins_trunct"
+ "testpub_fortbl"
+
+DROP VIEW testpub_view;
+DROP TABLE testpub_tbl1;
+\dRp+ testpub_default
+ Publication testpub_default
+ Inserts | Updates | Deletes
+---------+---------+---------
+ t | t | t
+(1 row)
+
+DROP PUBLICATION testpub_default;
+DROP PUBLICATION testpib_ins_trunct;
+DROP SCHEMA pub_test CASCADE;
+NOTICE: drop cascades to table pub_test.testpub_nopk
+RESET SESSION AUTHORIZATION;
+DROP ROLE regress_publication_user;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index add6adc8719..60abcad1017 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1417,6 +1417,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_tables| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS tablename
+ FROM pg_publication p,
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid IN ( SELECT pg_get_publication_tables.relid
+ FROM pg_get_publication_tables((p.pubname)::text) pg_get_publication_tables(relid)));
pg_replication_origin_status| SELECT pg_show_replication_origin_status.local_id,
pg_show_replication_origin_status.external_id,
pg_show_replication_origin_status.remote_lsn,
@@ -1822,6 +1830,16 @@ pg_stat_ssl| SELECT s.pid,
s.sslcompression AS compression,
s.sslclientdn AS clientdn
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn);
+pg_stat_subscription| SELECT su.oid AS subid,
+ su.subname,
+ st.pid,
+ st.received_lsn,
+ st.last_msg_send_time,
+ st.last_msg_receipt_time,
+ st.latest_end_lsn,
+ st.latest_end_time
+ FROM (pg_subscription su
+ LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
pg_stat_all_indexes.indexrelid,
pg_stat_all_indexes.schemaname,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index 7ad68c745b0..0af013f8a23 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -124,6 +124,8 @@ pg_partitioned_table|t
pg_pltemplate|t
pg_policy|t
pg_proc|t
+pg_publication|t
+pg_publication_rel|t
pg_range|t
pg_replication_origin|t
pg_rewrite|t
@@ -133,6 +135,7 @@ pg_shdepend|t
pg_shdescription|t
pg_shseclabel|t
pg_statistic|t
+pg_subscription|t
pg_tablespace|t
pg_transform|t
pg_trigger|t
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
new file mode 100644
index 00000000000..2ccec98b158
--- /dev/null
+++ b/src/test/regress/expected/subscription.out
@@ -0,0 +1,66 @@
+--
+-- SUBSCRIPTION
+--
+CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - no publications
+CREATE SUBSCRIPTION testsub CONNECTION 'foo';
+ERROR: syntax error at or near ";"
+LINE 1: CREATE SUBSCRIPTION testsub CONNECTION 'foo';
+ ^
+-- fail - no connection
+CREATE SUBSCRIPTION testsub PUBLICATION foo;
+ERROR: syntax error at or near "PUBLICATION"
+LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo;
+ ^
+set client_min_messages to error;
+CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
+ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string
+
+CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);
+reset client_min_messages;
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Conninfo
+---------+---------------------------+---------+-------------+---------------------
+ testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;
+\dRs
+ List of subscriptions
+ Name | Owner | Enabled | Publication
+---------+---------------------------+---------+---------------------
+ testsub | regress_subscription_user | f | {testpub2,testpub3}
+(1 row)
+
+ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2';
+ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1;
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Conninfo
+---------+---------------------------+---------+--------------------+----------------------
+ testsub | regress_subscription_user | f | {testpub,testpub1} | dbname=doesnotexist2
+(1 row)
+
+BEGIN;
+ALTER SUBSCRIPTION testsub ENABLE;
+\dRs
+ List of subscriptions
+ Name | Owner | Enabled | Publication
+---------+---------------------------+---------+--------------------
+ testsub | regress_subscription_user | t | {testpub,testpub1}
+(1 row)
+
+ALTER SUBSCRIPTION testsub DISABLE;
+\dRs
+ List of subscriptions
+ Name | Owner | Enabled | Publication
+---------+---------------------------+---------+--------------------
+ testsub | regress_subscription_user | f | {testpub,testpub1}
+(1 row)
+
+COMMIT;
+DROP SUBSCRIPTION testsub NODROP SLOT;
+RESET SESSION AUTHORIZATION;
+DROP ROLE regress_subscription_user;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 8641769351a..e9b2bad6fd2 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -97,6 +97,9 @@ test: rules psql_crosstab amutils
# run by itself so it can run parallel workers
test: select_parallel
+# no relation related tests can be put in this group
+test: publication subscription
+
# ----------
# Another group of parallel tests
# ----------
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 835cf3556cc..7cdc0f6a69c 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -127,6 +127,8 @@ test: tsrf
test: rules
test: psql_crosstab
test: select_parallel
+test: publication
+test: subscription
test: amutils
test: select_views
test: portals_p2
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
new file mode 100644
index 00000000000..89a31672fa8
--- /dev/null
+++ b/src/test/regress/sql/publication.sql
@@ -0,0 +1,82 @@
+--
+-- PUBLICATION
+--
+CREATE ROLE regress_publication_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'regress_publication_user';
+
+CREATE PUBLICATION testpub_default;
+
+CREATE PUBLICATION testpib_ins_trunct WITH (nopublish delete, nopublish update);
+
+ALTER PUBLICATION testpub_default WITH (nopublish insert, nopublish delete);
+
+\dRp
+
+ALTER PUBLICATION testpub_default WITH (publish insert, publish delete);
+
+\dRp
+
+--- adding tables
+CREATE SCHEMA pub_test;
+CREATE TABLE testpub_tbl1 (id serial primary key, data text);
+CREATE TABLE pub_test.testpub_nopk (foo int, bar int);
+CREATE VIEW testpub_view AS SELECT 1;
+
+CREATE PUBLICATION testpub_foralltables FOR ALL TABLES WITH (nopublish delete, nopublish update);
+ALTER PUBLICATION testpub_foralltables WITH (publish update);
+
+CREATE TABLE testpub_tbl2 (id serial primary key, data text);
+-- fail - can't add to for all tables publication
+ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2;
+-- fail - can't drop from all tables publication
+ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2;
+-- fail - can't add to for all tables publication
+ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk;
+
+SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables';
+\d+ testpub_tbl2
+
+DROP TABLE testpub_tbl2;
+DROP PUBLICATION testpub_foralltables;
+
+-- fail - view
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk;
+-- fail - already added
+ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_tbl1;
+-- fail - already added
+CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
+
+\dRp+ testpub_fortbl
+
+-- fail - view
+ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
+
+ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
+ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
+ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
+
+ALTER PUBLICATION testpib_ins_trunct ADD TABLE pub_test.testpub_nopk, testpub_tbl1;
+
+\d+ pub_test.testpub_nopk
+\d+ testpub_tbl1
+\dRp+ testpub_default
+
+ALTER PUBLICATION testpub_default DROP TABLE testpub_tbl1, pub_test.testpub_nopk;
+-- fail - nonexistent
+ALTER PUBLICATION testpub_default DROP TABLE pub_test.testpub_nopk;
+
+\d+ testpub_tbl1
+
+DROP VIEW testpub_view;
+DROP TABLE testpub_tbl1;
+
+\dRp+ testpub_default
+
+DROP PUBLICATION testpub_default;
+DROP PUBLICATION testpib_ins_trunct;
+
+DROP SCHEMA pub_test CASCADE;
+
+RESET SESSION AUTHORIZATION;
+DROP ROLE regress_publication_user;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
new file mode 100644
index 00000000000..68c17d5cfda
--- /dev/null
+++ b/src/test/regress/sql/subscription.sql
@@ -0,0 +1,44 @@
+--
+-- SUBSCRIPTION
+--
+
+CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - no publications
+CREATE SUBSCRIPTION testsub CONNECTION 'foo';
+
+-- fail - no connection
+CREATE SUBSCRIPTION testsub PUBLICATION foo;
+
+set client_min_messages to error;
+CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
+CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);
+reset client_min_messages;
+
+\dRs+
+
+ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;
+
+\dRs
+
+ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2';
+ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1;
+
+\dRs+
+
+BEGIN;
+ALTER SUBSCRIPTION testsub ENABLE;
+
+\dRs
+
+ALTER SUBSCRIPTION testsub DISABLE;
+
+\dRs
+
+COMMIT;
+
+DROP SUBSCRIPTION testsub NODROP SLOT;
+
+RESET SESSION AUTHORIZATION;
+DROP ROLE regress_subscription_user;
diff --git a/src/test/subscription/.gitignore b/src/test/subscription/.gitignore
new file mode 100644
index 00000000000..871e943d50e
--- /dev/null
+++ b/src/test/subscription/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
new file mode 100644
index 00000000000..bb9795453a8
--- /dev/null
+++ b/src/test/subscription/Makefile
@@ -0,0 +1,22 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/subscription
+#
+# Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/subscription/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/subscription
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+EXTRA_INSTALL = contrib/hstore
+
+check:
+ $(prove_check)
+
+clean distclean maintainer-clean:
+ rm -rf tmp_check
diff --git a/src/test/subscription/README b/src/test/subscription/README
new file mode 100644
index 00000000000..e9e93755b71
--- /dev/null
+++ b/src/test/subscription/README
@@ -0,0 +1,16 @@
+src/test/subscription/README
+
+Regression tests for subscription/logical replication
+=====================================================
+
+This directory contains a test suite for subscription/logical replication.
+
+Running the tests
+=================
+
+ make check
+
+NOTE: This creates a temporary installation, and some tests may
+create one or multiple nodes, for the purpose of the tests.
+
+NOTE: This requires the --enable-tap-tests argument to configure.
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
new file mode 100644
index 00000000000..b51740bcd4d
--- /dev/null
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -0,0 +1,188 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 11;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_ins (a int)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_notrep (a int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_ins_only WITH (nopublish delete, nopublish update)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
+is($result, qq(0), 'check non-replicated table is empty on subscriber');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_ins WHERE a > 20");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_ins SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_rep WHERE a > 20");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_rep SET a = -a");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(50|1|50), 'check replicated inserts on subscriber');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
+
+# insert some duplicate rows
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full SELECT generate_series(1,10)");
+
+# add REPLICA IDENTITY FULL so we can update
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+
+# and do the update
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full SET a = a * a");
+
+# Wait for subscription to catch up
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(10|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples');
+
+# check that change of connection string and/or publication list causes
+# restart of subscription workers. Not all of these are registered as tests
+# as we need to poll for a change but the test suite will fail none the less
+# when something goes wrong.
+my $oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';");
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'");
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';")
+ or die "Timed out while waiting for apply to restart";
+
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';");
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only");
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';")
+ or die "Timed out while waiting for apply to restart";
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_rep");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(150|1|1100), 'check replicated inserts after subscription publication change');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1), 'check changes skipped after subscription publication change');
+
+# check alter publication (relcache invalidation etc)
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only WITH (publish delete)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_ins WHERE a > 0");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full VALUES(0)");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# note that data are different on provider and subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(50|1|50), 'check replicated deletes after alter publication');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(11|0|100), 'check replicated insert after alter publication');
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result =
+ $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
new file mode 100644
index 00000000000..9064eb4c6de
--- /dev/null
+++ b/src/test/subscription/t/002_types.pl
@@ -0,0 +1,539 @@
+# This tests that more complex datatypes are replicated correctly
+# by logical replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+ CREATE EXTENSION hstore WITH SCHEMA public;
+ CREATE TABLE public.tst_one_array (
+ a INTEGER PRIMARY KEY,
+ b INTEGER[]
+ );
+ CREATE TABLE public.tst_arrays (
+ a INTEGER[] PRIMARY KEY,
+ b TEXT[],
+ c FLOAT[],
+ d INTERVAL[]
+ );
+
+ CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e');
+ CREATE TABLE public.tst_one_enum (
+ a INTEGER PRIMARY KEY,
+ b public.tst_enum_t
+ );
+ CREATE TABLE public.tst_enums (
+ a public.tst_enum_t PRIMARY KEY,
+ b public.tst_enum_t[]
+ );
+
+ CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER);
+ CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER);
+ CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER);
+ CREATE TABLE public.tst_one_comp (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_basic_t
+ );
+ CREATE TABLE public.tst_comps (
+ a public.tst_comp_basic_t PRIMARY KEY,
+ b public.tst_comp_basic_t[]
+ );
+ CREATE TABLE public.tst_comp_enum (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_enum_t
+ );
+ CREATE TABLE public.tst_comp_enum_array (
+ a public.tst_comp_enum_t PRIMARY KEY,
+ b public.tst_comp_enum_t[]
+ );
+ CREATE TABLE public.tst_comp_one_enum_array (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_enum_array_t
+ );
+ CREATE TABLE public.tst_comp_enum_what (
+ a public.tst_comp_enum_array_t PRIMARY KEY,
+ b public.tst_comp_enum_array_t[]
+ );
+
+ CREATE TYPE public.tst_comp_mix_t AS (
+ a public.tst_comp_basic_t,
+ b public.tst_comp_basic_t[],
+ c public.tst_enum_t,
+ d public.tst_enum_t[]
+ );
+ CREATE TABLE public.tst_comp_mix_array (
+ a public.tst_comp_mix_t PRIMARY KEY,
+ b public.tst_comp_mix_t[]
+ );
+ CREATE TABLE public.tst_range (
+ a INTEGER PRIMARY KEY,
+ b int4range
+ );
+ CREATE TABLE public.tst_range_array (
+ a INTEGER PRIMARY KEY,
+ b TSTZRANGE,
+ c int8range[]
+ );
+ CREATE TABLE public.tst_hstore (
+ a INTEGER PRIMARY KEY,
+ b public.hstore
+ ););
+
+# Setup structure on both nodes
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (SLOT NAME = tap_sub_slot)");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# Insert initial test data
+$node_publisher->safe_psql('postgres', qq(
+ -- test_tbl_one_array_col
+ INSERT INTO tst_one_array (a, b) VALUES
+ (1, '{1, 2, 3}'),
+ (2, '{2, 3, 1}'),
+ (3, '{3, 2, 1}'),
+ (4, '{4, 3, 2}'),
+ (5, '{5, NULL, 3}');
+
+ -- test_tbl_arrays
+ INSERT INTO tst_arrays (a, b, c, d) VALUES
+ ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+ ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
+ ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
+ ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
+ ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
+
+ -- test_tbl_single_enum
+ INSERT INTO tst_one_enum (a, b) VALUES
+ (1, 'a'),
+ (2, 'b'),
+ (3, 'c'),
+ (4, 'd'),
+ (5, NULL);
+
+ -- test_tbl_enums
+ INSERT INTO tst_enums (a, b) VALUES
+ ('a', '{b, c}'),
+ ('b', '{c, a}'),
+ ('c', '{b, a}'),
+ ('d', '{c, b}'),
+ ('e', '{d, NULL}');
+
+ -- test_tbl_single_composites
+ INSERT INTO tst_one_comp (a, b) VALUES
+ (1, ROW(1.0, 'a', 1)),
+ (2, ROW(2.0, 'b', 2)),
+ (3, ROW(3.0, 'c', 3)),
+ (4, ROW(4.0, 'd', 4)),
+ (5, ROW(NULL, NULL, 5));
+
+ -- test_tbl_composites
+ INSERT INTO tst_comps (a, b) VALUES
+ (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+ (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
+ (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
+ (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
+ (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
+
+ -- test_tbl_composite_with_enums
+ INSERT INTO tst_comp_enum (a, b) VALUES
+ (1, ROW(1.0, 'a', 1)),
+ (2, ROW(2.0, 'b', 2)),
+ (3, ROW(3.0, 'c', 3)),
+ (4, ROW(4.0, 'd', 4)),
+ (5, ROW(NULL, 'e', NULL));
+
+ -- test_tbl_composite_with_enums_array
+ INSERT INTO tst_comp_enum_array (a, b) VALUES
+ (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+ (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
+ (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
+ (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
+ (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
+
+ -- test_tbl_composite_with_single_enums_array_in_composite
+ INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+ (1, ROW(1.0, '{a, b, c}', 1)),
+ (2, ROW(2.0, '{a, b, c}', 2)),
+ (3, ROW(3.0, '{a, b, c}', 3)),
+ (4, ROW(4.0, '{c, b, d}', 4)),
+ (5, ROW(5.0, '{NULL, e, NULL}', 5));
+
+ -- test_tbl_composite_with_enums_array_in_composite
+ INSERT INTO tst_comp_enum_what (a, b) VALUES
+ (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+ (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
+ (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
+ (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
+ (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
+
+ -- test_tbl_mixed_composites
+ INSERT INTO tst_comp_mix_array (a, b) VALUES
+ (ROW(
+ ROW(1,'a',1),
+ ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+ 'a',
+ '{a,b,NULL,c}'),
+ ARRAY[
+ ROW(
+ ROW(1,'a',1),
+ ARRAY[
+ ROW(1,'a',1)::tst_comp_basic_t,
+ ROW(2,'b',2)::tst_comp_basic_t,
+ NULL
+ ],
+ 'a',
+ '{a,b,c}'
+ )::tst_comp_mix_t
+ ]
+ );
+
+ -- test_tbl_range
+ INSERT INTO tst_range (a, b) VALUES
+ (1, '[1, 10]'),
+ (2, '[2, 20]'),
+ (3, '[3, 30]'),
+ (4, '[4, 40]'),
+ (5, '[5, 50]');
+
+ -- test_tbl_range_array
+ INSERT INTO tst_range_array (a, b, c) VALUES
+ (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+ (2, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '2 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
+ (3, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '3 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
+ (4, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '4 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
+ (5, NULL, NULL);
+
+ -- tst_hstore
+ INSERT INTO tst_hstore (a, b) VALUES
+ (1, '"a"=>"1"'),
+ (2, '"zzz"=>"foo"'),
+ (3, '"123"=>"321"'),
+ (4, '"yellow horse"=>"moaned"');
+));
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql('postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is($result, '1|{1,2,3}
+2|{2,3,1}
+3|{3,2,1}
+4|{4,3,2}
+5|{5,NULL,3}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{"4 years","1 year","2 years"}
+{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{"5 years",NULL,NULL}
+1|a
+2|b
+3|c
+4|d
+5|
+a|{b,c}
+b|{c,a}
+c|{b,a}
+d|{c,b}
+e|{d,NULL}
+1|(1,a,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,4)
+5|(,,5)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,4)|{"(4,d,3)"}
+(5,e,)|{NULL,"(5,,5)"}
+1|(1,a,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,4)
+5|(,e,)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,3)|{"(3,d,3)"}
+(5,e,3)|{"(3,e,3)",NULL}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+3|(3,"{a,b,c}",3)
+4|(4,"{c,b,d}",4)
+5|(5,"{NULL,e,NULL}",5)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"}
+(4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
+(5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+3|[3,31)
+4|[4,41)
+5|[5,51)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+4|["2014-07-31 00:00:00+02","2014-08-04 00:00:00+02")|{"[4,6)",NULL,"[40,51)"}
+5||
+1|"a"=>"1"
+2|"zzz"=>"foo"
+3|"123"=>"321"
+4|"yellow horse"=>"moaned"',
+'check replicated inserts on subscriber');
+
+# Run batch of updates
+$node_publisher->safe_psql('postgres', qq(
+ UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1;
+ UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3;
+ UPDATE tst_arrays SET b = '{"1a", "2b", "3c"}', c = '{1.0, 2.0, 3.0}', d = '{"1 day 1 second", "2 days 2 seconds", "3 days 3 second"}' WHERE a = '{1, 2, 3}';
+ UPDATE tst_arrays SET b = '{"c", "d", "e"}', c = '{3.0, 4.0, 5.0}', d = '{"3 day 1 second", "4 days 2 seconds", "5 days 3 second"}' WHERE a[1] > 3;
+ UPDATE tst_one_enum SET b = 'c' WHERE a = 1;
+ UPDATE tst_one_enum SET b = NULL WHERE a > 3;
+ UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a';
+ UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c';
+ UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1;
+ UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3;
+ UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0;
+ UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9;
+ UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1;
+ UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3;
+ UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
+ UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3;
+ UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1;
+ UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3;
+ UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1;
+ UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3;
+ UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1;
+ UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1;
+ UPDATE tst_range SET b = '(1, 90)' WHERE a > 3;
+ UPDATE tst_range_array SET c = '{"[100, 1000]"}' WHERE a = 1;
+ UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, "[11,9999999]"}' WHERE a > 3;
+ UPDATE tst_hstore SET b = '"updated"=>"value"' WHERE a < 3;
+ UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3;
+));
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql('postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is($result, '1|{4,5,6}
+2|{2,3,1}
+3|{3,2,1}
+4|{4,5,6,1}
+5|{4,5,6,1}
+{1,2,3}|{1a,2b,3c}|{1,2,3}|{"1 day 00:00:01","2 days 00:00:02","3 days 00:00:03"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+1|c
+2|b
+3|c
+4|
+5|
+a|{e,NULL}
+b|{c,a}
+c|{b,a}
+d|{e,d}
+e|{e,d}
+1|(1,A,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(,x,-1)
+5|(,x,-1)
+(1,a,1)|{"(9,x,-1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,4)|{NULL,"(9,x,)"}
+(5,e,)|{NULL,"(9,x,)"}
+1|(1,,)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,44)
+5|(4,d,44)
+(1,a,1)|{NULL,"(3,d,3)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,3)|{"(1,a,1)","(2,b,2)"}
+(5,e,3)|{"(1,a,1)","(2,b,2)"}
+1|(1,"{a,e,c}",)
+2|(2,"{a,b,c}",2)
+3|(3,"{a,b,c}",3)
+4|(4,"{c,b,d}",4)
+5|(4,"{c,b,d}",4)
+(1,"{a,b,c}",1)|{NULL,"(1,\"{a,b,c}\",1)","(,\"{a,e,c}\",2)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"}
+(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
+(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+1|[100,1001)
+2|[2,21)
+3|[3,31)
+4|[2,90)
+5|[2,90)
+1|["2014-08-04 00:00:00+02",infinity)|{"[100,1001)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+4|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"}
+5|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"}
+1|"updated"=>"value"
+2|"updated"=>"value"
+3|"also"=>"updated"
+4|"yellow horse"=>"moaned"',
+'check replicated updates on subscriber');
+
+# Run batch of deletes
+$node_publisher->safe_psql('postgres', qq(
+ DELETE FROM tst_one_array WHERE a = 1;
+ DELETE FROM tst_one_array WHERE b = '{2, 3, 1}';
+ DELETE FROM tst_arrays WHERE a = '{1, 2, 3}';
+ DELETE FROM tst_arrays WHERE a[1] = 2;
+ DELETE FROM tst_one_enum WHERE a = 1;
+ DELETE FROM tst_one_enum WHERE b = 'b';
+ DELETE FROM tst_enums WHERE a = 'a';
+ DELETE FROM tst_enums WHERE b[1] = 'b';
+ DELETE FROM tst_one_comp WHERE a = 1;
+ DELETE FROM tst_one_comp WHERE (b).a = 2.0;
+ DELETE FROM tst_comps WHERE (a).b = 'a';
+ DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b);
+ DELETE FROM tst_comp_enum WHERE a = 1;
+ DELETE FROM tst_comp_enum WHERE (b).a = 2.0;
+ DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
+ DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b);
+ DELETE FROM tst_comp_one_enum_array WHERE a = 1;
+ DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b);
+ DELETE FROM tst_comp_enum_what WHERE (a).a = 1;
+ DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}';
+ DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1;
+ DELETE FROM tst_range WHERE a = 1;
+ DELETE FROM tst_range WHERE '[10,20]' && b;
+ DELETE FROM tst_range_array WHERE a = 1;
+ DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b;
+ DELETE FROM tst_hstore WHERE a = 1;
+));
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql('postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is($result, '3|{3,2,1}
+4|{4,5,6,1}
+5|{4,5,6,1}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+3|c
+4|
+5|
+b|{c,a}
+d|{e,d}
+e|{e,d}
+3|(3,c,3)
+4|(,x,-1)
+5|(,x,-1)
+(2,b,2)|{"(2,b,2)"}
+(4,d,4)|{NULL,"(9,x,)"}
+(5,e,)|{NULL,"(9,x,)"}
+3|(3,c,3)
+4|(4,d,44)
+5|(4,d,44)
+(2,b,2)|{"(2,b,2)"}
+(4,d,3)|{"(1,a,1)","(2,b,2)"}
+(5,e,3)|{"(1,a,1)","(2,b,2)"}
+4|(4,"{c,b,d}",4)
+5|(4,"{c,b,d}",4)
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
+(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+2|"updated"=>"value"
+3|"also"=>"updated"
+4|"yellow horse"=>"moaned"',
+'check replicated deletes on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');