diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-08-08 07:34:39 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-08-08 07:47:06 +0530 |
commit | 7259736a6e5b7c7588fff9578370736a6648acbb (patch) | |
tree | a2261d4ed09124a00d9ed8c0082f22256364aa77 /doc/src | |
parent | 0a7d771f0f63eb120e7f0a60aecd543ab25ba197 (diff) | |
download | postgresql-7259736a6e5b7c7588fff9578370736a6648acbb.tar.gz postgresql-7259736a6e5b7c7588fff9578370736a6648acbb.zip |
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'doc/src')
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 9 | ||||
-rw-r--r-- | doc/src/sgml/test-decoding.sgml | 22 |
2 files changed, 28 insertions, 3 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 791a62b57c9..1571d71a5b6 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); </programlisting> - Any actions leading to transaction ID assignment are prohibited. That, among others, - includes writing to tables, performing DDL changes, and - calling <literal>pg_current_xact_id()</literal>. + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the <literal>systable_*</literal> + scan APIs only. Access via the <literal>heap_*</literal> scan APIs will + error out. Additionally, any actions leading to transaction ID assignment + are prohibited. That, among others, includes writing to tables, performing + DDL changes, and calling <literal>pg_current_xact_id()</literal>. </para> </sect2> diff --git a/doc/src/sgml/test-decoding.sgml b/doc/src/sgml/test-decoding.sgml index 8356a3d67b3..fe7c9783fac 100644 --- a/doc/src/sgml/test-decoding.sgml +++ b/doc/src/sgml/test-decoding.sgml @@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i </programlisting> </para> +<para> + We can also get the changes of the in-progress transaction and the typical + output, might be: + +<programlisting> +postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1'); + lsn | xid | data +-----------+-----+-------------------------------------------------- + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | streaming change for TXN 503 + 0/16B2300 | 503 | streaming change for TXN 503 + 0/16B2408 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16BECA8 | 503 | streaming change for TXN 503 + 0/16BEDB0 | 503 | streaming change for TXN 503 + 0/16BEEB8 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 +(10 rows) +</programlisting> + </para> + </sect1> |