diff options
Diffstat (limited to 'src')
35 files changed, 2977 insertions, 442 deletions
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index d7ce063997a..3fd17ea64f0 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -1930,12 +1930,14 @@ get_object_address_publication_schema(List *object, bool missing_ok) char *pubname; char *schemaname; Oid schemaid; + char *objtype; ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); /* Fetch schema name and publication name from input list */ schemaname = strVal(linitial(object)); pubname = strVal(lsecond(object)); + objtype = strVal(lthird(object)); schemaid = get_namespace_oid(schemaname, missing_ok); if (!OidIsValid(schemaid)) @@ -1948,10 +1950,12 @@ get_object_address_publication_schema(List *object, bool missing_ok) /* Find the publication schema mapping in syscache */ address.objectId = - GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pub->oid)); + ObjectIdGetDatum(pub->oid), + CharGetDatum(objtype[0])); + if (!OidIsValid(address.objectId) && !missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -2232,7 +2236,6 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: - case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2257,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) /* fall through to check args length */ /* FALLTHROUGH */ case OBJECT_OPERATOR: + case OBJECT_PUBLICATION_NAMESPACE: if (list_length(args) != 2) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -2327,6 +2331,8 @@ pg_get_object_address(PG_FUNCTION_ARGS) objnode = (Node *) list_make2(name, linitial(args)); break; case OBJECT_PUBLICATION_NAMESPACE: + objnode = (Node *) list_make3(linitial(name), linitial(args), lsecond(args)); + break; case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2881,11 +2887,12 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) * * Get publication name and schema name from the object address into pubname and * nspname. Both pubname and nspname are palloc'd strings which will be freed by - * the caller. + * the caller. The last parameter specifies which object type is included from + * the schema. */ static bool getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, - char **pubname, char **nspname) + char **pubname, char **nspname, char **objtype) { HeapTuple tup; Form_pg_publication_namespace pnform; @@ -2921,6 +2928,13 @@ getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, return false; } + /* + * The type is always a single character, but we need to pass it as a string, + * so allocate two charaters and set the first one. The second one is \0. + */ + *objtype = palloc0(2); + *objtype[0] = pnform->pntype; + ReleaseSysCache(tup); return true; } @@ -3926,15 +3940,17 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) { char *pubname; char *nspname; + char *objtype; if (!getPublicationSchemaInfo(object, missing_ok, - &pubname, &nspname)) + &pubname, &nspname, &objtype)) break; - appendStringInfo(&buffer, _("publication of schema %s in publication %s"), - nspname, pubname); + appendStringInfo(&buffer, _("publication of schema %s in publication %s type %s"), + nspname, pubname, objtype); pfree(pubname); pfree(nspname); + pfree(objtype); break; } @@ -5729,18 +5745,24 @@ getObjectIdentityParts(const ObjectAddress *object, { char *pubname; char *nspname; + char *objtype; if (!getPublicationSchemaInfo(object, missing_ok, &pubname, - &nspname)) + &nspname, &objtype)) break; - appendStringInfo(&buffer, "%s in publication %s", - nspname, pubname); + appendStringInfo(&buffer, "%s in publication %s type %s", + nspname, pubname, objtype); if (objargs) *objargs = list_make1(pubname); else pfree(pubname); + if (objargs) + *objargs = lappend(*objargs, objtype); + else + pfree(objtype); + if (objname) *objname = list_make1(nspname); else diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 789b895db89..5bcfc94e2ba 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -52,9 +52,10 @@ static void check_publication_add_relation(Relation targetrel) { - /* Must be a regular or partitioned table */ + /* Must be a regular or partitioned table, or a sequence */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && - RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE && + RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", @@ -131,7 +132,8 @@ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { return (reltuple->relkind == RELKIND_RELATION || - reltuple->relkind == RELKIND_PARTITIONED_TABLE) && + reltuple->relkind == RELKIND_PARTITIONED_TABLE || + reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -177,6 +179,52 @@ filter_partitions(List *relids) } /* + * Check the character is a valid object type for schema publication. + * + * This recognizes either 't' for tables or 's' for sequences. Places that + * need to handle 'u' for unsupported relkinds need to do that explicitlyl + */ +static void +AssertObjectTypeValid(char objectType) +{ +#ifdef USE_ASSERT_CHECKING + Assert(objectType == PUB_OBJTYPE_SEQUENCE || objectType == PUB_OBJTYPE_TABLE); +#endif +} + +/* + * Determine object type given the object type set for a schema. + */ +char +pub_get_object_type_for_relkind(char relkind) +{ + /* sequence maps directly to sequence relkind */ + if (relkind == RELKIND_SEQUENCE) + return PUB_OBJTYPE_SEQUENCE; + + /* for table, we match either regular or partitioned table */ + if (relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE) + return PUB_OBJTYPE_TABLE; + + return PUB_OBJTYPE_UNSUPPORTED; +} + +/* + * Determine if publication object type matches the relkind. + * + * Returns true if the relation matches object type replicated by this schema, + * false otherwise. + */ +static bool +pub_object_type_matches_relkind(char objectType, char relkind) +{ + AssertObjectTypeValid(objectType); + + return (pub_get_object_type_for_relkind(relkind) == objectType); +} + +/* * Another variant of this, taking a Relation. */ bool @@ -205,7 +253,7 @@ is_schema_publication(Oid pubid) ObjectIdGetDatum(pubid)); scan = systable_beginscan(pubschsrel, - PublicationNamespacePnnspidPnpubidIndexId, + PublicationNamespacePnnspidPnpubidPntypeIndexId, true, NULL, 1, &scankey); tup = systable_getnext(scan); result = HeapTupleIsValid(tup); @@ -313,7 +361,9 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level } else { - aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); + /* we only search for ancestors of tables, so PUB_OBJTYPE_TABLE */ + aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor), + PUB_OBJTYPE_TABLE); if (list_member_oid(aschemaPubids, puboid)) { topmost_relid = ancestor; @@ -436,7 +486,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, * Insert new publication / schema mapping. */ ObjectAddress -publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +publication_add_schema(Oid pubid, Oid schemaid, char objectType, bool if_not_exists) { Relation rel; HeapTuple tup; @@ -448,6 +498,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) ObjectAddress myself, referenced; + AssertObjectTypeValid(objectType); + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); /* @@ -455,9 +507,10 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ - if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + if (SearchSysCacheExists3(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid))) + ObjectIdGetDatum(pubid), + CharGetDatum(objectType))) { table_close(rel, RowExclusiveLock); @@ -483,6 +536,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) ObjectIdGetDatum(pubid); values[Anum_pg_publication_namespace_pnnspid - 1] = ObjectIdGetDatum(schemaid); + values[Anum_pg_publication_namespace_pntype - 1] = + CharGetDatum(objectType); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -508,7 +563,7 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) * publication_add_relation for why we need to consider all the * partitions. */ - schemaRels = GetSchemaPublicationRelations(schemaid, + schemaRels = GetSchemaPublicationRelations(schemaid, objectType, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -542,11 +597,14 @@ GetRelationPublications(Oid relid) /* * Gets list of relation oids for a publication. * - * This should only be used FOR TABLE publications, the FOR ALL TABLES - * should use GetAllTablesPublicationRelations(). + * This should only be used FOR TABLE / FOR SEQUENCE publications, the FOR + * ALL TABLES / SEQUENCES should use GetAllTablesPublicationRelations() + * and GetAllSequencesPublicationRelations(). + * + * XXX pub_partopt only matters for tables, not sequences. */ List * -GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt) { List *result; Relation pubrelsrel; @@ -554,6 +612,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) SysScanDesc scan; HeapTuple tup; + AssertObjectTypeValid(objectType); + /* Find all publications associated with the relation. */ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); @@ -568,11 +628,29 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { + char relkind; Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = GetPubPartitionOptionRelations(result, pub_partopt, - pubrel->prrelid); + relkind = get_rel_relkind(pubrel->prrelid); + + /* + * If the relkind does not match the requested object type, ignore the + * relation. For example we might be interested only in sequences, so + * we ignore tables. + */ + if (!pub_object_type_matches_relkind(objectType, relkind)) + continue; + + /* + * We don't have partitioned sequences, so just add them to the list. + * Otherwise consider adding all child relations, if requested. + */ + if (relkind == RELKIND_SEQUENCE) + result = lappend_oid(result, pubrel->prrelid); + else + result = GetPubPartitionOptionRelations(result, pub_partopt, + pubrel->prrelid); } systable_endscan(scan); @@ -623,6 +701,43 @@ GetAllTablesPublications(void) } /* + * Gets list of publication oids for publications marked as FOR ALL SEQUENCES. + */ +List * +GetAllSequencesPublications(void) +{ + List *result; + Relation rel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications that are marked as for all sequences. */ + rel = table_open(PublicationRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_puballsequences, + BTEqualStrategyNumber, F_BOOLEQ, + BoolGetDatum(true)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; + + result = lappend_oid(result, oid); + } + + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return result; +} + +/* * Gets list of all relation published by FOR ALL TABLES publication(s). * * If the publication publishes partition changes via their respective root @@ -688,28 +803,38 @@ GetAllTablesPublicationRelations(bool pubviaroot) /* * Gets the list of schema oids for a publication. * - * This should only be used FOR ALL TABLES IN SCHEMA publications. + * This should only be used FOR ALL TABLES IN SCHEMA and FOR ALL SEQUENCES + * publications. + * + * 'objectType' determines whether to get FOR TABLE or FOR SEQUENCES schemas */ List * -GetPublicationSchemas(Oid pubid) +GetPublicationSchemas(Oid pubid, char objectType) { List *result = NIL; Relation pubschsrel; - ScanKeyData scankey; + ScanKeyData scankey[2]; SysScanDesc scan; HeapTuple tup; + AssertObjectTypeValid(objectType); + /* Find all schemas associated with the publication */ pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); - ScanKeyInit(&scankey, + ScanKeyInit(&scankey[0], Anum_pg_publication_namespace_pnpubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); + ScanKeyInit(&scankey[1], + Anum_pg_publication_namespace_pntype, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(objectType)); + scan = systable_beginscan(pubschsrel, - PublicationNamespacePnnspidPnpubidIndexId, - true, NULL, 1, &scankey); + PublicationNamespacePnnspidPnpubidPntypeIndexId, + true, NULL, 2, scankey); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_publication_namespace pubsch; @@ -727,14 +852,26 @@ GetPublicationSchemas(Oid pubid) /* * Gets the list of publication oids associated with a specified schema. + * + * objectType specifies whether we're looking for schemas including tables or + * sequences. + * + * Note: relcache calls this for all object types, not just tables and sequences. + * Which is why we handle the PUB_OBJTYPE_UNSUPPORTED object type too. */ List * -GetSchemaPublications(Oid schemaid) +GetSchemaPublications(Oid schemaid, char objectType) { List *result = NIL; CatCList *pubschlist; int i; + /* unsupported object type */ + if (objectType == PUB_OBJTYPE_UNSUPPORTED) + return result; + + AssertObjectTypeValid(objectType); + /* Find all publications associated with the schema */ pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid)); @@ -742,6 +879,11 @@ GetSchemaPublications(Oid schemaid) { HeapTuple tup = &pubschlist->members[i]->tuple; Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + char pntype = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pntype; + + /* Skip schemas publishing a different object type. */ + if (pntype != objectType) + continue; result = lappend_oid(result, pubid); } @@ -753,9 +895,13 @@ GetSchemaPublications(Oid schemaid) /* * Get the list of publishable relation oids for a specified schema. + * + * objectType specifies whether this is FOR ALL TABLES IN SCHEMA or FOR ALL + * SEQUENCES IN SCHEMA */ List * -GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +GetSchemaPublicationRelations(Oid schemaid, char objectType, + PublicationPartOpt pub_partopt) { Relation classRel; ScanKeyData key[1]; @@ -764,6 +910,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) List *result = NIL; Assert(OidIsValid(schemaid)); + AssertObjectTypeValid(objectType); classRel = table_open(RelationRelationId, AccessShareLock); @@ -784,9 +931,16 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) continue; relkind = get_rel_relkind(relid); - if (relkind == RELKIND_RELATION) - result = lappend_oid(result, relid); - else if (relkind == RELKIND_PARTITIONED_TABLE) + + /* Skip if the relkind does not match FOR ALL TABLES / SEQUENCES. */ + if (!pub_object_type_matches_relkind(objectType, relkind)) + continue; + + /* + * If the object is a partitioned table, lookup all the child relations + * (if requested). Otherwise just add the object to the list. + */ + if (relkind == RELKIND_PARTITIONED_TABLE) { List *partitionrels = NIL; @@ -799,7 +953,11 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) pub_partopt, relForm->oid); result = list_concat_unique_oid(result, partitionrels); + continue; } + + /* non-partitioned tables and sequences */ + result = lappend_oid(result, relid); } table_endscan(scan); @@ -809,21 +967,25 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) /* * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA - * publication. + * or FOR ALL SEQUENCES IN SCHEMA publication. */ List * -GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetAllSchemaPublicationRelations(Oid pubid, char objectType, + PublicationPartOpt pub_partopt) { List *result = NIL; - List *pubschemalist = GetPublicationSchemas(pubid); + List *pubschemalist = GetPublicationSchemas(pubid, objectType); ListCell *cell; + AssertObjectTypeValid(objectType); + foreach(cell, pubschemalist) { Oid schemaid = lfirst_oid(cell); List *schemaRels = NIL; - schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + schemaRels = GetSchemaPublicationRelations(schemaid, objectType, + pub_partopt); result = list_concat(result, schemaRels); } @@ -831,6 +993,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) } /* + * Gets list of all relation published by FOR ALL SEQUENCES publication(s). + */ +List * +GetAllSequencesPublicationRelations(void) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_SEQUENCE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + table_close(classRel, AccessShareLock); + return result; +} + +/* * Get publication using oid * * The Publication struct and its data are palloc'ed here. @@ -852,10 +1050,12 @@ GetPublication(Oid pubid) pub->oid = pubid; pub->name = pstrdup(NameStr(pubform->pubname)); pub->alltables = pubform->puballtables; + pub->allsequences = pubform->puballsequences; pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; + pub->pubactions.pubsequence = pubform->pubsequence; pub->pubviaroot = pubform->pubviaroot; ReleaseSysCache(tup); @@ -966,10 +1166,12 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) *schemarelids; relids = GetPublicationRelations(publication->oid, + PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(publication->oid, + PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); @@ -1005,3 +1207,71 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + List *sequences; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + else + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + sequences = list_concat_unique_oid(relids, schemarelids); + } + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bd48ee7bd25..9ac8e9a2998 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -374,6 +374,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 1aad2e769cb..f890d3f0baa 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/htup_details.h" +#include "access/relation.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -67,15 +68,17 @@ typedef struct rf_context } rf_context; static List *OpenRelIdList(List *relids); -static List *OpenTableList(List *tables); -static void CloseTableList(List *rels); +static List *OpenRelationList(List *rels, char objectType); +static void CloseRelationList(List *rels); static void LockSchemaList(List *schemalist); -static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +static void PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); -static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); -static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt); -static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); +static void PublicationDropRelations(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, char objectType, + bool missing_ok); + static void parse_publication_options(ParseState *pstate, @@ -95,6 +98,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; + pubactions->pubsequence = true; *publish_via_partition_root = false; /* Parse options */ @@ -119,6 +123,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = false; pubactions->pubdelete = false; pubactions->pubtruncate = false; + pubactions->pubsequence = false; *publish_given = true; publish = defGetString(defel); @@ -141,6 +146,8 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; + else if (strcmp(publish_opt, "sequence") == 0) + pubactions->pubsequence = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -167,7 +174,9 @@ parse_publication_options(ParseState *pstate, */ static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, - List **rels, List **schemas) + List **tables, List **sequences, + List **tables_schemas, List **sequences_schemas, + List **schemas) { ListCell *cell; PublicationObjSpec *pubobj; @@ -185,12 +194,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: - *rels = lappend(*rels, pubobj->pubtable); + *tables = lappend(*tables, pubobj->pubtable); + break; + case PUBLICATIONOBJ_SEQUENCE: + *sequences = lappend(*sequences, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: schemaid = get_namespace_oid(pubobj->name, false); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA: @@ -204,6 +224,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, list_free(search_path); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; default: @@ -240,6 +275,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", RelationGetRelationName(rel), get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Sequence \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); else if (checkobjtype == PUBLICATIONOBJ_TABLE) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -248,6 +291,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, RelationGetRelationName(rel)), errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Sequence's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); } } } @@ -615,6 +666,7 @@ TransformPubWhereClauses(List *tables, const char *queryString, ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) { + ListCell *lc; Relation rel; ObjectAddress myself; Oid puboid; @@ -626,9 +678,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; + bool for_all_tables = false; + bool for_all_sequences = false; + + /* Translate the list of object types (represented by strings) to bool flags. */ + foreach (lc, stmt->for_all_objects) + { + char *val = strVal(lfirst(lc)); + if (strcmp(val, "tables") == 0) + for_all_tables = true; + else if (strcmp(val, "sequences") == 0) + for_all_sequences = true; + } + /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) @@ -636,7 +704,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) get_database_name(MyDatabaseId)); /* FOR ALL TABLES requires superuser */ - if (stmt->for_all_tables && !superuser()) + if (for_all_tables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); @@ -672,7 +740,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) Anum_pg_publication_oid); values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); values[Anum_pg_publication_puballtables - 1] = - BoolGetDatum(stmt->for_all_tables); + BoolGetDatum(for_all_tables); + values[Anum_pg_publication_puballsequences - 1] = + BoolGetDatum(for_all_sequences); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = @@ -681,6 +751,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubsequence - 1] = + BoolGetDatum(pubactions.pubsequence); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -698,45 +770,88 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CommandCounterIncrement(); /* Associate objects with the publication. */ - if (stmt->for_all_tables) + if (for_all_tables || for_all_sequences) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } - else + + /* + * If the publication might have either tables or sequences (directly or + * through a schema), process that. + */ + if (!for_all_tables || !for_all_sequences) { - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); /* FOR ALL TABLES IN SCHEMA requires superuser */ - if (list_length(schemaidlist) > 0 && !superuser()) + if (list_length(tables_schemaidlist) > 0 && !superuser()) ereport(ERROR, errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); - if (list_length(relations) > 0) + /* FOR ALL SEQUENCES IN SCHEMA requires superuser */ + if (list_length(sequences_schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL SEQUENCES IN SCHEMA publication")); + + /* tables added directly */ + if (list_length(tables) > 0) { List *rels; - rels = OpenTableList(relations); - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); + CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* sequences added directly */ + if (list_length(sequences) > 0) + { + List *rels; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* tables added through a schema */ + if (list_length(tables_schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(tables_schemaidlist); + PublicationAddSchemas(puboid, + tables_schemaidlist, PUB_OBJTYPE_TABLE, + true, NULL); } - if (list_length(schemaidlist) > 0) + /* sequences added through a schema */ + if (list_length(sequences_schemaidlist) > 0) { /* * Schema lock is held until the publication is created to prevent * concurrent schema deletion. */ - LockSchemaList(schemaidlist); - PublicationAddSchemas(puboid, schemaidlist, true, NULL); + LockSchemaList(sequences_schemaidlist); + PublicationAddSchemas(puboid, + sequences_schemaidlist, PUB_OBJTYPE_SEQUENCE, + true, NULL); } } @@ -799,6 +914,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, AccessShareLock); root_relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); foreach(lc, root_relids) @@ -857,6 +973,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; + + values[Anum_pg_publication_pubsequence - 1] = BoolGetDatum(pubactions.pubsequence); + replaces[Anum_pg_publication_pubsequence - 1] = true; } if (publish_via_partition_root_given) @@ -876,7 +995,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) { CacheInvalidateRelcacheAll(); } @@ -892,6 +1011,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, */ if (root_relids == NIL) relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ALL); else { @@ -905,7 +1025,20 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, lfirst_oid(lc)); } + /* tables */ + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); + + /* sequences */ + relids = list_concat_unique_oid(relids, + GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ALL)); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -960,7 +1093,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!tables && stmt->action != AP_SetObjects) return; - rels = OpenTableList(tables); + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); if (stmt->action == AP_AddObjects) { @@ -970,19 +1103,22 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, * Check if the relation is member of the existing schema in the * publication or member of the schema list specified. */ - schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_TABLE)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); - PublicationAddTables(pubid, rels, false, stmt); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropTables(pubid, rels, false); + PublicationDropRelations(pubid, rels, false); else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -1064,18 +1200,18 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, } /* And drop them. */ - PublicationDropTables(pubid, delrels, true); + PublicationDropRelations(pubid, delrels, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddTables(pubid, rels, true, stmt); + PublicationAddRelations(pubid, rels, true, stmt); - CloseTableList(delrels); + CloseRelationList(delrels); } - CloseTableList(rels); + CloseRelationList(rels); } /* @@ -1085,7 +1221,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, */ static void AlterPublicationSchemas(AlterPublicationStmt *stmt, - HeapTuple tup, List *schemaidlist) + HeapTuple tup, List *schemaidlist, + char objectType) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -1107,20 +1244,20 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, List *rels; List *reloids; - reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + reloids = GetPublicationRelations(pubform->oid, objectType, PUBLICATION_PART_ROOT); rels = OpenRelIdList(reloids); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLES_IN_SCHEMA); - CloseTableList(rels); - PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + CloseRelationList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropSchemas(pubform->oid, schemaidlist, false); + PublicationDropSchemas(pubform->oid, schemaidlist, objectType, false); else /* AP_SetObjects */ { - List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *oldschemaids = GetPublicationSchemas(pubform->oid, objectType); List *delschemas = NIL; /* Identify which schemas should be dropped */ @@ -1133,13 +1270,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, LockSchemaList(delschemas); /* And drop them */ - PublicationDropSchemas(pubform->oid, delschemas, true); + PublicationDropSchemas(pubform->oid, delschemas, objectType, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, true, stmt); } } @@ -1149,12 +1286,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist) + List *tables, List *tables_schemaidlist, + List *sequences, List *sequences_schemaidlist) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && - schemaidlist && !superuser()) + (tables_schemaidlist || sequences_schemaidlist) && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to add or set schemas"))); @@ -1163,13 +1301,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, * Check that user is allowed to manipulate the publication tables in * schema */ - if (schemaidlist && pubform->puballtables) + if (tables_schemaidlist && pubform->puballtables) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + /* + * Check that user is allowed to manipulate the publication sequences in + * schema + */ + if (sequences_schemaidlist && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications."))); + /* Check that user is allowed to manipulate the publication tables. */ if (tables && pubform->puballtables) ereport(ERROR, @@ -1177,6 +1326,107 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (sequences && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."))); +} + +/* + * Add or remove sequence to/from publication. + */ +static void +AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, + List *sequences, List *schemaidlist) +{ + List *rels = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + Oid pubid = pubform->oid; + + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!sequences && stmt->action != AP_SetObjects) + return; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + + if (stmt->action == AP_AddObjects) + { + List *schemas = NIL; + + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_SEQUENCE)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(pubid, rels, false, stmt); + } + else if (stmt->action == AP_DropObjects) + PublicationDropRelations(pubid, rels, false); + else /* DEFELEM_SET */ + { + List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ROOT); + List *delrels = NIL; + ListCell *oldlc; + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + + /* Calculate which relations to drop. */ + foreach(oldlc, oldrelids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + PublicationRelInfo *oldrel; + bool found = false; + + foreach(newlc, rels) + { + PublicationRelInfo *newpubrel; + + newpubrel = (PublicationRelInfo *) lfirst(newlc); + if (RelationGetRelid(newpubrel->relation) == oldrelid) + { + found = true; + break; + } + } + /* Not yet in the list, open it and add to the list */ + if (!found) + { + oldrel = palloc(sizeof(PublicationRelInfo)); + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrelid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); + } + } + + /* And drop them. */ + PublicationDropRelations(pubid, delrels, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddRelations(pubid, rels, true, stmt); + + CloseRelationList(delrels); + } + + CloseRelationList(rels); } /* @@ -1214,14 +1464,22 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) AlterPublicationOptions(pstate, stmt, rel, tup); else { - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; Oid pubid = pubform->oid; - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); - CheckAlterPublication(stmt, tup, relations, schemaidlist); + CheckAlterPublication(stmt, tup, + tables, tables_schemaidlist, + sequences, sequences_schemaidlist); heap_freetuple(tup); @@ -1249,9 +1507,16 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist, + AlterPublicationTables(stmt, tup, tables, tables_schemaidlist, pstate->p_sourcetext); - AlterPublicationSchemas(stmt, tup, schemaidlist); + + AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist); + + AlterPublicationSchemas(stmt, tup, tables_schemaidlist, + PUB_OBJTYPE_TABLE); + + AlterPublicationSchemas(stmt, tup, sequences_schemaidlist, + PUB_OBJTYPE_SEQUENCE); } /* Cleanup. */ @@ -1319,7 +1584,7 @@ RemovePublicationById(Oid pubid) pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) CacheInvalidateRelcacheAll(); CatalogTupleDelete(rel, &tup->t_self); @@ -1355,6 +1620,7 @@ RemovePublicationSchemaById(Oid psoid) * partitions. */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + pubsch->pntype, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -1397,29 +1663,45 @@ OpenRelIdList(List *relids) * add them to a publication. */ static List * -OpenTableList(List *tables) +OpenRelationList(List *rels, char objectType) { List *relids = NIL; - List *rels = NIL; + List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; /* * Open, share-lock, and check all the explicitly-specified relations */ - foreach(lc, tables) + foreach(lc, rels) { PublicationTable *t = lfirst_node(PublicationTable, lc); bool recurse = t->relation->inh; Relation rel; Oid myrelid; PublicationRelInfo *pub_rel; + char myrelkind; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); rel = table_openrv(t->relation, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); + myrelkind = get_rel_relkind(myrelid); + + /* + * Make sure the relkind matches the expected object type. This may + * happen e.g. when adding a sequence using ADD TABLE or a table + * using ADD SEQUENCE). + * + * XXX We let through unsupported object types (views etc.). Those + * will be caught later in check_publication_add_relation. + */ + if (pub_get_object_type_for_relkind(myrelkind) != PUB_OBJTYPE_UNSUPPORTED && + pub_get_object_type_for_relkind(myrelkind) != objectType) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("object type does not match type expected by command")); /* * Filter out duplicates if user specifies "foo, foo". @@ -1444,7 +1726,7 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) @@ -1498,7 +1780,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) @@ -1510,14 +1792,14 @@ OpenTableList(List *tables) list_free(relids); list_free(relids_with_rf); - return rels; + return result; } /* * Close all relations in the list. */ static void -CloseTableList(List *rels) +CloseRelationList(List *rels) { ListCell *lc; @@ -1565,12 +1847,12 @@ LockSchemaList(List *schemalist) * Add listed tables to the publication. */ static void -PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, rels) { @@ -1599,7 +1881,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, * Remove listed tables from the publication. */ static void -PublicationDropTables(Oid pubid, List *rels, bool missing_ok) +PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1639,19 +1921,19 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) * Add listed schemas to the publication. */ static void -PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt) +PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, schemas) { Oid schemaid = lfirst_oid(lc); ObjectAddress obj; - obj = publication_add_schema(pubid, schemaid, if_not_exists); + obj = publication_add_schema(pubid, schemaid, objectType, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -1667,7 +1949,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, * Remove listed schemas from the publication. */ static void -PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +PublicationDropSchemas(Oid pubid, List *schemas, char objectType, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1677,10 +1959,11 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) { Oid schemaid = lfirst_oid(lc); - psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + psid = GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid)); + ObjectIdGetDatum(pubid), + CharGetDatum(objectType)); if (!OidIsValid(psid)) { if (missing_ok) @@ -1735,6 +2018,13 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) NameStr(form->pubname)), errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); + if (form->puballsequences && !superuser_arg(newOwnerId)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to change owner of publication \"%s\"", + NameStr(form->pubname)), + errhint("The owner of a FOR ALL SEQUENCES publication must be a superuser."))); + if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c13cada3bf1..717bb0b2aa9 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -337,6 +337,160 @@ ResetSequence(Oid seq_relid) } /* + * Update the sequence state by modifying the existing sequence data row. + * + * This keeps the same relfilenode, so the behavior is non-transactional. + */ +static void +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) +{ + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqdatatuple; + Form_pg_sequence_data seq; + + /* open and lock sequence */ + init_sequence(seqrelid, &elm, &seqrel); + + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + + /* check the comment above nextval_internal()'s equivalent call. */ + if (RelationNeedsWAL(seqrel)) + { + GetTopTransactionId(); + + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + + /* ready to change the on-disk (or really, in-buffer) tuple */ + START_CRIT_SECTION(); + + seq->last_value = last_value; + seq->is_called = is_called; + seq->log_cnt = log_cnt; + + MarkBufferDirty(buf); + + /* XLOG stuff */ + if (RelationNeedsWAL(seqrel)) + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + Page page = BufferGetPage(buf); + + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); + + xlrec.node = seqrel->rd_node; + xlrec.created = false; + + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buf); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seqrel, NoLock); +} + +/* + * Update the sequence state by creating a new relfilenode. + * + * This creates a new relfilenode, to allow transactional behavior. + */ +static void +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) +{ + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqdatatuple; + Form_pg_sequence_data seq; + HeapTuple tuple; + + /* open and lock sequence */ + init_sequence(seq_relid, &elm, &seqrel); + + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + + /* Copy the existing sequence tuple. */ + tuple = heap_copytuple(&seqdatatuple); + + /* Now we're done with the old page */ + UnlockReleaseBuffer(buf); + + /* + * Modify the copied tuple to update the sequence state (similar to what + * ResetSequence does). + */ + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + seq->last_value = last_value; + seq->is_called = is_called; + seq->log_cnt = log_cnt; + + /* + * Create a new storage file for the sequence - this is needed for the + * transactional behavior. + */ + RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence); + + /* + * Ensure sequence's relfrozenxid is at 0, since it won't contain any + * unfrozen XIDs. Same with relminmxid, since a sequence will never + * contain multixacts. + */ + Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId); + Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId); + + /* + * Insert the modified tuple into the new storage file. This does all the + * necessary WAL-logging etc. + */ + fill_seq_with_data(seqrel, tuple); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seqrel, NoLock); +} + +/* + * Set a sequence to a specified internal state. + * + * The change is made transactionally, so that on failure of the current + * transaction, the sequence will be restored to its previous state. + * We do that by creating a whole new relfilenode for the sequence; so this + * works much like the rewriting forms of ALTER TABLE. + * + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, + * which must not be released until end of transaction. Caller is also + * responsible for permissions checking. + */ +void +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called) +{ + if (transactional) + SetSequence_transactional(seq_relid, last_value, log_cnt, is_called); + else + SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called); +} + +/* * Initialize a sequence's relation with the specified tuple as content */ static void diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e16f04626de..abebffdf3bb 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -90,6 +90,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -541,9 +542,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; + List *relations; ListCell *lc; - char table_state; + char sync_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -558,14 +559,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Get the table and sequence list from publisher and build + * local relation sync status info. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); + + foreach(lc, relations) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -576,7 +580,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, table_state, + AddSubscriptionRelState(subid, relid, sync_state, InvalidXLogRecPtr); } @@ -602,12 +606,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * * Note that if tables were specified but copy_data is false * then it is safe to enable two_phase up-front because those - * tables are already initially in READY state. When the - * subscription has no tables, we leave the twophase state as - * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * relations are already initially in READY state. When the + * subscription has no relations, we leave the twophase state + * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && relations != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -677,8 +681,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { - /* Get the table list from publisher. */ + /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -1713,6 +1719,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } /* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); + first = true; + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated sequences from the publisher: %s", + res->err))); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + tablelist = lappend(tablelist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return tablelist; +} + +/* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop * them manually, if required. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 80faae985e9..124b9961dc9 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -42,6 +42,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" @@ -16381,11 +16382,14 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) * Check that setting the relation to a different schema won't result in a * publication having both a schema and the same schema's table, as this * is not supported. + * + * XXX We do this for tables and sequences, but it's better to keep the two + * blocks separate, to make the strings easier to translate. */ if (stmt->objectType == OBJECT_TABLE) { ListCell *lc; - List *schemaPubids = GetSchemaPublications(nspOid); + List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_TABLE); List *relPubids = GetRelationPublications(RelationGetRelid(rel)); foreach(lc, relPubids) @@ -16403,6 +16407,27 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) get_publication_name(pubid, false))); } } + else if (stmt->objectType == OBJECT_SEQUENCE) + { + ListCell *lc; + List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_SEQUENCE); + List *relPubids = GetRelationPublications(RelationGetRelid(rel)); + + foreach(lc, relPubids) + { + Oid pubid = lfirst_oid(lc); + + if (list_member_oid(schemaPubids, pubid)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move sequence \"%s\" to schema \"%s\"", + RelationGetRelationName(rel), stmt->newschema), + errdetail("The schema \"%s\" and same schema's sequence \"%s\" cannot be part of the same publication \"%s\".", + stmt->newschema, + RelationGetRelationName(rel), + get_publication_name(pubid, false))); + } + } /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 13328141e23..0df7cf58747 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -636,7 +636,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d4f8455a2bd..55f720a88f4 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4862,7 +4862,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(pubobjects); - COPY_SCALAR_FIELD(for_all_tables); + COPY_NODE_FIELD(for_all_objects); return newnode; } @@ -4875,7 +4875,7 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(pubobjects); - COPY_SCALAR_FIELD(for_all_tables); + COPY_NODE_FIELD(for_all_objects); COPY_SCALAR_FIELD(action); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index f1002afe7a0..82562eb9b87 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2333,7 +2333,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(pubobjects); - COMPARE_SCALAR_FIELD(for_all_tables); + COMPARE_NODE_FIELD(for_all_objects); return true; } @@ -2345,7 +2345,7 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(pubobjects); - COMPARE_SCALAR_FIELD(for_all_tables); + COMPARE_NODE_FIELD(for_all_objects); COMPARE_SCALAR_FIELD(action); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 0036c2f9e2d..e327bc735fb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -446,7 +446,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list pub_obj_list + drop_option_list pub_obj_list pub_obj_type_list %type <node> opt_routine_body %type <groupclause> group_clause @@ -575,6 +575,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> var_value zone_value %type <rolespec> auth_ident RoleSpec opt_granted_by %type <publicationobjectspec> PublicationObjSpec +%type <node> pub_obj_type %type <keyword> unreserved_keyword type_func_name_keyword %type <keyword> col_name_keyword reserved_keyword @@ -9701,12 +9702,9 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec * * CREATE PUBLICATION FOR ALL TABLES [WITH options] * - * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] - * - * pub_obj is one of: + * CREATE PUBLICATION FOR ALL SEQUENCES [WITH options] * - * TABLE table [, ...] - * ALL TABLES IN SCHEMA schema [, ...] + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] * *****************************************************************************/ @@ -9718,12 +9716,12 @@ CreatePublicationStmt: n->options = $4; $$ = (Node *)n; } - | CREATE PUBLICATION name FOR ALL TABLES opt_definition + | CREATE PUBLICATION name FOR ALL pub_obj_type_list opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; n->options = $7; - n->for_all_tables = true; + n->for_all_objects = $6; $$ = (Node *)n; } | CREATE PUBLICATION name FOR pub_obj_list opt_definition @@ -9772,6 +9770,26 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA; $$->location = @5; } + | SEQUENCE relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; + } + | ALL SEQUENCES IN_P SCHEMA ColId + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA; + $$->name = $5; + $$->location = @5; + } + | ALL SEQUENCES IN_P SCHEMA CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA; + $$->location = @5; + } | ColId OptWhereClause { $$ = makeNode(PublicationObjSpec); @@ -9826,6 +9844,19 @@ pub_obj_list: PublicationObjSpec { $$ = lappend($1, $3); } ; +pub_obj_type: TABLES + { $$ = (Node *) makeString("tables"); } + | SEQUENCES + { $$ = (Node *) makeString("sequences"); } + ; + +pub_obj_type_list: pub_obj_type + { $$ = list_make1($1); } + | pub_obj_type_list ',' pub_obj_type + { $$ = lappend($1, $3); } + ; + + /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) @@ -9836,11 +9867,6 @@ pub_obj_list: PublicationObjSpec * * ALTER PUBLICATION name SET pub_obj [, ...] * - * pub_obj is one of: - * - * TABLE table_name [, ...] - * ALL TABLES IN SCHEMA schema_name [, ...] - * *****************************************************************************/ AlterPublicationStmt: diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index c9b0eeefd7e..3dbe85d61a2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -649,6 +649,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, } /* + * Write SEQUENCE to stream + */ +void +logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, + XLogRecPtr lsn, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + uint8 flags = 0; + char *relname; + + pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + + logicalrep_write_namespace(out, RelationGetNamespace(rel)); + relname = RelationGetRelationName(rel); + pq_sendstring(out, relname); + + pq_sendint8(out, transactional); + pq_sendint64(out, last_value); + pq_sendint64(out, log_cnt); + pq_sendint8(out, is_called); +} + +/* + * Read SEQUENCE from the stream. + */ +void +logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) +{ + /* XXX skipping flags and lsn */ + pq_getmsgint(in, 1); + pq_getmsgint64(in); + + /* Read relation name from stream */ + seqdata->nspname = pstrdup(logicalrep_read_namespace(in)); + seqdata->seqname = pstrdup(pq_getmsgstring(in)); + + seqdata->transactional = pq_getmsgint(in, 1); + seqdata->last_value = pq_getmsgint64(in); + seqdata->log_cnt = pq_getmsgint64(in); + seqdata->is_called = pq_getmsgint(in, 1); +} + +/* * Write relation description to the output stream. */ void @@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_SEQUENCE: + return "SEQUENCE"; } elog(ERROR, "invalid logical replication message type \"%c\"", action); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571c..d8b12d94bc3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1000,6 +1001,95 @@ copy_table(Relation rel) } /* + * Fetch sequence data (current state) from the remote node. + */ +static void +fetch_sequence_data(char *nspname, char *relname, + int64 *last_value, int64 *log_cnt, bool *is_called) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_sequence(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + List *qual = NIL; + StringInfoData cmd; + int64 last_value = 0, + log_cnt = 0; + bool is_called = 0; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel, &qual); + + /* sequences don't have row filters */ + Assert(!qual); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + Assert(lrel.relkind == RELKIND_SEQUENCE); + + fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); + + /* tablesync sets the sequences in non-transactional way */ + SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* Do the right action depending on the relation kind. */ + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + /* Now do the initial sequence copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_sequence(rel); + PopActiveSnapshot(); + } + else + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + } res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db8..f3868b3e1f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s) } /* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by SetSequence). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ + LockRelationOid(relid, AccessExclusiveLock); + + /* apply the sequence change */ + SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + +/* * Handle STREAM START message. */ static void @@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5fddab3a3d4..4cdc698cbb3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,6 +15,7 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "commands/defrem.h" #include "executor/executor.h" @@ -53,6 +54,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation relation, bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -208,6 +213,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->sequence_cb = pgoutput_sequence; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -224,6 +230,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; + cb->stream_sequence_cb = pgoutput_sequence; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; @@ -237,6 +244,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool publication_names_given = false; bool binary_option_given = false; bool messages_option_given = false; + bool sequences_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; @@ -244,6 +252,7 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = false; data->messages = false; data->two_phase = false; + data->sequences = true; foreach(lc, options) { @@ -312,6 +321,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->messages = defGetBoolean(defel); } + else if (strcmp(defel->defname, "sequences") == 0) + { + if (sequences_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sequences_option_given = true; + + data->sequences = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -1440,6 +1459,51 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation relation, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + RelationSyncEntry *relentry; + + if (!data->sequences) + return; + + if (!is_publishable_relation(relation)) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + relentry = get_rel_sync_entry(data, relation); + + /* + * First check the sequence filter. + * + * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here. + */ + if (!relentry->pubactions.pubsequence) + return; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_sequence(ctx->out, + relation, + xid, + sequence_lsn, + transactional, + last_value, + log_cnt, + is_called); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ @@ -1725,7 +1789,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = + entry->pubactions.pubsequence = false; entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); @@ -1739,13 +1804,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); - + char objectType = pub_get_object_type_for_relkind(get_rel_relkind(relid)); /* * We don't acquire a lock on the namespace system table as we build * the cache entry using a historic snapshot and all the later changes * are absorbed while decoding WAL. */ - List *schemaPubids = GetSchemaPublications(schemaId); + List *schemaPubids = GetSchemaPublications(schemaId, objectType); ListCell *lc; Oid publish_as_relid = relid; int publish_ancestor_level = 0; @@ -1780,6 +1845,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->pubactions.pubsequence = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). @@ -1826,9 +1892,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * If this is a FOR ALL TABLES publication, pick the partition root - * and set the ancestor level accordingly. + * and set the ancestor level accordingly. If this is a FOR ALL + * SEQUENCES publication, we publish it too but we don't need to + * pick the partition root etc. */ - if (pub->alltables) + if (pub->alltables || pub->allsequences) { publish = true; if (pub->pubviaroot && am_partition) @@ -1889,6 +1957,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + entry->pubactions.pubsequence |= pub->pubactions.pubsequence; /* * We want to publish the changes as the top-most ancestor diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 3d05297b0d9..4f3fe1118a2 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -56,6 +56,7 @@ #include "catalog/pg_opclass.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_shseclabel.h" #include "catalog/pg_statistic_ext.h" @@ -5562,6 +5563,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) Oid schemaid; List *ancestors = NIL; Oid relid = RelationGetRelid(relation); + char relkind = relation->rd_rel->relkind; + char objType; /* * If not publishable, it publishes no actions. (pgoutput_change() will @@ -5588,8 +5591,15 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) /* Fetch the publication membership info. */ puboids = GetRelationPublications(relid); schemaid = RelationGetNamespace(relation); - puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + objType = pub_get_object_type_for_relkind(relkind); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid, objType)); + + /* + * If this is a partion (and thus a table), lookup all ancestors and track + * all publications them too. + */ if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5601,12 +5611,23 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + + /* include all publications publishing schema of all ancestors */ schemaid = get_rel_namespace(ancestor); puboids = list_concat_unique_oid(puboids, - GetSchemaPublications(schemaid)); + GetSchemaPublications(schemaid, + PUB_OBJTYPE_TABLE)); } } - puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + + /* + * Consider also FOR ALL TABLES and FOR ALL SEQUENCES publications, + * depending on the relkind of the relation. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + puboids = list_concat_unique_oid(puboids, GetAllSequencesPublications()); + else + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); foreach(lc, puboids) { @@ -5625,6 +5646,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->pubactions.pubupdate |= pubform->pubupdate; pubdesc->pubactions.pubdelete |= pubform->pubdelete; pubdesc->pubactions.pubtruncate |= pubform->pubtruncate; + pubdesc->pubactions.pubsequence |= pubform->pubsequence; /* * Check if all columns referenced in the filter expression are part of diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index f4e7819f1e2..a675877d195 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -630,12 +630,12 @@ static const struct cachedesc cacheinfo[] = { 64 }, {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ - PublicationNamespacePnnspidPnpubidIndexId, - 2, + PublicationNamespacePnnspidPnpubidPntypeIndexId, + 3, { Anum_pg_publication_namespace_pnnspid, Anum_pg_publication_namespace_pnpubid, - 0, + Anum_pg_publication_namespace_pntype, 0 }, 64 diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e5816c4ccea..00629f08362 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3814,10 +3814,12 @@ getPublications(Archive *fout, int *numPublications) int i_pubname; int i_pubowner; int i_puballtables; + int i_puballsequences; int i_pubinsert; int i_pubupdate; int i_pubdelete; int i_pubtruncate; + int i_pubsequence; int i_pubviaroot; int i, ntups; @@ -3833,23 +3835,29 @@ getPublications(Archive *fout, int *numPublications) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 150000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "p.pubowner, " + "p.puballtables, p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubsequence, p.pubviaroot " + "FROM pg_publication p"); + else if (fout->remoteVersion >= 130000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubsequence, p.pubviaroot " "FROM pg_publication p"); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubsequence, false AS pubviaroot " "FROM pg_publication p"); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubsequence, false AS pubviaroot " "FROM pg_publication p"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -3861,10 +3869,12 @@ getPublications(Archive *fout, int *numPublications) i_pubname = PQfnumber(res, "pubname"); i_pubowner = PQfnumber(res, "pubowner"); i_puballtables = PQfnumber(res, "puballtables"); + i_puballsequences = PQfnumber(res, "puballsequences"); i_pubinsert = PQfnumber(res, "pubinsert"); i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); + i_pubsequence = PQfnumber(res, "pubsequence"); i_pubviaroot = PQfnumber(res, "pubviaroot"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3880,6 +3890,8 @@ getPublications(Archive *fout, int *numPublications) pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner)); pubinfo[i].puballtables = (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0); + pubinfo[i].puballsequences = + (strcmp(PQgetvalue(res, i, i_puballsequences), "t") == 0); pubinfo[i].pubinsert = (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0); pubinfo[i].pubupdate = @@ -3888,6 +3900,8 @@ getPublications(Archive *fout, int *numPublications) (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); pubinfo[i].pubtruncate = (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); + pubinfo[i].pubsequence = + (strcmp(PQgetvalue(res, i, i_pubsequence), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); @@ -3933,6 +3947,9 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) if (pubinfo->puballtables) appendPQExpBufferStr(query, " FOR ALL TABLES"); + if (pubinfo->puballsequences) + appendPQExpBufferStr(query, " FOR ALL SEQUENCES"); + appendPQExpBufferStr(query, " WITH (publish = '"); if (pubinfo->pubinsert) { @@ -3967,6 +3984,15 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) first = false; } + if (pubinfo->pubsequence) + { + if (!first) + appendPQExpBufferStr(query, ", "); + + appendPQExpBufferStr(query, "sequence"); + first = false; + } + appendPQExpBufferStr(query, "'"); if (pubinfo->pubviaroot) @@ -4013,6 +4039,7 @@ getPublicationNamespaces(Archive *fout) int i_oid; int i_pnpubid; int i_pnnspid; + int i_pntype; int i, j, ntups; @@ -4024,7 +4051,7 @@ getPublicationNamespaces(Archive *fout) /* Collect all publication membership info. */ appendPQExpBufferStr(query, - "SELECT tableoid, oid, pnpubid, pnnspid " + "SELECT tableoid, oid, pnpubid, pnnspid, pntype " "FROM pg_catalog.pg_publication_namespace"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4034,6 +4061,7 @@ getPublicationNamespaces(Archive *fout) i_oid = PQfnumber(res, "oid"); i_pnpubid = PQfnumber(res, "pnpubid"); i_pnnspid = PQfnumber(res, "pnnspid"); + i_pntype = PQfnumber(res, "pntype"); /* this allocation may be more than we need */ pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); @@ -4043,6 +4071,7 @@ getPublicationNamespaces(Archive *fout) { Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid)); Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid)); + char pntype = PQgetvalue(res, i, i_pntype)[0]; PublicationInfo *pubinfo; NamespaceInfo *nspinfo; @@ -4074,6 +4103,7 @@ getPublicationNamespaces(Archive *fout) pubsinfo[j].dobj.name = nspinfo->dobj.name; pubsinfo[j].publication = pubinfo; pubsinfo[j].pubschema = nspinfo; + pubsinfo[j].pubtype = pntype; /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout); @@ -4207,7 +4237,11 @@ dumpPublicationNamespace(Archive *fout, const PublicationSchemaInfo *pubsinfo) query = createPQExpBuffer(); appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + if (pubsinfo->pubtype == 't') + appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + else + appendPQExpBuffer(query, "ADD ALL SEQUENCES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); /* * There is no point in creating drop query as the drop is done by schema @@ -4240,6 +4274,7 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) TableInfo *tbinfo = pubrinfo->pubtable; PQExpBuffer query; char *tag; + char *description; /* Do nothing in data-only dump */ if (dopt->dataOnly) @@ -4249,10 +4284,22 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) query = createPQExpBuffer(); - appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", - fmtId(pubinfo->dobj.name)); + if (tbinfo->relkind == RELKIND_SEQUENCE) + { + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD SEQUENCE", + fmtId(pubinfo->dobj.name)); + description = "PUBLICATION SEQUENCE"; + } + else + { + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", + fmtId(pubinfo->dobj.name)); + description = "PUBLICATION TABLE"; + } + appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrelqual) { /* @@ -4275,7 +4322,7 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) ARCHIVE_OPTS(.tag = tag, .namespace = tbinfo->dobj.namespace->dobj.name, .owner = pubinfo->rolname, - .description = "PUBLICATION TABLE", + .description = description, .section = SECTION_POST_DATA, .createStmt = query->data)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 772dc0cf7a2..893725d121f 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -615,10 +615,12 @@ typedef struct _PublicationInfo DumpableObject dobj; const char *rolname; bool puballtables; + bool puballsequences; bool pubinsert; bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubsequence; bool pubviaroot; } PublicationInfo; @@ -643,6 +645,7 @@ typedef struct _PublicationSchemaInfo DumpableObject dobj; PublicationInfo *publication; NamespaceInfo *pubschema; + char pubtype; } PublicationSchemaInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index fd1052e5db8..19f908f6006 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2358,7 +2358,7 @@ my %tests = ( create_order => 50, create_sql => 'CREATE PUBLICATION pub1;', regexp => qr/^ - \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E + \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate, sequence');\E /xm, like => { %full_runs, section_post_data => 1, }, }, @@ -2378,16 +2378,27 @@ my %tests = ( create_order => 50, create_sql => 'CREATE PUBLICATION pub3;', regexp => qr/^ - \QCREATE PUBLICATION pub3 WITH (publish = 'insert, update, delete, truncate');\E + \QCREATE PUBLICATION pub3 WITH (publish = 'insert, update, delete, truncate, sequence');\E /xm, like => { %full_runs, section_post_data => 1, }, }, 'CREATE PUBLICATION pub4' => { create_order => 50, - create_sql => 'CREATE PUBLICATION pub4;', + create_sql => 'CREATE PUBLICATION pub4 + FOR ALL SEQUENCES + WITH (publish = \'\');', regexp => qr/^ - \QCREATE PUBLICATION pub4 WITH (publish = 'insert, update, delete, truncate');\E + \QCREATE PUBLICATION pub4 FOR ALL SEQUENCES WITH (publish = '');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub5' => { + create_order => 50, + create_sql => 'CREATE PUBLICATION pub5;', + regexp => qr/^ + \QCREATE PUBLICATION pub5 WITH (publish = 'insert, update, delete, truncate, sequence');\E /xm, like => { %full_runs, section_post_data => 1, }, }, @@ -2474,6 +2485,27 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'ALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA dump_test' => { + create_order => 51, + create_sql => + 'ALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA dump_test;', + regexp => qr/^ + \QALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA dump_test;\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'ALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA public' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA public;', + regexp => qr/^ + \QALTER PUBLICATION pub3 ADD ALL SEQUENCES IN SCHEMA public;\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SCHEMA public' => { regexp => qr/^CREATE SCHEMA public;/m, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 714097cad1b..b8a532a45f7 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -1633,28 +1633,19 @@ describeOneTableDetails(const char *schemaname, if (tableinfo.relkind == RELKIND_SEQUENCE) { PGresult *result = NULL; - printQueryOpt myopt = pset.popt; - char *footers[2] = {NULL, NULL}; if (pset.sversion >= 100000) { printfPQExpBuffer(&buf, - "SELECT pg_catalog.format_type(seqtypid, NULL) AS \"%s\",\n" - " seqstart AS \"%s\",\n" - " seqmin AS \"%s\",\n" - " seqmax AS \"%s\",\n" - " seqincrement AS \"%s\",\n" - " CASE WHEN seqcycle THEN '%s' ELSE '%s' END AS \"%s\",\n" - " seqcache AS \"%s\"\n", - gettext_noop("Type"), - gettext_noop("Start"), - gettext_noop("Minimum"), - gettext_noop("Maximum"), - gettext_noop("Increment"), + "SELECT pg_catalog.format_type(seqtypid, NULL),\n" + " seqstart,\n" + " seqmin,\n" + " seqmax,\n" + " seqincrement,\n" + " CASE WHEN seqcycle THEN '%s' ELSE '%s' END,\n" + " seqcache\n", gettext_noop("yes"), - gettext_noop("no"), - gettext_noop("Cycles?"), - gettext_noop("Cache")); + gettext_noop("no")); appendPQExpBuffer(&buf, "FROM pg_catalog.pg_sequence\n" "WHERE seqrelid = '%s';", @@ -1663,22 +1654,15 @@ describeOneTableDetails(const char *schemaname, else { printfPQExpBuffer(&buf, - "SELECT 'bigint' AS \"%s\",\n" - " start_value AS \"%s\",\n" - " min_value AS \"%s\",\n" - " max_value AS \"%s\",\n" - " increment_by AS \"%s\",\n" - " CASE WHEN is_cycled THEN '%s' ELSE '%s' END AS \"%s\",\n" - " cache_value AS \"%s\"\n", - gettext_noop("Type"), - gettext_noop("Start"), - gettext_noop("Minimum"), - gettext_noop("Maximum"), - gettext_noop("Increment"), + "SELECT 'bigint',\n" + " start_value,\n" + " min_value,\n" + " max_value,\n" + " increment_by,\n" + " CASE WHEN is_cycled THEN '%s' ELSE '%s' END,\n" + " cache_value\n", gettext_noop("yes"), - gettext_noop("no"), - gettext_noop("Cycles?"), - gettext_noop("Cache")); + gettext_noop("no")); appendPQExpBuffer(&buf, "FROM %s", fmtId(schemaname)); /* must be separate because fmtId isn't reentrant */ appendPQExpBuffer(&buf, ".%s;", fmtId(relationname)); @@ -1688,6 +1672,51 @@ describeOneTableDetails(const char *schemaname, if (!res) goto error_return; + numrows = PQntuples(res); + + /* XXX reset to use expanded output for sequences (maybe we should + * keep this disabled, just like for tables?) */ + myopt.expanded = pset.popt.topt.expanded; + + printTableInit(&cont, &myopt, title.data, 7, numrows); + printTableInitialized = true; + + printfPQExpBuffer(&title, _("Sequence \"%s.%s\""), + schemaname, relationname); + + printTableAddHeader(&cont, gettext_noop("Type"), true, 'l'); + printTableAddHeader(&cont, gettext_noop("Start"), true, 'r'); + printTableAddHeader(&cont, gettext_noop("Minimum"), true, 'r'); + printTableAddHeader(&cont, gettext_noop("Maximum"), true, 'r'); + printTableAddHeader(&cont, gettext_noop("Increment"), true, 'r'); + printTableAddHeader(&cont, gettext_noop("Cycles?"), true, 'l'); + printTableAddHeader(&cont, gettext_noop("Cache"), true, 'r'); + + /* Generate table cells to be printed */ + for (i = 0; i < numrows; i++) + { + /* Type */ + printTableAddCell(&cont, PQgetvalue(res, i, 0), false, false); + + /* Start */ + printTableAddCell(&cont, PQgetvalue(res, i, 1), false, false); + + /* Minimum */ + printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); + + /* Maximum */ + printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); + + /* Increment */ + printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); + + /* Cycles? */ + printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); + + /* Cache */ + printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); + } + /* Footer information about a sequence */ /* Get the column that owns this sequence */ @@ -1721,29 +1750,63 @@ describeOneTableDetails(const char *schemaname, switch (PQgetvalue(result, 0, 1)[0]) { case 'a': - footers[0] = psprintf(_("Owned by: %s"), - PQgetvalue(result, 0, 0)); + printTableAddFooter(&cont, + psprintf(_("Owned by: %s"), + PQgetvalue(result, 0, 0))); break; case 'i': - footers[0] = psprintf(_("Sequence for identity column: %s"), - PQgetvalue(result, 0, 0)); + printTableAddFooter(&cont, + psprintf(_("Sequence for identity column: %s"), + PQgetvalue(result, 0, 0))); break; } } PQclear(result); - printfPQExpBuffer(&title, _("Sequence \"%s.%s\""), - schemaname, relationname); + /* print any publications */ + if (pset.sversion >= 150000) + { + int tuples = 0; - myopt.footers = footers; - myopt.topt.default_footer = false; - myopt.title = title.data; - myopt.translate_header = true; + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" + "WHERE pc.oid ='%s' and pn.pntype = 's' and pg_catalog.pg_relation_is_publishable('%s')\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballsequences AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid, oid); - printQuery(res, &myopt, pset.queryFout, false, pset.logfile); + result = PSQLexec(buf.data); + if (!result) + goto error_return; + else + tuples = PQntuples(result); + + if (tuples > 0) + printTableAddFooter(&cont, _("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(result); + } - if (footers[0]) - free(footers[0]); + printTable(&cont, pset.queryFout, false, pset.logfile); retval = true; goto error_return; /* not an error, just return early */ @@ -1970,6 +2033,11 @@ describeOneTableDetails(const char *schemaname, for (i = 0; i < cols; i++) printTableAddHeader(&cont, headers[i], true, 'l'); + res = PSQLexec(buf.data); + if (!res) + goto error_return; + numrows = PQntuples(res); + /* Generate table cells to be printed */ for (i = 0; i < numrows; i++) { @@ -2895,7 +2963,7 @@ describeOneTableDetails(const char *schemaname, "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" - "WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n" + "WHERE pc.oid ='%s' and pn.pntype = 't' and pg_catalog.pg_relation_is_publishable('%s')\n" "UNION\n" "SELECT pubname\n" " , pg_get_expr(pr.prqual, c.oid)\n" @@ -4785,7 +4853,7 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) int i; printfPQExpBuffer(&buf, - "SELECT pubname \n" + "SELECT pubname, (CASE WHEN pntype = 't' THEN 'tables' ELSE 'sequences' END) AS pubtype\n" "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" " JOIN pg_catalog.pg_namespace n ON n.oid = pn.pnnspid \n" @@ -4814,8 +4882,9 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) /* Might be an empty set - that's ok */ for (i = 0; i < pub_schema_tuples; i++) { - printfPQExpBuffer(&buf, " \"%s\"", - PQgetvalue(result, i, 0)); + printfPQExpBuffer(&buf, " \"%s\" (%s)", + PQgetvalue(result, i, 0), + PQgetvalue(result, i, 1)); footers[i + 1] = pg_strdup(buf.data); } @@ -5820,7 +5889,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5834,23 +5903,45 @@ listPublications(const char *pattern) initPQExpBuffer(&buf); - printfPQExpBuffer(&buf, - "SELECT pubname AS \"%s\",\n" - " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n" - " puballtables AS \"%s\",\n" - " pubinsert AS \"%s\",\n" - " pubupdate AS \"%s\",\n" - " pubdelete AS \"%s\"", - gettext_noop("Name"), - gettext_noop("Owner"), - gettext_noop("All tables"), - gettext_noop("Inserts"), - gettext_noop("Updates"), - gettext_noop("Deletes")); + if (pset.sversion >= 150000) + printfPQExpBuffer(&buf, + "SELECT pubname AS \"%s\",\n" + " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n" + " puballtables AS \"%s\",\n" + " puballsequences AS \"%s\",\n" + " pubinsert AS \"%s\",\n" + " pubupdate AS \"%s\",\n" + " pubdelete AS \"%s\"", + gettext_noop("Name"), + gettext_noop("Owner"), + gettext_noop("All tables"), + gettext_noop("All sequences"), + gettext_noop("Inserts"), + gettext_noop("Updates"), + gettext_noop("Deletes")); + else + printfPQExpBuffer(&buf, + "SELECT pubname AS \"%s\",\n" + " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n" + " puballtables AS \"%s\",\n" + " pubinsert AS \"%s\",\n" + " pubupdate AS \"%s\",\n" + " pubdelete AS \"%s\"", + gettext_noop("Name"), + gettext_noop("Owner"), + gettext_noop("All tables"), + gettext_noop("Inserts"), + gettext_noop("Updates"), + gettext_noop("Deletes")); + if (pset.sversion >= 110000) appendPQExpBuffer(&buf, ",\n pubtruncate AS \"%s\"", gettext_noop("Truncates")); + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ",\n pubsequence AS \"%s\"", + gettext_noop("Sequences")); if (pset.sversion >= 130000) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", @@ -5936,6 +6027,7 @@ describePublications(const char *pattern) PGresult *res; bool has_pubtruncate; bool has_pubviaroot; + bool has_pubsequence; PQExpBufferData title; printTableContent cont; @@ -5952,6 +6044,7 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); + has_pubsequence = (pset.sversion >= 150000); initPQExpBuffer(&buf); @@ -5959,12 +6052,17 @@ describePublications(const char *pattern) "SELECT oid, pubname,\n" " pg_catalog.pg_get_userbyid(pubowner) AS owner,\n" " puballtables, pubinsert, pubupdate, pubdelete"); + if (has_pubtruncate) appendPQExpBufferStr(&buf, ", pubtruncate"); if (has_pubviaroot) appendPQExpBufferStr(&buf, ", pubviaroot"); + if (has_pubsequence) + appendPQExpBufferStr(&buf, + ", puballsequences, pubsequence"); + appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6005,6 +6103,7 @@ describePublications(const char *pattern) char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; + bool puballsequences = strcmp(PQgetvalue(res, i, 9), "t") == 0; printTableOpt myopt = pset.popt.topt; if (has_pubtruncate) @@ -6012,29 +6111,43 @@ describePublications(const char *pattern) if (has_pubviaroot) ncols++; + /* sequences have two extra columns (puballsequences, pubsequences) */ + if (has_pubsequence) + ncols += 2; + initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); printTableInit(&cont, &myopt, title.data, ncols, nrows); printTableAddHeader(&cont, gettext_noop("Owner"), true, align); printTableAddHeader(&cont, gettext_noop("All tables"), true, align); + if (has_pubsequence) + printTableAddHeader(&cont, gettext_noop("All sequences"), true, align); printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); printTableAddHeader(&cont, gettext_noop("Updates"), true, align); printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); if (has_pubtruncate) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); + if (has_pubsequence) + printTableAddHeader(&cont, gettext_noop("Sequences"), true, align); if (has_pubviaroot) printTableAddHeader(&cont, gettext_noop("Via root"), true, align); - printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); - printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); - printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); - printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); - printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); + printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); /* owner */ + printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); /* all tables */ + + if (has_pubsequence) + printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false); /* all sequences */ + + printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); /* insert */ + printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); /* update */ + printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); /* delete */ if (has_pubtruncate) - printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); + printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); /* truncate */ + if (has_pubsequence) + printTableAddCell(&cont, PQgetvalue(res, i, 10), false, false); /* sequence */ if (has_pubviaroot) - printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); + printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); /* via root */ if (!puballtables) { @@ -6054,6 +6167,7 @@ describePublications(const char *pattern) "WHERE c.relnamespace = n.oid\n" " AND c.oid = pr.prrelid\n" " AND pr.prpubid = '%s'\n" + " AND c.relkind != 'S'\n" /* exclude sequences */ "ORDER BY 1,2", pubid); if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont)) goto error_return; @@ -6065,7 +6179,7 @@ describePublications(const char *pattern) "SELECT n.nspname\n" "FROM pg_catalog.pg_namespace n\n" " JOIN pg_catalog.pg_publication_namespace pn ON n.oid = pn.pnnspid\n" - "WHERE pn.pnpubid = '%s'\n" + "WHERE pn.pnpubid = '%s' AND pn.pntype = 't'\n" "ORDER BY 1", pubid); if (!addFooterToPublicationDesc(&buf, "Tables from schemas:", true, &cont)) @@ -6073,6 +6187,37 @@ describePublications(const char *pattern) } } + if (!puballsequences) + { + /* Get the tables for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT n.nspname, c.relname, NULL\n" + "FROM pg_catalog.pg_class c,\n" + " pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_rel pr\n" + "WHERE c.relnamespace = n.oid\n" + " AND c.oid = pr.prrelid\n" + " AND pr.prpubid = '%s'\n" + " AND c.relkind = 'S'\n" /* only sequences */ + "ORDER BY 1,2", pubid); + if (!addFooterToPublicationDesc(&buf, "Sequences:", false, &cont)) + goto error_return; + + if (pset.sversion >= 150000) + { + /* Get the schemas for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n\n" + " JOIN pg_catalog.pg_publication_namespace pn ON n.oid = pn.pnnspid\n" + "WHERE pn.pnpubid = '%s' AND pn.pntype = 's'\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, "Sequences from schemas:", + true, &cont)) + goto error_return; + } + } + printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 5c064595a97..e59bd8302d6 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1815,11 +1815,15 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); /* ALTER PUBLICATION <name> ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) - COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + COMPLETE_WITH("ALL TABLES IN SCHEMA", "ALL SEQUENCES IN SCHEMA", "TABLE", "SEQUENCE"); else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") || (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") && ends_with(prev_wd, ','))) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "SEQUENCE") || + (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "SEQUENCE") && + ends_with(prev_wd, ','))) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_sequences); /* * "ALTER PUBLICATION <name> SET TABLE <name> WHERE (" - complete with * table attributes @@ -1838,11 +1842,11 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH(","); /* ALTER PUBLICATION <name> DROP */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) - COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + COMPLETE_WITH("ALL TABLES IN SCHEMA", "ALL SEQUENCES IN SCHEMA", "TABLE", "SEQUENCE"); /* ALTER PUBLICATION <name> SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE"); - else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES", "IN", "SCHEMA")) + COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "ALL SEQUENCES IN SCHEMA", "TABLE", "SEQUENCE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES|SEQUENCES", "IN", "SCHEMA")) COMPLETE_WITH_QUERY_PLUS(Query_for_list_of_schemas " AND nspname NOT LIKE E'pg\\\\_%%'", "CURRENT_SCHEMA"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d8e8715ed1c..699bd0aa3e3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11540,6 +11540,11 @@ provolatile => 's', prorettype => 'oid', proargtypes => 'text', proallargtypes => '{text,oid}', proargmodes => '{i,o}', proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8000', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index fe773cf9b7d..97f26208e1d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId) */ bool puballtables; + /* + * indicates that this is special publication which should encompass all + * sequences in the database (except for the unlogged and temp ones) + */ + bool puballsequences; + /* true if inserts are published */ bool pubinsert; @@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if truncates are published */ bool pubtruncate; + /* true if sequences are published */ + bool pubsequence; + /* true if partition changes are published using root schema */ bool pubviaroot; } FormData_pg_publication; @@ -72,6 +81,7 @@ typedef struct PublicationActions bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubsequence; } PublicationActions; typedef struct PublicationDesc @@ -92,6 +102,7 @@ typedef struct Publication Oid oid; char *name; bool alltables; + bool allsequences; bool pubviaroot; PublicationActions pubactions; } Publication; @@ -122,14 +133,15 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationRelations(Oid pubid, char objectType, + PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); -extern List *GetPublicationSchemas(Oid pubid); -extern List *GetSchemaPublications(Oid schemaid); -extern List *GetSchemaPublicationRelations(Oid schemaid, +extern List *GetPublicationSchemas(Oid pubid, char objectType); +extern List *GetSchemaPublications(Oid schemaid, char objectType); +extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType, PublicationPartOpt pub_partopt); -extern List *GetAllSchemaPublicationRelations(Oid puboid, +extern List *GetAllSchemaPublicationRelations(Oid puboid, char objectType, PublicationPartOpt pub_partopt); extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, @@ -137,11 +149,15 @@ extern List *GetPubPartitionOptionRelations(List *result, extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level); +extern List *GetAllSequencesPublications(void); +extern List *GetAllSequencesPublicationRelations(void); + extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, + char objectType, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_namespace.h b/src/include/catalog/pg_publication_namespace.h index e4306da02e7..7340a1ec646 100644 --- a/src/include/catalog/pg_publication_namespace.h +++ b/src/include/catalog/pg_publication_namespace.h @@ -32,6 +32,7 @@ CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) Oid oid; /* oid */ Oid pnpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid pnnspid BKI_LOOKUP(pg_namespace); /* Oid of the schema */ + char pntype; /* object type to include */ } FormData_pg_publication_namespace; /* ---------------- @@ -42,6 +43,13 @@ CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) typedef FormData_pg_publication_namespace *Form_pg_publication_namespace; DECLARE_UNIQUE_INDEX_PKEY(pg_publication_namespace_oid_index, 8902, PublicationNamespaceObjectIndexId, on pg_publication_namespace using btree(oid oid_ops)); -DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 8903, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_pntype_index, 8903, PublicationNamespacePnnspidPnpubidPntypeIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops, pntype char_ops)); + +/* object type to include from a schema, maps to relkind */ +#define PUB_OBJTYPE_TABLE 't' /* table (regular or partitioned) */ +#define PUB_OBJTYPE_SEQUENCE 's' /* sequence object */ +#define PUB_OBJTYPE_UNSUPPORTED 'u' /* used for non-replicated types */ + +extern char pub_get_object_type_for_relkind(char relkind); #endif /* PG_PUBLICATION_NAMESPACE_H */ diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9fecc41954e..5bab90db8e0 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt); extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *rptr); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 2f618cb2292..cb1fcc0ee31 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3665,6 +3665,10 @@ typedef enum PublicationObjSpecType PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of * search_path */ + PUBLICATIONOBJ_SEQUENCE, /* Sequence type */ + PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA, /* Sequences in schema type */ + PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA, /* Get the first element of + * search_path */ PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ } PublicationObjSpecType; @@ -3683,7 +3687,7 @@ typedef struct CreatePublicationStmt char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ List *pubobjects; /* Optional list of publication objects */ - bool for_all_tables; /* Special publication for all tables in db */ + List *for_all_objects; /* Special publication for all objects in db */ } CreatePublicationStmt; typedef enum AlterPublicationAction @@ -3706,7 +3710,7 @@ typedef struct AlterPublicationStmt * objects. */ List *pubobjects; /* Optional list of publication objects */ - bool for_all_tables; /* Special publication for all tables in db */ + List *for_all_objects; /* Special publication for all objects in db */ AlterPublicationAction action; /* What action to perform with the given * objects */ } AlterPublicationStmt; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4d2c881644a..fb86ca022d2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_SEQUENCE = 'Q', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -118,6 +119,18 @@ typedef struct LogicalRepTyp char *typname; /* name of the remote type */ } LogicalRepTyp; +/* Sequence info */ +typedef struct LogicalRepSequence +{ + Oid remoteid; /* unique id of the remote sequence */ + char *nspname; /* schema name of remote sequence */ + char *seqname; /* name of the remote sequence */ + bool transactional; + int64 last_value; + int64 log_cnt; + bool is_called; +} LogicalRepSequence; + /* Transaction info */ typedef struct LogicalRepBeginData { @@ -230,6 +243,12 @@ extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +extern void logicalrep_write_sequence(StringInfo out, Relation rel, + TransactionId xid, XLogRecPtr lsn, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); +extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a5..f4e9f35d09d 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool sequences; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out index 4117fc27c9a..c95d44b3db9 100644 --- a/src/test/regress/expected/object_address.out +++ b/src/test/regress/expected/object_address.out @@ -46,6 +46,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; CREATE PUBLICATION addr_pub_schema FOR ALL TABLES IN SCHEMA addr_nsp; +CREATE PUBLICATION addr_pub_schema2 FOR ALL SEQUENCES IN SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables @@ -428,7 +429,8 @@ WITH objects (type, name, args) AS (VALUES ('transform', '{int}', '{sql}'), ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), - ('publication namespace', '{addr_nsp}', '{addr_pub_schema}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema,t}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema2,s}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') @@ -492,8 +494,9 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, subscription | | regress_addr_sub | regress_addr_sub | t publication | | addr_pub | addr_pub | t publication relation | | | addr_nsp.gentable in publication addr_pub | t - publication namespace | | | addr_nsp in publication addr_pub_schema | t -(50 rows) + publication namespace | | | addr_nsp in publication addr_pub_schema type t | t + publication namespace | | | addr_nsp in publication addr_pub_schema2 type s | t +(51 rows) --- --- Cleanup resources @@ -506,6 +509,7 @@ drop cascades to server integer drop cascades to user mapping for regress_addr_user on server integer DROP PUBLICATION addr_pub; DROP PUBLICATION addr_pub_schema; +DROP PUBLICATION addr_pub_schema2; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4e191c120ac..92f6122d409 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -30,20 +30,20 @@ ERROR: conflicting or redundant options LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi... ^ \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------+--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + testpib_ins_trunct | regress_publication_user | f | f | t | f | f | f | f | f + testpub_default | regress_publication_user | f | f | f | t | f | f | f | f (2 rows) -ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); +ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete, sequence'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------+--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + testpib_ins_trunct | regress_publication_user | f | f | t | f | f | f | f | f + testpub_default | regress_publication_user | f | f | t | t | t | f | t | f (2 rows) --- adding tables @@ -61,6 +61,9 @@ CREATE TABLE testpub_tbl2 (id serial primary key, data text); ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +-- fail - can't add a table using ADD SEQUENCE command +ALTER PUBLICATION testpub_foralltables ADD SEQUENCE testpub_tbl2; +ERROR: object type does not match type expected by command -- fail - can't drop from all tables publication ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES @@ -87,10 +90,10 @@ RESET client_min_messages; -- should be able to add schema to 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "public.testpub_tbl1" Tables from schemas: @@ -99,20 +102,20 @@ Tables from schemas: -- should be able to drop schema from 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "public.testpub_tbl1" -- should be able to set schema to 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test" @@ -134,10 +137,10 @@ ERROR: relation "testpub_nopk" is not part of the publication -- should be able to set table to schema publication ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk; \dRp+ testpub_forschema - Publication testpub_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "pub_test.testpub_nopk" @@ -159,10 +162,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | t | t | t | f | f | f + Publication testpub_foralltables + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | t | f | t | t | f | f | f | f (1 row) DROP TABLE testpub_tbl2; @@ -174,24 +177,527 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "public.testpub_tbl3" DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +--- adding sequences +CREATE SEQUENCE testpub_seq0; +CREATE SEQUENCE pub_test.testpub_seq1; +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forallsequences FOR ALL SEQUENCES WITH (publish = 'sequence'); +RESET client_min_messages; +ALTER PUBLICATION testpub_forallsequences SET (publish = 'insert, sequence'); +CREATE SEQUENCE testpub_seq2; +-- fail - can't add to for all sequences publication +ALTER PUBLICATION testpub_forallsequences ADD SEQUENCE testpub_seq2; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications. +-- fail - can't drop from all sequences publication +ALTER PUBLICATION testpub_forallsequences DROP SEQUENCE testpub_seq2; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications. +-- fail - can't add to for all sequences publication +ALTER PUBLICATION testpub_forallsequences SET SEQUENCE pub_test.testpub_seq1; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications. +-- fail - can't add schema to 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences ADD ALL SEQUENCES IN SCHEMA pub_test; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications. +-- fail - can't drop schema from 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences DROP ALL SEQUENCES IN SCHEMA pub_test; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications. +-- fail - can't set schema to 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences SET ALL SEQUENCES IN SCHEMA pub_test; +ERROR: publication "testpub_forallsequences" is defined as FOR ALL SEQUENCES +DETAIL: Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forsequence FOR SEQUENCE testpub_seq0; +RESET client_min_messages; +-- should be able to add schema to 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence ADD ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence + Publication testpub_forsequence + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Sequences: + "public.testpub_seq0" +Sequences from schemas: + "pub_test" + +-- fail - can't add sequence from the schema we already added +ALTER PUBLICATION testpub_forsequence ADD SEQUENCE pub_test.testpub_seq1; +ERROR: cannot add relation "pub_test.testpub_seq1" to publication +DETAIL: Sequence's schema "pub_test" is already part of the publication or part of the specified schema list. +-- fail - can't add sequence using ADD TABLE command +ALTER PUBLICATION testpub_forsequence ADD TABLE pub_test.testpub_seq1; +ERROR: object type does not match type expected by command +-- should be able to drop schema from 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence DROP ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence + Publication testpub_forsequence + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Sequences: + "public.testpub_seq0" + +-- should be able to set schema to 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence SET ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence + Publication testpub_forsequence + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Sequences from schemas: + "pub_test" + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test; +RESET client_min_messages; +-- fail - can't create publication with schema and sequence of the same schema +CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA pub_test, SEQUENCE pub_test.testpub_seq1; +ERROR: cannot add relation "pub_test.testpub_seq1" to publication +DETAIL: Sequence's schema "pub_test" is already part of the publication or part of the specified schema list. +-- fail - can't add a sequence of the same schema to the schema publication +ALTER PUBLICATION testpub_forschema ADD SEQUENCE pub_test.testpub_seq1; +ERROR: cannot add relation "pub_test.testpub_seq1" to publication +DETAIL: Sequence's schema "pub_test" is already part of the publication or part of the specified schema list. +-- fail - can't drop a sequence from the schema publication which isn't in the +-- publication +ALTER PUBLICATION testpub_forschema DROP SEQUENCE pub_test.testpub_seq1; +ERROR: relation "testpub_seq1" is not part of the publication +-- should be able to set sequence to schema publication +ALTER PUBLICATION testpub_forschema SET SEQUENCE pub_test.testpub_seq1; +\dRp+ testpub_forschema + Publication testpub_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Sequences: + "pub_test.testpub_seq1" + +SELECT pubname, puballtables, puballsequences FROM pg_publication WHERE pubname = 'testpub_forallsequences'; + pubname | puballtables | puballsequences +-------------------------+--------------+----------------- + testpub_forallsequences | f | t +(1 row) + +\d+ pub_test.testpub_seq1 + Sequence "pub_test.testpub_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_forallsequences" + "testpub_forschema" + "testpub_forsequence" + +\dRp+ testpub_forallsequences + Publication testpub_forallsequences + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | t | t | f | f | f | t | f +(1 row) + +DROP SEQUENCE testpub_seq0, pub_test.testpub_seq1, testpub_seq2; +DROP PUBLICATION testpub_forallsequences, testpub_forsequence, testpub_forschema; +-- Publication mixing tables and sequences +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_mix; +RESET client_min_messages; +CREATE SEQUENCE testpub_seq1; +CREATE SEQUENCE pub_test.testpub_seq2; +ALTER PUBLICATION testpub_mix ADD SEQUENCE testpub_seq1, TABLE testpub_tbl1; +\dRp+ testpub_mix + Publication testpub_mix + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl1" +Sequences: + "public.testpub_seq1" + +ALTER PUBLICATION testpub_mix ADD ALL SEQUENCES IN SCHEMA pub_test, ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_mix + Publication testpub_mix + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl1" +Tables from schemas: + "pub_test" +Sequences: + "public.testpub_seq1" +Sequences from schemas: + "pub_test" + +ALTER PUBLICATION testpub_mix DROP ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_mix + Publication testpub_mix + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl1" +Tables from schemas: + "pub_test" +Sequences: + "public.testpub_seq1" + +ALTER PUBLICATION testpub_mix DROP ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_mix + Publication testpub_mix + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl1" +Sequences: + "public.testpub_seq1" + +DROP PUBLICATION testpub_mix; +DROP SEQUENCE testpub_seq1; +DROP SEQUENCE pub_test.testpub_seq2; +-- make sure we replicate only the correct relation type +CREATE SCHEMA pub_test1; +CREATE SEQUENCE pub_test1.test_seq1; +CREATE TABLE pub_test1.test_tbl1 (a int primary key, b int); +CREATE SCHEMA pub_test2; +CREATE SEQUENCE pub_test2.test_seq2; +CREATE TABLE pub_test2.test_tbl2 (a int primary key, b int); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_schemas; +RESET client_min_messages; +-- add tables from one schema, sequences from the other +ALTER PUBLICATION testpub_schemas ADD ALL TABLES IN SCHEMA pub_test2; +ALTER PUBLICATION testpub_schemas ADD ALL SEQUENCES IN SCHEMA pub_test1; +\dRp+ testpub_schemas + Publication testpub_schemas + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables from schemas: + "pub_test2" +Sequences from schemas: + "pub_test1" + +\dn+ pub_test1 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test1 | regress_publication_user | | +Publications: + "testpub_schemas" (sequences) + +\dn+ pub_test2 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test2 | regress_publication_user | | +Publications: + "testpub_schemas" (tables) + +\d+ pub_test1.test_seq1; + Sequence "pub_test1.test_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test1.test_tbl1; + Table "pub_test1.test_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl1_pkey" PRIMARY KEY, btree (a) + +\d+ pub_test2.test_seq2; + Sequence "pub_test2.test_seq2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +\d+ pub_test2.test_tbl2; + Table "pub_test2.test_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl2_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +-- add the other object type from each schema +ALTER PUBLICATION testpub_schemas ADD ALL TABLES IN SCHEMA pub_test1; +ALTER PUBLICATION testpub_schemas ADD ALL SEQUENCES IN SCHEMA pub_test2; +\dRp+ testpub_schemas + Publication testpub_schemas + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" +Sequences from schemas: + "pub_test1" + "pub_test2" + +\dn+ pub_test1 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test1 | regress_publication_user | | +Publications: + "testpub_schemas" (sequences) + "testpub_schemas" (tables) + +\dn+ pub_test2 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test2 | regress_publication_user | | +Publications: + "testpub_schemas" (tables) + "testpub_schemas" (sequences) + +\d+ pub_test1.test_seq1; + Sequence "pub_test1.test_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test1.test_tbl1; + Table "pub_test1.test_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl1_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +\d+ pub_test2.test_seq2; + Sequence "pub_test2.test_seq2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test2.test_tbl2; + Table "pub_test2.test_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl2_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +-- now drop the object type added first +ALTER PUBLICATION testpub_schemas DROP ALL TABLES IN SCHEMA pub_test2; +ALTER PUBLICATION testpub_schemas DROP ALL SEQUENCES IN SCHEMA pub_test1; +\dRp+ testpub_schemas + Publication testpub_schemas + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables from schemas: + "pub_test1" +Sequences from schemas: + "pub_test2" + +\dn+ pub_test1 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test1 | regress_publication_user | | +Publications: + "testpub_schemas" (tables) + +\dn+ pub_test2 + List of schemas + Name | Owner | Access privileges | Description +-----------+--------------------------+-------------------+------------- + pub_test2 | regress_publication_user | | +Publications: + "testpub_schemas" (sequences) + +\d+ pub_test1.test_seq1; + Sequence "pub_test1.test_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +\d+ pub_test1.test_tbl1; + Table "pub_test1.test_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl1_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +\d+ pub_test2.test_seq2; + Sequence "pub_test2.test_seq2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test2.test_tbl2; + Table "pub_test2.test_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl2_pkey" PRIMARY KEY, btree (a) + +-- should fail (publication contains the whole schema) +ALTER PUBLICATION testpub_schemas ADD TABLE pub_test1.test_tbl1; +ERROR: cannot add relation "pub_test1.test_tbl1" to publication +DETAIL: Table's schema "pub_test1" is already part of the publication or part of the specified schema list. +ALTER PUBLICATION testpub_schemas ADD SEQUENCE pub_test2.test_seq2; +ERROR: cannot add relation "pub_test2.test_seq2" to publication +DETAIL: Sequence's schema "pub_test2" is already part of the publication or part of the specified schema list. +-- should work (different schema) +ALTER PUBLICATION testpub_schemas ADD TABLE pub_test2.test_tbl2; +ALTER PUBLICATION testpub_schemas ADD SEQUENCE pub_test1.test_seq1; +\dRp+ testpub_schemas + Publication testpub_schemas + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "pub_test2.test_tbl2" +Tables from schemas: + "pub_test1" +Sequences: + "pub_test1.test_seq1" +Sequences from schemas: + "pub_test2" + +\d+ pub_test1.test_seq1; + Sequence "pub_test1.test_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test1.test_tbl1; + Table "pub_test1.test_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl1_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +\d+ pub_test2.test_seq2; + Sequence "pub_test2.test_seq2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test2.test_tbl2; + Table "pub_test2.test_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl2_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +-- now drop the explicitly added objects again +ALTER PUBLICATION testpub_schemas DROP TABLE pub_test2.test_tbl2; +ALTER PUBLICATION testpub_schemas DROP SEQUENCE pub_test1.test_seq1; +\dRp+ testpub_schemas + Publication testpub_schemas + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables from schemas: + "pub_test1" +Sequences from schemas: + "pub_test2" + +\d+ pub_test1.test_seq1; + Sequence "pub_test1.test_seq1" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +\d+ pub_test1.test_tbl1; + Table "pub_test1.test_tbl1" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl1_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_schemas" + +\d+ pub_test2.test_seq2; + Sequence "pub_test2.test_seq2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------+-------+---------+---------------------+-----------+---------+------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 +Publications: + "testpub_schemas" + +\d+ pub_test2.test_tbl2; + Table "pub_test2.test_tbl2" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | +Indexes: + "test_tbl2_pkey" PRIMARY KEY, btree (a) + +DROP PUBLICATION testpub_schemas; +DROP TABLE pub_test1.test_tbl1, pub_test2.test_tbl2; +DROP SEQUENCE pub_test1.test_seq1, pub_test2.test_seq2; +DROP SCHEMA pub_test1, pub_test2; -- Tests for partitioned tables SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_forparted; @@ -207,10 +713,10 @@ UPDATE testpub_parted1 SET a = 1; -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forparted + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "public.testpub_parted" @@ -223,10 +729,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1; ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | t + Publication testpub_forparted + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | t Tables: "public.testpub_parted" @@ -255,10 +761,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5)) @@ -271,10 +777,10 @@ Tables: ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5)) @@ -290,10 +796,10 @@ Publications: ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000)) @@ -301,10 +807,10 @@ Tables: -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500)) @@ -337,10 +843,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub_syntax1 - Publication testpub_syntax1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub_syntax1 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl3" WHERE (e < 999) @@ -350,10 +856,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub_syntax2 - Publication testpub_syntax2 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub_syntax2 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999) @@ -658,10 +1164,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortbl + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -699,10 +1205,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | f | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -780,10 +1286,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | f | t | f (1 row) -- fail - must be owner of publication @@ -793,20 +1299,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +-------------+--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + testpub_foo | regress_publication_user | f | f | t | t | t | f | t | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +-----------------+---------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + testpub_default | regress_publication_user2 | f | f | t | t | t | f | t | f (1 row) -- adding schemas and tables @@ -822,19 +1328,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -848,44 +1354,44 @@ CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA", CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"; RESET client_min_messages; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "public" \dRp+ testpub4_forschema - Publication testpub4_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "CURRENT_SCHEMA" \dRp+ testpub5_forschema - Publication testpub5_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub5_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "CURRENT_SCHEMA" "public" \dRp+ testpub6_forschema - Publication testpub6_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub6_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "CURRENT_SCHEMA" "public" \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "CURRENT_SCHEMA.CURRENT_SCHEMA" @@ -919,10 +1425,10 @@ ERROR: schema "testpub_view" does not exist -- dropping the schema should reflect the change in publication DROP SCHEMA pub_test3; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -930,20 +1436,20 @@ Tables from schemas: -- renaming the schema should reflect the change in publication ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1_renamed" "pub_test2" ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -951,10 +1457,10 @@ Tables from schemas: -- alter publication add schema ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -963,10 +1469,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -975,10 +1481,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1; ERROR: schema "pub_test1" is already member of publication "testpub1_forschema" \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -986,10 +1492,10 @@ Tables from schemas: -- alter publication drop schema ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" @@ -997,10 +1503,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; ERROR: tables from schema "pub_test2" are not part of the publication \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" @@ -1008,29 +1514,29 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" -- drop all schemas ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f (1 row) -- alter publication set multiple schema ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -1039,10 +1545,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" "pub_test2" @@ -1051,10 +1557,10 @@ Tables from schemas: -- removing the duplicate schemas ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" @@ -1124,18 +1630,18 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3_forschema; RESET client_min_messages; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f (1 row) ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables from schemas: "pub_test1" @@ -1145,20 +1651,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1; RESET client_min_messages; \dRp+ testpub_forschema_fortable - Publication testpub_forschema_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forschema_fortable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "pub_test2.tbl1" Tables from schemas: "pub_test1" \dRp+ testpub_fortable_forschema - Publication testpub_fortable_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable_forschema + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f Tables: "pub_test2.tbl1" Tables from schemas: @@ -1202,40 +1708,85 @@ CREATE SCHEMA sch1; CREATE SCHEMA sch2; CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +CREATE SEQUENCE sch1.seq1; +CREATE SEQUENCE sch2.seq2; -- Schema publication that does not include the schema that has the parent table CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch2; SELECT * FROM pg_publication_tables; pubname | schemaname | tablename ---------+------------+------------ pub | sch2 | tbl1_part1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + DROP PUBLICATION pub; -- Table publication that does not include the parent table CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +ALTER PUBLICATION pub ADD SEQUENCE sch2.seq2; SELECT * FROM pg_publication_tables; pubname | schemaname | tablename ---------+------------+------------ pub | sch2 | tbl1_part1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; SELECT * FROM pg_publication_tables; pubname | schemaname | tablename ---------+------------+----------- pub | sch1 | tbl1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch1 | seq1 + pub | sch2 | seq2 +(2 rows) + DROP PUBLICATION pub; -- Schema publication that does not include the schema that has the parent table CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; SELECT * FROM pg_publication_tables; pubname | schemaname | tablename ---------+------------+------------ pub | sch2 | tbl1_part1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch1 | seq1 +(1 row) + +DROP PUBLICATION pub; +-- Sequence publication +CREATE PUBLICATION pub FOR SEQUENCE sch2.seq2; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + DROP PUBLICATION pub; -- Table publication that does not include the parent table CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0); @@ -1245,14 +1796,26 @@ SELECT * FROM pg_publication_tables; pub | sch2 | tbl1_part1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- +(0 rows) + -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch2; SELECT * FROM pg_publication_tables; pubname | schemaname | tablename ---------+------------+------------ pub | sch2 | tbl1_part1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + DROP PUBLICATION pub; DROP TABLE sch2.tbl1_part1; DROP TABLE sch1.tbl1; @@ -1268,9 +1831,81 @@ SELECT * FROM pg_publication_tables; pub | sch1 | tbl1 (1 row) +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- +(0 rows) + +DROP PUBLICATION pub; +-- Schema publication +CREATE PUBLICATION pub FOR SEQUENCE sch2.seq2; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + +DROP PUBLICATION pub; +-- Sequence publication +CREATE PUBLICATION pub FOR ALL SEQUENCES IN SCHEMA sch2; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch1 | seq1 + pub | sch2 | seq2 +(2 rows) + +ALTER PUBLICATION pub DROP SEQUENCE sch1.seq1; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch2 | seq2 +(1 row) + +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch1; +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- +(0 rows) + +SELECT * FROM pg_publication_sequences; + pubname | schemaname | sequencename +---------+------------+-------------- + pub | sch1 | seq1 + pub | sch2 | seq2 +(2 rows) + RESET client_min_messages; DROP PUBLICATION pub; DROP TABLE sch1.tbl1; +DROP SEQUENCE sch1.seq1, sch2.seq2; DROP SCHEMA sch1 cascade; DROP SCHEMA sch2 cascade; RESET SESSION AUTHORIZATION; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6cb6388880d..8a5b20e62cb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1429,6 +1429,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gps.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql index acd0468a9d9..9d8323468d6 100644 --- a/src/test/regress/sql/object_address.sql +++ b/src/test/regress/sql/object_address.sql @@ -49,6 +49,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; CREATE PUBLICATION addr_pub_schema FOR ALL TABLES IN SCHEMA addr_nsp; +CREATE PUBLICATION addr_pub_schema2 FOR ALL SEQUENCES IN SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM addr_nsp.gentable; @@ -198,7 +199,8 @@ WITH objects (type, name, args) AS (VALUES ('transform', '{int}', '{sql}'), ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), - ('publication namespace', '{addr_nsp}', '{addr_pub_schema}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema,t}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema2,s}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') @@ -218,6 +220,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, DROP FOREIGN DATA WRAPPER addr_fdw CASCADE; DROP PUBLICATION addr_pub; DROP PUBLICATION addr_pub_schema; +DROP PUBLICATION addr_pub_schema2; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5457c56b33f..c195e75c6f0 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -27,7 +27,7 @@ CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publis \dRp -ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); +ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete, sequence'); \dRp @@ -46,6 +46,8 @@ ALTER PUBLICATION testpub_foralltables SET (publish = 'insert, update'); CREATE TABLE testpub_tbl2 (id serial primary key, data text); -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; +-- fail - can't add a table using ADD SEQUENCE command +ALTER PUBLICATION testpub_foralltables ADD SEQUENCE testpub_tbl2; -- fail - can't drop from all tables publication ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; -- fail - can't add to for all tables publication @@ -104,6 +106,183 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +--- adding sequences +CREATE SEQUENCE testpub_seq0; +CREATE SEQUENCE pub_test.testpub_seq1; + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forallsequences FOR ALL SEQUENCES WITH (publish = 'sequence'); +RESET client_min_messages; +ALTER PUBLICATION testpub_forallsequences SET (publish = 'insert, sequence'); + +CREATE SEQUENCE testpub_seq2; +-- fail - can't add to for all sequences publication +ALTER PUBLICATION testpub_forallsequences ADD SEQUENCE testpub_seq2; +-- fail - can't drop from all sequences publication +ALTER PUBLICATION testpub_forallsequences DROP SEQUENCE testpub_seq2; +-- fail - can't add to for all sequences publication +ALTER PUBLICATION testpub_forallsequences SET SEQUENCE pub_test.testpub_seq1; + +-- fail - can't add schema to 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences ADD ALL SEQUENCES IN SCHEMA pub_test; +-- fail - can't drop schema from 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences DROP ALL SEQUENCES IN SCHEMA pub_test; +-- fail - can't set schema to 'FOR ALL SEQUENCES' publication +ALTER PUBLICATION testpub_forallsequences SET ALL SEQUENCES IN SCHEMA pub_test; + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forsequence FOR SEQUENCE testpub_seq0; +RESET client_min_messages; +-- should be able to add schema to 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence ADD ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence +-- fail - can't add sequence from the schema we already added +ALTER PUBLICATION testpub_forsequence ADD SEQUENCE pub_test.testpub_seq1; +-- fail - can't add sequence using ADD TABLE command +ALTER PUBLICATION testpub_forsequence ADD TABLE pub_test.testpub_seq1; +-- should be able to drop schema from 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence DROP ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence +-- should be able to set schema to 'FOR SEQUENCE' publication +ALTER PUBLICATION testpub_forsequence SET ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_forsequence + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test; +RESET client_min_messages; +-- fail - can't create publication with schema and sequence of the same schema +CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA pub_test, SEQUENCE pub_test.testpub_seq1; +-- fail - can't add a sequence of the same schema to the schema publication +ALTER PUBLICATION testpub_forschema ADD SEQUENCE pub_test.testpub_seq1; +-- fail - can't drop a sequence from the schema publication which isn't in the +-- publication +ALTER PUBLICATION testpub_forschema DROP SEQUENCE pub_test.testpub_seq1; +-- should be able to set sequence to schema publication +ALTER PUBLICATION testpub_forschema SET SEQUENCE pub_test.testpub_seq1; +\dRp+ testpub_forschema + +SELECT pubname, puballtables, puballsequences FROM pg_publication WHERE pubname = 'testpub_forallsequences'; +\d+ pub_test.testpub_seq1 +\dRp+ testpub_forallsequences +DROP SEQUENCE testpub_seq0, pub_test.testpub_seq1, testpub_seq2; +DROP PUBLICATION testpub_forallsequences, testpub_forsequence, testpub_forschema; + +-- Publication mixing tables and sequences +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_mix; +RESET client_min_messages; + +CREATE SEQUENCE testpub_seq1; +CREATE SEQUENCE pub_test.testpub_seq2; + +ALTER PUBLICATION testpub_mix ADD SEQUENCE testpub_seq1, TABLE testpub_tbl1; +\dRp+ testpub_mix + +ALTER PUBLICATION testpub_mix ADD ALL SEQUENCES IN SCHEMA pub_test, ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_mix + +ALTER PUBLICATION testpub_mix DROP ALL SEQUENCES IN SCHEMA pub_test; +\dRp+ testpub_mix + +ALTER PUBLICATION testpub_mix DROP ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_mix + +DROP PUBLICATION testpub_mix; +DROP SEQUENCE testpub_seq1; +DROP SEQUENCE pub_test.testpub_seq2; + + +-- make sure we replicate only the correct relation type +CREATE SCHEMA pub_test1; +CREATE SEQUENCE pub_test1.test_seq1; +CREATE TABLE pub_test1.test_tbl1 (a int primary key, b int); + +CREATE SCHEMA pub_test2; +CREATE SEQUENCE pub_test2.test_seq2; +CREATE TABLE pub_test2.test_tbl2 (a int primary key, b int); + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_schemas; +RESET client_min_messages; + +-- add tables from one schema, sequences from the other +ALTER PUBLICATION testpub_schemas ADD ALL TABLES IN SCHEMA pub_test2; +ALTER PUBLICATION testpub_schemas ADD ALL SEQUENCES IN SCHEMA pub_test1; + +\dRp+ testpub_schemas + +\dn+ pub_test1 +\dn+ pub_test2 + +\d+ pub_test1.test_seq1; +\d+ pub_test1.test_tbl1; + +\d+ pub_test2.test_seq2; +\d+ pub_test2.test_tbl2; + +-- add the other object type from each schema +ALTER PUBLICATION testpub_schemas ADD ALL TABLES IN SCHEMA pub_test1; +ALTER PUBLICATION testpub_schemas ADD ALL SEQUENCES IN SCHEMA pub_test2; + +\dRp+ testpub_schemas + +\dn+ pub_test1 +\dn+ pub_test2 + +\d+ pub_test1.test_seq1; +\d+ pub_test1.test_tbl1; + +\d+ pub_test2.test_seq2; +\d+ pub_test2.test_tbl2; + +-- now drop the object type added first +ALTER PUBLICATION testpub_schemas DROP ALL TABLES IN SCHEMA pub_test2; +ALTER PUBLICATION testpub_schemas DROP ALL SEQUENCES IN SCHEMA pub_test1; + +\dRp+ testpub_schemas + +\dn+ pub_test1 +\dn+ pub_test2 + +\d+ pub_test1.test_seq1; +\d+ pub_test1.test_tbl1; + +\d+ pub_test2.test_seq2; +\d+ pub_test2.test_tbl2; + +-- should fail (publication contains the whole schema) +ALTER PUBLICATION testpub_schemas ADD TABLE pub_test1.test_tbl1; +ALTER PUBLICATION testpub_schemas ADD SEQUENCE pub_test2.test_seq2; + +-- should work (different schema) +ALTER PUBLICATION testpub_schemas ADD TABLE pub_test2.test_tbl2; +ALTER PUBLICATION testpub_schemas ADD SEQUENCE pub_test1.test_seq1; + +\dRp+ testpub_schemas + +\d+ pub_test1.test_seq1; +\d+ pub_test1.test_tbl1; + +\d+ pub_test2.test_seq2; +\d+ pub_test2.test_tbl2; + +-- now drop the explicitly added objects again +ALTER PUBLICATION testpub_schemas DROP TABLE pub_test2.test_tbl2; +ALTER PUBLICATION testpub_schemas DROP SEQUENCE pub_test1.test_seq1; + +\dRp+ testpub_schemas + +\d+ pub_test1.test_seq1; +\d+ pub_test1.test_tbl1; + +\d+ pub_test2.test_seq2; +\d+ pub_test2.test_tbl2; + +DROP PUBLICATION testpub_schemas; +DROP TABLE pub_test1.test_tbl1, pub_test2.test_tbl2; +DROP SEQUENCE pub_test1.test_seq1, pub_test2.test_seq2; +DROP SCHEMA pub_test1, pub_test2; + -- Tests for partitioned tables SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_forparted; @@ -717,32 +896,51 @@ CREATE SCHEMA sch1; CREATE SCHEMA sch2; CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +CREATE SEQUENCE sch1.seq1; +CREATE SEQUENCE sch2.seq2; -- Schema publication that does not include the schema that has the parent table CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch2; SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; DROP PUBLICATION pub; -- Table publication that does not include the parent table CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +ALTER PUBLICATION pub ADD SEQUENCE sch2.seq2; SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; DROP PUBLICATION pub; -- Schema publication that does not include the schema that has the parent table CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +DROP PUBLICATION pub; +-- Sequence publication +CREATE PUBLICATION pub FOR SEQUENCE sch2.seq2; SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; DROP PUBLICATION pub; -- Table publication that does not include the parent table CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0); SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch2; SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; DROP PUBLICATION pub; DROP TABLE sch2.tbl1_part1; @@ -755,10 +953,36 @@ CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a); ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30); CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +DROP PUBLICATION pub; +-- Schema publication +CREATE PUBLICATION pub FOR SEQUENCE sch2.seq2; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +DROP PUBLICATION pub; +-- Sequence publication +CREATE PUBLICATION pub FOR ALL SEQUENCES IN SCHEMA sch2; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +ALTER PUBLICATION pub ADD SEQUENCE sch1.seq1; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +ALTER PUBLICATION pub DROP SEQUENCE sch1.seq1; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; + +ALTER PUBLICATION pub ADD ALL SEQUENCES IN SCHEMA sch1; +SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication_sequences; RESET client_min_messages; DROP PUBLICATION pub; DROP TABLE sch1.tbl1; +DROP SEQUENCE sch1.seq1, sch2.seq2; DROP SCHEMA sch1 cascade; DROP SCHEMA sch2 cascade; diff --git a/src/test/subscription/t/030_sequences.pl b/src/test/subscription/t/030_sequences.pl new file mode 100644 index 00000000000..9ae3c03d7d1 --- /dev/null +++ b/src/test/subscription/t/030_sequences.pl @@ -0,0 +1,202 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This tests that sequences are replicated correctly by logical replication +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +my $ddl = qq( + CREATE TABLE seq_test (v BIGINT); + CREATE SEQUENCE s; +); + +# Setup structure on the publisher +$node_publisher->safe_psql('postgres', $ddl); + +# Create some the same structure on subscriber, and an extra sequence that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE seq_test (v BIGINT); + CREATE SEQUENCE s; + CREATE SEQUENCE s2; +); + +$node_subscriber->safe_psql('postgres', $ddl); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION seq_pub"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION seq_pub ADD SEQUENCE s"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" +); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Check the data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s; +)); + +is( $result, '132|0|t', + 'initial test data replicated'); + + +# advance the sequence in a rolled-back transaction - the rollback +# does not wait for the replication, so we could see any intermediate state +# so do something else after the test, to ensure we wait for everything +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); + ROLLBACK; + INSERT INTO seq_test VALUES (-1); +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s; +)); + +is( $result, '231|0|t', + 'advance sequence in rolled-back transaction'); + + +# create a new sequence and roll it back - should not be replicated, due to +# the transactional behavior +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + CREATE SEQUENCE s2; + ALTER PUBLICATION seq_pub ADD SEQUENCE s2; + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); + ROLLBACK; +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is( $result, '1|0|f', + 'create new sequence and roll it back'); + + +# create a new sequence, advance it in a rolled-back transaction, but commit +# the create - the advance should be replicated nevertheless +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + CREATE SEQUENCE s2; + ALTER PUBLICATION seq_pub ADD SEQUENCE s2; + SAVEPOINT sp1; + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); + ROLLBACK TO sp1; + COMMIT; +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Wait for sync of the second sequence we just added to finish +$synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is( $result, '132|0|t', + 'create sequence, advance it in rolled-back transaction, but commit the create'); + + +# advance the new sequence in a transaction, and roll it back - the rollback +# does not wait for the replication, so we could see any intermediate state +# so do something else after the test, to ensure we wait for everything +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); + ROLLBACK; + INSERT INTO seq_test VALUES (-1); +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is( $result, '231|0|t', + 'advance the new sequence in a transaction and roll it back'); + + +# advance the sequence in a subtransaction - the subtransaction gets rolled +# back, but commit the main one - the changes should still be replicated +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SAVEPOINT s1; + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); + ROLLBACK TO s1; + COMMIT; +)); + +$node_publisher->wait_for_catchup('seq_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is( $result, '330|0|t', + 'advance sequence in a subtransaction'); + + +done_testing(); |