diff options
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 379 |
1 files changed, 165 insertions, 214 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 295ae955d1f..01b53e06933 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.168 2005/07/31 17:19:17 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.169 2005/08/02 19:02:31 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -79,14 +79,14 @@ createdb(const CreatedbStmt *stmt) TransactionId src_vacuumxid; TransactionId src_frozenxid; Oid src_deftablespace; - Oid dst_deftablespace; - Relation pg_database_rel; + volatile Oid dst_deftablespace; + volatile Relation pg_database_rel = NULL; HeapTuple tuple; TupleDesc pg_database_dsc; Datum new_record[Natts_pg_database]; char new_record_nulls[Natts_pg_database]; Oid dboid; - Oid datdba; + volatile Oid datdba; ListCell *option; DefElem *dtablespacename = NULL; DefElem *downer = NULL; @@ -96,12 +96,8 @@ createdb(const CreatedbStmt *stmt) char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; - int encoding = -1; - int dbconnlimit = -1; - -#ifndef WIN32 - char buf[2 * MAXPGPATH + 100]; -#endif + volatile int encoding = -1; + volatile int dbconnlimit = -1; /* don't call this in a transaction block */ PreventTransactionChain((void *) stmt, "CREATE DATABASE"); @@ -363,207 +359,186 @@ createdb(const CreatedbStmt *stmt) BufferSync(); /* - * Close virtual file descriptors so the kernel has more available for - * the system() calls below. + * Once we start copying subdirectories, we need to be able to clean + * 'em up if we fail. Establish a TRY block to make sure this happens. + * (This is not a 100% solution, because of the possibility of failure + * during transaction commit after we leave this routine, but it should + * handle most scenarios.) */ - closeAllVfds(); - - /* - * Iterate through all tablespaces of the template database, and copy - * each one to the new database. - */ - rel = heap_open(TableSpaceRelationId, AccessShareLock); - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + PG_TRY(); { - Oid srctablespace = HeapTupleGetOid(tuple); - Oid dsttablespace; - char *srcpath; - char *dstpath; - struct stat st; - - /* No need to copy global tablespace */ - if (srctablespace == GLOBALTABLESPACE_OID) - continue; - - srcpath = GetDatabasePath(src_dboid, srctablespace); - - if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || - directory_is_empty(srcpath)) + /* + * Iterate through all tablespaces of the template database, + * and copy each one to the new database. + */ + rel = heap_open(TableSpaceRelationId, AccessShareLock); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { - /* Assume we can ignore it */ - pfree(srcpath); - continue; - } - - if (srctablespace == src_deftablespace) - dsttablespace = dst_deftablespace; - else - dsttablespace = srctablespace; - - dstpath = GetDatabasePath(dboid, dsttablespace); + Oid srctablespace = HeapTupleGetOid(tuple); + Oid dsttablespace; + char *srcpath; + char *dstpath; + struct stat st; - if (stat(dstpath, &st) == 0 || errno != ENOENT) - { - remove_dbtablespaces(dboid); - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Directory \"%s\" already exists.", - dstpath))); + /* No need to copy global tablespace */ + if (srctablespace == GLOBALTABLESPACE_OID) + continue; + + srcpath = GetDatabasePath(src_dboid, srctablespace); + + if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || + directory_is_empty(srcpath)) + { + /* Assume we can ignore it */ + pfree(srcpath); + continue; + } + + if (srctablespace == src_deftablespace) + dsttablespace = dst_deftablespace; + else + dsttablespace = srctablespace; + + dstpath = GetDatabasePath(dboid, dsttablespace); + + /* + * Copy this subdirectory to the new location + * + * We don't need to copy subdirectories + */ + copydir(srcpath, dstpath, false); + + /* Record the filesystem change in XLOG */ + { + xl_dbase_create_rec xlrec; + XLogRecData rdata[1]; + + xlrec.db_id = dboid; + xlrec.tablespace_id = dsttablespace; + xlrec.src_db_id = src_dboid; + xlrec.src_tablespace_id = srctablespace; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_dbase_create_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata); + } } - -#ifndef WIN32 + heap_endscan(scan); + heap_close(rel, AccessShareLock); /* - * Copy this subdirectory to the new location - * - * XXX use of cp really makes this code pretty grotty, particularly - * with respect to lack of ability to report errors well. Someday - * rewrite to do it for ourselves. + * Now OK to grab exclusive lock on pg_database. */ + pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock); - /* We might need to use cp -R one day for portability */ - snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", - srcpath, dstpath); - if (system(buf) != 0) - { - remove_dbtablespaces(dboid); - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Failing system command was: %s", buf), - errhint("Look in the postmaster's stderr log for more information."))); - } -#else /* WIN32 */ - if (copydir(srcpath, dstpath) != 0) - { - /* copydir should already have given details of its troubles */ - remove_dbtablespaces(dboid); + /* Check to see if someone else created same DB name meanwhile. */ + if (get_db_info(dbname, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL)) ereport(ERROR, - (errmsg("could not initialize database directory"))); - } -#endif /* WIN32 */ - - /* Record the filesystem change in XLOG */ - { - xl_dbase_create_rec xlrec; - XLogRecData rdata[1]; - - xlrec.db_id = dboid; - xlrec.tablespace_id = dsttablespace; - xlrec.src_db_id = src_dboid; - xlrec.src_tablespace_id = srctablespace; + (errcode(ERRCODE_DUPLICATE_DATABASE), + errmsg("database \"%s\" already exists", dbname))); - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(xl_dbase_create_rec); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = NULL; - - (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata); - } - } - heap_endscan(scan); - heap_close(rel, AccessShareLock); - - /* - * Now OK to grab exclusive lock on pg_database. - */ - pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock); + /* + * Insert a new tuple into pg_database + */ + pg_database_dsc = RelationGetDescr(pg_database_rel); + + /* Form tuple */ + MemSet(new_record, 0, sizeof(new_record)); + MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); + + new_record[Anum_pg_database_datname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(dbname)); + new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); + new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); + new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); + new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); + new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); + new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); + new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); + new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); + new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); - /* Check to see if someone else created same DB name meanwhile. */ - if (get_db_info(dbname, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL)) - { - /* Don't hold lock while doing recursive remove */ - heap_close(pg_database_rel, ExclusiveLock); - remove_dbtablespaces(dboid); - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); - } + /* + * We deliberately set datconfig and datacl to defaults (NULL), rather + * than copying them from the template database. Copying datacl would + * be a bad idea when the owner is not the same as the template's + * owner. It's more debatable whether datconfig should be copied. + */ + new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; + new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; - /* - * Insert a new tuple into pg_database - */ - pg_database_dsc = RelationGetDescr(pg_database_rel); + tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); - /* Form tuple */ - MemSet(new_record, 0, sizeof(new_record)); - MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); + HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID + * selection */ - new_record[Anum_pg_database_datname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(dbname)); - new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); - new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); - new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); - new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); - new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); - new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); - new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); - new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); - new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); + simple_heap_insert(pg_database_rel, tuple); - /* - * We deliberately set datconfig and datacl to defaults (NULL), rather - * than copying them from the template database. Copying datacl would - * be a bad idea when the owner is not the same as the template's - * owner. It's more debatable whether datconfig should be copied. - */ - new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; - new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; + /* Update indexes */ + CatalogUpdateIndexes(pg_database_rel, tuple); - tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); + /* Register owner dependency */ + recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); - HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID - * selection */ + /* Create pg_shdepend entries for objects within database */ + copyTemplateDependencies(src_dboid, dboid); - simple_heap_insert(pg_database_rel, tuple); + /* + * We force a checkpoint before committing. This effectively means + * that committed XLOG_DBASE_CREATE operations will never need to be + * replayed (at least not in ordinary crash recovery; we still have + * to make the XLOG entry for the benefit of PITR operations). + * This avoids two nasty scenarios: + * + * #1: When PITR is off, we don't XLOG the contents of newly created + * indexes; therefore the drop-and-recreate-whole-directory behavior + * of DBASE_CREATE replay would lose such indexes. + * + * #2: Since we have to recopy the source database during DBASE_CREATE + * replay, we run the risk of copying changes in it that were committed + * after the original CREATE DATABASE command but before the system + * crash that led to the replay. This is at least unexpected and at + * worst could lead to inconsistencies, eg duplicate table names. + * + * (Both of these were real bugs in releases 8.0 through 8.0.3.) + * + * In PITR replay, the first of these isn't an issue, and the second + * is only a risk if the CREATE DATABASE and subsequent template + * database change both occur while a base backup is being taken. + * There doesn't seem to be much we can do about that except document + * it as a limitation. + * + * Perhaps if we ever implement CREATE DATABASE in a less cheesy + * way, we can avoid this. + */ + RequestCheckpoint(true, false); - /* Update indexes */ - CatalogUpdateIndexes(pg_database_rel, tuple); + /* + * Set flag to update flat database file at commit. + */ + database_file_update_needed(); + } + PG_CATCH(); + { + /* Don't hold pg_database lock while doing recursive remove */ + if (pg_database_rel != NULL) + heap_close(pg_database_rel, ExclusiveLock); - /* Register owner dependency */ - recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); + /* Throw away any successfully copied subdirectories */ + remove_dbtablespaces(dboid); - /* Create pg_shdepend entries for objects within database */ - copyTemplateDependencies(src_dboid, dboid); + PG_RE_THROW(); + } + PG_END_TRY(); /* Close pg_database, but keep exclusive lock till commit */ + /* This has to be outside the PG_TRY */ heap_close(pg_database_rel, NoLock); - - /* - * We force a checkpoint before committing. This effectively means - * that committed XLOG_DBASE_CREATE operations will never need to be - * replayed (at least not in ordinary crash recovery; we still have - * to make the XLOG entry for the benefit of PITR operations). - * This avoids two nasty scenarios: - * - * #1: When PITR is off, we don't XLOG the contents of newly created - * indexes; therefore the drop-and-recreate-whole-directory behavior - * of DBASE_CREATE replay would lose such indexes. - * - * #2: Since we have to recopy the source database during DBASE_CREATE - * replay, we run the risk of copying changes in it that were committed - * after the original CREATE DATABASE command but before the system - * crash that led to the replay. This is at least unexpected and at - * worst could lead to inconsistencies, eg duplicate table names. - * - * (Both of these were real bugs in releases 8.0 through 8.0.3.) - * - * In PITR replay, the first of these isn't an issue, and the second - * is only a risk if the CREATE DATABASE and subsequent template - * database change both occur while a base backup is being taken. - * There doesn't seem to be much we can do about that except document - * it as a limitation. - * - * Perhaps if we ever implement CREATE DATABASE in a less cheesy - * way, we can avoid this. - */ - RequestCheckpoint(true, false); - - /* - * Set flag to update flat database file at commit. - */ - database_file_update_needed(); } @@ -1348,10 +1323,6 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) char *dst_path; struct stat st; -#ifndef WIN32 - char buf[2 * MAXPGPATH + 100]; -#endif - src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id); dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); @@ -1365,8 +1336,8 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) { if (!rmtree(dst_path, true)) ereport(WARNING, - (errmsg("could not remove database directory \"%s\"", - dst_path))); + (errmsg("could not remove database directory \"%s\"", + dst_path))); } /* @@ -1376,32 +1347,12 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) */ BufferSync(); -#ifndef WIN32 - /* * Copy this subdirectory to the new location * - * XXX use of cp really makes this code pretty grotty, particularly - * with respect to lack of ability to report errors well. Someday - * rewrite to do it for ourselves. + * We don't need to copy subdirectories */ - - /* We might need to use cp -R one day for portability */ - snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", - src_path, dst_path); - if (system(buf) != 0) - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Failing system command was: %s", buf), - errhint("Look in the postmaster's stderr log for more information."))); -#else /* WIN32 */ - if (copydir(src_path, dst_path) != 0) - { - /* copydir should already have given details of its troubles */ - ereport(ERROR, - (errmsg("could not initialize database directory"))); - } -#endif /* WIN32 */ + copydir(src_path, dst_path, false); } else if (info == XLOG_DBASE_DROP) { |