diff options
author | Peter Eisentraut <peter@eisentraut.org> | 2022-02-14 08:09:04 +0100 |
---|---|---|
committer | Peter Eisentraut <peter@eisentraut.org> | 2022-02-14 08:27:26 +0100 |
commit | 37851a8b83d3d57ca48736093b10aa5f3bc0c177 (patch) | |
tree | 5da6f597063d9579937e80aaf66d92c50ef73029 /src/backend/commands/dbcommands.c | |
parent | 9898c5e03c40c133a9a01d8b2b36cb7c990b30d5 (diff) | |
download | postgresql-37851a8b83d3d57ca48736093b10aa5f3bc0c177.tar.gz postgresql-37851a8b83d3d57ca48736093b10aa5f3bc0c177.zip |
Database-level collation version tracking
This adds to database objects the same version tracking that collation
objects have. There is a new pg_database column datcollversion that
stores the version, a new function
pg_database_collation_actual_version() to get the version from the
operating system, and a new subcommand ALTER DATABASE ... REFRESH
COLLATION VERSION.
This was not originally added together with pg_collation.collversion,
since originally version tracking was only supported for ICU, and ICU
on a database-level is not currently supported. But we now have
version tracking for glibc (since PG13), FreeBSD (since PG14), and
Windows (since PG13), so this is useful to have now.
Reviewed-by: Julien Rouhaud <rjuju123@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/f0ff3190-29a3-5b39-a179-fa32eee57db6%40enterprisedb.com
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 194 |
1 files changed, 188 insertions, 6 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 700b1209652..c37e3c9a9a4 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -36,6 +36,7 @@ #include "catalog/indexing.h" #include "catalog/objectaccess.h" #include "catalog/pg_authid.h" +#include "catalog/pg_collation.h" #include "catalog/pg_database.h" #include "catalog/pg_db_role_setting.h" #include "catalog/pg_subscription.h" @@ -85,7 +86,8 @@ static bool get_db_info(const char *name, LOCKMODE lockmode, Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP, - Oid *dbTablespace, char **dbCollate, char **dbCtype); + Oid *dbTablespace, char **dbCollate, char **dbCtype, + char **dbCollversion); static bool have_createdb_privilege(void); static void remove_dbtablespaces(Oid db_id); static bool check_db_file_conflict(Oid db_id); @@ -105,6 +107,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) int src_encoding = -1; char *src_collate = NULL; char *src_ctype = NULL; + char *src_collversion = NULL; bool src_istemplate; bool src_allowconn; TransactionId src_frozenxid = InvalidTransactionId; @@ -128,6 +131,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) DefElem *distemplate = NULL; DefElem *dallowconnections = NULL; DefElem *dconnlimit = NULL; + DefElem *dcollversion = NULL; char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; @@ -138,6 +142,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) bool dbistemplate = false; bool dballowconnections = true; int dbconnlimit = -1; + char *dbcollversion = NULL; int notherbackends; int npreparedxacts; createdb_failure_params fparms; @@ -207,6 +212,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) errorConflictingDefElem(defel, pstate); dconnlimit = defel; } + else if (strcmp(defel->defname, "collation_version") == 0) + { + if (dcollversion) + errorConflictingDefElem(defel, pstate); + dcollversion = defel; + } else if (strcmp(defel->defname, "location") == 0) { ereport(WARNING, @@ -305,6 +316,8 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid connection limit: %d", dbconnlimit))); } + if (dcollversion) + dbcollversion = defGetString(dcollversion); /* obtain OID of proposed owner */ if (dbowner) @@ -342,7 +355,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) &src_dboid, &src_owner, &src_encoding, &src_istemplate, &src_allowconn, &src_frozenxid, &src_minmxid, &src_deftablespace, - &src_collate, &src_ctype)) + &src_collate, &src_ctype, &src_collversion)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("template database \"%s\" does not exist", @@ -424,6 +437,52 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) errhint("Use the same LC_CTYPE as in the template database, or use template0 as template."))); } + /* + * If we got a collation version for the template database, check that it + * matches the actual OS collation version. Otherwise error; the user + * needs to fix the template database first. Don't complain if a + * collation version was specified explicitly as a statement option; that + * is used by pg_upgrade to reproduce the old state exactly. + * + * (If the template database has no collation version, then either the + * platform/provider does not support collation versioning, or it's + * template0, for which we stipulate that it does not contain + * collation-using objects.) + */ + if (src_collversion && !dcollversion) + { + char *actual_versionstr; + + actual_versionstr = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate); + if (!actual_versionstr) + ereport(ERROR, + (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined", + dbtemplate))); + + if (strcmp(actual_versionstr, src_collversion) != 0) + ereport(ERROR, + (errmsg("template database \"%s\" has a collation version mismatch", + dbtemplate), + errdetail("The template database was created using collation version %s, " + "but the operating system provides version %s.", + src_collversion, actual_versionstr), + errhint("Rebuild all objects in the template database that use the default collation and run " + "ALTER DATABASE %s REFRESH COLLATION VERSION, " + "or build PostgreSQL with the right library version.", + quote_identifier(dbtemplate)))); + } + + if (dbcollversion == NULL) + dbcollversion = src_collversion; + + /* + * Normally, we copy the collation version from the template database. + * This last resort only applies if the template database does not have a + * collation version, which is normally only the case for template0. + */ + if (dbcollversion == NULL) + dbcollversion = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate); + /* Resolve default tablespace for new database */ if (dtablespacename && dtablespacename->arg) { @@ -578,6 +637,10 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate); new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype); + if (dbcollversion) + new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion); + else + new_record_nulls[Anum_pg_database_datcollversion - 1] = true; /* * We deliberately set datacl to default (NULL), rather than copying it @@ -844,7 +907,7 @@ dropdb(const char *dbname, bool missing_ok, bool force) pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock); if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, - &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL)) + &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) { if (!missing_ok) { @@ -1046,7 +1109,7 @@ RenameDatabase(const char *oldname, const char *newname) rel = table_open(DatabaseRelationId, RowExclusiveLock); if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, NULL)) + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", oldname))); @@ -1159,7 +1222,7 @@ movedb(const char *dbname, const char *tblspcname) pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock); if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, - NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL)) + NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", dbname))); @@ -1652,6 +1715,88 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel) /* + * ALTER DATABASE name REFRESH COLLATION VERSION + */ +ObjectAddress +AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt) +{ + Relation rel; + ScanKeyData scankey; + SysScanDesc scan; + Oid db_id; + HeapTuple tuple; + Form_pg_database datForm; + ObjectAddress address; + Datum datum; + bool isnull; + char *oldversion; + char *newversion; + + rel = table_open(DatabaseRelationId, RowExclusiveLock); + ScanKeyInit(&scankey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(stmt->dbname)); + scan = systable_beginscan(rel, DatabaseNameIndexId, true, + NULL, 1, &scankey); + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", stmt->dbname))); + + datForm = (Form_pg_database) GETSTRUCT(tuple); + db_id = datForm->oid; + + if (!pg_database_ownercheck(db_id, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, + stmt->dbname); + + datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull); + oldversion = isnull ? NULL : TextDatumGetCString(datum); + + datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull); + Assert(!isnull); + newversion = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum)); + + /* cannot change from NULL to non-NULL or vice versa */ + if ((!oldversion && newversion) || (oldversion && !newversion)) + elog(ERROR, "invalid collation version change"); + else if (oldversion && newversion && strcmp(newversion, oldversion) != 0) + { + bool nulls[Natts_pg_database] = {0}; + bool replaces[Natts_pg_database] = {0}; + Datum values[Natts_pg_database] = {0}; + + ereport(NOTICE, + (errmsg("changing version from %s to %s", + oldversion, newversion))); + + values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion); + replaces[Anum_pg_database_datcollversion - 1] = true; + + tuple = heap_modify_tuple(tuple, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tuple->t_self, tuple); + heap_freetuple(tuple); + } + else + ereport(NOTICE, + (errmsg("version has not changed"))); + + InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0); + + ObjectAddressSet(address, DatabaseRelationId, db_id); + + systable_endscan(scan); + + table_close(rel, NoLock); + + return address; +} + + +/* * ALTER DATABASE name SET ... */ Oid @@ -1793,6 +1938,34 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) } +Datum +pg_database_collation_actual_version(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + HeapTuple tp; + Datum datum; + bool isnull; + char *version; + + tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid)); + if (!HeapTupleIsValid(tp)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("database with OID %u does not exist", dbid))); + + datum = SysCacheGetAttr(DATABASEOID, tp, Anum_pg_database_datcollate, &isnull); + Assert(!isnull); + version = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum)); + + ReleaseSysCache(tp); + + if (version) + PG_RETURN_TEXT_P(cstring_to_text(version)); + else + PG_RETURN_NULL(); +} + + /* * Helper functions */ @@ -1808,7 +1981,8 @@ get_db_info(const char *name, LOCKMODE lockmode, Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP, - Oid *dbTablespace, char **dbCollate, char **dbCtype) + Oid *dbTablespace, char **dbCollate, char **dbCtype, + char **dbCollversion) { bool result = false; Relation relation; @@ -1913,6 +2087,14 @@ get_db_info(const char *name, LOCKMODE lockmode, Assert(!isnull); *dbCtype = TextDatumGetCString(datum); } + if (dbCollversion) + { + datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull); + if (isnull) + *dbCollversion = NULL; + else + *dbCollversion = TextDatumGetCString(datum); + } ReleaseSysCache(tuple); result = true; break; |