diff options
-rw-r--r-- | doc/src/sgml/config.sgml | 35 | ||||
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 16 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 6 | ||||
-rw-r--r-- | src/test/subscription/t/015_stream.pl | 28 | ||||
-rw-r--r-- | src/test/subscription/t/018_stream_subxact_abort.pl | 61 | ||||
-rw-r--r-- | src/test/subscription/t/023_twophase_stream.pl | 46 |
6 files changed, 172 insertions, 20 deletions
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1cf53c74ea6..186edaffd5a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11701,22 +11701,35 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) </term> <listitem> <para> - Allows streaming or serializing changes immediately in logical decoding. - The allowed values of <varname>logical_replication_mode</varname> are - <literal>buffered</literal> and <literal>immediate</literal>. When set - to <literal>immediate</literal>, stream each change if + The allowed values are <literal>buffered</literal> and + <literal>immediate</literal>. The default is <literal>buffered</literal>. + This parameter is intended to be used to test logical decoding and + replication of large transactions. The effect of + <varname>logical_replication_mode</varname> is different for the + publisher and subscriber: + </para> + + <para> + On the publisher side, <varname>logical_replication_mode</varname> + allows streaming or serializing changes immediately in logical decoding. + When set to <literal>immediate</literal>, stream each change if the <literal>streaming</literal> option (see optional parameters set by <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>) is enabled, otherwise, serialize each change. When set to - <literal>buffered</literal>, which is the default, decoding will stream - or serialize changes when <varname>logical_decoding_work_mem</varname> - is reached. + <literal>buffered</literal>, the decoding will stream or serialize + changes when <varname>logical_decoding_work_mem</varname> is reached. </para> + <para> - This parameter is intended to be used to test logical decoding and - replication of large transactions for which otherwise we need to - generate the changes till <varname>logical_decoding_work_mem</varname> - is reached. + On the subscriber side, if the <literal>streaming</literal> option is set to + <literal>parallel</literal>, <varname>logical_replication_mode</varname> + can be used to direct the leader apply worker to send changes to the + shared memory queue or to serialize all changes to the file. When set to + <literal>buffered</literal>, the leader sends changes to parallel apply + workers via a shared memory queue. When set to + <literal>immediate</literal>, the leader serializes all changes to files + and notifies the parallel apply workers to read and apply them at the + end of the transaction. </para> </listitem> </varlistentry> diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 3579e704fe5..e670ec617a4 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); + /* + * We don't try to send data to parallel worker for 'immediate' mode. This + * is primarily used for testing purposes. + */ + if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)) + return false; + /* * This timeout is a bit arbitrary but testing revealed that it is sufficient * to send the message unless the parallel apply worker is waiting on some @@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - { - ereport(LOG, - (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", - winfo->shared->xid))); return false; - } } } @@ -1206,6 +1208,10 @@ void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked) { + ereport(LOG, + (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", + winfo->shared->xid))); + /* * The parallel apply worker could be stuck for some reason (say waiting * on some lock by other backend), so stop trying to send data directly to diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c5a95f5dcca..b46e3b8c558 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4920,8 +4920,10 @@ struct config_enum ConfigureNamesEnum[] = { {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("Controls when to replicate each change."), - gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."), + gettext_noop("Controls when to replicate or apply each change."), + gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. " + "On the subscriber, it allows serialization of all changes to files and notifies the " + "parallel apply workers to read and apply them at the end of the transaction."), GUC_NOT_IN_SAMPLE }, &logical_replication_mode, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 91e8aa8c0a5..0e0f27f14df 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -312,6 +312,34 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(15000), 'parallel apply worker replayed all changes from file'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 814daf4d2f9..2b67ae1e0ac 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -198,6 +200,63 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + ROLLBACK; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(0), 'check rollback was reflected on subscriber'); + +# Serialize the ABORT sub-transaction. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + SAVEPOINT sp; + INSERT INTO test_tab_2 values(1); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'check rollback to savepoint was reflected on subscriber'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 497245a209c..1cc871fddbd 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber (columns a and b are compatible with same table name on publisher) $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication (streaming = on) my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -384,6 +386,48 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'transaction is committed on subscriber'); + ############################### # check all the cleanup ############################### |