diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2006-05-04 16:07:29 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2006-05-04 16:07:29 +0000 |
commit | 52667d56a3b489e5645f069522631824b7ffc520 (patch) | |
tree | fc53caf8cff281d944b0bcb1dd2b10f9e9e5b0c4 /src/backend/commands/dbcommands.c | |
parent | cb98e6fb8fd4f1ca955a85d5c0088e42e77a04f0 (diff) | |
download | postgresql-52667d56a3b489e5645f069522631824b7ffc520.tar.gz postgresql-52667d56a3b489e5645f069522631824b7ffc520.zip |
Rethink the locking mechanisms used for CREATE/DROP/RENAME DATABASE.
The former approach used ExclusiveLock on pg_database, which being a
cluster-wide lock meant only one of these operations could proceed at
a time; worse, it also blocked all incoming connections in ReverifyMyDatabase.
Now that we have LockSharedObject(), we can use locks of different types
applied to databases considered as objects. This allows much more
flexible management of the interlocking: two CREATE DATABASEs need not
block each other, and need not block connections except to the template
database being used. Similarly DROP DATABASE doesn't block unrelated
operations. The locking used in flatfiles.c is also much narrower in
scope than before. Per recent proposal.
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 451 |
1 files changed, 240 insertions, 211 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index b21d750394d..518fed81968 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -3,19 +3,17 @@ * dbcommands.c * Database management commands (create/drop database). * - * Note: database creation/destruction commands take ExclusiveLock on - * pg_database to ensure that no two proceed in parallel. We must use - * at least this level of locking to ensure that no two backends try to - * write the flat-file copy of pg_database at once. We avoid using - * AccessExclusiveLock since there's no need to lock out ordinary readers - * of pg_database. + * Note: database creation/destruction commands use exclusive locks on + * the database objects (as expressed by LockSharedObject()) to avoid + * stepping on each others' toes. Formerly we used table-level locks + * on pg_database, but that's too coarse-grained. * * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.180 2006/05/03 22:45:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.181 2006/05/04 16:07:29 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -53,7 +51,8 @@ /* non-export function prototypes */ -static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, +static bool get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, @@ -80,13 +79,12 @@ createdb(const CreatedbStmt *stmt) TransactionId src_frozenxid; Oid src_deftablespace; volatile Oid dst_deftablespace; - volatile Relation pg_database_rel; + Relation pg_database_rel; HeapTuple tuple; - TupleDesc pg_database_dsc; Datum new_record[Natts_pg_database]; char new_record_nulls[Natts_pg_database]; Oid dboid; - volatile Oid datdba; + Oid datdba; ListCell *option; DefElem *dtablespacename = NULL; DefElem *downer = NULL; @@ -96,8 +94,8 @@ createdb(const CreatedbStmt *stmt) char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; - volatile int encoding = -1; - volatile int dbconnlimit = -1; + int encoding = -1; + int dbconnlimit = -1; /* don't call this in a transaction block */ PreventTransactionChain((void *) stmt, "CREATE DATABASE"); @@ -216,31 +214,25 @@ createdb(const CreatedbStmt *stmt) check_is_member_of_role(GetUserId(), datdba); /* - * Check for db name conflict. There is a race condition here, since - * another backend could create the same DB name before we commit. - * However, holding an exclusive lock on pg_database for the whole time we - * are copying the source database doesn't seem like a good idea, so - * accept possibility of race to create. We will check again after we - * grab the exclusive lock. - */ - if (get_db_info(dbname, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); - - /* - * Lookup database (template) to be cloned. + * Lookup database (template) to be cloned, and obtain share lock on it. + * ShareLock allows two CREATE DATABASEs to work from the same template + * concurrently, while ensuring no one is busy dropping it in parallel + * (which would be Very Bad since we'd likely get an incomplete copy + * without knowing it). This also prevents any new connections from being + * made to the source until we finish copying it, so we can be sure it + * won't change underneath us. */ if (!dbtemplate) dbtemplate = "template1"; /* Default template database name */ - if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, + if (!get_db_info(dbtemplate, ShareLock, + &src_dboid, &src_owner, &src_encoding, &src_istemplate, &src_allowconn, &src_lastsysoid, &src_vacuumxid, &src_frozenxid, &src_deftablespace)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), - errmsg("template database \"%s\" does not exist", dbtemplate))); + errmsg("template database \"%s\" does not exist", + dbtemplate))); /* * Permission check: to copy a DB that's not marked datistemplate, you @@ -258,8 +250,7 @@ createdb(const CreatedbStmt *stmt) /* * The source DB can't have any active backends, except this one * (exception is to allow CREATE DB while connected to template1). - * Otherwise we might copy inconsistent data. This check is not - * bulletproof, since someone might connect while we are copying... + * Otherwise we might copy inconsistent data. */ if (DatabaseHasActiveBackends(src_dboid, true)) ereport(ERROR, @@ -346,14 +337,65 @@ createdb(const CreatedbStmt *stmt) src_vacuumxid = src_frozenxid = GetCurrentTransactionId(); /* - * Preassign OID for pg_database tuple, so that we can compute db path. We - * have to open pg_database to do this, but we don't want to take - * ExclusiveLock yet, so just do it and close again. + * Check for db name conflict. This is just to give a more friendly + * error message than "unique index violation". There's a race condition + * but we're willing to accept the less friendly message in that case. */ - pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock); - dboid = GetNewOid(pg_database_rel); - heap_close(pg_database_rel, AccessShareLock); - pg_database_rel = NULL; + if (OidIsValid(get_database_oid(dbname))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_DATABASE), + errmsg("database \"%s\" already exists", dbname))); + + /* + * Insert a new tuple into pg_database. This establishes our ownership + * of the new database name (anyone else trying to insert the same name + * will block on the unique index, and fail after we commit). It also + * assigns the OID that the new database will have. + */ + pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock); + + /* Form tuple */ + MemSet(new_record, 0, sizeof(new_record)); + MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); + + new_record[Anum_pg_database_datname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(dbname)); + new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); + new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); + new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); + new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); + new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); + new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); + new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); + new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); + new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); + + /* + * We deliberately set datconfig and datacl to defaults (NULL), rather + * than copying them from the template database. Copying datacl would + * be a bad idea when the owner is not the same as the template's + * owner. It's more debatable whether datconfig should be copied. + */ + new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; + new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; + + tuple = heap_formtuple(RelationGetDescr(pg_database_rel), + new_record, new_record_nulls); + + dboid = simple_heap_insert(pg_database_rel, tuple); + + /* Update indexes */ + CatalogUpdateIndexes(pg_database_rel, tuple); + + /* + * Now generate additional catalog entries associated with the new DB + */ + + /* Register owner dependency */ + recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); + + /* Create pg_shdepend entries for objects within database */ + copyTemplateDependencies(src_dboid, dboid); /* * Force dirty buffers out to disk, to ensure source database is @@ -435,64 +477,6 @@ createdb(const CreatedbStmt *stmt) heap_close(rel, AccessShareLock); /* - * Now OK to grab exclusive lock on pg_database. - */ - pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock); - - /* Check to see if someone else created same DB name meanwhile. */ - if (get_db_info(dbname, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); - - /* - * Insert a new tuple into pg_database - */ - pg_database_dsc = RelationGetDescr(pg_database_rel); - - /* Form tuple */ - MemSet(new_record, 0, sizeof(new_record)); - MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); - - new_record[Anum_pg_database_datname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(dbname)); - new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); - new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); - new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); - new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); - new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); - new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); - new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); - new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); - new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); - - /* - * We deliberately set datconfig and datacl to defaults (NULL), rather - * than copying them from the template database. Copying datacl would - * be a bad idea when the owner is not the same as the template's - * owner. It's more debatable whether datconfig should be copied. - */ - new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; - new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; - - tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); - - HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID - * selection */ - - simple_heap_insert(pg_database_rel, tuple); - - /* Update indexes */ - CatalogUpdateIndexes(pg_database_rel, tuple); - - /* Register owner dependency */ - recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); - - /* Create pg_shdepend entries for objects within database */ - copyTemplateDependencies(src_dboid, dboid); - - /* * We force a checkpoint before committing. This effectively means * that committed XLOG_DBASE_CREATE operations will never need to be * replayed (at least not in ordinary crash recovery; we still have to @@ -524,15 +508,21 @@ createdb(const CreatedbStmt *stmt) RequestCheckpoint(true, false); /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ + heap_close(pg_database_rel, NoLock); + + /* * Set flag to update flat database file at commit. */ database_file_update_needed(); } PG_CATCH(); { - /* Don't hold pg_database lock while doing recursive remove */ - if (pg_database_rel != NULL) - heap_close(pg_database_rel, ExclusiveLock); + /* Release lock on source database before doing recursive remove */ + UnlockSharedObject(DatabaseRelationId, src_dboid, 0, + ShareLock); /* Throw away any successfully copied subdirectories */ remove_dbtablespaces(dboid); @@ -540,10 +530,6 @@ createdb(const CreatedbStmt *stmt) PG_RE_THROW(); } PG_END_TRY(); - - /* Close pg_database, but keep exclusive lock till commit */ - /* This has to be outside the PG_TRY */ - heap_close(pg_database_rel, NoLock); } @@ -568,20 +554,15 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop the currently open database"))); /* - * Obtain exclusive lock on pg_database. We need this to ensure that no - * new backend starts up in the target database while we are deleting it. - * (Actually, a new backend might still manage to start up, because it - * isn't able to lock pg_database while starting. But it will detect its - * error in ReverifyMyDatabase and shut down before any serious damage is - * done. See postinit.c.) - * - * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient since - * ReverifyMyDatabase takes RowShareLock. This allows ordinary readers of - * pg_database to proceed in parallel. + * Look up the target database's OID, and get exclusive lock on it. + * We need this to ensure that no new backend starts up in the target + * database while we are deleting it (see postinit.c), and that no one is + * using it as a CREATE DATABASE template or trying to delete it for + * themselves. */ - pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock); + pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); - if (!get_db_info(dbname, &db_id, NULL, NULL, + if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, &db_istemplate, NULL, NULL, NULL, NULL, NULL)) { if (!missing_ok) @@ -592,17 +573,18 @@ dropdb(const char *dbname, bool missing_ok) } else { - /* Close pg_database, release the lock, since we changed nothing */ - heap_close(pgdbrel, ExclusiveLock); + heap_close(pgdbrel, RowExclusiveLock); ereport(NOTICE, (errmsg("database \"%s\" does not exist, skipping", dbname))); - return; } } + /* + * Permission checks + */ if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, dbname); @@ -618,7 +600,8 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop a template database"))); /* - * Check for active backends in the target database. + * Check for active backends in the target database. (Because we hold + * the database lock, no new ones can start after this.) */ if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, @@ -640,8 +623,7 @@ dropdb(const char *dbname, bool missing_ok) ReleaseSysCache(tup); /* - * Delete any comments associated with the database - * + * Delete any comments associated with the database. */ DeleteSharedComments(db_id, DatabaseRelationId); @@ -675,7 +657,10 @@ dropdb(const char *dbname, bool missing_ok) */ remove_dbtablespaces(db_id); - /* Close pg_database, but keep exclusive lock till commit */ + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(pgdbrel, NoLock); /* @@ -691,29 +676,18 @@ dropdb(const char *dbname, bool missing_ok) void RenameDatabase(const char *oldname, const char *newname) { - HeapTuple tup, - newtup; + Oid db_id; + HeapTuple newtup; Relation rel; - SysScanDesc scan, - scan2; - ScanKeyData key, - key2; /* - * Obtain ExclusiveLock so that no new session gets started while the - * rename is in progress. + * Look up the target database's OID, and get exclusive lock on it. + * We need this for the same reasons as DROP DATABASE. */ - rel = heap_open(DatabaseRelationId, ExclusiveLock); - - ScanKeyInit(&key, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(oldname)); - scan = systable_beginscan(rel, DatabaseNameIndexId, true, - SnapshotNow, 1, &key); + rel = heap_open(DatabaseRelationId, RowExclusiveLock); - tup = systable_getnext(scan); - if (!HeapTupleIsValid(tup)) + if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", oldname))); @@ -724,36 +698,29 @@ RenameDatabase(const char *oldname, const char *newname) * be an actual problem besides a little confusion, so think about this * and decide. */ - if (HeapTupleGetOid(tup) == MyDatabaseId) + if (db_id == MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("current database may not be renamed"))); /* - * Make sure the database does not have active sessions. Might not be - * necessary, but it's consistent with other database operations. + * Make sure the database does not have active sessions. This is the + * same concern as above, but applied to other sessions. */ - if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false)) + if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("database \"%s\" is being accessed by other users", oldname))); /* make sure the new name doesn't exist */ - ScanKeyInit(&key2, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(newname)); - scan2 = systable_beginscan(rel, DatabaseNameIndexId, true, - SnapshotNow, 1, &key2); - if (HeapTupleIsValid(systable_getnext(scan2))) + if (OidIsValid(get_database_oid(newname))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_DATABASE), errmsg("database \"%s\" already exists", newname))); - systable_endscan(scan2); /* must be owner */ - if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId())) + if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, oldname); @@ -764,14 +731,19 @@ RenameDatabase(const char *oldname, const char *newname) errmsg("permission denied to rename database"))); /* rename */ - newtup = heap_copytuple(tup); + newtup = SearchSysCacheCopy(DATABASEOID, + ObjectIdGetDatum(db_id), + 0, 0, 0); + if (!HeapTupleIsValid(newtup)) + elog(ERROR, "cache lookup failed for database %u", db_id); namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname); simple_heap_update(rel, &newtup->t_self, newtup); CatalogUpdateIndexes(rel, newtup); - systable_endscan(scan); - - /* Close pg_database, but keep exclusive lock till commit */ + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(rel, NoLock); /* @@ -821,7 +793,9 @@ AlterDatabase(AlterDatabaseStmt *stmt) connlimit = intVal(dconnlimit->arg); /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -891,7 +865,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) valuestr = flatten_set_variable_args(stmt->variable, stmt->value); /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -974,7 +950,9 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) Form_pg_database datForm; /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -1077,72 +1055,127 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) * Helper functions */ +/* + * Look up info about the database named "name". If the database exists, + * obtain the specified lock type on it, fill in any of the remaining + * parameters that aren't NULL, and return TRUE. If no such database, + * return FALSE. + */ static bool -get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, +get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, Oid *dbTablespace) { + bool result = false; Relation relation; - ScanKeyData scanKey; - SysScanDesc scan; - HeapTuple tuple; - bool gottuple; AssertArg(name); /* Caller may wish to grab a better lock on pg_database beforehand... */ relation = heap_open(DatabaseRelationId, AccessShareLock); - ScanKeyInit(&scanKey, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(name)); + /* + * Loop covers the rare case where the database is renamed before we + * can lock it. We try again just in case we can find a new one of + * the same name. + */ + for (;;) + { + ScanKeyData scanKey; + SysScanDesc scan; + HeapTuple tuple; + Oid dbOid; - scan = systable_beginscan(relation, DatabaseNameIndexId, true, - SnapshotNow, 1, &scanKey); + /* + * there's no syscache for database-indexed-by-name, + * so must do it the hard way + */ + ScanKeyInit(&scanKey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(name)); - tuple = systable_getnext(scan); + scan = systable_beginscan(relation, DatabaseNameIndexId, true, + SnapshotNow, 1, &scanKey); - gottuple = HeapTupleIsValid(tuple); - if (gottuple) - { - Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); - - /* oid of the database */ - if (dbIdP) - *dbIdP = HeapTupleGetOid(tuple); - /* oid of the owner */ - if (ownerIdP) - *ownerIdP = dbform->datdba; - /* character encoding */ - if (encodingP) - *encodingP = dbform->encoding; - /* allowed as template? */ - if (dbIsTemplateP) - *dbIsTemplateP = dbform->datistemplate; - /* allowing connections? */ - if (dbAllowConnP) - *dbAllowConnP = dbform->datallowconn; - /* last system OID used in database */ - if (dbLastSysOidP) - *dbLastSysOidP = dbform->datlastsysoid; - /* limit of vacuumed XIDs */ - if (dbVacuumXidP) - *dbVacuumXidP = dbform->datvacuumxid; - /* limit of frozen XIDs */ - if (dbFrozenXidP) - *dbFrozenXidP = dbform->datfrozenxid; - /* default tablespace for this database */ - if (dbTablespace) - *dbTablespace = dbform->dattablespace; + tuple = systable_getnext(scan); + + if (!HeapTupleIsValid(tuple)) + { + /* definitely no database of that name */ + systable_endscan(scan); + break; + } + + dbOid = HeapTupleGetOid(tuple); + + systable_endscan(scan); + + /* + * Now that we have a database OID, we can try to lock the DB. + */ + if (lockmode != NoLock) + LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); + + /* + * And now, re-fetch the tuple by OID. If it's still there and + * still the same name, we win; else, drop the lock and loop + * back to try again. + */ + tuple = SearchSysCache(DATABASEOID, + ObjectIdGetDatum(dbOid), + 0, 0, 0); + if (HeapTupleIsValid(tuple)) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); + + if (strcmp(name, NameStr(dbform->datname)) == 0) + { + /* oid of the database */ + if (dbIdP) + *dbIdP = dbOid; + /* oid of the owner */ + if (ownerIdP) + *ownerIdP = dbform->datdba; + /* character encoding */ + if (encodingP) + *encodingP = dbform->encoding; + /* allowed as template? */ + if (dbIsTemplateP) + *dbIsTemplateP = dbform->datistemplate; + /* allowing connections? */ + if (dbAllowConnP) + *dbAllowConnP = dbform->datallowconn; + /* last system OID used in database */ + if (dbLastSysOidP) + *dbLastSysOidP = dbform->datlastsysoid; + /* limit of vacuumed XIDs */ + if (dbVacuumXidP) + *dbVacuumXidP = dbform->datvacuumxid; + /* limit of frozen XIDs */ + if (dbFrozenXidP) + *dbFrozenXidP = dbform->datfrozenxid; + /* default tablespace for this database */ + if (dbTablespace) + *dbTablespace = dbform->dattablespace; + ReleaseSysCache(tuple); + result = true; + break; + } + /* can only get here if it was just renamed */ + ReleaseSysCache(tuple); + } + + if (lockmode != NoLock) + UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); } - systable_endscan(scan); heap_close(relation, AccessShareLock); - return gottuple; + return result; } /* Check if current user has createdb privileges */ @@ -1234,8 +1267,6 @@ remove_dbtablespaces(Oid db_id) * get_database_oid - given a database name, look up the OID * * Returns InvalidOid if database name not found. - * - * This is not actually used in this file, but is exported for use elsewhere. */ Oid get_database_oid(const char *dbname) @@ -1277,8 +1308,6 @@ get_database_oid(const char *dbname) * get_database_name - given a database OID, look up the name * * Returns a palloc'd string, or NULL if no such database. - * - * This is not actually used in this file, but is exported for use elsewhere. */ char * get_database_name(Oid dbid) |