aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c16
-rw-r--r--src/backend/utils/misc/guc_tables.c6
-rw-r--r--src/test/subscription/t/015_stream.pl28
-rw-r--r--src/test/subscription/t/018_stream_subxact_abort.pl61
-rw-r--r--src/test/subscription/t/023_twophase_stream.pl46
5 files changed, 148 insertions, 9 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3579e704fe5..e670ec617a4 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
Assert(!IsTransactionState());
Assert(!winfo->serialize_changes);
+ /*
+ * We don't try to send data to parallel worker for 'immediate' mode. This
+ * is primarily used for testing purposes.
+ */
+ if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
+ return false;
+
/*
* This timeout is a bit arbitrary but testing revealed that it is sufficient
* to send the message unless the parallel apply worker is waiting on some
@@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
startTime = GetCurrentTimestamp();
else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
SHM_SEND_TIMEOUT_MS))
- {
- ereport(LOG,
- (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
- winfo->shared->xid)));
return false;
- }
}
}
@@ -1206,6 +1208,10 @@ void
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked)
{
+ ereport(LOG,
+ (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+ winfo->shared->xid)));
+
/*
* The parallel apply worker could be stuck for some reason (say waiting
* on some lock by other backend), so stop trying to send data directly to
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c5a95f5dcca..b46e3b8c558 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4920,8 +4920,10 @@ struct config_enum ConfigureNamesEnum[] =
{
{"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
- gettext_noop("Controls when to replicate each change."),
- gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."),
+ gettext_noop("Controls when to replicate or apply each change."),
+ gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. "
+ "On the subscriber, it allows serialization of all changes to files and notifies the "
+ "parallel apply workers to read and apply them at the end of the transaction."),
GUC_NOT_IN_SAMPLE
},
&logical_replication_mode,
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 91e8aa8c0a5..0e0f27f14df 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -312,6 +312,34 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(15000), 'parallel apply worker replayed all changes from file');
+
$node_subscriber->stop;
$node_publisher->stop;
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 814daf4d2f9..2b67ae1e0ac 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
@@ -198,6 +200,63 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is aborted on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(0), 'check rollback was reflected on subscriber');
+
+# Serialize the ABORT sub-transaction.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ SAVEPOINT sp;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK TO sp;
+ COMMIT;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that only sub-transaction is aborted on subscriber.
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'check rollback to savepoint was reflected on subscriber');
+
$node_subscriber->stop;
$node_publisher->stop;
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 497245a209c..1cc871fddbd 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
@@ -384,6 +386,48 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ PREPARE TRANSACTION 'xact';
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'transaction is committed on subscriber');
+
###############################
# check all the cleanup
###############################