diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-07-14 07:33:50 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-07-14 07:33:50 +0530 |
commit | a8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch) | |
tree | bfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical/tablesync.c | |
parent | 6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff) | |
download | postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.tar.gz postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.zip |
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 197 |
1 files changed, 163 insertions, 34 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 682c107e747..f07983a43cb 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -96,6 +96,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" @@ -114,8 +115,11 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static bool FetchTableStates(bool *started_tx); StringInfo copybuf = NULL; @@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Oid relid; TimestampTz last_start_time; }; - static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; @@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - if (!table_states_valid) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; - - StartTransactionCommand(); - started_tx = true; - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); - } - MemoryContextSwitchTo(oldctx); - - table_states_valid = true; - } + FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ - if (table_states && !last_start_times) + if (table_states_not_ready && !last_start_times) { HASHCTL ctl; @@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ - else if (!table_states && last_start_times) + else if (!table_states_not_ready && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + + /* * Process all tables that are being synchronized. */ - foreach(lc, table_states) + foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1158,3 +1156,134 @@ copy_table_done: wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +static bool +FetchTableStates(bool *started_tx) +{ + static bool has_subrels = false; + + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY relations found then we know it does. But + * if table_state_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subrels = (list_length(table_states_not_ready) > 0) || + HasSubscriptionRelations(MySubscription->oid); + + table_states_valid = true; + } + + return has_subrels; +} + +/* + * If the subscription has no tables then return false. + * + * Otherwise, are all tablesyncs READY? + * + * Note: This function is not suitable to be called from outside of apply or + * tablesync workers because MySubscription needs to be already initialized. + */ +bool +AllTablesyncsReady(void) +{ + bool started_tx = false; + bool has_subrels = false; + + /* We need up-to-date sync state info for subscription tables here. */ + has_subrels = FetchTableStates(&started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } + + /* + * Return false when there are no tables in subscription or not all tables + * are in ready state; true otherwise. + */ + return has_subrels && list_length(table_states_not_ready) == 0; +} + +/* + * Update the two_phase state of the specified subscription in pg_subscription. + */ +void +UpdateTwoPhaseState(Oid suboid, char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + suboid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase state */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); +} |