diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 05e86de8ebc..582890a3025 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -670,6 +670,9 @@ static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, const char *compression); static char GetAttributeStorage(Oid atttypid, const char *storagemode); +static void ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, + Relation rel, PartitionCmd *cmd, + AlterTableUtilityContext *context); static void ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); @@ -4740,6 +4743,10 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = ShareUpdateExclusiveLock; break; + case AT_SplitPartition: + cmd_lockmode = AccessExclusiveLock; + break; + case AT_MergePartitions: cmd_lockmode = AccessExclusiveLock; break; @@ -5163,6 +5170,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, /* No command-specific prep needed */ pass = AT_PASS_MISC; break; + case AT_SplitPartition: + ATSimplePermissions(cmd->subtype, rel, ATT_TABLE); + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; case AT_MergePartitions: ATSimplePermissions(cmd->subtype, rel, ATT_TABLE); /* No command-specific prep needed */ @@ -5565,6 +5577,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, case AT_DetachPartitionFinalize: address = ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name); break; + case AT_SplitPartition: + cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, + cur_pass, context); + Assert(cmd != NULL); + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + ATExecSplitPartition(wqueue, tab, rel, (PartitionCmd *) cmd->def, + context); + break; case AT_MergePartitions: cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, cur_pass, context); @@ -6567,6 +6587,8 @@ alter_table_type_to_string(AlterTableType cmdtype) return "DETACH PARTITION"; case AT_DetachPartitionFinalize: return "DETACH PARTITION ... FINALIZE"; + case AT_SplitPartition: + return "SPLIT PARTITION"; case AT_MergePartitions: return "MERGE PARTITIONS"; case AT_AddIdentity: @@ -20853,6 +20875,260 @@ GetAttributeStorage(Oid atttypid, const char *storagemode) } /* + * Struct with context of new partition for insert rows from splited partition + */ +typedef struct SplitPartitionContext +{ + ExprState *partqualstate; /* expression for check slot for partition + * (NULL for DEFAULT partition) */ + BulkInsertState bistate; /* state of bulk inserts for partition */ + TupleTableSlot *dstslot; /* slot for insert row into partition */ + Relation partRel; /* relation for partition */ +} SplitPartitionContext; + + +/* + * createSplitPartitionContext: create context for partition and fill it + */ +static SplitPartitionContext * +createSplitPartitionContext(Relation partRel) +{ + SplitPartitionContext *pc; + + pc = (SplitPartitionContext *) palloc0(sizeof(SplitPartitionContext)); + pc->partRel = partRel; + + /* + * Prepare a BulkInsertState for table_tuple_insert. The FSM is empty, so + * don't bother using it. + */ + pc->bistate = GetBulkInsertState(); + + /* Create tuple slot for new partition. */ + pc->dstslot = MakeSingleTupleTableSlot(RelationGetDescr(pc->partRel), + table_slot_callbacks(pc->partRel)); + ExecStoreAllNullTuple(pc->dstslot); + + return pc; +} + +/* + * deleteSplitPartitionContext: delete context for partition + */ +static void +deleteSplitPartitionContext(SplitPartitionContext *pc, int ti_options) +{ + ExecDropSingleTupleTableSlot(pc->dstslot); + FreeBulkInsertState(pc->bistate); + + table_finish_bulk_insert(pc->partRel, ti_options); + + pfree(pc); +} + +/* + * moveSplitTableRows: scan split partition (splitRel) of partitioned table + * (rel) and move rows into new partitions. + * + * New partitions description: + * partlist: list of pointers to SinglePartitionSpec structures. + * newPartRels: list of Relation's. + * defaultPartOid: oid of DEFAULT partition, for table rel. + */ +static void +moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPartRels, Oid defaultPartOid) +{ + /* The FSM is empty, so don't bother using it. */ + int ti_options = TABLE_INSERT_SKIP_FSM; + CommandId mycid; + EState *estate; + ListCell *listptr, + *listptr2; + TupleTableSlot *srcslot; + ExprContext *econtext; + TableScanDesc scan; + Snapshot snapshot; + MemoryContext oldCxt; + List *partContexts = NIL; + TupleConversionMap *tuple_map; + SplitPartitionContext *defaultPartCtx = NULL, + *pc; + bool isOldDefaultPart = false; + + mycid = GetCurrentCommandId(true); + + estate = CreateExecutorState(); + + forboth(listptr, partlist, listptr2, newPartRels) + { + SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); + + pc = createSplitPartitionContext((Relation) lfirst(listptr2)); + + if (sps->bound->is_default) + { + /* We should not create constraint for detached DEFAULT partition. */ + defaultPartCtx = pc; + } + else + { + List *partConstraint; + + /* Build expression execution states for partition check quals. */ + partConstraint = get_qual_from_partbound(rel, sps->bound); + partConstraint = + (List *) eval_const_expressions(NULL, + (Node *) partConstraint); + /* Make boolean expression for ExecCheck(). */ + partConstraint = list_make1(make_ands_explicit(partConstraint)); + + /* + * Map the vars in the constraint expression from rel's attnos to + * splitRel's. + */ + partConstraint = map_partition_varattnos(partConstraint, + 1, splitRel, rel); + + pc->partqualstate = + ExecPrepareExpr((Expr *) linitial(partConstraint), estate); + Assert(pc->partqualstate != NULL); + } + + /* Store partition context into list. */ + partContexts = lappend(partContexts, pc); + } + + /* + * Create partition context for DEFAULT partition. We can insert values + * into this partition in case spaces with values between new partitions. + */ + if (!defaultPartCtx && OidIsValid(defaultPartOid)) + { + /* Indicate that we allocate context for old DEFAULT partition */ + isOldDefaultPart = true; + defaultPartCtx = createSplitPartitionContext(table_open(defaultPartOid, AccessExclusiveLock)); + } + + econtext = GetPerTupleExprContext(estate); + + /* Create necessary tuple slot. */ + srcslot = MakeSingleTupleTableSlot(RelationGetDescr(splitRel), + table_slot_callbacks(splitRel)); + + /* + * Map computing for moving attributes of split partition to new partition + * (for first new partition but other new partitions can use the same + * map). + */ + pc = (SplitPartitionContext *) lfirst(list_head(partContexts)); + tuple_map = convert_tuples_by_name(RelationGetDescr(splitRel), + RelationGetDescr(pc->partRel)); + + /* Scan through the rows. */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = table_beginscan(splitRel, snapshot, 0, NULL); + + /* + * Switch to per-tuple memory context and reset it for each tuple + * produced, so we don't leak memory. + */ + oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot)) + { + bool found = false; + bool insert_indexes; + TupleTableSlot *insertslot; + + /* Extract data from old tuple. */ + slot_getallattrs(srcslot); + + econtext->ecxt_scantuple = srcslot; + + /* Search partition for current slot srcslot. */ + foreach(listptr, partContexts) + { + pc = (SplitPartitionContext *) lfirst(listptr); + + if (pc->partqualstate /* skip DEFAULT partition */ && + ExecCheck(pc->partqualstate, econtext)) + { + found = true; + break; + } + ResetExprContext(econtext); + } + if (!found) + { + /* Use DEFAULT partition if it exists. */ + if (defaultPartCtx) + pc = defaultPartCtx; + else + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("can not find partition for split partition row"), + errtable(splitRel))); + } + + if (tuple_map) + { + /* Need to use map for copy attributes. */ + insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, pc->dstslot); + } + else + { + /* Copy attributes directly. */ + insertslot = pc->dstslot; + + ExecClearTuple(insertslot); + + memcpy(insertslot->tts_values, srcslot->tts_values, + sizeof(Datum) * srcslot->tts_nvalid); + memcpy(insertslot->tts_isnull, srcslot->tts_isnull, + sizeof(bool) * srcslot->tts_nvalid); + + ExecStoreVirtualTuple(insertslot); + } + + /* + * Write the tuple out to the new relation. We ignore the + * 'insert_indexes' flag since newPartRel has no indexes anyway. + */ + (void) table_tuple_insert(pc->partRel, insertslot, mycid, + ti_options, pc->bistate, &insert_indexes); + + ResetExprContext(econtext); + + CHECK_FOR_INTERRUPTS(); + } + + MemoryContextSwitchTo(oldCxt); + + table_endscan(scan); + UnregisterSnapshot(snapshot); + + if (tuple_map) + free_conversion_map(tuple_map); + + ExecDropSingleTupleTableSlot(srcslot); + + FreeExecutorState(estate); + + foreach(listptr, partContexts) + deleteSplitPartitionContext((SplitPartitionContext *) lfirst(listptr), ti_options); + + /* Need to close table and free buffers for DEFAULT partition. */ + if (isOldDefaultPart) + { + Relation defaultPartRel = defaultPartCtx->partRel; + + deleteSplitPartitionContext(defaultPartCtx, ti_options); + /* Keep the lock until commit. */ + table_close(defaultPartRel, NoLock); + } +} + +/* * createPartitionTable: create table for new partition with given name * (newPartName) like table (modelRelName) * @@ -20907,6 +21183,141 @@ createPartitionTable(RangeVar *newPartName, RangeVar *modelRelName, } /* + * ALTER TABLE <name> SPLIT PARTITION <partition-name> INTO <partition-list> + */ +static void +ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel, + PartitionCmd *cmd, AlterTableUtilityContext *context) +{ + Relation splitRel; + Oid splitRelOid; + char relname[NAMEDATALEN]; + Oid namespaceId; + ListCell *listptr, + *listptr2; + bool isSameName = false; + char tmpRelName[NAMEDATALEN]; + List *newPartRels = NIL; + ObjectAddress object; + RangeVar *parentName; + Oid defaultPartOid; + + defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true)); + + /* + * We are going to detach and remove this partition: need to use exclusive + * lock for prevent DML-queries to the partition. + */ + splitRel = table_openrv(cmd->name, AccessExclusiveLock); + + if (splitRel->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot split non-table partition \"%s\"", + RelationGetRelationName(splitRel)))); + + splitRelOid = RelationGetRelid(splitRel); + + /* Check descriptions of new partitions. */ + foreach(listptr, cmd->partlist) + { + Oid existing_relid; + SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); + + strlcpy(relname, sps->name->relname, NAMEDATALEN); + + /* + * Look up the namespace in which we are supposed to create the + * partition, check we have permission to create there, lock it + * against concurrent drop, and mark stmt->relation as + * RELPERSISTENCE_TEMP if a temporary namespace is selected. + */ + namespaceId = + RangeVarGetAndCheckCreationNamespace(sps->name, NoLock, NULL); + + /* + * This would fail later on anyway, if the relation already exists. + * But by catching it here we can emit a nicer error message. + */ + existing_relid = get_relname_relid(relname, namespaceId); + if (existing_relid == splitRelOid && !isSameName) + /* One new partition can have the same name as split partition. */ + isSameName = true; + else if (existing_relid != InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s\" already exists", relname))); + } + + /* Detach split partition. */ + RemoveInheritance(splitRel, rel, false); + /* Do the final part of detaching. */ + DetachPartitionFinalize(rel, splitRel, false, defaultPartOid); + + /* + * If new partition has the same name as split partition then we should + * rename split partition for reuse name. + */ + if (isSameName) + { + /* + * We must bump the command counter to make the split partition tuple + * visible for rename. + */ + CommandCounterIncrement(); + /* Rename partition. */ + sprintf(tmpRelName, "split-%u-%X-tmp", RelationGetRelid(rel), MyProcPid); + RenameRelationInternal(splitRelOid, tmpRelName, false, false); + + /* + * We must bump the command counter to make the split partition tuple + * visible after rename. + */ + CommandCounterIncrement(); + } + + /* Create new partitions (like split partition), without indexes. */ + parentName = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), -1); + foreach(listptr, cmd->partlist) + { + SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); + Relation newPartRel; + + createPartitionTable(sps->name, parentName, context); + + /* Open the new partition and acquire exclusive lock on it. */ + newPartRel = table_openrv(sps->name, AccessExclusiveLock); + + newPartRels = lappend(newPartRels, newPartRel); + } + + /* Copy data from split partition to new partitions. */ + moveSplitTableRows(rel, splitRel, cmd->partlist, newPartRels, defaultPartOid); + /* Keep the lock until commit. */ + table_close(splitRel, NoLock); + + /* Attach new partitions to partitioned table. */ + forboth(listptr, cmd->partlist, listptr2, newPartRels) + { + SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); + Relation newPartRel = (Relation) lfirst(listptr2); + + /* wqueue = NULL: verification for each cloned constraint is not need. */ + attachPartitionTable(NULL, rel, newPartRel, sps->bound); + /* Keep the lock until commit. */ + table_close(newPartRel, NoLock); + } + + /* Drop split partition. */ + object.classId = RelationRelationId; + object.objectId = splitRelOid; + object.objectSubId = 0; + /* Probably DROP_CASCADE is not needed. */ + performDeletion(&object, DROP_RESTRICT, 0); +} + +/* * moveMergedTablesRows: scan partitions to be merged (mergingPartitionsList) * of the partitioned table (rel) and move rows into the new partition * (newPartRel). |