diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/publicationcmds.c | 435 | ||||
-rw-r--r-- | src/backend/commands/sequence.c | 186 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 101 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 27 |
4 files changed, 88 insertions, 661 deletions
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 84e37df783b..4fd1e6e7abb 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -16,7 +16,6 @@ #include "access/genam.h" #include "access/htup_details.h" -#include "access/relation.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -68,17 +67,15 @@ typedef struct rf_context } rf_context; static List *OpenRelIdList(List *relids); -static List *OpenRelationList(List *rels, char objectType); -static void CloseRelationList(List *rels); +static List *OpenTableList(List *tables); +static void CloseTableList(List *rels); static void LockSchemaList(List *schemalist); -static void PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, +static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); -static void PublicationDropRelations(Oid pubid, List *rels, bool missing_ok); -static void PublicationAddSchemas(Oid pubid, List *schemas, char objectType, - bool if_not_exists, AlterPublicationStmt *stmt); -static void PublicationDropSchemas(Oid pubid, List *schemas, char objectType, - bool missing_ok); - +static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -98,7 +95,6 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; - pubactions->pubsequence = true; *publish_via_partition_root = false; /* Parse options */ @@ -123,7 +119,6 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = false; pubactions->pubdelete = false; pubactions->pubtruncate = false; - pubactions->pubsequence = false; *publish_given = true; publish = defGetString(defel); @@ -146,8 +141,6 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; - else if (strcmp(publish_opt, "sequence") == 0) - pubactions->pubsequence = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -174,8 +167,7 @@ parse_publication_options(ParseState *pstate, */ static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, - List **tables, List **sequences, - List **tables_schemas, List **sequences_schemas) + List **rels, List **schemas) { ListCell *cell; PublicationObjSpec *pubobj; @@ -193,22 +185,13 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: - *tables = lappend(*tables, pubobj->pubtable); - break; - case PUBLICATIONOBJ_SEQUENCE: - *sequences = lappend(*sequences, pubobj->pubtable); + *rels = lappend(*rels, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: schemaid = get_namespace_oid(pubobj->name, false); /* Filter out duplicates if user specifies "sch1, sch1" */ - *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); - break; - case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA: - schemaid = get_namespace_oid(pubobj->name, false); - - /* Filter out duplicates if user specifies "sch1, sch1" */ - *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); break; case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA: search_path = fetch_search_path(false); @@ -221,20 +204,7 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, list_free(search_path); /* Filter out duplicates if user specifies "sch1, sch1" */ - *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); - break; - case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA: - search_path = fetch_search_path(false); - if (search_path == NIL) /* nothing valid in search_path? */ - ereport(ERROR, - errcode(ERRCODE_UNDEFINED_SCHEMA), - errmsg("no schema has been selected for CURRENT_SCHEMA")); - - schemaid = linitial_oid(search_path); - list_free(search_path); - - /* Filter out duplicates if user specifies "sch1, sch1" */ - *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); break; default: /* shouldn't happen */ @@ -270,14 +240,6 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", RelationGetRelationName(rel), get_namespace_name(relSchemaId))); - else if (checkobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add schema \"%s\" to publication", - get_namespace_name(relSchemaId)), - errdetail("Sequence \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", - RelationGetRelationName(rel), - get_namespace_name(relSchemaId))); else if (checkobjtype == PUBLICATIONOBJ_TABLE) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -286,14 +248,6 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, RelationGetRelationName(rel)), errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", get_namespace_name(relSchemaId))); - else if (checkobjtype == PUBLICATIONOBJ_SEQUENCE) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s.%s\" to publication", - get_namespace_name(relSchemaId), - RelationGetRelationName(rel)), - errdetail("Sequence's schema \"%s\" is already part of the publication or part of the specified schema list.", - get_namespace_name(relSchemaId))); } } } @@ -799,7 +753,6 @@ CheckPubRelationColumnList(List *tables, const char *queryString, ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) { - ListCell *lc; Relation rel; ObjectAddress myself; Oid puboid; @@ -811,23 +764,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; - List *tables = NIL; - List *sequences = NIL; - List *tables_schemaidlist = NIL; - List *sequences_schemaidlist = NIL; - - bool for_all_tables = false; - bool for_all_sequences = false; - - /* Translate the list of object types (represented by strings) to bool flags. */ - foreach (lc, stmt->for_all_objects) - { - char *val = strVal(lfirst(lc)); - if (strcmp(val, "tables") == 0) - for_all_tables = true; - else if (strcmp(val, "sequences") == 0) - for_all_sequences = true; - } + List *relations = NIL; + List *schemaidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -836,17 +774,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) get_database_name(MyDatabaseId)); /* FOR ALL TABLES requires superuser */ - if (for_all_tables && !superuser()) + if (stmt->for_all_tables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); - /* FOR ALL SEQUENCES requires superuser */ - if (for_all_sequences && !superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to create FOR ALL SEQUENCES publication"))); - rel = table_open(PublicationRelationId, RowExclusiveLock); /* Check if name is used */ @@ -878,9 +810,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) Anum_pg_publication_oid); values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); values[Anum_pg_publication_puballtables - 1] = - BoolGetDatum(for_all_tables); - values[Anum_pg_publication_puballsequences - 1] = - BoolGetDatum(for_all_sequences); + BoolGetDatum(stmt->for_all_tables); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = @@ -889,8 +819,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); - values[Anum_pg_publication_pubsequence - 1] = - BoolGetDatum(pubactions.pubsequence); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -908,42 +836,28 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CommandCounterIncrement(); /* Associate objects with the publication. */ - if (for_all_tables || for_all_sequences) + if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } - - /* - * If the publication might have either tables or sequences (directly or - * through a schema), process that. - */ - if (!for_all_tables || !for_all_sequences) + else { - ObjectsInPublicationToOids(stmt->pubobjects, pstate, - &tables, &sequences, - &tables_schemaidlist, - &sequences_schemaidlist); + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); /* FOR ALL TABLES IN SCHEMA requires superuser */ - if (list_length(tables_schemaidlist) > 0 && !superuser()) + if (list_length(schemaidlist) > 0 && !superuser()) ereport(ERROR, errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); - /* FOR ALL SEQUENCES IN SCHEMA requires superuser */ - if (list_length(sequences_schemaidlist) > 0 && !superuser()) - ereport(ERROR, - errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to create FOR ALL SEQUENCES IN SCHEMA publication")); - - /* tables added directly */ - if (list_length(tables) > 0) + if (list_length(relations) > 0) { List *rels; - rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); - CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist, + rels = OpenTableList(relations); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, pstate->p_sourcetext, @@ -952,46 +866,18 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CheckPubRelationColumnList(rels, pstate->p_sourcetext, publish_via_partition_root); - PublicationAddRelations(puboid, rels, true, NULL); - CloseRelationList(rels); + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); } - /* sequences added directly */ - if (list_length(sequences) > 0) - { - List *rels; - - rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); - CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist, - PUBLICATIONOBJ_SEQUENCE); - PublicationAddRelations(puboid, rels, true, NULL); - CloseRelationList(rels); - } - - /* tables added through a schema */ - if (list_length(tables_schemaidlist) > 0) + if (list_length(schemaidlist) > 0) { /* * Schema lock is held until the publication is created to prevent * concurrent schema deletion. */ - LockSchemaList(tables_schemaidlist); - PublicationAddSchemas(puboid, - tables_schemaidlist, PUB_OBJTYPE_TABLE, - true, NULL); - } - - /* sequences added through a schema */ - if (list_length(sequences_schemaidlist) > 0) - { - /* - * Schema lock is held until the publication is created to prevent - * concurrent schema deletion. - */ - LockSchemaList(sequences_schemaidlist); - PublicationAddSchemas(puboid, - sequences_schemaidlist, PUB_OBJTYPE_SEQUENCE, - true, NULL); + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); } } @@ -1055,7 +941,6 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, AccessShareLock); root_relids = GetPublicationRelations(pubform->oid, - PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); foreach(lc, root_relids) @@ -1135,9 +1020,6 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; - - values[Anum_pg_publication_pubsequence - 1] = BoolGetDatum(pubactions.pubsequence); - replaces[Anum_pg_publication_pubsequence - 1] = true; } if (publish_via_partition_root_given) @@ -1157,7 +1039,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables || pubform->puballsequences) + if (pubform->puballtables) { CacheInvalidateRelcacheAll(); } @@ -1173,7 +1055,6 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, */ if (root_relids == NIL) relids = GetPublicationRelations(pubform->oid, - PUB_OBJTYPE_TABLE, PUBLICATION_PART_ALL); else { @@ -1187,20 +1068,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, lfirst_oid(lc)); } - /* tables */ - schemarelids = GetAllSchemaPublicationRelations(pubform->oid, - PUB_OBJTYPE_TABLE, - PUBLICATION_PART_ALL); - relids = list_concat_unique_oid(relids, schemarelids); - - /* sequences */ - relids = list_concat_unique_oid(relids, - GetPublicationRelations(pubform->oid, - PUB_OBJTYPE_SEQUENCE, - PUBLICATION_PART_ALL)); - schemarelids = GetAllSchemaPublicationRelations(pubform->oid, - PUB_OBJTYPE_SEQUENCE, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -1255,7 +1123,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!tables && stmt->action != AP_SetObjects) return; - rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); + rels = OpenTableList(tables); if (stmt->action == AP_AddObjects) { @@ -1265,9 +1133,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, * Check if the relation is member of the existing schema in the * publication or member of the schema list specified. */ - schemas = list_concat_copy(schemaidlist, - GetPublicationSchemas(pubid, - PUB_OBJTYPE_TABLE)); + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); @@ -1275,14 +1141,13 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); - PublicationAddRelations(pubid, rels, false, stmt); + PublicationAddTables(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropRelations(pubid, rels, false); + PublicationDropTables(pubid, rels, false); else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, - PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -1401,18 +1266,18 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, } /* And drop them. */ - PublicationDropRelations(pubid, delrels, true); + PublicationDropTables(pubid, delrels, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddRelations(pubid, rels, true, stmt); + PublicationAddTables(pubid, rels, true, stmt); - CloseRelationList(delrels); + CloseTableList(delrels); } - CloseRelationList(rels); + CloseTableList(rels); } /* @@ -1422,8 +1287,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, */ static void AlterPublicationSchemas(AlterPublicationStmt *stmt, - HeapTuple tup, List *schemaidlist, - char objectType) + HeapTuple tup, List *schemaidlist) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -1445,20 +1309,20 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, List *rels; List *reloids; - reloids = GetPublicationRelations(pubform->oid, objectType, PUBLICATION_PART_ROOT); + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); rels = OpenRelIdList(reloids); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLES_IN_SCHEMA); - CloseRelationList(rels); - PublicationAddSchemas(pubform->oid, schemaidlist, objectType, false, stmt); + CloseTableList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropSchemas(pubform->oid, schemaidlist, objectType, false); + PublicationDropSchemas(pubform->oid, schemaidlist, false); else /* AP_SetObjects */ { - List *oldschemaids = GetPublicationSchemas(pubform->oid, objectType); + List *oldschemaids = GetPublicationSchemas(pubform->oid); List *delschemas = NIL; /* Identify which schemas should be dropped */ @@ -1471,13 +1335,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, LockSchemaList(delschemas); /* And drop them */ - PublicationDropSchemas(pubform->oid, delschemas, objectType, true); + PublicationDropSchemas(pubform->oid, delschemas, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddSchemas(pubform->oid, schemaidlist, objectType, true, stmt); + PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); } } @@ -1487,13 +1351,12 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *tables_schemaidlist, - List *sequences, List *sequences_schemaidlist) + List *tables, List *schemaidlist) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && - (tables_schemaidlist || sequences_schemaidlist) && !superuser()) + schemaidlist && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to add or set schemas"))); @@ -1502,24 +1365,13 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, * Check that user is allowed to manipulate the publication tables in * schema */ - if (tables_schemaidlist && pubform->puballtables) + if (schemaidlist && pubform->puballtables) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); - /* - * Check that user is allowed to manipulate the publication sequences in - * schema - */ - if (sequences_schemaidlist && pubform->puballsequences) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", - NameStr(pubform->pubname)), - errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications."))); - /* Check that user is allowed to manipulate the publication tables. */ if (tables && pubform->puballtables) ereport(ERROR, @@ -1527,108 +1379,6 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); - - /* Check that user is allowed to manipulate the publication sequences. */ - if (sequences && pubform->puballsequences) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", - NameStr(pubform->pubname)), - errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."))); -} - -/* - * Add or remove sequence to/from publication. - */ -static void -AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, - List *sequences, List *schemaidlist) -{ - List *rels = NIL; - Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); - Oid pubid = pubform->oid; - - /* - * It is quite possible that for the SET case user has not specified any - * tables in which case we need to remove all the existing tables. - */ - if (!sequences && stmt->action != AP_SetObjects) - return; - - rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); - - if (stmt->action == AP_AddObjects) - { - List *schemas = NIL; - - /* - * Check if the relation is member of the existing schema in the - * publication or member of the schema list specified. - */ - schemas = list_concat_copy(schemaidlist, - GetPublicationSchemas(pubid, - PUB_OBJTYPE_SEQUENCE)); - CheckObjSchemaNotAlreadyInPublication(rels, schemas, - PUBLICATIONOBJ_SEQUENCE); - PublicationAddRelations(pubid, rels, false, stmt); - } - else if (stmt->action == AP_DropObjects) - PublicationDropRelations(pubid, rels, false); - else /* DEFELEM_SET */ - { - List *oldrelids = GetPublicationRelations(pubid, - PUB_OBJTYPE_SEQUENCE, - PUBLICATION_PART_ROOT); - List *delrels = NIL; - ListCell *oldlc; - - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, - PUBLICATIONOBJ_SEQUENCE); - - /* Calculate which relations to drop. */ - foreach(oldlc, oldrelids) - { - Oid oldrelid = lfirst_oid(oldlc); - ListCell *newlc; - PublicationRelInfo *oldrel; - bool found = false; - - foreach(newlc, rels) - { - PublicationRelInfo *newpubrel; - - newpubrel = (PublicationRelInfo *) lfirst(newlc); - if (RelationGetRelid(newpubrel->relation) == oldrelid) - { - found = true; - break; - } - } - /* Not yet in the list, open it and add to the list */ - if (!found) - { - oldrel = palloc(sizeof(PublicationRelInfo)); - oldrel->whereClause = NULL; - oldrel->columns = NULL; - oldrel->relation = table_open(oldrelid, - ShareUpdateExclusiveLock); - delrels = lappend(delrels, oldrel); - } - } - - /* And drop them. */ - PublicationDropRelations(pubid, delrels, true); - - /* - * Don't bother calculating the difference for adding, we'll catch and - * skip existing ones when doing catalog update. - */ - PublicationAddRelations(pubid, rels, true, stmt); - - CloseRelationList(delrels); - } - - CloseRelationList(rels); } /* @@ -1666,20 +1416,14 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) AlterPublicationOptions(pstate, stmt, rel, tup); else { - List *tables = NIL; - List *sequences = NIL; - List *tables_schemaidlist = NIL; - List *sequences_schemaidlist = NIL; + List *relations = NIL; + List *schemaidlist = NIL; Oid pubid = pubform->oid; - ObjectsInPublicationToOids(stmt->pubobjects, pstate, - &tables, &sequences, - &tables_schemaidlist, - &sequences_schemaidlist); + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); - CheckAlterPublication(stmt, tup, - tables, tables_schemaidlist, - sequences, sequences_schemaidlist); + CheckAlterPublication(stmt, tup, relations, schemaidlist); heap_freetuple(tup); @@ -1707,16 +1451,9 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, tables, tables_schemaidlist, + AlterPublicationTables(stmt, tup, relations, schemaidlist, pstate->p_sourcetext); - - AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist); - - AlterPublicationSchemas(stmt, tup, tables_schemaidlist, - PUB_OBJTYPE_TABLE); - - AlterPublicationSchemas(stmt, tup, sequences_schemaidlist, - PUB_OBJTYPE_SEQUENCE); + AlterPublicationSchemas(stmt, tup, schemaidlist); } /* Cleanup. */ @@ -1784,7 +1521,7 @@ RemovePublicationById(Oid pubid) pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ - if (pubform->puballtables || pubform->puballsequences) + if (pubform->puballtables) CacheInvalidateRelcacheAll(); CatalogTupleDelete(rel, &tup->t_self); @@ -1820,7 +1557,6 @@ RemovePublicationSchemaById(Oid psoid) * partitions. */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, - pubsch->pntype, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -1863,10 +1599,10 @@ OpenRelIdList(List *relids) * add them to a publication. */ static List * -OpenRelationList(List *rels, char objectType) +OpenTableList(List *tables) { List *relids = NIL; - List *result = NIL; + List *rels = NIL; ListCell *lc; List *relids_with_rf = NIL; List *relids_with_collist = NIL; @@ -1874,35 +1610,19 @@ OpenRelationList(List *rels, char objectType) /* * Open, share-lock, and check all the explicitly-specified relations */ - foreach(lc, rels) + foreach(lc, tables) { PublicationTable *t = lfirst_node(PublicationTable, lc); bool recurse = t->relation->inh; Relation rel; Oid myrelid; PublicationRelInfo *pub_rel; - char myrelkind; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); rel = table_openrv(t->relation, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); - myrelkind = get_rel_relkind(myrelid); - - /* - * Make sure the relkind matches the expected object type. This may - * happen e.g. when adding a sequence using ADD TABLE or a table - * using ADD SEQUENCE). - * - * XXX We let through unsupported object types (views etc.). Those - * will be caught later in check_publication_add_relation. - */ - if (pub_get_object_type_for_relkind(myrelkind) != PUB_OBJTYPE_UNSUPPORTED && - pub_get_object_type_for_relkind(myrelkind) != objectType) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("object type does not match type expected by command")); /* * Filter out duplicates if user specifies "foo, foo". @@ -1935,7 +1655,7 @@ OpenRelationList(List *rels, char objectType) pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; - result = lappend(result, pub_rel); + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) @@ -2004,9 +1724,10 @@ OpenRelationList(List *rels, char objectType) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; + /* child inherits column list from parent */ pub_rel->columns = t->columns; - result = lappend(result, pub_rel); + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) @@ -2021,14 +1742,14 @@ OpenRelationList(List *rels, char objectType) list_free(relids); list_free(relids_with_rf); - return result; + return rels; } /* * Close all relations in the list. */ static void -CloseRelationList(List *rels) +CloseTableList(List *rels) { ListCell *lc; @@ -2076,12 +1797,12 @@ LockSchemaList(List *schemalist) * Add listed tables to the publication. */ static void -PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, +PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_objects); + Assert(!stmt || !stmt->for_all_tables); foreach(lc, rels) { @@ -2110,7 +1831,7 @@ PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, * Remove listed tables from the publication. */ static void -PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) +PublicationDropTables(Oid pubid, List *rels, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -2155,19 +1876,19 @@ PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) * Add listed schemas to the publication. */ static void -PublicationAddSchemas(Oid pubid, List *schemas, char objectType, - bool if_not_exists, AlterPublicationStmt *stmt) +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_objects); + Assert(!stmt || !stmt->for_all_tables); foreach(lc, schemas) { Oid schemaid = lfirst_oid(lc); ObjectAddress obj; - obj = publication_add_schema(pubid, schemaid, objectType, if_not_exists); + obj = publication_add_schema(pubid, schemaid, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -2183,7 +1904,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, char objectType, * Remove listed schemas from the publication. */ static void -PublicationDropSchemas(Oid pubid, List *schemas, char objectType, bool missing_ok) +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -2193,11 +1914,10 @@ PublicationDropSchemas(Oid pubid, List *schemas, char objectType, bool missing_o { Oid schemaid = lfirst_oid(lc); - psid = GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, + psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid), - CharGetDatum(objectType)); + ObjectIdGetDatum(pubid)); if (!OidIsValid(psid)) { if (missing_ok) @@ -2252,13 +1972,6 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) NameStr(form->pubname)), errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); - if (form->puballsequences && !superuser_arg(newOwnerId)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied to change owner of publication \"%s\"", - NameStr(form->pubname)), - errhint("The owner of a FOR ALL SEQUENCES publication must be a superuser."))); - if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 47f62c28d42..ddf219b21f5 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -333,160 +333,6 @@ ResetSequence(Oid seq_relid) } /* - * Update the sequence state by modifying the existing sequence data row. - * - * This keeps the same relfilenode, so the behavior is non-transactional. - */ -static void -SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) -{ - SeqTable elm; - Relation seqrel; - Buffer buf; - HeapTupleData seqdatatuple; - Form_pg_sequence_data seq; - - /* open and lock sequence */ - init_sequence(seqrelid, &elm, &seqrel); - - /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); - - /* check the comment above nextval_internal()'s equivalent call. */ - if (RelationNeedsWAL(seqrel)) - { - GetTopTransactionId(); - - if (XLogLogicalInfoActive()) - GetCurrentTransactionId(); - } - - /* ready to change the on-disk (or really, in-buffer) tuple */ - START_CRIT_SECTION(); - - seq->last_value = last_value; - seq->is_called = is_called; - seq->log_cnt = log_cnt; - - MarkBufferDirty(buf); - - /* XLOG stuff */ - if (RelationNeedsWAL(seqrel)) - { - xl_seq_rec xlrec; - XLogRecPtr recptr; - Page page = BufferGetPage(buf); - - XLogBeginInsert(); - XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); - - xlrec.node = seqrel->rd_node; - xlrec.created = false; - - XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); - XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); - - recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); - - PageSetLSN(page, recptr); - } - - END_CRIT_SECTION(); - - UnlockReleaseBuffer(buf); - - /* Clear local cache so that we don't think we have cached numbers */ - /* Note that we do not change the currval() state */ - elm->cached = elm->last; - - relation_close(seqrel, NoLock); -} - -/* - * Update the sequence state by creating a new relfilenode. - * - * This creates a new relfilenode, to allow transactional behavior. - */ -static void -SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) -{ - SeqTable elm; - Relation seqrel; - Buffer buf; - HeapTupleData seqdatatuple; - Form_pg_sequence_data seq; - HeapTuple tuple; - - /* open and lock sequence */ - init_sequence(seq_relid, &elm, &seqrel); - - /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); - - /* Copy the existing sequence tuple. */ - tuple = heap_copytuple(&seqdatatuple); - - /* Now we're done with the old page */ - UnlockReleaseBuffer(buf); - - /* - * Modify the copied tuple to update the sequence state (similar to what - * ResetSequence does). - */ - seq = (Form_pg_sequence_data) GETSTRUCT(tuple); - seq->last_value = last_value; - seq->is_called = is_called; - seq->log_cnt = log_cnt; - - /* - * Create a new storage file for the sequence - this is needed for the - * transactional behavior. - */ - RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence); - - /* - * Ensure sequence's relfrozenxid is at 0, since it won't contain any - * unfrozen XIDs. Same with relminmxid, since a sequence will never - * contain multixacts. - */ - Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId); - Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId); - - /* - * Insert the modified tuple into the new storage file. This does all the - * necessary WAL-logging etc. - */ - fill_seq_with_data(seqrel, tuple); - - /* Clear local cache so that we don't think we have cached numbers */ - /* Note that we do not change the currval() state */ - elm->cached = elm->last; - - relation_close(seqrel, NoLock); -} - -/* - * Set a sequence to a specified internal state. - * - * The change is made transactionally, so that on failure of the current - * transaction, the sequence will be restored to its previous state. - * We do that by creating a whole new relfilenode for the sequence; so this - * works much like the rewriting forms of ALTER TABLE. - * - * Caller is assumed to have acquired AccessExclusiveLock on the sequence, - * which must not be released until end of transaction. Caller is also - * responsible for permissions checking. - */ -void -SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called) -{ - if (transactional) - SetSequence_transactional(seq_relid, last_value, log_cnt, is_called); - else - SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called); -} - -/* * Initialize a sequence's relation with the specified tuple as content * * This handles unlogged sequences by writing to both the main and the init @@ -552,13 +398,8 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(rel)) - { GetTopTransactionId(); - if (XLogLogicalInfoActive()) - GetCurrentTransactionId(); - } - START_CRIT_SECTION(); MarkBufferDirty(buf); @@ -578,7 +419,6 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = rel->rd_node; - xlrec.created = true; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) tuple->t_data, tuple->t_len); @@ -958,28 +798,10 @@ nextval_internal(Oid relid, bool check_permissions) * It's sufficient to ensure the toplevel transaction has an xid, no need * to assign xids subxacts, that'll already trigger an appropriate wait. * (Have to do that here, so we're outside the critical section) - * - * We have to ensure we have a proper XID, which will be included in - * the XLOG record by XLogRecordAssemble. Otherwise the first nextval() - * in a subxact (without any preceding changes) would get XID 0, and it - * would then be impossible to decide which top xact it belongs to. - * It'd also trigger assert in DecodeSequence. We only do that with - * wal_level=logical, though. - * - * XXX This might seem unnecessary, because if there's no XID the xact - * couldn't have done anything important yet, e.g. it could not have - * created a sequence. But that's incorrect, because of subxacts. The - * current subtransaction might not have done anything yet (thus no XID), - * but an earlier one might have created the sequence. */ if (logit && RelationNeedsWAL(seqrel)) - { GetTopTransactionId(); - if (XLogLogicalInfoActive()) - GetCurrentTransactionId(); - } - /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -1015,7 +837,6 @@ nextval_internal(Oid relid, bool check_permissions) seq->log_cnt = 0; xlrec.node = seqrel->rd_node; - xlrec.created = false; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); @@ -1181,13 +1002,8 @@ do_setval(Oid relid, int64 next, bool iscalled) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) - { GetTopTransactionId(); - if (XLogLogicalInfoActive()) - GetCurrentTransactionId(); - } - /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -1208,8 +1024,6 @@ do_setval(Oid relid, int64 next, bool iscalled) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = seqrel->rd_node; - xlrec.created = false; - XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 057ab4b6a3f..2e8d8afead8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -90,7 +90,6 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -639,9 +638,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *relations; + List *tables; ListCell *lc; - char sync_state; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -658,17 +657,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table and sequence list from publisher and build - * local relation sync status info. + * Get the table list from publisher and build local table status + * info. */ - relations = fetch_table_list(wrconn, publications); - relations = list_concat(relations, - fetch_sequence_list(wrconn, publications)); - - foreach(lc, relations) + tables = fetch_table_list(wrconn, publications); + foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -679,7 +675,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, sync_state, + AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -705,12 +701,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * * Note that if tables were specified but copy_data is false * then it is safe to enable two_phase up-front because those - * relations are already initially in READY state. When the - * subscription has no relations, we leave the twophase state - * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * tables are already initially in READY state. When the + * subscription has no tables, we leave the twophase state as + * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && relations != NIL) + if (opts.twophase && !opts.copy_data && tables != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -786,10 +782,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the list of relations from publisher. */ + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); - pubrel_names = list_concat(pubrel_names, - fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -1814,75 +1808,6 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } /* - * Get the list of sequences which belong to specified publications on the - * publisher connection. - */ -static List * -fetch_sequence_list(WalReceiverConn *wrconn, List *publications) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; - List *tablelist = NIL; - - Assert(list_length(publications) > 0); - - initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" - " FROM pg_catalog.pg_publication_sequences s\n" - " WHERE s.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } - appendStringInfoChar(&cmd, ')'); - - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive list of replicated sequences from the publisher: %s", - res->err))); - - /* Process sequences. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - char *nspname; - char *relname; - bool isnull; - RangeVar *rv; - - nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); - Assert(!isnull); - - rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); - - ExecClearTuple(slot); - } - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); - - return tablelist; -} - -/* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop * them manually, if required. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 4dd545cdd20..90edd0bb97d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -42,7 +42,6 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" -#include "catalog/pg_publication_namespace.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" @@ -16409,14 +16408,11 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) * Check that setting the relation to a different schema won't result in a * publication having both a schema and the same schema's table, as this * is not supported. - * - * XXX We do this for tables and sequences, but it's better to keep the two - * blocks separate, to make the strings easier to translate. */ if (stmt->objectType == OBJECT_TABLE) { ListCell *lc; - List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_TABLE); + List *schemaPubids = GetSchemaPublications(nspOid); List *relPubids = GetRelationPublications(RelationGetRelid(rel)); foreach(lc, relPubids) @@ -16434,27 +16430,6 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) get_publication_name(pubid, false))); } } - else if (stmt->objectType == OBJECT_SEQUENCE) - { - ListCell *lc; - List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_SEQUENCE); - List *relPubids = GetRelationPublications(RelationGetRelid(rel)); - - foreach(lc, relPubids) - { - Oid pubid = lfirst_oid(lc); - - if (list_member_oid(schemaPubids, pubid)) - ereport(ERROR, - errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot move sequence \"%s\" to schema \"%s\"", - RelationGetRelationName(rel), stmt->newschema), - errdetail("The schema \"%s\" and same schema's sequence \"%s\" cannot be part of the same publication \"%s\".", - stmt->newschema, - RelationGetRelationName(rel), - get_publication_name(pubid, false))); - } - } /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); |