aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c411
1 files changed, 411 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 05e86de8ebc..582890a3025 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -670,6 +670,9 @@ static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static char GetAttributeCompression(Oid atttypid, const char *compression);
static char GetAttributeStorage(Oid atttypid, const char *storagemode);
+static void ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab,
+ Relation rel, PartitionCmd *cmd,
+ AlterTableUtilityContext *context);
static void ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel,
PartitionCmd *cmd, AlterTableUtilityContext *context);
@@ -4740,6 +4743,10 @@ AlterTableGetLockLevel(List *cmds)
cmd_lockmode = ShareUpdateExclusiveLock;
break;
+ case AT_SplitPartition:
+ cmd_lockmode = AccessExclusiveLock;
+ break;
+
case AT_MergePartitions:
cmd_lockmode = AccessExclusiveLock;
break;
@@ -5163,6 +5170,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
+ case AT_SplitPartition:
+ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE);
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
case AT_MergePartitions:
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE);
/* No command-specific prep needed */
@@ -5565,6 +5577,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
case AT_DetachPartitionFinalize:
address = ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name);
break;
+ case AT_SplitPartition:
+ cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode,
+ cur_pass, context);
+ Assert(cmd != NULL);
+ Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+ ATExecSplitPartition(wqueue, tab, rel, (PartitionCmd *) cmd->def,
+ context);
+ break;
case AT_MergePartitions:
cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode,
cur_pass, context);
@@ -6567,6 +6587,8 @@ alter_table_type_to_string(AlterTableType cmdtype)
return "DETACH PARTITION";
case AT_DetachPartitionFinalize:
return "DETACH PARTITION ... FINALIZE";
+ case AT_SplitPartition:
+ return "SPLIT PARTITION";
case AT_MergePartitions:
return "MERGE PARTITIONS";
case AT_AddIdentity:
@@ -20853,6 +20875,260 @@ GetAttributeStorage(Oid atttypid, const char *storagemode)
}
/*
+ * Struct with context of new partition for insert rows from splited partition
+ */
+typedef struct SplitPartitionContext
+{
+ ExprState *partqualstate; /* expression for check slot for partition
+ * (NULL for DEFAULT partition) */
+ BulkInsertState bistate; /* state of bulk inserts for partition */
+ TupleTableSlot *dstslot; /* slot for insert row into partition */
+ Relation partRel; /* relation for partition */
+} SplitPartitionContext;
+
+
+/*
+ * createSplitPartitionContext: create context for partition and fill it
+ */
+static SplitPartitionContext *
+createSplitPartitionContext(Relation partRel)
+{
+ SplitPartitionContext *pc;
+
+ pc = (SplitPartitionContext *) palloc0(sizeof(SplitPartitionContext));
+ pc->partRel = partRel;
+
+ /*
+ * Prepare a BulkInsertState for table_tuple_insert. The FSM is empty, so
+ * don't bother using it.
+ */
+ pc->bistate = GetBulkInsertState();
+
+ /* Create tuple slot for new partition. */
+ pc->dstslot = MakeSingleTupleTableSlot(RelationGetDescr(pc->partRel),
+ table_slot_callbacks(pc->partRel));
+ ExecStoreAllNullTuple(pc->dstslot);
+
+ return pc;
+}
+
+/*
+ * deleteSplitPartitionContext: delete context for partition
+ */
+static void
+deleteSplitPartitionContext(SplitPartitionContext *pc, int ti_options)
+{
+ ExecDropSingleTupleTableSlot(pc->dstslot);
+ FreeBulkInsertState(pc->bistate);
+
+ table_finish_bulk_insert(pc->partRel, ti_options);
+
+ pfree(pc);
+}
+
+/*
+ * moveSplitTableRows: scan split partition (splitRel) of partitioned table
+ * (rel) and move rows into new partitions.
+ *
+ * New partitions description:
+ * partlist: list of pointers to SinglePartitionSpec structures.
+ * newPartRels: list of Relation's.
+ * defaultPartOid: oid of DEFAULT partition, for table rel.
+ */
+static void
+moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPartRels, Oid defaultPartOid)
+{
+ /* The FSM is empty, so don't bother using it. */
+ int ti_options = TABLE_INSERT_SKIP_FSM;
+ CommandId mycid;
+ EState *estate;
+ ListCell *listptr,
+ *listptr2;
+ TupleTableSlot *srcslot;
+ ExprContext *econtext;
+ TableScanDesc scan;
+ Snapshot snapshot;
+ MemoryContext oldCxt;
+ List *partContexts = NIL;
+ TupleConversionMap *tuple_map;
+ SplitPartitionContext *defaultPartCtx = NULL,
+ *pc;
+ bool isOldDefaultPart = false;
+
+ mycid = GetCurrentCommandId(true);
+
+ estate = CreateExecutorState();
+
+ forboth(listptr, partlist, listptr2, newPartRels)
+ {
+ SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
+
+ pc = createSplitPartitionContext((Relation) lfirst(listptr2));
+
+ if (sps->bound->is_default)
+ {
+ /* We should not create constraint for detached DEFAULT partition. */
+ defaultPartCtx = pc;
+ }
+ else
+ {
+ List *partConstraint;
+
+ /* Build expression execution states for partition check quals. */
+ partConstraint = get_qual_from_partbound(rel, sps->bound);
+ partConstraint =
+ (List *) eval_const_expressions(NULL,
+ (Node *) partConstraint);
+ /* Make boolean expression for ExecCheck(). */
+ partConstraint = list_make1(make_ands_explicit(partConstraint));
+
+ /*
+ * Map the vars in the constraint expression from rel's attnos to
+ * splitRel's.
+ */
+ partConstraint = map_partition_varattnos(partConstraint,
+ 1, splitRel, rel);
+
+ pc->partqualstate =
+ ExecPrepareExpr((Expr *) linitial(partConstraint), estate);
+ Assert(pc->partqualstate != NULL);
+ }
+
+ /* Store partition context into list. */
+ partContexts = lappend(partContexts, pc);
+ }
+
+ /*
+ * Create partition context for DEFAULT partition. We can insert values
+ * into this partition in case spaces with values between new partitions.
+ */
+ if (!defaultPartCtx && OidIsValid(defaultPartOid))
+ {
+ /* Indicate that we allocate context for old DEFAULT partition */
+ isOldDefaultPart = true;
+ defaultPartCtx = createSplitPartitionContext(table_open(defaultPartOid, AccessExclusiveLock));
+ }
+
+ econtext = GetPerTupleExprContext(estate);
+
+ /* Create necessary tuple slot. */
+ srcslot = MakeSingleTupleTableSlot(RelationGetDescr(splitRel),
+ table_slot_callbacks(splitRel));
+
+ /*
+ * Map computing for moving attributes of split partition to new partition
+ * (for first new partition but other new partitions can use the same
+ * map).
+ */
+ pc = (SplitPartitionContext *) lfirst(list_head(partContexts));
+ tuple_map = convert_tuples_by_name(RelationGetDescr(splitRel),
+ RelationGetDescr(pc->partRel));
+
+ /* Scan through the rows. */
+ snapshot = RegisterSnapshot(GetLatestSnapshot());
+ scan = table_beginscan(splitRel, snapshot, 0, NULL);
+
+ /*
+ * Switch to per-tuple memory context and reset it for each tuple
+ * produced, so we don't leak memory.
+ */
+ oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot))
+ {
+ bool found = false;
+ bool insert_indexes;
+ TupleTableSlot *insertslot;
+
+ /* Extract data from old tuple. */
+ slot_getallattrs(srcslot);
+
+ econtext->ecxt_scantuple = srcslot;
+
+ /* Search partition for current slot srcslot. */
+ foreach(listptr, partContexts)
+ {
+ pc = (SplitPartitionContext *) lfirst(listptr);
+
+ if (pc->partqualstate /* skip DEFAULT partition */ &&
+ ExecCheck(pc->partqualstate, econtext))
+ {
+ found = true;
+ break;
+ }
+ ResetExprContext(econtext);
+ }
+ if (!found)
+ {
+ /* Use DEFAULT partition if it exists. */
+ if (defaultPartCtx)
+ pc = defaultPartCtx;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("can not find partition for split partition row"),
+ errtable(splitRel)));
+ }
+
+ if (tuple_map)
+ {
+ /* Need to use map for copy attributes. */
+ insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, pc->dstslot);
+ }
+ else
+ {
+ /* Copy attributes directly. */
+ insertslot = pc->dstslot;
+
+ ExecClearTuple(insertslot);
+
+ memcpy(insertslot->tts_values, srcslot->tts_values,
+ sizeof(Datum) * srcslot->tts_nvalid);
+ memcpy(insertslot->tts_isnull, srcslot->tts_isnull,
+ sizeof(bool) * srcslot->tts_nvalid);
+
+ ExecStoreVirtualTuple(insertslot);
+ }
+
+ /*
+ * Write the tuple out to the new relation. We ignore the
+ * 'insert_indexes' flag since newPartRel has no indexes anyway.
+ */
+ (void) table_tuple_insert(pc->partRel, insertslot, mycid,
+ ti_options, pc->bistate, &insert_indexes);
+
+ ResetExprContext(econtext);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ MemoryContextSwitchTo(oldCxt);
+
+ table_endscan(scan);
+ UnregisterSnapshot(snapshot);
+
+ if (tuple_map)
+ free_conversion_map(tuple_map);
+
+ ExecDropSingleTupleTableSlot(srcslot);
+
+ FreeExecutorState(estate);
+
+ foreach(listptr, partContexts)
+ deleteSplitPartitionContext((SplitPartitionContext *) lfirst(listptr), ti_options);
+
+ /* Need to close table and free buffers for DEFAULT partition. */
+ if (isOldDefaultPart)
+ {
+ Relation defaultPartRel = defaultPartCtx->partRel;
+
+ deleteSplitPartitionContext(defaultPartCtx, ti_options);
+ /* Keep the lock until commit. */
+ table_close(defaultPartRel, NoLock);
+ }
+}
+
+/*
* createPartitionTable: create table for new partition with given name
* (newPartName) like table (modelRelName)
*
@@ -20907,6 +21183,141 @@ createPartitionTable(RangeVar *newPartName, RangeVar *modelRelName,
}
/*
+ * ALTER TABLE <name> SPLIT PARTITION <partition-name> INTO <partition-list>
+ */
+static void
+ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ PartitionCmd *cmd, AlterTableUtilityContext *context)
+{
+ Relation splitRel;
+ Oid splitRelOid;
+ char relname[NAMEDATALEN];
+ Oid namespaceId;
+ ListCell *listptr,
+ *listptr2;
+ bool isSameName = false;
+ char tmpRelName[NAMEDATALEN];
+ List *newPartRels = NIL;
+ ObjectAddress object;
+ RangeVar *parentName;
+ Oid defaultPartOid;
+
+ defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true));
+
+ /*
+ * We are going to detach and remove this partition: need to use exclusive
+ * lock for prevent DML-queries to the partition.
+ */
+ splitRel = table_openrv(cmd->name, AccessExclusiveLock);
+
+ if (splitRel->rd_rel->relkind != RELKIND_RELATION)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot split non-table partition \"%s\"",
+ RelationGetRelationName(splitRel))));
+
+ splitRelOid = RelationGetRelid(splitRel);
+
+ /* Check descriptions of new partitions. */
+ foreach(listptr, cmd->partlist)
+ {
+ Oid existing_relid;
+ SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
+
+ strlcpy(relname, sps->name->relname, NAMEDATALEN);
+
+ /*
+ * Look up the namespace in which we are supposed to create the
+ * partition, check we have permission to create there, lock it
+ * against concurrent drop, and mark stmt->relation as
+ * RELPERSISTENCE_TEMP if a temporary namespace is selected.
+ */
+ namespaceId =
+ RangeVarGetAndCheckCreationNamespace(sps->name, NoLock, NULL);
+
+ /*
+ * This would fail later on anyway, if the relation already exists.
+ * But by catching it here we can emit a nicer error message.
+ */
+ existing_relid = get_relname_relid(relname, namespaceId);
+ if (existing_relid == splitRelOid && !isSameName)
+ /* One new partition can have the same name as split partition. */
+ isSameName = true;
+ else if (existing_relid != InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("relation \"%s\" already exists", relname)));
+ }
+
+ /* Detach split partition. */
+ RemoveInheritance(splitRel, rel, false);
+ /* Do the final part of detaching. */
+ DetachPartitionFinalize(rel, splitRel, false, defaultPartOid);
+
+ /*
+ * If new partition has the same name as split partition then we should
+ * rename split partition for reuse name.
+ */
+ if (isSameName)
+ {
+ /*
+ * We must bump the command counter to make the split partition tuple
+ * visible for rename.
+ */
+ CommandCounterIncrement();
+ /* Rename partition. */
+ sprintf(tmpRelName, "split-%u-%X-tmp", RelationGetRelid(rel), MyProcPid);
+ RenameRelationInternal(splitRelOid, tmpRelName, false, false);
+
+ /*
+ * We must bump the command counter to make the split partition tuple
+ * visible after rename.
+ */
+ CommandCounterIncrement();
+ }
+
+ /* Create new partitions (like split partition), without indexes. */
+ parentName = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), -1);
+ foreach(listptr, cmd->partlist)
+ {
+ SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
+ Relation newPartRel;
+
+ createPartitionTable(sps->name, parentName, context);
+
+ /* Open the new partition and acquire exclusive lock on it. */
+ newPartRel = table_openrv(sps->name, AccessExclusiveLock);
+
+ newPartRels = lappend(newPartRels, newPartRel);
+ }
+
+ /* Copy data from split partition to new partitions. */
+ moveSplitTableRows(rel, splitRel, cmd->partlist, newPartRels, defaultPartOid);
+ /* Keep the lock until commit. */
+ table_close(splitRel, NoLock);
+
+ /* Attach new partitions to partitioned table. */
+ forboth(listptr, cmd->partlist, listptr2, newPartRels)
+ {
+ SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
+ Relation newPartRel = (Relation) lfirst(listptr2);
+
+ /* wqueue = NULL: verification for each cloned constraint is not need. */
+ attachPartitionTable(NULL, rel, newPartRel, sps->bound);
+ /* Keep the lock until commit. */
+ table_close(newPartRel, NoLock);
+ }
+
+ /* Drop split partition. */
+ object.classId = RelationRelationId;
+ object.objectId = splitRelOid;
+ object.objectSubId = 0;
+ /* Probably DROP_CASCADE is not needed. */
+ performDeletion(&object, DROP_RESTRICT, 0);
+}
+
+/*
* moveMergedTablesRows: scan partitions to be merged (mergingPartitionsList)
* of the partitioned table (rel) and move rows into the new partition
* (newPartRel).