aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-02-12 07:41:51 +0530
committerAmit Kapila <akapila@postgresql.org>2021-02-12 07:41:51 +0530
commitce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e (patch)
treebe540b24d4cc30cbbd52e92ac164239b6773a699 /src/backend/commands/subscriptioncmds.c
parent3063eb17593c3ad498ce4e89db3358862ea2dbb6 (diff)
downloadpostgresql-ce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e.tar.gz
postgresql-ce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e.zip
Allow multiple xacts during table sync in logical replication.
For the initial table data synchronization in logical replication, we use a single transaction to copy the entire table and then synchronize the position in the stream with the main apply worker. There are multiple downsides of this approach: (a) We have to perform the entire copy operation again if there is any error (network breakdown, error in the database operation, etc.) while we synchronize the WAL position between tablesync worker and apply worker; this will be onerous especially for large copies, (b) Using a single transaction in the synchronization-phase (where we can receive WAL from multiple transactions) will have the risk of exceeding the CID limit, (c) The slot will hold the WAL till the entire sync is complete because we never commit till the end. This patch solves all the above downsides by allowing multiple transactions during the tablesync phase. The initial copy is done in a single transaction and after that, we commit each transaction as we receive. To allow recovery after any error or crash, we use a permanent slot and origin to track the progress. The slot and origin will be removed once we finish the synchronization of the table. We also remove slot and origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not finished. The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true cannot be executed inside a transaction block because they can now drop the slots for which we have no provision to rollback. This will also open up the path for logical replication of 2PC transactions on the subscriber side. Previously, we can't do that because of the requirement of maintaining a single transaction in tablesync workers. Bump catalog version due to change of state in the catalog (pg_subscription_rel). Author: Peter Smith, Amit Kapila, and Takamichi Osumi Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c467
1 files changed, 369 insertions, 98 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5ccbc9dd50f..5cf874e0b46 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
@@ -46,6 +47,8 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
Oid *pubrel_local_oids;
ListCell *lc;
int off;
+ int remove_rel_len;
+ Relation rel = NULL;
+ typedef struct SubRemoveRels
+ {
+ Oid relid;
+ char state;
+ } SubRemoveRels;
+ SubRemoveRels *sub_remove_rels;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
- /* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ PG_TRY();
+ {
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
- /* We are done with the remote side, close connection. */
- walrcv_disconnect(wrconn);
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
- /* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid);
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
- /*
- * Build qsorted array of local table oids for faster lookup. This can
- * potentially contain all tables in the database so speed of lookup is
- * important.
- */
- subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
- off = 0;
- foreach(lc, subrel_states)
- {
- SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
- subrel_local_oids[off++] = relstate->relid;
- }
- qsort(subrel_local_oids, list_length(subrel_states),
- sizeof(Oid), oid_cmp);
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
- /*
- * Walk over the remote tables and try to match them to locally known
- * tables. If the table is not known locally create a new state for it.
- *
- * Also builds array of local oids of remote tables for the next step.
- */
- off = 0;
- pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
- foreach(lc, pubrel_names)
- {
- RangeVar *rv = (RangeVar *) lfirst(lc);
- Oid relid;
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
- relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ pubrel_local_oids[off++] = relid;
- /* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
- pubrel_local_oids[off++] = relid;
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
- if (!bsearch(&relid, subrel_local_oids,
- list_length(subrel_states), sizeof(Oid), oid_cmp))
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
{
- AddSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
- }
- }
+ Oid relid = subrel_local_oids[off];
- /*
- * Next remove state for tables we should not care about anymore using the
- * data we collected above
- */
- qsort(pubrel_local_oids, list_length(pubrel_names),
- sizeof(Oid), oid_cmp);
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
- for (off = 0; off < list_length(subrel_states); off++)
- {
- Oid relid = subrel_local_oids[off];
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
- if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
{
- RemoveSubscriptionRel(sub->oid, relid);
-
- logicalrep_worker_stop_at_commit(sub->oid, relid);
-
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name)));
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
}
}
+ PG_FINALLY();
+ {
+ if (wrconn)
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ if (rel)
+ table_close(rel, NoLock);
}
/*
* Alter the existing subscription.
*/
ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt)
+AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
@@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+
AlterSubscription_refresh(sub, copy_data);
break;
@@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char originname[NAMEDATALEN];
char *err = NULL;
WalReceiverConn *wrconn = NULL;
- StringInfoData cmd;
Form_pg_subscription form;
+ List *rstates;
/*
* Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
list_free(subworkers);
+ /*
+ * Cleanup of tablesync replication origins.
+ *
+ * Any READY-state relations would already have dealt with clean-ups.
+ *
+ * Note that the state can't change because we have already stopped both
+ * the apply and tablesync workers and they can't restart because of
+ * exclusive lock on the subscription.
+ */
+ rstates = GetSubscriptionNotReadyRelations(subid);
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for tablesync
+ * worker so passing missing_ok = true. This can happen for the states
+ * before SUBREL_STATE_FINISHEDCOPY.
+ */
+ ReplicationOriginNameForTablesync(subid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
@@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!slotname && rstates == NIL)
{
table_close(rel, NoLock);
return;
}
/*
- * Otherwise drop the replication slot at the publisher node using the
- * replication connection.
+ * Try to acquire the connection necessary for dropping slots.
+ *
+ * Note: If the slotname is NONE/NULL then we allow the command to finish
+ * and users need to manually cleanup the apply and tablesync worker slots
+ * later.
+ *
+ * This has to be at the end because otherwise if there is an error while
+ * doing the database operations we won't be able to rollback dropped
+ * slot.
*/
load_file("libpqwalreceiver", false);
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
-
wrconn = walrcv_connect(conninfo, true, subname, &err);
if (wrconn == NULL)
- ereport(ERROR,
- (errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
- errdetail("The error was: %s", err),
- /* translator: %s is an SQL ALTER command */
- errhint("Use %s to disassociate the subscription from the slot.",
- "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+ {
+ if (!slotname)
+ {
+ /* be tidy */
+ list_free(rstates);
+ table_close(rel, NoLock);
+ return;
+ }
+ else
+ {
+ ReportSlotConnectionError(rstates, subid, slotname, err);
+ }
+ }
+
+ PG_TRY();
+ {
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync slots associated with removed tables.
+ *
+ * For SYNCDONE/READY states, the tablesync slot is known to have
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot does
+ * not exist yet. Also, if we fail after removing some of the
+ * slots, next time, it will again try to drop already dropped
+ * slots and fail. For these reasons, we allow missing_ok = true
+ * for the drop.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
+ list_free(rstates);
+
+ /*
+ * If there is a slot associated with the subscription, then drop the
+ * replication slot at the publisher.
+ */
+ if (slotname)
+ ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ table_close(rel, NoLock);
+}
+
+/*
+ * Drop the replication slot at the publisher node using the replication
+ * connection.
+ *
+ * missing_ok - if true then only issue a LOG message if the slot doesn't
+ * exist.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+ StringInfoData cmd;
+
+ Assert(wrconn);
+
+ load_file("libpqwalreceiver", false);
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
PG_TRY();
{
@@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
+ if (res->status == WALRCV_OK_COMMAND)
+ {
+ /* NOTICE. Success. */
+ ereport(NOTICE,
+ (errmsg("dropped replication slot \"%s\" on publisher",
+ slotname)));
+ }
+ else if (res->status == WALRCV_ERROR &&
+ missing_ok &&
+ res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
+ {
+ /* LOG. Error, but missing_ok = true. */
+ ereport(LOG,
(errmsg("could not drop the replication slot \"%s\" on publisher",
slotname),
errdetail("The error was: %s", res->err)));
+ }
else
- ereport(NOTICE,
- (errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ {
+ /* ERROR. */
+ ereport(ERROR,
+ (errmsg("could not drop the replication slot \"%s\" on publisher",
+ slotname),
+ errdetail("The error was: %s", res->err)));
+ }
walrcv_clear_result(res);
}
PG_FINALLY();
{
- walrcv_disconnect(wrconn);
+ pfree(cmd.data);
}
PG_END_TRY();
-
- pfree(cmd.data);
-
- table_close(rel, NoLock);
}
/*
@@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+
+/*
+ * This is to report the connection failure while dropping replication slots.
+ * Here, we report the WARNING for all tablesync slots so that user can drop
+ * them manually, if required.
+ */
+static void
+ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
+{
+ ListCell *lc;
+
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Caller needs to ensure that relstate doesn't change underneath us.
+ * See DropSubscription where we get the relstates.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ elog(WARNING, "could not drop tablesync replication slot \"%s\"",
+ syncslotname);
+ }
+ }
+
+ ereport(ERROR,
+ (errmsg("could not connect to publisher when attempting to "
+ "drop the replication slot \"%s\"", slotname),
+ errdetail("The error was: %s", err),
+ /* translator: %s is an SQL ALTER command */
+ errhint("Use %s to disassociate the subscription from the slot.",
+ "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+}