diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 17 | ||||
-rw-r--r-- | src/test/subscription/t/014_binary.pl | 180 |
2 files changed, 186 insertions, 11 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 07eea504ba8..fb6d5474d00 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -101,6 +101,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "replication/logicallauncher.h" @@ -1090,6 +1091,7 @@ copy_table(Relation rel) CopyFromState cstate; List *attnamelist; ParseState *pstate; + List *options = NIL; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), @@ -1168,6 +1170,19 @@ copy_table(Relation rel) appendStringInfoString(&cmd, ") TO STDOUT"); } + + /* + * Prior to v16, initial table synchronization will use text format even + * if the binary option is enabled for a subscription. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 && + MySubscription->binary) + { + appendStringInfoString(&cmd, " WITH (FORMAT binary)"); + options = list_make1(makeDefElem("format", + (Node *) makeString("binary"), -1)); + } + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) @@ -1184,7 +1199,7 @@ copy_table(Relation rel) NULL, false, false); attnamelist = make_copy_attnamelist(relmapentry); - cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL); + cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options); /* Do the copy */ (void) CopyFrom(cstate); diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl index e53e23da3ee..feefbe734e4 100644 --- a/src/test/subscription/t/014_binary.pl +++ b/src/test/subscription/t/014_binary.pl @@ -40,35 +40,72 @@ $node_subscriber->safe_psql('postgres', $ddl); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tpub FOR ALL TABLES"); +# ------------------------------------------------------ +# Ensure binary mode also executes COPY in binary format +# ------------------------------------------------------ + +# Insert some content before creating a subscription +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20); + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + )); + my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); +# Ensure the COPY command is executed in binary format on the publisher +$node_publisher->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/ +); + # Ensure nodes are in sync with each other $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); +my $sync_check = qq( + SELECT a, b, c, d FROM test_numerical ORDER BY a; + SELECT a, b, c FROM test_arrays ORDER BY a; +); + +# Check the synced data on the subscriber +my $result = $node_subscriber->safe_psql('postgres', $sync_check); + +is( $result, '1|1.2|1.3|10 +2|2.2|2.3|20 +{1,2,3}|{1.1,1.2,1.3}|{one,two,three} +{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber'); + +# ---------------------------------- +# Ensure apply works in binary mode +# ---------------------------------- + # Insert some content and make sure it's replicated across $node_publisher->safe_psql( 'postgres', qq( INSERT INTO public.test_arrays (a, b, c) VALUES - ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), - ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + ('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'), + ('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}'); INSERT INTO public.test_numerical (a, b, c, d) VALUES - (1, 1.2, 1.3, 10), - (2, 2.2, 2.3, 20), - (3, 3.2, 3.3, 30); + (3, 3.2, 3.3, 30), + (4, 4.2, 4.3, 40); )); $node_publisher->wait_for_catchup('tsub'); -my $result = $node_subscriber->safe_psql('postgres', +$result = $node_subscriber->safe_psql('postgres', "SELECT a, b, c, d FROM test_numerical ORDER BY a"); is( $result, '1|1.2|1.3|10 2|2.2|2.3|20 -3|3.2|3.3|30', 'check replicated data on subscriber'); +3|3.2|3.3|30 +4|4.2|4.3|40', 'check replicated data on subscriber'); # Test updates as well $node_publisher->safe_psql( @@ -83,6 +120,8 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT a, b, c FROM test_arrays ORDER BY a"); is( $result, '{1,2,3}|{42,1.2,1.3}| +{1,3,2}|{42,1.3,1.2}| +{2,1,3}|{42,1.1,1.3}| {3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber'); $result = $node_subscriber->safe_psql('postgres', @@ -90,7 +129,12 @@ $result = $node_subscriber->safe_psql('postgres', is( $result, '1|42||10 2|42||20 -3|42||30', 'check updated replicated data on subscriber'); +3|42||30 +4|42||40', 'check updated replicated data on subscriber'); + +# ------------------------------------------------------------------------------ +# Use ALTER SUBSCRIPTION to change to text format and then back to binary format +# ------------------------------------------------------------------------------ # Test to reset back to text formatting, and then to binary again $node_subscriber->safe_psql('postgres', @@ -99,7 +143,7 @@ $node_subscriber->safe_psql('postgres', $node_publisher->safe_psql( 'postgres', qq( INSERT INTO public.test_numerical (a, b, c, d) VALUES - (4, 4.2, 4.3, 40); + (5, 5.2, 5.3, 50); )); $node_publisher->wait_for_catchup('tsub'); @@ -110,7 +154,8 @@ $result = $node_subscriber->safe_psql('postgres', is( $result, '1|42||10 2|42||20 3|42||30 -4|4.2|4.3|40', 'check replicated data on subscriber'); +4|42||40 +5|5.2|5.3|50', 'check replicated data on subscriber'); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tsub SET (binary = true);"); @@ -127,9 +172,124 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT a, b, c FROM test_arrays ORDER BY a"); is( $result, '{1,2,3}|{42,1.2,1.3}| +{1,3,2}|{42,1.3,1.2}| +{2,1,3}|{42,1.1,1.3}| {2,3,1}|{1.2,1.3,1.1}|{two,three,one} {3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber'); +# --------------------------------------------------------------- +# Test binary replication without and with send/receive functions +# --------------------------------------------------------------- + +# Create a custom type without send/rcv functions +$ddl = qq( + CREATE TYPE myvarchar; + CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar + LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin'; + CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring + LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout'; + CREATE TYPE myvarchar ( + input = myvarcharin, + output = myvarcharout); + CREATE TABLE public.test_myvarchar ( + a myvarchar + );); + +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert some initial data +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_myvarchar (a) VALUES + ('a'); + )); + +# Check the subscriber log from now on. +my $offset = -s $node_subscriber->logfile; + +# Refresh the publication to trigger the tablesync +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION"); + +# It should fail +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/, + $offset); + +# Create and set send/rcv functions for the custom type +$ddl = qq( + CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea + LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend'; + CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar + LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv'; + ALTER TYPE myvarchar SET ( + send = myvarcharsend, + receive = myvarcharrecv + );); + +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Now tablesync should succeed +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); + +# Check the synced data on the subscriber +$result = + $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;'); + +is($result, 'a', 'check synced data on subscriber with custom type'); + +# ----------------------------------------------------- +# Test mismatched column types with/without binary mode +# ----------------------------------------------------- + +# Test syncing tables with mismatching column types +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE public.test_mismatching_types ( + a bigint PRIMARY KEY + ); + INSERT INTO public.test_mismatching_types (a) + VALUES (1), (2); + )); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE public.test_mismatching_types ( + a int PRIMARY KEY + ); + ALTER SUBSCRIPTION tsub REFRESH PUBLICATION; + )); + +# Cannot sync due to type mismatch +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset); + +# Check the publisher log from now on. +$offset = -s $node_publisher->logfile; + +# Setting binary to false should allow syncing +$node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION tsub SET (binary = false);)); + +# Ensure the COPY command is executed in text format on the publisher +$node_publisher->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); + +# Check the synced data on the subscriber +$result = $node_subscriber->safe_psql('postgres', + 'SELECT a FROM test_mismatching_types ORDER BY a;'); + +is( $result, '1 +2', 'check synced data on subscriber with binary = false'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); |