aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/tablesync.c17
-rw-r--r--src/test/subscription/t/014_binary.pl180
2 files changed, 186 insertions, 11 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba8..fb6d5474d00 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
+#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
@@ -1090,6 +1091,7 @@ copy_table(Relation rel)
CopyFromState cstate;
List *attnamelist;
ParseState *pstate;
+ List *options = NIL;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1168,6 +1170,19 @@ copy_table(Relation rel)
appendStringInfoString(&cmd, ") TO STDOUT");
}
+
+ /*
+ * Prior to v16, initial table synchronization will use text format even
+ * if the binary option is enabled for a subscription.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
+ MySubscription->binary)
+ {
+ appendStringInfoString(&cmd, " WITH (FORMAT binary)");
+ options = list_make1(makeDefElem("format",
+ (Node *) makeString("binary"), -1));
+ }
+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
@@ -1184,7 +1199,7 @@ copy_table(Relation rel)
NULL, false, false);
attnamelist = make_copy_attnamelist(relmapentry);
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+ cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
/* Do the copy */
(void) CopyFrom(cstate);
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index e53e23da3ee..feefbe734e4 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -40,35 +40,72 @@ $node_subscriber->safe_psql('postgres', $ddl);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tpub FOR ALL TABLES");
+# ------------------------------------------------------
+# Ensure binary mode also executes COPY in binary format
+# ------------------------------------------------------
+
+# Insert some content before creating a subscription
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_numerical (a, b, c, d) VALUES
+ (1, 1.2, 1.3, 10),
+ (2, 2.2, 2.3, 20);
+ INSERT INTO public.test_arrays (a, b, c) VALUES
+ ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+ ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+ ));
+
my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
+# Ensure the COPY command is executed in binary format on the publisher
+$node_publisher->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/
+);
+
# Ensure nodes are in sync with each other
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+my $sync_check = qq(
+ SELECT a, b, c, d FROM test_numerical ORDER BY a;
+ SELECT a, b, c FROM test_arrays ORDER BY a;
+);
+
+# Check the synced data on the subscriber
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber');
+
+# ----------------------------------
+# Ensure apply works in binary mode
+# ----------------------------------
+
# Insert some content and make sure it's replicated across
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_arrays (a, b, c) VALUES
- ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
- ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+ ('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'),
+ ('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}');
INSERT INTO public.test_numerical (a, b, c, d) VALUES
- (1, 1.2, 1.3, 10),
- (2, 2.2, 2.3, 20),
- (3, 3.2, 3.3, 30);
+ (3, 3.2, 3.3, 30),
+ (4, 4.2, 4.3, 40);
));
$node_publisher->wait_for_catchup('tsub');
-my $result = $node_subscriber->safe_psql('postgres',
+$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c, d FROM test_numerical ORDER BY a");
is( $result, '1|1.2|1.3|10
2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+4|4.2|4.3|40', 'check replicated data on subscriber');
# Test updates as well
$node_publisher->safe_psql(
@@ -83,6 +120,8 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
+{1,3,2}|{42,1.3,1.2}|
+{2,1,3}|{42,1.1,1.3}|
{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
$result = $node_subscriber->safe_psql('postgres',
@@ -90,7 +129,12 @@ $result = $node_subscriber->safe_psql('postgres',
is( $result, '1|42||10
2|42||20
-3|42||30', 'check updated replicated data on subscriber');
+3|42||30
+4|42||40', 'check updated replicated data on subscriber');
+
+# ------------------------------------------------------------------------------
+# Use ALTER SUBSCRIPTION to change to text format and then back to binary format
+# ------------------------------------------------------------------------------
# Test to reset back to text formatting, and then to binary again
$node_subscriber->safe_psql('postgres',
@@ -99,7 +143,7 @@ $node_subscriber->safe_psql('postgres',
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_numerical (a, b, c, d) VALUES
- (4, 4.2, 4.3, 40);
+ (5, 5.2, 5.3, 50);
));
$node_publisher->wait_for_catchup('tsub');
@@ -110,7 +154,8 @@ $result = $node_subscriber->safe_psql('postgres',
is( $result, '1|42||10
2|42||20
3|42||30
-4|4.2|4.3|40', 'check replicated data on subscriber');
+4|42||40
+5|5.2|5.3|50', 'check replicated data on subscriber');
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tsub SET (binary = true);");
@@ -127,9 +172,124 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
+{1,3,2}|{42,1.3,1.2}|
+{2,1,3}|{42,1.1,1.3}|
{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
+# ---------------------------------------------------------------
+# Test binary replication without and with send/receive functions
+# ---------------------------------------------------------------
+
+# Create a custom type without send/rcv functions
+$ddl = qq(
+ CREATE TYPE myvarchar;
+ CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar
+ LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin';
+ CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring
+ LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout';
+ CREATE TYPE myvarchar (
+ input = myvarcharin,
+ output = myvarcharout);
+ CREATE TABLE public.test_myvarchar (
+ a myvarchar
+ ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Insert some initial data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_myvarchar (a) VALUES
+ ('a');
+ ));
+
+# Check the subscriber log from now on.
+my $offset = -s $node_subscriber->logfile;
+
+# Refresh the publication to trigger the tablesync
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION");
+
+# It should fail
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/,
+ $offset);
+
+# Create and set send/rcv functions for the custom type
+$ddl = qq(
+ CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea
+ LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend';
+ CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar
+ LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv';
+ ALTER TYPE myvarchar SET (
+ send = myvarcharsend,
+ receive = myvarcharrecv
+ ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Now tablesync should succeed
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+# Check the synced data on the subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;');
+
+is($result, 'a', 'check synced data on subscriber with custom type');
+
+# -----------------------------------------------------
+# Test mismatched column types with/without binary mode
+# -----------------------------------------------------
+
+# Test syncing tables with mismatching column types
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE public.test_mismatching_types (
+ a bigint PRIMARY KEY
+ );
+ INSERT INTO public.test_mismatching_types (a)
+ VALUES (1), (2);
+ ));
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE public.test_mismatching_types (
+ a int PRIMARY KEY
+ );
+ ALTER SUBSCRIPTION tsub REFRESH PUBLICATION;
+ ));
+
+# Cannot sync due to type mismatch
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset);
+
+# Check the publisher log from now on.
+$offset = -s $node_publisher->logfile;
+
+# Setting binary to false should allow syncing
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION tsub SET (binary = false);));
+
+# Ensure the COPY command is executed in text format on the publisher
+$node_publisher->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+# Check the synced data on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ 'SELECT a FROM test_mismatching_types ORDER BY a;');
+
+is( $result, '1
+2', 'check synced data on subscriber with binary = false');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');