diff options
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/Makefile | 2 | ||||
-rw-r--r-- | src/test/perl/PostgresNode.pm | 13 | ||||
-rw-r--r-- | src/test/regress/expected/publication.out | 156 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 18 | ||||
-rw-r--r-- | src/test/regress/expected/sanity_check.out | 3 | ||||
-rw-r--r-- | src/test/regress/expected/subscription.out | 66 | ||||
-rw-r--r-- | src/test/regress/parallel_schedule | 3 | ||||
-rw-r--r-- | src/test/regress/serial_schedule | 2 | ||||
-rw-r--r-- | src/test/regress/sql/publication.sql | 82 | ||||
-rw-r--r-- | src/test/regress/sql/subscription.sql | 44 | ||||
-rw-r--r-- | src/test/subscription/.gitignore | 2 | ||||
-rw-r--r-- | src/test/subscription/Makefile | 22 | ||||
-rw-r--r-- | src/test/subscription/README | 16 | ||||
-rw-r--r-- | src/test/subscription/t/001_rep_changes.pl | 188 | ||||
-rw-r--r-- | src/test/subscription/t/002_types.pl | 539 |
15 files changed, 1154 insertions, 2 deletions
diff --git a/src/test/Makefile b/src/test/Makefile index 6b40cf50ed2..3c2215849ed 100644 --- a/src/test/Makefile +++ b/src/test/Makefile @@ -12,7 +12,7 @@ subdir = src/test top_builddir = ../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = perl regress isolation modules recovery +SUBDIRS = perl regress isolation modules recovery subscription # We don't build or execute examples/, locale/, or thread/ by default, # but we do want "make clean" etc to recurse into them. Likewise for ssl/, diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 932478183a8..18d5d12454b 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -380,7 +380,9 @@ WAL archiving can be enabled on this node by passing the keyword parameter has_archiving => 1. This is disabled by default. postgresql.conf can be set up for replication by passing the keyword -parameter allows_streaming => 1. This is disabled by default. +parameter allows_streaming => 'logical' or 'physical' (passing 1 will also +suffice for physical replication) depending on type of replication that +should be enabled. This is disabled by default. The new node is set up in a fast but unsafe configuration where fsync is disabled. @@ -415,7 +417,16 @@ sub init if ($params{allows_streaming}) { + if ($params{allows_streaming} eq "logical") + { + print $conf "wal_level = logical\n"; + } + else + { + print $conf "wal_level = replica\n"; + } print $conf "max_wal_senders = 5\n"; + print $conf "max_replication_slots = 5\n"; print $conf "wal_keep_segments = 20\n"; print $conf "max_wal_size = 128MB\n"; print $conf "shared_buffers = 1MB\n"; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out new file mode 100644 index 00000000000..47b04f1a57a --- /dev/null +++ b/src/test/regress/expected/publication.out @@ -0,0 +1,156 @@ +-- +-- PUBLICATION +-- +CREATE ROLE regress_publication_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'regress_publication_user'; +CREATE PUBLICATION testpub_default; +CREATE PUBLICATION testpib_ins_trunct WITH (nopublish delete, nopublish update); +ALTER PUBLICATION testpub_default WITH (nopublish insert, nopublish delete); +\dRp + List of publications + Name | Owner | Inserts | Updates | Deletes +--------------------+--------------------------+---------+---------+--------- + testpib_ins_trunct | regress_publication_user | t | f | f + testpub_default | regress_publication_user | f | t | f +(2 rows) + +ALTER PUBLICATION testpub_default WITH (publish insert, publish delete); +\dRp + List of publications + Name | Owner | Inserts | Updates | Deletes +--------------------+--------------------------+---------+---------+--------- + testpib_ins_trunct | regress_publication_user | t | f | f + testpub_default | regress_publication_user | t | t | t +(2 rows) + +--- adding tables +CREATE SCHEMA pub_test; +CREATE TABLE testpub_tbl1 (id serial primary key, data text); +CREATE TABLE pub_test.testpub_nopk (foo int, bar int); +CREATE VIEW testpub_view AS SELECT 1; +CREATE PUBLICATION testpub_foralltables FOR ALL TABLES WITH (nopublish delete, nopublish update); +ALTER PUBLICATION testpub_foralltables WITH (publish update); +CREATE TABLE testpub_tbl2 (id serial primary key, data text); +-- fail - can't add to for all tables publication +ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +-- fail - can't drop from all tables publication +ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +-- fail - can't add to for all tables publication +ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; + pubname | puballtables +----------------------+-------------- + testpub_foralltables | t +(1 row) + +\d+ testpub_tbl2 + Table "public.testpub_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+------------------------------------------+----------+--------------+------------- + id | integer | | not null | nextval('testpub_tbl2_id_seq'::regclass) | plain | | + data | text | | | | extended | | +Indexes: + "testpub_tbl2_pkey" PRIMARY KEY, btree (id) +Publications: + "testpub_foralltables" + +DROP TABLE testpub_tbl2; +DROP PUBLICATION testpub_foralltables; +-- fail - view +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; +ERROR: "testpub_view" is not a table +DETAIL: Only tables can be added to publications. +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk; +-- fail - already added +ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_tbl1; +ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl" +-- fail - already added +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; +ERROR: publication "testpub_fortbl" already exists +\dRp+ testpub_fortbl + Publication testpub_fortbl + Inserts | Updates | Deletes +---------+---------+--------- + t | t | t +Tables: + "pub_test.testpub_nopk" + "public.testpub_tbl1" + +-- fail - view +ALTER PUBLICATION testpub_default ADD TABLE testpub_view; +ERROR: "testpub_view" is not a table +DETAIL: Only tables can be added to publications. +ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; +ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; +ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; +ALTER PUBLICATION testpib_ins_trunct ADD TABLE pub_test.testpub_nopk, testpub_tbl1; +\d+ pub_test.testpub_nopk + Table "pub_test.testpub_nopk" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + foo | integer | | | | plain | | + bar | integer | | | | plain | | +Publications: + "testpib_ins_trunct" + "testpub_default" + "testpub_fortbl" + +\d+ testpub_tbl1 + Table "public.testpub_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+------------------------------------------+----------+--------------+------------- + id | integer | | not null | nextval('testpub_tbl1_id_seq'::regclass) | plain | | + data | text | | | | extended | | +Indexes: + "testpub_tbl1_pkey" PRIMARY KEY, btree (id) +Publications: + "testpib_ins_trunct" + "testpub_default" + "testpub_fortbl" + +\dRp+ testpub_default + Publication testpub_default + Inserts | Updates | Deletes +---------+---------+--------- + t | t | t +Tables: + "pub_test.testpub_nopk" + "public.testpub_tbl1" + +ALTER PUBLICATION testpub_default DROP TABLE testpub_tbl1, pub_test.testpub_nopk; +-- fail - nonexistent +ALTER PUBLICATION testpub_default DROP TABLE pub_test.testpub_nopk; +ERROR: relation "testpub_nopk" is not part of the publication +\d+ testpub_tbl1 + Table "public.testpub_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+------------------------------------------+----------+--------------+------------- + id | integer | | not null | nextval('testpub_tbl1_id_seq'::regclass) | plain | | + data | text | | | | extended | | +Indexes: + "testpub_tbl1_pkey" PRIMARY KEY, btree (id) +Publications: + "testpib_ins_trunct" + "testpub_fortbl" + +DROP VIEW testpub_view; +DROP TABLE testpub_tbl1; +\dRp+ testpub_default + Publication testpub_default + Inserts | Updates | Deletes +---------+---------+--------- + t | t | t +(1 row) + +DROP PUBLICATION testpub_default; +DROP PUBLICATION testpib_ins_trunct; +DROP SCHEMA pub_test CASCADE; +NOTICE: drop cascades to table pub_test.testpub_nopk +RESET SESSION AUTHORIZATION; +DROP ROLE regress_publication_user; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index add6adc8719..60abcad1017 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1417,6 +1417,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_tables| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS tablename + FROM pg_publication p, + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid IN ( SELECT pg_get_publication_tables.relid + FROM pg_get_publication_tables((p.pubname)::text) pg_get_publication_tables(relid))); pg_replication_origin_status| SELECT pg_show_replication_origin_status.local_id, pg_show_replication_origin_status.external_id, pg_show_replication_origin_status.remote_lsn, @@ -1822,6 +1830,16 @@ pg_stat_ssl| SELECT s.pid, s.sslcompression AS compression, s.sslclientdn AS clientdn FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn); +pg_stat_subscription| SELECT su.oid AS subid, + su.subname, + st.pid, + st.received_lsn, + st.last_msg_send_time, + st.last_msg_receipt_time, + st.latest_end_lsn, + st.latest_end_time + FROM (pg_subscription su + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 7ad68c745b0..0af013f8a23 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -124,6 +124,8 @@ pg_partitioned_table|t pg_pltemplate|t pg_policy|t pg_proc|t +pg_publication|t +pg_publication_rel|t pg_range|t pg_replication_origin|t pg_rewrite|t @@ -133,6 +135,7 @@ pg_shdepend|t pg_shdescription|t pg_shseclabel|t pg_statistic|t +pg_subscription|t pg_tablespace|t pg_transform|t pg_trigger|t diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out new file mode 100644 index 00000000000..2ccec98b158 --- /dev/null +++ b/src/test/regress/expected/subscription.out @@ -0,0 +1,66 @@ +-- +-- SUBSCRIPTION +-- +CREATE ROLE regress_subscription_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'regress_subscription_user'; +-- fail - no publications +CREATE SUBSCRIPTION testsub CONNECTION 'foo'; +ERROR: syntax error at or near ";" +LINE 1: CREATE SUBSCRIPTION testsub CONNECTION 'foo'; + ^ +-- fail - no connection +CREATE SUBSCRIPTION testsub PUBLICATION foo; +ERROR: syntax error at or near "PUBLICATION" +LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo; + ^ +set client_min_messages to error; +CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; +ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string + +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT); +reset client_min_messages; +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Conninfo +---------+---------------------------+---------+-------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist +(1 row) + +ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3; +\dRs + List of subscriptions + Name | Owner | Enabled | Publication +---------+---------------------------+---------+--------------------- + testsub | regress_subscription_user | f | {testpub2,testpub3} +(1 row) + +ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; +ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1; +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Conninfo +---------+---------------------------+---------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub,testpub1} | dbname=doesnotexist2 +(1 row) + +BEGIN; +ALTER SUBSCRIPTION testsub ENABLE; +\dRs + List of subscriptions + Name | Owner | Enabled | Publication +---------+---------------------------+---------+-------------------- + testsub | regress_subscription_user | t | {testpub,testpub1} +(1 row) + +ALTER SUBSCRIPTION testsub DISABLE; +\dRs + List of subscriptions + Name | Owner | Enabled | Publication +---------+---------------------------+---------+-------------------- + testsub | regress_subscription_user | f | {testpub,testpub1} +(1 row) + +COMMIT; +DROP SUBSCRIPTION testsub NODROP SLOT; +RESET SESSION AUTHORIZATION; +DROP ROLE regress_subscription_user; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 8641769351a..e9b2bad6fd2 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -97,6 +97,9 @@ test: rules psql_crosstab amutils # run by itself so it can run parallel workers test: select_parallel +# no relation related tests can be put in this group +test: publication subscription + # ---------- # Another group of parallel tests # ---------- diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 835cf3556cc..7cdc0f6a69c 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -127,6 +127,8 @@ test: tsrf test: rules test: psql_crosstab test: select_parallel +test: publication +test: subscription test: amutils test: select_views test: portals_p2 diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql new file mode 100644 index 00000000000..89a31672fa8 --- /dev/null +++ b/src/test/regress/sql/publication.sql @@ -0,0 +1,82 @@ +-- +-- PUBLICATION +-- +CREATE ROLE regress_publication_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'regress_publication_user'; + +CREATE PUBLICATION testpub_default; + +CREATE PUBLICATION testpib_ins_trunct WITH (nopublish delete, nopublish update); + +ALTER PUBLICATION testpub_default WITH (nopublish insert, nopublish delete); + +\dRp + +ALTER PUBLICATION testpub_default WITH (publish insert, publish delete); + +\dRp + +--- adding tables +CREATE SCHEMA pub_test; +CREATE TABLE testpub_tbl1 (id serial primary key, data text); +CREATE TABLE pub_test.testpub_nopk (foo int, bar int); +CREATE VIEW testpub_view AS SELECT 1; + +CREATE PUBLICATION testpub_foralltables FOR ALL TABLES WITH (nopublish delete, nopublish update); +ALTER PUBLICATION testpub_foralltables WITH (publish update); + +CREATE TABLE testpub_tbl2 (id serial primary key, data text); +-- fail - can't add to for all tables publication +ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; +-- fail - can't drop from all tables publication +ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; +-- fail - can't add to for all tables publication +ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; + +SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; +\d+ testpub_tbl2 + +DROP TABLE testpub_tbl2; +DROP PUBLICATION testpub_foralltables; + +-- fail - view +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk; +-- fail - already added +ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_tbl1; +-- fail - already added +CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; + +\dRp+ testpub_fortbl + +-- fail - view +ALTER PUBLICATION testpub_default ADD TABLE testpub_view; + +ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; +ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; +ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; + +ALTER PUBLICATION testpib_ins_trunct ADD TABLE pub_test.testpub_nopk, testpub_tbl1; + +\d+ pub_test.testpub_nopk +\d+ testpub_tbl1 +\dRp+ testpub_default + +ALTER PUBLICATION testpub_default DROP TABLE testpub_tbl1, pub_test.testpub_nopk; +-- fail - nonexistent +ALTER PUBLICATION testpub_default DROP TABLE pub_test.testpub_nopk; + +\d+ testpub_tbl1 + +DROP VIEW testpub_view; +DROP TABLE testpub_tbl1; + +\dRp+ testpub_default + +DROP PUBLICATION testpub_default; +DROP PUBLICATION testpib_ins_trunct; + +DROP SCHEMA pub_test CASCADE; + +RESET SESSION AUTHORIZATION; +DROP ROLE regress_publication_user; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql new file mode 100644 index 00000000000..68c17d5cfda --- /dev/null +++ b/src/test/regress/sql/subscription.sql @@ -0,0 +1,44 @@ +-- +-- SUBSCRIPTION +-- + +CREATE ROLE regress_subscription_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +-- fail - no publications +CREATE SUBSCRIPTION testsub CONNECTION 'foo'; + +-- fail - no connection +CREATE SUBSCRIPTION testsub PUBLICATION foo; + +set client_min_messages to error; +CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT); +reset client_min_messages; + +\dRs+ + +ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3; + +\dRs + +ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; +ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1; + +\dRs+ + +BEGIN; +ALTER SUBSCRIPTION testsub ENABLE; + +\dRs + +ALTER SUBSCRIPTION testsub DISABLE; + +\dRs + +COMMIT; + +DROP SUBSCRIPTION testsub NODROP SLOT; + +RESET SESSION AUTHORIZATION; +DROP ROLE regress_subscription_user; diff --git a/src/test/subscription/.gitignore b/src/test/subscription/.gitignore new file mode 100644 index 00000000000..871e943d50e --- /dev/null +++ b/src/test/subscription/.gitignore @@ -0,0 +1,2 @@ +# Generated by test suite +/tmp_check/ diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile new file mode 100644 index 00000000000..bb9795453a8 --- /dev/null +++ b/src/test/subscription/Makefile @@ -0,0 +1,22 @@ +#------------------------------------------------------------------------- +# +# Makefile for src/test/subscription +# +# Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# src/test/subscription/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/test/subscription +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +EXTRA_INSTALL = contrib/hstore + +check: + $(prove_check) + +clean distclean maintainer-clean: + rm -rf tmp_check diff --git a/src/test/subscription/README b/src/test/subscription/README new file mode 100644 index 00000000000..e9e93755b71 --- /dev/null +++ b/src/test/subscription/README @@ -0,0 +1,16 @@ +src/test/subscription/README + +Regression tests for subscription/logical replication +===================================================== + +This directory contains a test suite for subscription/logical replication. + +Running the tests +================= + + make check + +NOTE: This creates a temporary installation, and some tests may +create one or multiple nodes, for the purpose of the tests. + +NOTE: This requires the --enable-tap-tests argument to configure. diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl new file mode 100644 index 00000000000..b51740bcd4d --- /dev/null +++ b/src/test/subscription/t/001_rep_changes.pl @@ -0,0 +1,188 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 11; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins (a int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_notrep (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_ins (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (nopublish delete, nopublish update)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rep SET a = -a"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(50|1|50), 'check replicated inserts on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); + +# and do the update +$node_publisher->safe_psql('postgres', + "UPDATE tab_full SET a = a * a"); + +# Wait for subscription to catch up +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(10|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. Not all of these are registered as tests +# as we need to poll for a change but the test suite will fail none the less +# when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") + or die "Timed out while waiting for apply to restart"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") + or die "Timed out while waiting for apply to restart"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_rep"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(150|1|1100), 'check replicated inserts after subscription publication change'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only WITH (publish delete)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_ins WHERE a > 0"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# note that data are different on provider and subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(50|1|50), 'check replicated deletes after alter publication'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(11|0|100), 'check replicated insert after alter publication'); + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = + $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl new file mode 100644 index 00000000000..9064eb4c6de --- /dev/null +++ b/src/test/subscription/t/002_types.pl @@ -0,0 +1,539 @@ +# This tests that more complex datatypes are replicated correctly +# by logical replication +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +my $ddl = qq( + CREATE EXTENSION hstore WITH SCHEMA public; + CREATE TABLE public.tst_one_array ( + a INTEGER PRIMARY KEY, + b INTEGER[] + ); + CREATE TABLE public.tst_arrays ( + a INTEGER[] PRIMARY KEY, + b TEXT[], + c FLOAT[], + d INTERVAL[] + ); + + CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e'); + CREATE TABLE public.tst_one_enum ( + a INTEGER PRIMARY KEY, + b public.tst_enum_t + ); + CREATE TABLE public.tst_enums ( + a public.tst_enum_t PRIMARY KEY, + b public.tst_enum_t[] + ); + + CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER); + CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER); + CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER); + CREATE TABLE public.tst_one_comp ( + a INTEGER PRIMARY KEY, + b public.tst_comp_basic_t + ); + CREATE TABLE public.tst_comps ( + a public.tst_comp_basic_t PRIMARY KEY, + b public.tst_comp_basic_t[] + ); + CREATE TABLE public.tst_comp_enum ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_t + ); + CREATE TABLE public.tst_comp_enum_array ( + a public.tst_comp_enum_t PRIMARY KEY, + b public.tst_comp_enum_t[] + ); + CREATE TABLE public.tst_comp_one_enum_array ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_array_t + ); + CREATE TABLE public.tst_comp_enum_what ( + a public.tst_comp_enum_array_t PRIMARY KEY, + b public.tst_comp_enum_array_t[] + ); + + CREATE TYPE public.tst_comp_mix_t AS ( + a public.tst_comp_basic_t, + b public.tst_comp_basic_t[], + c public.tst_enum_t, + d public.tst_enum_t[] + ); + CREATE TABLE public.tst_comp_mix_array ( + a public.tst_comp_mix_t PRIMARY KEY, + b public.tst_comp_mix_t[] + ); + CREATE TABLE public.tst_range ( + a INTEGER PRIMARY KEY, + b int4range + ); + CREATE TABLE public.tst_range_array ( + a INTEGER PRIMARY KEY, + b TSTZRANGE, + c int8range[] + ); + CREATE TABLE public.tst_hstore ( + a INTEGER PRIMARY KEY, + b public.hstore + );); + +# Setup structure on both nodes +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (SLOT NAME = tap_sub_slot)"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Insert initial test data +$node_publisher->safe_psql('postgres', qq( + -- test_tbl_one_array_col + INSERT INTO tst_one_array (a, b) VALUES + (1, '{1, 2, 3}'), + (2, '{2, 3, 1}'), + (3, '{3, 2, 1}'), + (4, '{4, 3, 2}'), + (5, '{5, NULL, 3}'); + + -- test_tbl_arrays + INSERT INTO tst_arrays (a, b, c, d) VALUES + ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'), + ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'), + ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'), + ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'), + ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}'); + + -- test_tbl_single_enum + INSERT INTO tst_one_enum (a, b) VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'), + (4, 'd'), + (5, NULL); + + -- test_tbl_enums + INSERT INTO tst_enums (a, b) VALUES + ('a', '{b, c}'), + ('b', '{c, a}'), + ('c', '{b, a}'), + ('d', '{c, b}'), + ('e', '{d, NULL}'); + + -- test_tbl_single_composites + INSERT INTO tst_one_comp (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, NULL, 5)); + + -- test_tbl_composites + INSERT INTO tst_comps (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]), + (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]), + (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]); + + -- test_tbl_composite_with_enums + INSERT INTO tst_comp_enum (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, 'e', NULL)); + + -- test_tbl_composite_with_enums_array + INSERT INTO tst_comp_enum_array (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]), + (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]), + (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]); + + -- test_tbl_composite_with_single_enums_array_in_composite + INSERT INTO tst_comp_one_enum_array (a, b) VALUES + (1, ROW(1.0, '{a, b, c}', 1)), + (2, ROW(2.0, '{a, b, c}', 2)), + (3, ROW(3.0, '{a, b, c}', 3)), + (4, ROW(4.0, '{c, b, d}', 4)), + (5, ROW(5.0, '{NULL, e, NULL}', 5)); + + -- test_tbl_composite_with_enums_array_in_composite + INSERT INTO tst_comp_enum_what (a, b) VALUES + (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), + (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]), + (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]), + (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]), + (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]); + + -- test_tbl_mixed_composites + INSERT INTO tst_comp_mix_array (a, b) VALUES + (ROW( + ROW(1,'a',1), + ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], + 'a', + '{a,b,NULL,c}'), + ARRAY[ + ROW( + ROW(1,'a',1), + ARRAY[ + ROW(1,'a',1)::tst_comp_basic_t, + ROW(2,'b',2)::tst_comp_basic_t, + NULL + ], + 'a', + '{a,b,c}' + )::tst_comp_mix_t + ] + ); + + -- test_tbl_range + INSERT INTO tst_range (a, b) VALUES + (1, '[1, 10]'), + (2, '[2, 20]'), + (3, '[3, 30]'), + (4, '[4, 40]'), + (5, '[5, 50]'); + + -- test_tbl_range_array + INSERT INTO tst_range_array (a, b, c) VALUES + (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'), + (2, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '2 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'), + (3, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '3 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'), + (4, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '4 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'), + (5, NULL, NULL); + + -- tst_hstore + INSERT INTO tst_hstore (a, b) VALUES + (1, '"a"=>"1"'), + (2, '"zzz"=>"foo"'), + (3, '"123"=>"321"'), + (4, '"yellow horse"=>"moaned"'); +)); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Check the data on subscriber +my $result = $node_subscriber->safe_psql('postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is($result, '1|{1,2,3} +2|{2,3,1} +3|{3,2,1} +4|{4,3,2} +5|{5,NULL,3} +{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{"4 years","1 year","2 years"} +{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{"5 years",NULL,NULL} +1|a +2|b +3|c +4|d +5| +a|{b,c} +b|{c,a} +c|{b,a} +d|{c,b} +e|{d,NULL} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,,5) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{"(4,d,3)"} +(5,e,)|{NULL,"(5,,5)"} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,e,) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(3,d,3)"} +(5,e,3)|{"(3,e,3)",NULL} +1|(1,"{a,b,c}",1) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(5,"{NULL,e,NULL}",5) +(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"} +(5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} +1|[1,11) +2|[2,21) +3|[3,31) +4|[4,41) +5|[5,51) +1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-07-31 00:00:00+02","2014-08-04 00:00:00+02")|{"[4,6)",NULL,"[40,51)"} +5|| +1|"a"=>"1" +2|"zzz"=>"foo" +3|"123"=>"321" +4|"yellow horse"=>"moaned"', +'check replicated inserts on subscriber'); + +# Run batch of updates +$node_publisher->safe_psql('postgres', qq( + UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1; + UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3; + UPDATE tst_arrays SET b = '{"1a", "2b", "3c"}', c = '{1.0, 2.0, 3.0}', d = '{"1 day 1 second", "2 days 2 seconds", "3 days 3 second"}' WHERE a = '{1, 2, 3}'; + UPDATE tst_arrays SET b = '{"c", "d", "e"}', c = '{3.0, 4.0, 5.0}', d = '{"3 day 1 second", "4 days 2 seconds", "5 days 3 second"}' WHERE a[1] > 3; + UPDATE tst_one_enum SET b = 'c' WHERE a = 1; + UPDATE tst_one_enum SET b = NULL WHERE a > 3; + UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a'; + UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c'; + UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1; + UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3; + UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0; + UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9; + UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1; + UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3; + UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3; + UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1; + UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3; + UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1; + UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3; + UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1; + UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1; + UPDATE tst_range SET b = '(1, 90)' WHERE a > 3; + UPDATE tst_range_array SET c = '{"[100, 1000]"}' WHERE a = 1; + UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, "[11,9999999]"}' WHERE a > 3; + UPDATE tst_hstore SET b = '"updated"=>"value"' WHERE a < 3; + UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3; +)); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Check the data on subscriber +$result = $node_subscriber->safe_psql('postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is($result, '1|{4,5,6} +2|{2,3,1} +3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{1,2,3}|{1a,2b,3c}|{1,2,3}|{"1 day 00:00:01","2 days 00:00:02","3 days 00:00:03"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +1|c +2|b +3|c +4| +5| +a|{e,NULL} +b|{c,a} +c|{b,a} +d|{e,d} +e|{e,d} +1|(1,A,1) +2|(2,b,2) +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(1,a,1)|{"(9,x,-1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +1|(1,,) +2|(2,b,2) +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(1,a,1)|{NULL,"(3,d,3)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +1|(1,"{a,e,c}",) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(1,"{a,b,c}",1)|{NULL,"(1,\"{a,b,c}\",1)","(,\"{a,e,c}\",2)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL} +1|[100,1001) +2|[2,21) +3|[3,31) +4|[2,90) +5|[2,90) +1|["2014-08-04 00:00:00+02",infinity)|{"[100,1001)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +5|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +1|"updated"=>"value" +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', +'check replicated updates on subscriber'); + +# Run batch of deletes +$node_publisher->safe_psql('postgres', qq( + DELETE FROM tst_one_array WHERE a = 1; + DELETE FROM tst_one_array WHERE b = '{2, 3, 1}'; + DELETE FROM tst_arrays WHERE a = '{1, 2, 3}'; + DELETE FROM tst_arrays WHERE a[1] = 2; + DELETE FROM tst_one_enum WHERE a = 1; + DELETE FROM tst_one_enum WHERE b = 'b'; + DELETE FROM tst_enums WHERE a = 'a'; + DELETE FROM tst_enums WHERE b[1] = 'b'; + DELETE FROM tst_one_comp WHERE a = 1; + DELETE FROM tst_one_comp WHERE (b).a = 2.0; + DELETE FROM tst_comps WHERE (a).b = 'a'; + DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b); + DELETE FROM tst_comp_enum WHERE a = 1; + DELETE FROM tst_comp_enum WHERE (b).a = 2.0; + DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b); + DELETE FROM tst_comp_one_enum_array WHERE a = 1; + DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b); + DELETE FROM tst_comp_enum_what WHERE (a).a = 1; + DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}'; + DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1; + DELETE FROM tst_range WHERE a = 1; + DELETE FROM tst_range WHERE '[10,20]' && b; + DELETE FROM tst_range_array WHERE a = 1; + DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b; + DELETE FROM tst_hstore WHERE a = 1; +)); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Check the data on subscriber +$result = $node_subscriber->safe_psql('postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is($result, '3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +3|c +4| +5| +b|{c,a} +d|{e,d} +e|{e,d} +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(2,b,2)|{"(2,b,2)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(2,b,2)|{"(2,b,2)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', +'check replicated deletes on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); |