aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/dbcommands.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2008-11-07 18:25:07 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2008-11-07 18:25:07 +0000
commit6517f377d65dfb385d589edaa423aef0d1cf5a52 (patch)
tree2f0ec8d579d5e74ddf89d13ce30a18e34a3b713d /src/backend/commands/dbcommands.c
parent85e2cedf985bfecaf43a18ca17433070f439fb0e (diff)
downloadpostgresql-6517f377d65dfb385d589edaa423aef0d1cf5a52.tar.gz
postgresql-6517f377d65dfb385d589edaa423aef0d1cf5a52.zip
Implement ALTER DATABASE SET TABLESPACE to move a whole database (or at least
as much of it as lives in its default tablespace) to a new tablespace. Guillaume Lelarge, with some help from Bernd Helmle and Tom Lane
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r--src/backend/commands/dbcommands.c352
1 files changed, 350 insertions, 2 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 7eb6e1bda00..e8b814b6e9b 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.215 2008/11/02 01:45:27 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.216 2008/11/07 18:25:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -42,6 +42,7 @@
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
+#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/ipc.h"
#include "storage/procarray.h"
@@ -53,6 +54,7 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/pg_locale.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
@@ -63,8 +65,16 @@ typedef struct
Oid dest_dboid; /* DB we are trying to create */
} createdb_failure_params;
+typedef struct
+{
+ Oid dest_dboid; /* DB we are trying to move */
+ Oid dest_tsoid; /* tablespace we are trying to move to */
+} movedb_failure_params;
+
/* non-export function prototypes */
static void createdb_failure_callback(int code, Datum arg);
+static void movedb(const char *dbname, const char *tblspcname);
+static void movedb_failure_callback(int code, Datum arg);
static bool get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
@@ -935,10 +945,329 @@ RenameDatabase(const char *oldname, const char *newname)
/*
+ * ALTER DATABASE SET TABLESPACE
+ */
+static void
+movedb(const char *dbname, const char *tblspcname)
+{
+ Oid db_id;
+ Relation pgdbrel;
+ int notherbackends;
+ int npreparedxacts;
+ HeapTuple oldtuple, newtuple;
+ Oid src_tblspcoid, dst_tblspcoid;
+ Datum new_record[Natts_pg_database];
+ bool new_record_nulls[Natts_pg_database];
+ bool new_record_repl[Natts_pg_database];
+ ScanKeyData scankey;
+ SysScanDesc sysscan;
+ AclResult aclresult;
+ char *src_dbpath;
+ char *dst_dbpath;
+ DIR *dstdir;
+ struct dirent *xlde;
+ movedb_failure_params fparms;
+
+ /*
+ * Look up the target database's OID, and get exclusive lock on it. We
+ * need this to ensure that no new backend starts up in the database while
+ * we are moving it, and that no one is using it as a CREATE DATABASE
+ * template or trying to delete it.
+ */
+ pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
+
+ if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
+ NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", dbname)));
+
+ /*
+ * We actually need a session lock, so that the lock will persist across
+ * the commit/restart below. (We could almost get away with letting the
+ * lock be released at commit, except that someone could try to move
+ * relations of the DB back into the old directory while we rmtree() it.)
+ */
+ LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
+ AccessExclusiveLock);
+
+ /*
+ * Permission checks
+ */
+ if (!pg_database_ownercheck(db_id, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ dbname);
+
+ /*
+ * Obviously can't move the tables of my own database
+ */
+ if (db_id == MyDatabaseId)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("cannot change the tablespace of the currently open database")));
+
+ /*
+ * Get tablespace's oid
+ */
+ dst_tblspcoid = get_tablespace_oid(tblspcname);
+ if (dst_tblspcoid == InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("tablespace \"%s\" does not exist", tblspcname)));
+
+ /*
+ * Permission checks
+ */
+ aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
+ ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+ tblspcname);
+
+ /*
+ * pg_global must never be the default tablespace
+ */
+ if (dst_tblspcoid == GLOBALTABLESPACE_OID)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_global cannot be used as default tablespace")));
+
+ /*
+ * No-op if same tablespace
+ */
+ if (src_tblspcoid == dst_tblspcoid)
+ {
+ heap_close(pgdbrel, NoLock);
+ UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
+ AccessExclusiveLock);
+ return;
+ }
+
+ /*
+ * Check for other backends in the target database. (Because we hold the
+ * database lock, no new ones can start after this.)
+ *
+ * As in CREATE DATABASE, check this after other error conditions.
+ */
+ if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("database \"%s\" is being accessed by other users",
+ dbname),
+ errdetail_busy_db(notherbackends, npreparedxacts)));
+
+ /*
+ * Get old and new database paths
+ */
+ src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
+ dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
+
+ /*
+ * Force a checkpoint before proceeding. This will force dirty buffers out
+ * to disk, to ensure source database is up-to-date on disk for the
+ * copy. FlushDatabaseBuffers() would suffice for that, but we also want
+ * to process any pending unlink requests. Otherwise, the check for
+ * existing files in the target directory might fail unnecessarily, not to
+ * mention that the copy might fail due to source files getting deleted
+ * under it. On Windows, this also ensures that the bgwriter doesn't hold
+ * any open files, which would cause rmdir() to fail.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+
+ /*
+ * Check for existence of files in the target directory, i.e., objects of
+ * this database that are already in the target tablespace. We can't
+ * allow the move in such a case, because we would need to change those
+ * relations' pg_class.reltablespace entries to zero, and we don't have
+ * access to the DB's pg_class to do so.
+ */
+ dstdir = AllocateDir(dst_dbpath);
+ if (dstdir != NULL)
+ {
+ while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
+ {
+ if (strcmp(xlde->d_name, ".") == 0 ||
+ strcmp(xlde->d_name, "..") == 0)
+ continue;
+
+ ereport(ERROR,
+ (errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
+ dbname, tblspcname),
+ errhint("You must move them back to the database's default tablespace before using this command.")));
+ }
+
+ FreeDir(dstdir);
+
+ /*
+ * The directory exists but is empty.
+ * We must remove it before using the copydir function.
+ */
+ if (rmdir(dst_dbpath) != 0)
+ elog(ERROR, "could not remove directory \"%s\": %m",
+ dst_dbpath);
+ }
+
+ /*
+ * Use an ENSURE block to make sure we remove the debris if the copy fails
+ * (eg, due to out-of-disk-space). This is not a 100% solution, because
+ * of the possibility of failure during transaction commit, but it should
+ * handle most scenarios.
+ */
+ fparms.dest_dboid = db_id;
+ fparms.dest_tsoid = dst_tblspcoid;
+ PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
+ PointerGetDatum(&fparms));
+ {
+ /*
+ * Copy files from the old tablespace to the new one
+ */
+ copydir(src_dbpath, dst_dbpath, false);
+
+ /*
+ * Record the filesystem change in XLOG
+ */
+ {
+ xl_dbase_create_rec xlrec;
+ XLogRecData rdata[1];
+
+ xlrec.db_id = db_id;
+ xlrec.tablespace_id = dst_tblspcoid;
+ xlrec.src_db_id = db_id;
+ xlrec.src_tablespace_id = src_tblspcoid;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(xl_dbase_create_rec);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = NULL;
+
+ (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
+ }
+
+ /*
+ * Update the database's pg_database tuple
+ */
+ ScanKeyInit(&scankey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(dbname));
+ sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
+ SnapshotNow, 1, &scankey);
+ oldtuple = systable_getnext(sysscan);
+ if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", dbname)));
+
+ MemSet(new_record, 0, sizeof(new_record));
+ MemSet(new_record_nulls, false, sizeof(new_record_nulls));
+ MemSet(new_record_repl, false, sizeof(new_record_repl));
+
+ new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
+ new_record_repl[Anum_pg_database_dattablespace - 1] = true;
+
+ newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
+ new_record,
+ new_record_nulls, new_record_repl);
+ simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
+
+ /* Update indexes */
+ CatalogUpdateIndexes(pgdbrel, newtuple);
+
+ systable_endscan(sysscan);
+
+ /*
+ * Force another checkpoint here. As in CREATE DATABASE, this is to
+ * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
+ * operation, which would cause us to lose any unlogged operations
+ * done in the new DB tablespace before the next checkpoint.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+
+ /*
+ * Set flag to update flat database file at commit. Note: this also
+ * forces synchronous commit, which minimizes the window between
+ * copying the database files and commital of the transaction. If we
+ * crash before committing, we'll leave an orphaned set of files on
+ * disk, which is not fatal but not good either.
+ */
+ database_file_update_needed();
+
+ /*
+ * Close pg_database, but keep lock till commit (this is important to
+ * prevent any risk of deadlock failure while updating flat file)
+ */
+ heap_close(pgdbrel, NoLock);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
+ PointerGetDatum(&fparms));
+
+ /*
+ * Commit the transaction so that the pg_database update is committed.
+ * If we crash while removing files, the database won't be corrupt,
+ * we'll just leave some orphaned files in the old directory.
+ *
+ * (This is OK because we know we aren't inside a transaction block.)
+ *
+ * XXX would it be safe/better to do this inside the ensure block? Not
+ * convinced it's a good idea; consider elog just after the transaction
+ * really commits.
+ */
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+ /* Start new transaction for the remaining work; don't need a snapshot */
+ StartTransactionCommand();
+
+ /*
+ * Remove files from the old tablespace
+ */
+ if (!rmtree(src_dbpath, true))
+ ereport(WARNING,
+ (errmsg("some useless files may be left behind in old database directory \"%s\"",
+ src_dbpath)));
+
+ /*
+ * Record the filesystem change in XLOG
+ */
+ {
+ xl_dbase_drop_rec xlrec;
+ XLogRecData rdata[1];
+
+ xlrec.db_id = db_id;
+ xlrec.tablespace_id = src_tblspcoid;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(xl_dbase_drop_rec);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = NULL;
+
+ (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
+ }
+
+ /* Now it's safe to release the database lock */
+ UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
+ AccessExclusiveLock);
+}
+
+/* Error cleanup callback for movedb */
+static void
+movedb_failure_callback(int code, Datum arg)
+{
+ movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
+ char *dstpath;
+
+ /* Get rid of anything we managed to copy to the target directory */
+ dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
+
+ (void) rmtree(dstpath, true);
+}
+
+
+/*
* ALTER DATABASE name ...
*/
void
-AlterDatabase(AlterDatabaseStmt *stmt)
+AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
{
Relation rel;
HeapTuple tuple,
@@ -948,6 +1277,7 @@ AlterDatabase(AlterDatabaseStmt *stmt)
ListCell *option;
int connlimit = -1;
DefElem *dconnlimit = NULL;
+ DefElem *dtablespace = NULL;
Datum new_record[Natts_pg_database];
bool new_record_nulls[Natts_pg_database];
bool new_record_repl[Natts_pg_database];
@@ -965,11 +1295,29 @@ AlterDatabase(AlterDatabaseStmt *stmt)
errmsg("conflicting or redundant options")));
dconnlimit = defel;
}
+ else if (strcmp(defel->defname, "tablespace") == 0)
+ {
+ if (dtablespace)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ dtablespace = defel;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
+ if (dtablespace)
+ {
+ /* currently, can't be specified along with any other options */
+ Assert(!dconnlimit);
+ /* this case isn't allowed within a transaction block */
+ PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+ movedb(stmt->dbname, strVal(dtablespace->arg));
+ return;
+ }
+
if (dconnlimit)
connlimit = intVal(dconnlimit->arg);