diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 468 |
1 files changed, 397 insertions, 71 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0198e6d75ba..0784ca79515 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -20,27 +20,36 @@ #include "access/htup_details.h" #include "access/xact.h" +#include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_type.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" +#include "nodes/makefuncs.h" + #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/walreceiver.h" +#include "replication/walsender.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); + /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. * @@ -49,17 +58,17 @@ * accomodate that. */ static void -parse_subscription_options(List *options, char **conninfo, - List **publications, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name) +parse_subscription_options(List *options, bool *connect, bool *enabled_given, + bool *enabled, bool *create_slot, char **slot_name, + bool *copy_data) { ListCell *lc; + bool connect_given = false; bool create_slot_given = false; + bool copy_data_given = false; - if (conninfo) - *conninfo = NULL; - if (publications) - *publications = NIL; + if (connect) + *connect = true; if (enabled) { *enabled_given = false; @@ -69,29 +78,23 @@ parse_subscription_options(List *options, char **conninfo, *create_slot = true; if (slot_name) *slot_name = NULL; + if (copy_data) + *copy_data = true; /* Parse options */ foreach (lc, options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "conninfo") == 0 && conninfo) - { - if (*conninfo) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); - - *conninfo = defGetString(defel); - } - else if (strcmp(defel->defname, "publication") == 0 && publications) + if (strcmp(defel->defname, "noconnect") == 0 && connect) { - if (*publications) + if (connect_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *publications = defGetStringList(defel); + connect_given = true; + *connect = !defGetBoolean(defel); } else if (strcmp(defel->defname, "enabled") == 0 && enabled) { @@ -142,9 +145,57 @@ parse_subscription_options(List *options, char **conninfo, *slot_name = defGetString(defel); } + else if (strcmp(defel->defname, "copy data") == 0 && copy_data) + { + if (copy_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + copy_data_given = true; + *copy_data = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "nocopy data") == 0 && copy_data) + { + if (copy_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + copy_data_given = true; + *copy_data = !defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } + + /* + * We've been explicitly asked to not connect, that requires some + * additional processing. + */ + if (connect && !*connect) + { + /* Check for incompatible options from the user. */ + if (*enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and enabled are mutually exclusive options"))); + + if (create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and create slot are mutually exclusive options"))); + + if (copy_data_given && *copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and copy data are mutually exclusive options"))); + + /* Change the defaults of other options. */ + *enabled = false; + *create_slot = false; + *copy_data = false; + } } /* @@ -214,8 +265,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; + bool connect; bool enabled_given; bool enabled; + bool copy_data; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -226,9 +279,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Parse and check options. * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, NULL, NULL, - &enabled_given, &enabled, - &create_slot, &slotname); + parse_subscription_options(stmt->options, &connect, &enabled_given, + &enabled, &create_slot, &slotname, ©_data); /* * Since creating a replication slot is not transactional, rolling back @@ -297,14 +349,17 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) replorigin_create(originname); /* - * If requested, create the replication slot on remote side for our - * newly created subscription. + * Connect to remote side to execute requested commands and fetch table + * info. */ - if (create_slot) + if (connect) { XLogRecPtr lsn; char *err; WalReceiverConn *wrconn; + List *tables; + ListCell *lc; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -315,13 +370,43 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { /* - * Create permanent slot for the subscription. We won't use the - * initial snapshot for anything, so no need to export it. + * If requested, create permanent slot for the subscription. + * We won't use the initial snapshot for anything, so no need + * to export it. + */ + if (create_slot) + { + walrcv_create_slot(wrconn, slotname, false, + CRS_NOEXPORT_SNAPSHOT, &lsn); + ereport(NOTICE, + (errmsg("created replication slot \"%s\" on publisher", + slotname))); + } + + /* + * Set sync state based on if we were asked to do data copy or + * not. */ - walrcv_create_slot(wrconn, slotname, false, false, &lsn); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + + /* + * Get the table list from publisher and build local table status + * info. + */ + tables = fetch_table_list(wrconn, publications); + foreach (lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, true); + + SetSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + ereport(NOTICE, - (errmsg("created replication slot \"%s\" on publisher", - slotname))); + (errmsg("synchronized table states"))); } PG_CATCH(); { @@ -334,6 +419,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) /* And we are done with the remote side. */ walrcv_disconnect(wrconn); } + else + ereport(WARNING, + (errmsg("tables were not subscribed, you will have to run " + "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to " + "subscribe the tables"))); heap_close(rel, RowExclusiveLock); @@ -346,6 +436,108 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) return myself; } +static void +AlterSubscription_refresh(Subscription *sub, bool copy_data) +{ + char *err; + List *pubrel_names; + List *subrel_states; + Oid *subrel_local_oids; + Oid *pubrel_local_oids; + ListCell *lc; + int off; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); + + /* + * Build qsorted array of local table oids for faster lookup. + * This can potentially contain all tables in the database so + * speed of lookup is important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + /* + * Walk over the remote tables and try to match them to locally + * known tables. If the table is not known locally create a new state + * for it. + * + * Also builds array of local oids of remote tables for the next step. + */ + off = 0; + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + + foreach (lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + pubrel_local_oids[off++] = relid; + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + SetSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(NOTICE, + (errmsg("added subscription for table %s.%s", + quote_identifier(rv->schemaname), + quote_identifier(rv->relname)))); + } + } + + /* + * Next remove state for tables we should not care about anymore using + * the data we collected above + */ + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + for (off = 0; off < list_length(subrel_states); off++) + { + Oid relid = subrel_local_oids[off]; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + char *namespace; + + RemoveSubscriptionRel(sub->oid, relid); + + namespace = get_namespace_name(get_rel_namespace(relid)); + ereport(NOTICE, + (errmsg("removed subscription for table %s.%s", + quote_identifier(namespace), + quote_identifier(get_rel_name(relid))))); + } + } +} + /* * Alter the existing subscription. */ @@ -359,11 +551,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Datum values[Natts_pg_subscription]; HeapTuple tup; Oid subid; - bool enabled_given; - bool enabled; - char *conninfo; - char *slot_name; - List *publications; + bool update_tuple = false; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -384,52 +572,113 @@ AlterSubscription(AlterSubscriptionStmt *stmt) subid = HeapTupleGetOid(tup); - /* Parse options. */ - parse_subscription_options(stmt->options, &conninfo, &publications, - &enabled_given, &enabled, - NULL, &slot_name); - /* Form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - if (enabled_given) - { - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - replaces[Anum_pg_subscription_subenabled - 1] = true; - } - if (conninfo) - { - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); - replaces[Anum_pg_subscription_subconninfo - 1] = true; - } - if (slot_name) - { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; - } - if (publications != NIL) + switch (stmt->kind) { - values[Anum_pg_subscription_subpublications - 1] = - publicationListToArray(publications); - replaces[Anum_pg_subscription_subpublications - 1] = true; + case ALTER_SUBSCRIPTION_OPTIONS: + { + char *slot_name; + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, &slot_name, NULL); + + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + + update_tuple = true; + break; + } + + case ALTER_SUBSCRIPTION_ENABLED: + { + bool enabled, + enabled_given; + + parse_subscription_options(stmt->options, NULL, + &enabled_given, &enabled, NULL, + NULL, NULL); + Assert(enabled_given); + + values[Anum_pg_subscription_subenabled - 1] = + BoolGetDatum(enabled); + replaces[Anum_pg_subscription_subenabled - 1] = true; + + update_tuple = true; + break; + } + + case ALTER_SUBSCRIPTION_CONNECTION: + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(stmt->conninfo); + replaces[Anum_pg_subscription_subconninfo - 1] = true; + update_tuple = true; + break; + + case ALTER_SUBSCRIPTION_PUBLICATION: + case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: + { + bool copy_data; + Subscription *sub = GetSubscription(subid, false); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, ©_data); + + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(stmt->publication); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Refresh if user asked us to. */ + if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) + { + /* Make sure refresh sees the new list of publications. */ + sub->publications = stmt->publication; + + AlterSubscription_refresh(sub, copy_data); + } + + break; + } + + case ALTER_SUBSCRIPTION_REFRESH: + { + bool copy_data; + Subscription *sub = GetSubscription(subid, false); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, ©_data); + + AlterSubscription_refresh(sub, copy_data); + + break; + } + + default: + elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", + stmt->kind); } - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Update the catalog if needed. */ + if (update_tuple) + { + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); - /* Update the catalog. */ - CatalogTupleUpdate(rel, &tup->t_self, tup); + CatalogTupleUpdate(rel, &tup->t_self, tup); - ObjectAddressSet(myself, SubscriptionRelationId, subid); + heap_freetuple(tup); + } - /* Cleanup. */ - heap_freetuple(tup); heap_close(rel, RowExclusiveLock); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); return myself; @@ -537,8 +786,11 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + /* Remove any associated relation synchronization states. */ + RemoveSubscriptionRel(subid, InvalidOid); + /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid); + logicalrep_worker_stop(subid, InvalidOid); /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); @@ -571,15 +823,20 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - if (!walrcv_command(wrconn, cmd.data, &err)) + WalRcvExecResult *res; + res = walrcv_exec(wrconn, cmd.data, 0, NULL); + + if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), - errdetail("The error was: %s", err))); + errdetail("The error was: %s", res->err))); else ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", slotname))); + + walrcv_clear_result(res); } PG_CATCH(); { @@ -691,3 +948,72 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) heap_close(rel, RowExclusiveLock); } + +/* + * Get the list of tables which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_table_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + first = true; + foreach (lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname)); + } + appendStringInfoString(&cmd, ")"); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); + tablelist = lappend(tablelist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return tablelist; +} |