aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
commita8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch)
treebfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical/tablesync.c
parent6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff)
downloadpostgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.tar.gz
postgresql-a8fd13cab0ba815e9925dc9676e6309f699b5f72.zip
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c197
1 files changed, 163 insertions, 34 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 682c107e747..f07983a43cb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -96,6 +96,7 @@
#include "access/table.h"
#include "access/xact.h"
+#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
@@ -114,8 +115,11 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
static bool table_states_valid = false;
+static List *table_states_not_ready = NIL;
+static bool FetchTableStates(bool *started_tx);
StringInfo copybuf = NULL;
@@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Oid relid;
TimestampTz last_start_time;
};
- static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
@@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- if (!table_states_valid)
- {
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
- SubscriptionRelState *rstate;
-
- /* Clean the old list. */
- list_free_deep(table_states);
- table_states = NIL;
-
- StartTransactionCommand();
- started_tx = true;
-
- /* Fetch all non-ready tables. */
- rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
-
- /* Allocate the tracking info in a permanent memory context. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
- {
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states = lappend(table_states, rstate);
- }
- MemoryContextSwitchTo(oldctx);
-
- table_states_valid = true;
- }
+ FetchTableStates(&started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
* immediate restarts. We don't need it if there are no tables that need
* syncing.
*/
- if (table_states && !last_start_times)
+ if (table_states_not_ready && !last_start_times)
{
HASHCTL ctl;
@@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
*/
- else if (!table_states && last_start_times)
+ else if (!table_states_not_ready && last_start_times)
{
hash_destroy(last_start_times);
last_start_times = NULL;
}
/*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * 'pending' until all tablesyncs have reached READY state.
+ *
+ * When this happens, we restart the apply worker and (if the conditions
+ * are still ok) then the two_phase tri-state will become 'enabled' at
+ * that time.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /*
* Process all tables that are being synchronized.
*/
- foreach(lc, table_states)
+ foreach(lc, table_states_not_ready)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn,
+ slotname, false /* permanent */ , false /* two_phase */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1158,3 +1156,134 @@ copy_table_done:
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+static bool
+FetchTableStates(bool *started_tx)
+{
+ static bool has_subrels = false;
+
+ *started_tx = false;
+
+ if (!table_states_valid)
+ {
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
+ SubscriptionRelState *rstate;
+
+ /* Clean the old lists. */
+ list_free_deep(table_states_not_ready);
+ table_states_not_ready = NIL;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ *started_tx = true;
+ }
+
+ /* Fetch all non-ready tables. */
+ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+
+ /* Allocate the tracking info in a permanent memory context. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ foreach(lc, rstates)
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready, rstate);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Does the subscription have tables?
+ *
+ * If there were not-READY relations found then we know it does. But
+ * if table_state_not_ready was empty we still need to check again to
+ * see if there are 0 tables.
+ */
+ has_subrels = (list_length(table_states_not_ready) > 0) ||
+ HasSubscriptionRelations(MySubscription->oid);
+
+ table_states_valid = true;
+ }
+
+ return has_subrels;
+}
+
+/*
+ * If the subscription has no tables then return false.
+ *
+ * Otherwise, are all tablesyncs READY?
+ *
+ * Note: This function is not suitable to be called from outside of apply or
+ * tablesync workers because MySubscription needs to be already initialized.
+ */
+bool
+AllTablesyncsReady(void)
+{
+ bool started_tx = false;
+ bool has_subrels = false;
+
+ /* We need up-to-date sync state info for subscription tables here. */
+ has_subrels = FetchTableStates(&started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+ }
+
+ /*
+ * Return false when there are no tables in subscription or not all tables
+ * are in ready state; true otherwise.
+ */
+ return has_subrels && list_length(table_states_not_ready) == 0;
+}
+
+/*
+ * Update the two_phase state of the specified subscription in pg_subscription.
+ */
+void
+UpdateTwoPhaseState(Oid suboid, char new_state)
+{
+ Relation rel;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
+ new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
+ new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR,
+ "cache lookup failed for subscription oid %u",
+ suboid);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* And update/set two_phase state */
+ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel),
+ values, nulls, replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ heap_freetuple(tup);
+ table_close(rel, RowExclusiveLock);
+}