aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c468
1 files changed, 397 insertions, 71 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0198e6d75ba..0784ca79515 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -20,27 +20,36 @@
#include "access/htup_details.h"
#include "access/xact.h"
+#include "catalog/dependency.h"
#include "catalog/indexing.h"
+#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_type.h"
#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
+#include "nodes/makefuncs.h"
+
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/walreceiver.h"
+#include "replication/walsender.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
*
@@ -49,17 +58,17 @@
* accomodate that.
*/
static void
-parse_subscription_options(List *options, char **conninfo,
- List **publications, bool *enabled_given,
- bool *enabled, bool *create_slot, char **slot_name)
+parse_subscription_options(List *options, bool *connect, bool *enabled_given,
+ bool *enabled, bool *create_slot, char **slot_name,
+ bool *copy_data)
{
ListCell *lc;
+ bool connect_given = false;
bool create_slot_given = false;
+ bool copy_data_given = false;
- if (conninfo)
- *conninfo = NULL;
- if (publications)
- *publications = NIL;
+ if (connect)
+ *connect = true;
if (enabled)
{
*enabled_given = false;
@@ -69,29 +78,23 @@ parse_subscription_options(List *options, char **conninfo,
*create_slot = true;
if (slot_name)
*slot_name = NULL;
+ if (copy_data)
+ *copy_data = true;
/* Parse options */
foreach (lc, options)
{
DefElem *defel = (DefElem *) lfirst(lc);
- if (strcmp(defel->defname, "conninfo") == 0 && conninfo)
- {
- if (*conninfo)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
-
- *conninfo = defGetString(defel);
- }
- else if (strcmp(defel->defname, "publication") == 0 && publications)
+ if (strcmp(defel->defname, "noconnect") == 0 && connect)
{
- if (*publications)
+ if (connect_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *publications = defGetStringList(defel);
+ connect_given = true;
+ *connect = !defGetBoolean(defel);
}
else if (strcmp(defel->defname, "enabled") == 0 && enabled)
{
@@ -142,9 +145,57 @@ parse_subscription_options(List *options, char **conninfo,
*slot_name = defGetString(defel);
}
+ else if (strcmp(defel->defname, "copy data") == 0 && copy_data)
+ {
+ if (copy_data_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ copy_data_given = true;
+ *copy_data = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "nocopy data") == 0 && copy_data)
+ {
+ if (copy_data_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ copy_data_given = true;
+ *copy_data = !defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
+
+ /*
+ * We've been explicitly asked to not connect, that requires some
+ * additional processing.
+ */
+ if (connect && !*connect)
+ {
+ /* Check for incompatible options from the user. */
+ if (*enabled_given && *enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("noconnect and enabled are mutually exclusive options")));
+
+ if (create_slot_given && *create_slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("noconnect and create slot are mutually exclusive options")));
+
+ if (copy_data_given && *copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("noconnect and copy data are mutually exclusive options")));
+
+ /* Change the defaults of other options. */
+ *enabled = false;
+ *create_slot = false;
+ *copy_data = false;
+ }
}
/*
@@ -214,8 +265,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
+ bool connect;
bool enabled_given;
bool enabled;
+ bool copy_data;
char *conninfo;
char *slotname;
char originname[NAMEDATALEN];
@@ -226,9 +279,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Parse and check options.
* Connection and publication should not be specified here.
*/
- parse_subscription_options(stmt->options, NULL, NULL,
- &enabled_given, &enabled,
- &create_slot, &slotname);
+ parse_subscription_options(stmt->options, &connect, &enabled_given,
+ &enabled, &create_slot, &slotname, &copy_data);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -297,14 +349,17 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
replorigin_create(originname);
/*
- * If requested, create the replication slot on remote side for our
- * newly created subscription.
+ * Connect to remote side to execute requested commands and fetch table
+ * info.
*/
- if (create_slot)
+ if (connect)
{
XLogRecPtr lsn;
char *err;
WalReceiverConn *wrconn;
+ List *tables;
+ ListCell *lc;
+ char table_state;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -315,13 +370,43 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
PG_TRY();
{
/*
- * Create permanent slot for the subscription. We won't use the
- * initial snapshot for anything, so no need to export it.
+ * If requested, create permanent slot for the subscription.
+ * We won't use the initial snapshot for anything, so no need
+ * to export it.
+ */
+ if (create_slot)
+ {
+ walrcv_create_slot(wrconn, slotname, false,
+ CRS_NOEXPORT_SNAPSHOT, &lsn);
+ ereport(NOTICE,
+ (errmsg("created replication slot \"%s\" on publisher",
+ slotname)));
+ }
+
+ /*
+ * Set sync state based on if we were asked to do data copy or
+ * not.
*/
- walrcv_create_slot(wrconn, slotname, false, false, &lsn);
+ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+
+ /*
+ * Get the table list from publisher and build local table status
+ * info.
+ */
+ tables = fetch_table_list(wrconn, publications);
+ foreach (lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, true);
+
+ SetSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+
ereport(NOTICE,
- (errmsg("created replication slot \"%s\" on publisher",
- slotname)));
+ (errmsg("synchronized table states")));
}
PG_CATCH();
{
@@ -334,6 +419,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
/* And we are done with the remote side. */
walrcv_disconnect(wrconn);
}
+ else
+ ereport(WARNING,
+ (errmsg("tables were not subscribed, you will have to run "
+ "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
+ "subscribe the tables")));
heap_close(rel, RowExclusiveLock);
@@ -346,6 +436,108 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
return myself;
}
+static void
+AlterSubscription_refresh(Subscription *sub, bool copy_data)
+{
+ char *err;
+ List *pubrel_names;
+ List *subrel_states;
+ Oid *subrel_local_oids;
+ Oid *pubrel_local_oids;
+ ListCell *lc;
+ int off;
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
+
+ /* We are done with the remote side, close connection. */
+ walrcv_disconnect(wrconn);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup.
+ * This can potentially contain all tables in the database so
+ * speed of lookup is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Walk over the remote tables and try to match them to locally
+ * known tables. If the table is not known locally create a new state
+ * for it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach (lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ pubrel_local_oids[off++] = relid;
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ SetSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(NOTICE,
+ (errmsg("added subscription for table %s.%s",
+ quote_identifier(rv->schemaname),
+ quote_identifier(rv->relname))));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char *namespace;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ namespace = get_namespace_name(get_rel_namespace(relid));
+ ereport(NOTICE,
+ (errmsg("removed subscription for table %s.%s",
+ quote_identifier(namespace),
+ quote_identifier(get_rel_name(relid)))));
+ }
+ }
+}
+
/*
* Alter the existing subscription.
*/
@@ -359,11 +551,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
Datum values[Natts_pg_subscription];
HeapTuple tup;
Oid subid;
- bool enabled_given;
- bool enabled;
- char *conninfo;
- char *slot_name;
- List *publications;
+ bool update_tuple = false;
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
@@ -384,52 +572,113 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
subid = HeapTupleGetOid(tup);
- /* Parse options. */
- parse_subscription_options(stmt->options, &conninfo, &publications,
- &enabled_given, &enabled,
- NULL, &slot_name);
-
/* Form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
- if (enabled_given)
- {
- values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
- replaces[Anum_pg_subscription_subenabled - 1] = true;
- }
- if (conninfo)
- {
- values[Anum_pg_subscription_subconninfo - 1] =
- CStringGetTextDatum(conninfo);
- replaces[Anum_pg_subscription_subconninfo - 1] = true;
- }
- if (slot_name)
- {
- values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slot_name));
- replaces[Anum_pg_subscription_subslotname - 1] = true;
- }
- if (publications != NIL)
+ switch (stmt->kind)
{
- values[Anum_pg_subscription_subpublications - 1] =
- publicationListToArray(publications);
- replaces[Anum_pg_subscription_subpublications - 1] = true;
+ case ALTER_SUBSCRIPTION_OPTIONS:
+ {
+ char *slot_name;
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, &slot_name, NULL);
+
+ values[Anum_pg_subscription_subslotname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+ replaces[Anum_pg_subscription_subslotname - 1] = true;
+
+ update_tuple = true;
+ break;
+ }
+
+ case ALTER_SUBSCRIPTION_ENABLED:
+ {
+ bool enabled,
+ enabled_given;
+
+ parse_subscription_options(stmt->options, NULL,
+ &enabled_given, &enabled, NULL,
+ NULL, NULL);
+ Assert(enabled_given);
+
+ values[Anum_pg_subscription_subenabled - 1] =
+ BoolGetDatum(enabled);
+ replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+ update_tuple = true;
+ break;
+ }
+
+ case ALTER_SUBSCRIPTION_CONNECTION:
+ values[Anum_pg_subscription_subconninfo - 1] =
+ CStringGetTextDatum(stmt->conninfo);
+ replaces[Anum_pg_subscription_subconninfo - 1] = true;
+ update_tuple = true;
+ break;
+
+ case ALTER_SUBSCRIPTION_PUBLICATION:
+ case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH:
+ {
+ bool copy_data;
+ Subscription *sub = GetSubscription(subid, false);
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, &copy_data);
+
+ values[Anum_pg_subscription_subpublications - 1] =
+ publicationListToArray(stmt->publication);
+ replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+ update_tuple = true;
+
+ /* Refresh if user asked us to. */
+ if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH)
+ {
+ /* Make sure refresh sees the new list of publications. */
+ sub->publications = stmt->publication;
+
+ AlterSubscription_refresh(sub, copy_data);
+ }
+
+ break;
+ }
+
+ case ALTER_SUBSCRIPTION_REFRESH:
+ {
+ bool copy_data;
+ Subscription *sub = GetSubscription(subid, false);
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, &copy_data);
+
+ AlterSubscription_refresh(sub, copy_data);
+
+ break;
+ }
+
+ default:
+ elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
+ stmt->kind);
}
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Update the catalog if needed. */
+ if (update_tuple)
+ {
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
- ObjectAddressSet(myself, SubscriptionRelationId, subid);
+ heap_freetuple(tup);
+ }
- /* Cleanup. */
- heap_freetuple(tup);
heap_close(rel, RowExclusiveLock);
+ ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
return myself;
@@ -537,8 +786,11 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+ /* Remove any associated relation synchronization states. */
+ RemoveSubscriptionRel(subid, InvalidOid);
+
/* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid);
+ logicalrep_worker_stop(subid, InvalidOid);
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
@@ -571,15 +823,20 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
PG_TRY();
{
- if (!walrcv_command(wrconn, cmd.data, &err))
+ WalRcvExecResult *res;
+ res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+
+ if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("could not drop the replication slot \"%s\" on publisher",
slotname),
- errdetail("The error was: %s", err)));
+ errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
slotname)));
+
+ walrcv_clear_result(res);
}
PG_CATCH();
{
@@ -691,3 +948,72 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
heap_close(rel, RowExclusiveLock);
}
+
+/*
+ * Get the list of tables which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_table_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
+ " FROM pg_catalog.pg_publication_tables t\n"
+ " WHERE t.pubname IN (");
+ first = true;
+ foreach (lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname));
+ }
+ appendStringInfoString(&cmd, ")");
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
+ tablelist = lappend(tablelist, rv);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return tablelist;
+}