diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 16 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 6 | ||||
-rw-r--r-- | src/test/subscription/t/015_stream.pl | 28 | ||||
-rw-r--r-- | src/test/subscription/t/018_stream_subxact_abort.pl | 61 | ||||
-rw-r--r-- | src/test/subscription/t/023_twophase_stream.pl | 46 |
5 files changed, 148 insertions, 9 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 3579e704fe5..e670ec617a4 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); + /* + * We don't try to send data to parallel worker for 'immediate' mode. This + * is primarily used for testing purposes. + */ + if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)) + return false; + /* * This timeout is a bit arbitrary but testing revealed that it is sufficient * to send the message unless the parallel apply worker is waiting on some @@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - { - ereport(LOG, - (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", - winfo->shared->xid))); return false; - } } } @@ -1206,6 +1208,10 @@ void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked) { + ereport(LOG, + (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", + winfo->shared->xid))); + /* * The parallel apply worker could be stuck for some reason (say waiting * on some lock by other backend), so stop trying to send data directly to diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c5a95f5dcca..b46e3b8c558 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4920,8 +4920,10 @@ struct config_enum ConfigureNamesEnum[] = { {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("Controls when to replicate each change."), - gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."), + gettext_noop("Controls when to replicate or apply each change."), + gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. " + "On the subscriber, it allows serialization of all changes to files and notifies the " + "parallel apply workers to read and apply them at the end of the transaction."), GUC_NOT_IN_SAMPLE }, &logical_replication_mode, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 91e8aa8c0a5..0e0f27f14df 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -312,6 +312,34 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(15000), 'parallel apply worker replayed all changes from file'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 814daf4d2f9..2b67ae1e0ac 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -198,6 +200,63 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + ROLLBACK; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(0), 'check rollback was reflected on subscriber'); + +# Serialize the ABORT sub-transaction. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + SAVEPOINT sp; + INSERT INTO test_tab_2 values(1); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'check rollback to savepoint was reflected on subscriber'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 497245a209c..1cc871fddbd 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber (columns a and b are compatible with same table name on publisher) $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication (streaming = on) my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -384,6 +386,48 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'transaction is committed on subscriber'); + ############################### # check all the cleanup ############################### |