aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/test/perl/PostgresNode.pm87
-rw-r--r--src/test/subscription/t/015_stream.pl26
3 files changed, 112 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d29c0c5a559..38749393800 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -776,7 +776,7 @@ apply_handle_stream_start(StringInfo s)
hash_ctl.entrysize = sizeof(StreamXidHash);
hash_ctl.hcxt = ApplyContext;
xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
- HASH_ELEM | HASH_CONTEXT);
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}
/* open the spool file for this transaction */
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 014f0fcda6c..9667f7667ec 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1565,6 +1565,93 @@ sub psql
=pod
+=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
+
+Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, which the
+caller may use to send input to B<psql>. The process's stdin is sourced from
+the $stdin scalar reference, and its stdout and stderr go to the $stdout
+scalar reference. This allows the caller to act on other parts of the system
+while idling this backend.
+
+The specified timer object is attached to the harness, as well. It's caller's
+responsibility to select the timeout length, and to restart the timer after
+each command if the timeout is per-command.
+
+psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
+disabled. That may be overridden by passing extra psql parameters.
+
+Dies on failure to invoke psql, or if psql fails to connect. Errors occurring
+later are the caller's problem. psql runs with on_error_stop by default so
+that it will stop running sql and return 3 if passed SQL results in an error.
+
+Be sure to "finish" the harness when done with it.
+
+=over
+
+=item on_error_stop => 1
+
+By default, the B<psql> method invokes the B<psql> program with ON_ERROR_STOP=1
+set, so SQL execution is stopped at the first error and exit code 3 is
+returned. Set B<on_error_stop> to 0 to ignore errors instead.
+
+=item replication => B<value>
+
+If set, add B<replication=value> to the conninfo string.
+Passing the literal value C<database> results in a logical replication
+connection.
+
+=item extra_params => ['--single-transaction']
+
+If given, it must be an array reference containing additional parameters to B<psql>.
+
+=back
+
+=cut
+
+sub background_psql
+{
+ my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
+
+ my $replication = $params{replication};
+
+ my @psql_params = (
+ 'psql',
+ '-XAtq',
+ '-d',
+ $self->connstr($dbname)
+ . (defined $replication ? " replication=$replication" : ""),
+ '-f',
+ '-');
+
+ $params{on_error_stop} = 1 unless defined $params{on_error_stop};
+
+ push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop};
+ push @psql_params, @{ $params{extra_params} }
+ if defined $params{extra_params};
+
+ # Ensure there is no data waiting to be sent:
+ $$stdin = "" if ref($stdin);
+ # IPC::Run would otherwise append to existing contents:
+ $$stdout = "" if ref($stdout);
+
+ my $harness = IPC::Run::start \@psql_params,
+ '<', $stdin, '>', $stdout, $timer;
+
+ # Request some output, and pump until we see it. This means that psql
+ # connection failures are caught here, relieving callers of the need to
+ # handle those. (Right now, we have no particularly good handling for
+ # errors anyway, but that might be added later.)
+ my $banner = "background_psql: ready";
+ $$stdin = "\\echo $banner\n";
+ pump $harness until $$stdout =~ /$banner/ || $timer->is_expired;
+
+ die "psql startup timed out" if $timer->is_expired;
+
+ return $harness;
+}
+
+=pod
+
=item $node->interactive_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
Invoke B<psql> on B<$dbname> and return an IPC::Run harness object,
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index fffe0019650..1b0e6fb9fb6 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -46,15 +46,37 @@ my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
-# Insert, update and delete enough rows to exceed the 64kB limit.
-$node_publisher->safe_psql('postgres', q{
+# Interleave a pair of transactions, each exceeding the 64kB limit.
+my $in = '';
+my $out = '';
+
+my $timer = IPC::Run::timeout(180);
+
+my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+ on_error_stop => 0);
+
+$in .= q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
+};
+$h->pump_nb;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i);
+DELETE FROM test_tab WHERE a > 5000;
COMMIT;
});
+$in .= q{
+COMMIT;
+\q
+};
+$h->finish; # errors make the next test fail, so ignore them here
+
$node_publisher->wait_for_catchup($appname);
$result =