diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2008-11-07 18:25:07 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2008-11-07 18:25:07 +0000 |
commit | 6517f377d65dfb385d589edaa423aef0d1cf5a52 (patch) | |
tree | 2f0ec8d579d5e74ddf89d13ce30a18e34a3b713d /src/backend/commands/dbcommands.c | |
parent | 85e2cedf985bfecaf43a18ca17433070f439fb0e (diff) | |
download | postgresql-6517f377d65dfb385d589edaa423aef0d1cf5a52.tar.gz postgresql-6517f377d65dfb385d589edaa423aef0d1cf5a52.zip |
Implement ALTER DATABASE SET TABLESPACE to move a whole database (or at least
as much of it as lives in its default tablespace) to a new tablespace.
Guillaume Lelarge, with some help from Bernd Helmle and Tom Lane
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 352 |
1 files changed, 350 insertions, 2 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 7eb6e1bda00..e8b814b6e9b 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.215 2008/11/02 01:45:27 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.216 2008/11/07 18:25:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -42,6 +42,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/lmgr.h" #include "storage/ipc.h" #include "storage/procarray.h" @@ -53,6 +54,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/pg_locale.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -63,8 +65,16 @@ typedef struct Oid dest_dboid; /* DB we are trying to create */ } createdb_failure_params; +typedef struct +{ + Oid dest_dboid; /* DB we are trying to move */ + Oid dest_tsoid; /* tablespace we are trying to move to */ +} movedb_failure_params; + /* non-export function prototypes */ static void createdb_failure_callback(int code, Datum arg); +static void movedb(const char *dbname, const char *tblspcname); +static void movedb_failure_callback(int code, Datum arg); static bool get_db_info(const char *name, LOCKMODE lockmode, Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, @@ -935,10 +945,329 @@ RenameDatabase(const char *oldname, const char *newname) /* + * ALTER DATABASE SET TABLESPACE + */ +static void +movedb(const char *dbname, const char *tblspcname) +{ + Oid db_id; + Relation pgdbrel; + int notherbackends; + int npreparedxacts; + HeapTuple oldtuple, newtuple; + Oid src_tblspcoid, dst_tblspcoid; + Datum new_record[Natts_pg_database]; + bool new_record_nulls[Natts_pg_database]; + bool new_record_repl[Natts_pg_database]; + ScanKeyData scankey; + SysScanDesc sysscan; + AclResult aclresult; + char *src_dbpath; + char *dst_dbpath; + DIR *dstdir; + struct dirent *xlde; + movedb_failure_params fparms; + + /* + * Look up the target database's OID, and get exclusive lock on it. We + * need this to ensure that no new backend starts up in the database while + * we are moving it, and that no one is using it as a CREATE DATABASE + * template or trying to delete it. + */ + pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); + + if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, + NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", dbname))); + + /* + * We actually need a session lock, so that the lock will persist across + * the commit/restart below. (We could almost get away with letting the + * lock be released at commit, except that someone could try to move + * relations of the DB back into the old directory while we rmtree() it.) + */ + LockSharedObjectForSession(DatabaseRelationId, db_id, 0, + AccessExclusiveLock); + + /* + * Permission checks + */ + if (!pg_database_ownercheck(db_id, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, + dbname); + + /* + * Obviously can't move the tables of my own database + */ + if (db_id == MyDatabaseId) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("cannot change the tablespace of the currently open database"))); + + /* + * Get tablespace's oid + */ + dst_tblspcoid = get_tablespace_oid(tblspcname); + if (dst_tblspcoid == InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("tablespace \"%s\" does not exist", tblspcname))); + + /* + * Permission checks + */ + aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + tblspcname); + + /* + * pg_global must never be the default tablespace + */ + if (dst_tblspcoid == GLOBALTABLESPACE_OID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global cannot be used as default tablespace"))); + + /* + * No-op if same tablespace + */ + if (src_tblspcoid == dst_tblspcoid) + { + heap_close(pgdbrel, NoLock); + UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0, + AccessExclusiveLock); + return; + } + + /* + * Check for other backends in the target database. (Because we hold the + * database lock, no new ones can start after this.) + * + * As in CREATE DATABASE, check this after other error conditions. + */ + if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("database \"%s\" is being accessed by other users", + dbname), + errdetail_busy_db(notherbackends, npreparedxacts))); + + /* + * Get old and new database paths + */ + src_dbpath = GetDatabasePath(db_id, src_tblspcoid); + dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid); + + /* + * Force a checkpoint before proceeding. This will force dirty buffers out + * to disk, to ensure source database is up-to-date on disk for the + * copy. FlushDatabaseBuffers() would suffice for that, but we also want + * to process any pending unlink requests. Otherwise, the check for + * existing files in the target directory might fail unnecessarily, not to + * mention that the copy might fail due to source files getting deleted + * under it. On Windows, this also ensures that the bgwriter doesn't hold + * any open files, which would cause rmdir() to fail. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + + /* + * Check for existence of files in the target directory, i.e., objects of + * this database that are already in the target tablespace. We can't + * allow the move in such a case, because we would need to change those + * relations' pg_class.reltablespace entries to zero, and we don't have + * access to the DB's pg_class to do so. + */ + dstdir = AllocateDir(dst_dbpath); + if (dstdir != NULL) + { + while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL) + { + if (strcmp(xlde->d_name, ".") == 0 || + strcmp(xlde->d_name, "..") == 0) + continue; + + ereport(ERROR, + (errmsg("some relations of database \"%s\" are already in tablespace \"%s\"", + dbname, tblspcname), + errhint("You must move them back to the database's default tablespace before using this command."))); + } + + FreeDir(dstdir); + + /* + * The directory exists but is empty. + * We must remove it before using the copydir function. + */ + if (rmdir(dst_dbpath) != 0) + elog(ERROR, "could not remove directory \"%s\": %m", + dst_dbpath); + } + + /* + * Use an ENSURE block to make sure we remove the debris if the copy fails + * (eg, due to out-of-disk-space). This is not a 100% solution, because + * of the possibility of failure during transaction commit, but it should + * handle most scenarios. + */ + fparms.dest_dboid = db_id; + fparms.dest_tsoid = dst_tblspcoid; + PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback, + PointerGetDatum(&fparms)); + { + /* + * Copy files from the old tablespace to the new one + */ + copydir(src_dbpath, dst_dbpath, false); + + /* + * Record the filesystem change in XLOG + */ + { + xl_dbase_create_rec xlrec; + XLogRecData rdata[1]; + + xlrec.db_id = db_id; + xlrec.tablespace_id = dst_tblspcoid; + xlrec.src_db_id = db_id; + xlrec.src_tablespace_id = src_tblspcoid; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_dbase_create_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata); + } + + /* + * Update the database's pg_database tuple + */ + ScanKeyInit(&scankey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(dbname)); + sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true, + SnapshotNow, 1, &scankey); + oldtuple = systable_getnext(sysscan); + if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */ + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", dbname))); + + MemSet(new_record, 0, sizeof(new_record)); + MemSet(new_record_nulls, false, sizeof(new_record_nulls)); + MemSet(new_record_repl, false, sizeof(new_record_repl)); + + new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid); + new_record_repl[Anum_pg_database_dattablespace - 1] = true; + + newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel), + new_record, + new_record_nulls, new_record_repl); + simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple); + + /* Update indexes */ + CatalogUpdateIndexes(pgdbrel, newtuple); + + systable_endscan(sysscan); + + /* + * Force another checkpoint here. As in CREATE DATABASE, this is to + * ensure that we don't have to replay a committed XLOG_DBASE_CREATE + * operation, which would cause us to lose any unlogged operations + * done in the new DB tablespace before the next checkpoint. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + + /* + * Set flag to update flat database file at commit. Note: this also + * forces synchronous commit, which minimizes the window between + * copying the database files and commital of the transaction. If we + * crash before committing, we'll leave an orphaned set of files on + * disk, which is not fatal but not good either. + */ + database_file_update_needed(); + + /* + * Close pg_database, but keep lock till commit (this is important to + * prevent any risk of deadlock failure while updating flat file) + */ + heap_close(pgdbrel, NoLock); + } + PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback, + PointerGetDatum(&fparms)); + + /* + * Commit the transaction so that the pg_database update is committed. + * If we crash while removing files, the database won't be corrupt, + * we'll just leave some orphaned files in the old directory. + * + * (This is OK because we know we aren't inside a transaction block.) + * + * XXX would it be safe/better to do this inside the ensure block? Not + * convinced it's a good idea; consider elog just after the transaction + * really commits. + */ + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* Start new transaction for the remaining work; don't need a snapshot */ + StartTransactionCommand(); + + /* + * Remove files from the old tablespace + */ + if (!rmtree(src_dbpath, true)) + ereport(WARNING, + (errmsg("some useless files may be left behind in old database directory \"%s\"", + src_dbpath))); + + /* + * Record the filesystem change in XLOG + */ + { + xl_dbase_drop_rec xlrec; + XLogRecData rdata[1]; + + xlrec.db_id = db_id; + xlrec.tablespace_id = src_tblspcoid; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_dbase_drop_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata); + } + + /* Now it's safe to release the database lock */ + UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0, + AccessExclusiveLock); +} + +/* Error cleanup callback for movedb */ +static void +movedb_failure_callback(int code, Datum arg) +{ + movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg); + char *dstpath; + + /* Get rid of anything we managed to copy to the target directory */ + dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid); + + (void) rmtree(dstpath, true); +} + + +/* * ALTER DATABASE name ... */ void -AlterDatabase(AlterDatabaseStmt *stmt) +AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) { Relation rel; HeapTuple tuple, @@ -948,6 +1277,7 @@ AlterDatabase(AlterDatabaseStmt *stmt) ListCell *option; int connlimit = -1; DefElem *dconnlimit = NULL; + DefElem *dtablespace = NULL; Datum new_record[Natts_pg_database]; bool new_record_nulls[Natts_pg_database]; bool new_record_repl[Natts_pg_database]; @@ -965,11 +1295,29 @@ AlterDatabase(AlterDatabaseStmt *stmt) errmsg("conflicting or redundant options"))); dconnlimit = defel; } + else if (strcmp(defel->defname, "tablespace") == 0) + { + if (dtablespace) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + dtablespace = defel; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); } + if (dtablespace) + { + /* currently, can't be specified along with any other options */ + Assert(!dconnlimit); + /* this case isn't allowed within a transaction block */ + PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE"); + movedb(stmt->dbname, strVal(dtablespace->arg)); + return; + } + if (dconnlimit) connlimit = intVal(dconnlimit->arg); |