diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2006-08-12 02:52:06 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2006-08-12 02:52:06 +0000 |
commit | 7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07 (patch) | |
tree | 215adabe95d76123f6120fc22e4b51b5a1baf4cd /src/backend/executor/execMain.c | |
parent | 5c9e9c0c42904648af5a03fe90db8050e31d603f (diff) | |
download | postgresql-7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07.tar.gz postgresql-7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07.zip |
Add INSERT/UPDATE/DELETE RETURNING, with basic docs and regression tests.
plpgsql support to come later. Along the way, convert execMain's
SELECT INTO support into a DestReceiver, in order to eliminate some ugly
special cases.
Jonah Harris and Tom Lane
Diffstat (limited to 'src/backend/executor/execMain.c')
-rw-r--r-- | src/backend/executor/execMain.c | 655 |
1 files changed, 455 insertions, 200 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index efc5d54fe9e..05c4a542b84 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -26,7 +26,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.277 2006/07/31 01:16:37 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.278 2006/08/12 02:52:04 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -43,6 +43,7 @@ #include "commands/trigger.h" #include "executor/execdebug.h" #include "executor/instrument.h" +#include "executor/nodeSubplan.h" #include "miscadmin.h" #include "optimizer/clauses.h" #include "parser/parse_clause.h" @@ -75,14 +76,20 @@ static TupleTableSlot *ExecutePlan(EState *estate, PlanState *planstate, ScanDirection direction, DestReceiver *dest); static void ExecSelect(TupleTableSlot *slot, - DestReceiver *dest, - EState *estate); + DestReceiver *dest, EState *estate); static void ExecInsert(TupleTableSlot *slot, ItemPointer tupleid, - EState *estate); -static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid, - EState *estate); + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate); +static void ExecDelete(ItemPointer tupleid, + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate); static void ExecUpdate(TupleTableSlot *slot, ItemPointer tupleid, - EState *estate); + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate); +static void ExecProcessReturning(ProjectionInfo *projectReturning, + TupleTableSlot *tupleSlot, + TupleTableSlot *planSlot, + DestReceiver *dest); static TupleTableSlot *EvalPlanQualNext(EState *estate); static void EndEvalPlanQual(EState *estate); static void ExecCheckRTEPerms(RangeTblEntry *rte); @@ -90,6 +97,12 @@ static void ExecCheckXactReadOnly(Query *parsetree); static void EvalPlanQualStart(evalPlanQual *epq, EState *estate, evalPlanQual *priorepq); static void EvalPlanQualStop(evalPlanQual *epq); +static void OpenIntoRel(QueryDesc *queryDesc); +static void CloseIntoRel(QueryDesc *queryDesc); +static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); +static void intorel_receive(TupleTableSlot *slot, DestReceiver *self); +static void intorel_shutdown(DestReceiver *self); +static void intorel_destroy(DestReceiver *self); /* end of local decls */ @@ -185,6 +198,7 @@ ExecutorRun(QueryDesc *queryDesc, EState *estate; CmdType operation; DestReceiver *dest; + bool sendTuples; TupleTableSlot *result; MemoryContext oldcontext; @@ -207,12 +221,16 @@ ExecutorRun(QueryDesc *queryDesc, dest = queryDesc->dest; /* - * startup tuple receiver + * startup tuple receiver, if we will be emitting tuples */ estate->es_processed = 0; estate->es_lastoid = InvalidOid; - (*dest->rStartup) (dest, operation, queryDesc->tupDesc); + sendTuples = (operation == CMD_SELECT || + queryDesc->parsetree->returningList); + + if (sendTuples) + (*dest->rStartup) (dest, operation, queryDesc->tupDesc); /* * run plan @@ -228,9 +246,10 @@ ExecutorRun(QueryDesc *queryDesc, dest); /* - * shutdown receiver + * shutdown tuple receiver, if we started it */ - (*dest->rShutdown) (dest); + if (sendTuples) + (*dest->rShutdown) (dest); MemoryContextSwitchTo(oldcontext); @@ -265,6 +284,12 @@ ExecutorEnd(QueryDesc *queryDesc) ExecEndPlan(queryDesc->planstate, estate); /* + * Close the SELECT INTO relation if any + */ + if (estate->es_select_into) + CloseIntoRel(queryDesc); + + /* * Must switch out of context before destroying it */ MemoryContextSwitchTo(oldcontext); @@ -449,8 +474,6 @@ InitPlan(QueryDesc *queryDesc, int eflags) EState *estate = queryDesc->estate; PlanState *planstate; List *rangeTable; - Relation intoRelationDesc; - bool do_select_into; TupleDesc tupType; ListCell *l; @@ -534,13 +557,11 @@ InitPlan(QueryDesc *queryDesc, int eflags) /* * Detect whether we're doing SELECT INTO. If so, set the es_into_oids * flag appropriately so that the plan tree will be initialized with the - * correct tuple descriptors. + * correct tuple descriptors. (Other SELECT INTO stuff comes later.) */ - do_select_into = false; - + estate->es_select_into = false; if (operation == CMD_SELECT && parseTree->into != NULL) { - do_select_into = true; estate->es_select_into = true; estate->es_into_oids = interpretOidsOption(parseTree->intoOptions); } @@ -581,7 +602,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) else nSlots += 1; if (operation != CMD_SELECT) - nSlots++; + nSlots++; /* for es_trig_tuple_slot */ + if (parseTree->returningLists) + nSlots++; /* for RETURNING projection */ estate->es_tupleTable = ExecCreateTupleTable(nSlots); @@ -638,7 +661,7 @@ InitPlan(QueryDesc *queryDesc, int eflags) } } if (!junk_filter_needed && - (operation == CMD_INSERT || do_select_into) && + (operation == CMD_INSERT || estate->es_select_into) && ExecMayReturnRawTuples(planstate)) junk_filter_needed = true; break; @@ -713,140 +736,70 @@ InitPlan(QueryDesc *queryDesc, int eflags) } /* - * If doing SELECT INTO, initialize the "into" relation. We must wait - * till now so we have the "clean" result tuple type to create the new - * table from. - * - * If EXPLAIN, skip creating the "into" relation. + * Initialize RETURNING projections if needed. */ - intoRelationDesc = NULL; - - if (do_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + if (parseTree->returningLists) { - char *intoName; - Oid namespaceId; - Oid tablespaceId; - Datum reloptions; - AclResult aclresult; - Oid intoRelationId; - TupleDesc tupdesc; - - /* - * Check consistency of arguments - */ - if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("ON COMMIT can only be used on temporary tables"))); + TupleTableSlot *slot; + ExprContext *econtext; + ResultRelInfo *resultRelInfo; /* - * find namespace to create in, check permissions + * We set QueryDesc.tupDesc to be the RETURNING rowtype in this case. + * We assume all the sublists will generate the same output tupdesc. */ - intoName = parseTree->into->relname; - namespaceId = RangeVarGetCreationNamespace(parseTree->into); + tupType = ExecTypeFromTL((List *) linitial(parseTree->returningLists), + false); - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); + /* Set up a slot for the output of the RETURNING projection(s) */ + slot = ExecAllocTableSlot(estate->es_tupleTable); + ExecSetSlotDescriptor(slot, tupType); + /* Need an econtext too */ + econtext = CreateExprContext(estate); /* - * Select tablespace to use. If not specified, use default_tablespace - * (which may in turn default to database's default). + * Build a projection for each result rel. Note that any SubPlans + * in the RETURNING lists get attached to the topmost plan node. */ - if (parseTree->intoTableSpaceName) + Assert(list_length(parseTree->returningLists) == estate->es_num_result_relations); + resultRelInfo = estate->es_result_relations; + foreach(l, parseTree->returningLists) { - tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName); - if (!OidIsValid(tablespaceId)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("tablespace \"%s\" does not exist", - parseTree->intoTableSpaceName))); - } else - { - tablespaceId = GetDefaultTablespace(); - /* note InvalidOid is OK in this case */ - } - - /* Parse and validate any reloptions */ - reloptions = transformRelOptions((Datum) 0, - parseTree->intoOptions, - true, - false); - (void) heap_reloptions(RELKIND_RELATION, reloptions, true); + List *rlist = (List *) lfirst(l); + List *rliststate; - /* Check permissions except when using the database's default */ - if (OidIsValid(tablespaceId)) - { - AclResult aclresult; - - aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), - ACL_CREATE); - - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_TABLESPACE, - get_tablespace_name(tablespaceId)); + rliststate = (List *) ExecInitExpr((Expr *) rlist, planstate); + resultRelInfo->ri_projectReturning = + ExecBuildProjectionInfo(rliststate, econtext, slot); + resultRelInfo++; } - - /* - * have to copy tupType to get rid of constraints - */ - tupdesc = CreateTupleDescCopy(tupType); - - intoRelationId = heap_create_with_catalog(intoName, - namespaceId, - tablespaceId, - InvalidOid, - GetUserId(), - tupdesc, - RELKIND_RELATION, - false, - true, - 0, - parseTree->intoOnCommit, - reloptions, - allowSystemTableMods); - - FreeTupleDesc(tupdesc); - /* - * Advance command counter so that the newly-created relation's - * catalog tuples will be visible to heap_open. + * Because we already ran ExecInitNode() for the top plan node, + * any subplans we just attached to it won't have been initialized; + * so we have to do it here. (Ugly, but the alternatives seem worse.) */ - CommandCounterIncrement(); - - /* - * If necessary, create a TOAST table for the into relation. Note that - * AlterTableCreateToastTable ends with CommandCounterIncrement(), so - * that the TOAST table will be visible for insertion. - */ - AlterTableCreateToastTable(intoRelationId); - - /* - * And open the constructed table for writing. - */ - intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock); - - /* use_wal off requires rd_targblock be initially invalid */ - Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber); + foreach(l, planstate->subPlan) + { + SubPlanState *sstate = (SubPlanState *) lfirst(l); - /* - * We can skip WAL-logging the insertions, unless PITR is in use. - * - * Note that for a non-temp INTO table, this is safe only because we - * know that the catalog changes above will have been WAL-logged, and - * so RecordTransactionCommit will think it needs to WAL-log the - * eventual transaction commit. Else the commit might be lost, even - * though all the data is safely fsync'd ... - */ - estate->es_into_relation_use_wal = XLogArchivingActive(); + Assert(IsA(sstate, SubPlanState)); + if (sstate->planstate == NULL) /* already inited? */ + ExecInitSubPlan(sstate, estate, eflags); + } } - estate->es_into_relation_descriptor = intoRelationDesc; - queryDesc->tupDesc = tupType; queryDesc->planstate = planstate; + + /* + * If doing SELECT INTO, initialize the "into" relation. We must wait + * till now so we have the "clean" result tuple type to create the new + * table from. + * + * If EXPLAIN, skip creating the "into" relation. + */ + if (estate->es_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + OpenIntoRel(queryDesc); } /* @@ -914,6 +867,7 @@ initResultRelInfo(ResultRelInfo *resultRelInfo, } resultRelInfo->ri_ConstraintExprs = NULL; resultRelInfo->ri_junkFilter = NULL; + resultRelInfo->ri_projectReturning = NULL; /* * If there are indices on the result relation, open them and save @@ -1032,28 +986,6 @@ ExecEndPlan(PlanState *planstate, EState *estate) } /* - * close the "into" relation if necessary, again keeping lock - */ - if (estate->es_into_relation_descriptor != NULL) - { - /* - * If we skipped using WAL, and it's not a temp relation, we must - * force the relation down to disk before it's safe to commit the - * transaction. This requires forcing out any dirty buffers and then - * doing a forced fsync. - */ - if (!estate->es_into_relation_use_wal && - !estate->es_into_relation_descriptor->rd_istemp) - { - FlushRelationBuffers(estate->es_into_relation_descriptor); - /* FlushRelationBuffers will have opened rd_smgr */ - smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr); - } - - heap_close(estate->es_into_relation_descriptor, NoLock); - } - - /* * close any relations selected FOR UPDATE/FOR SHARE, again keeping locks */ foreach(l, estate->es_rowMarks) @@ -1088,6 +1020,7 @@ ExecutePlan(EState *estate, DestReceiver *dest) { JunkFilter *junkfilter; + TupleTableSlot *planSlot; TupleTableSlot *slot; ItemPointer tupleid = NULL; ItemPointerData tuple_ctid; @@ -1097,7 +1030,6 @@ ExecutePlan(EState *estate, /* * initialize local variables */ - slot = NULL; current_tuple_count = 0; result = NULL; @@ -1140,22 +1072,23 @@ ExecutePlan(EState *estate, lnext: ; if (estate->es_useEvalPlan) { - slot = EvalPlanQualNext(estate); - if (TupIsNull(slot)) - slot = ExecProcNode(planstate); + planSlot = EvalPlanQualNext(estate); + if (TupIsNull(planSlot)) + planSlot = ExecProcNode(planstate); } else - slot = ExecProcNode(planstate); + planSlot = ExecProcNode(planstate); /* * if the tuple is null, then we assume there is nothing more to * process so we just return null... */ - if (TupIsNull(slot)) + if (TupIsNull(planSlot)) { result = NULL; break; } + slot = planSlot; /* * if we have a junk filter, then project a new tuple with the junk @@ -1261,7 +1194,7 @@ lnext: ; estate->es_snapshot->curcid); if (!TupIsNull(newSlot)) { - slot = newSlot; + slot = planSlot = newSlot; estate->es_useEvalPlan = true; goto lmark; } @@ -1282,10 +1215,12 @@ lnext: ; } /* - * Finally create a new "clean" tuple with all junk attributes - * removed + * Create a new "clean" tuple with all junk attributes removed. + * We don't need to do this for DELETE, however (there will + * in fact be no non-junk attributes in a DELETE!) */ - slot = ExecFilterJunk(junkfilter, slot); + if (operation != CMD_DELETE) + slot = ExecFilterJunk(junkfilter, slot); } /* @@ -1296,24 +1231,22 @@ lnext: ; switch (operation) { case CMD_SELECT: - ExecSelect(slot, /* slot containing tuple */ - dest, /* destination's tuple-receiver obj */ - estate); + ExecSelect(slot, dest, estate); result = slot; break; case CMD_INSERT: - ExecInsert(slot, tupleid, estate); + ExecInsert(slot, tupleid, planSlot, dest, estate); result = NULL; break; case CMD_DELETE: - ExecDelete(slot, tupleid, estate); + ExecDelete(tupleid, planSlot, dest, estate); result = NULL; break; case CMD_UPDATE: - ExecUpdate(slot, tupleid, estate); + ExecUpdate(slot, tupleid, planSlot, dest, estate); result = NULL; break; @@ -1364,10 +1297,7 @@ lnext: ; * ExecSelect * * SELECTs are easy.. we just pass the tuple to the appropriate - * print function. The only complexity is when we do a - * "SELECT INTO", in which case we insert the tuple into - * the appropriate relation (note: this is a newly created relation - * so we don't need to worry about indices or locks.) + * output function. * ---------------------------------------------------------------- */ static void @@ -1375,28 +1305,6 @@ ExecSelect(TupleTableSlot *slot, DestReceiver *dest, EState *estate) { - /* - * insert the tuple into the "into relation" - * - * XXX this probably ought to be replaced by a separate destination - */ - if (estate->es_into_relation_descriptor != NULL) - { - HeapTuple tuple; - - tuple = ExecCopySlotTuple(slot); - heap_insert(estate->es_into_relation_descriptor, tuple, - estate->es_snapshot->curcid, - estate->es_into_relation_use_wal, - false); /* never any point in using FSM */ - /* we know there are no indexes to update */ - heap_freetuple(tuple); - IncrAppended(); - } - - /* - * send the tuple to the destination - */ (*dest->receiveSlot) (slot, dest); IncrRetrieved(); (estate->es_processed)++; @@ -1413,6 +1321,8 @@ ExecSelect(TupleTableSlot *slot, static void ExecInsert(TupleTableSlot *slot, ItemPointer tupleid, + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate) { HeapTuple tuple; @@ -1490,6 +1400,11 @@ ExecInsert(TupleTableSlot *slot, /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, tuple); + + /* Process RETURNING if present */ + if (resultRelInfo->ri_projectReturning) + ExecProcessReturning(resultRelInfo->ri_projectReturning, + slot, planSlot, dest); } /* ---------------------------------------------------------------- @@ -1500,8 +1415,9 @@ ExecInsert(TupleTableSlot *slot, * ---------------------------------------------------------------- */ static void -ExecDelete(TupleTableSlot *slot, - ItemPointer tupleid, +ExecDelete(ItemPointer tupleid, + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate) { ResultRelInfo *resultRelInfo; @@ -1594,6 +1510,33 @@ ldelete:; /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, resultRelInfo, tupleid); + + /* Process RETURNING if present */ + if (resultRelInfo->ri_projectReturning) + { + /* + * We have to put the target tuple into a slot, which means + * first we gotta fetch it. We can use the trigger tuple slot. + */ + TupleTableSlot *slot = estate->es_trig_tuple_slot; + HeapTupleData deltuple; + Buffer delbuffer; + + deltuple.t_self = *tupleid; + if (!heap_fetch(resultRelationDesc, SnapshotAny, + &deltuple, &delbuffer, false, NULL)) + elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING"); + + if (slot->tts_tupleDescriptor != RelationGetDescr(resultRelationDesc)) + ExecSetSlotDescriptor(slot, RelationGetDescr(resultRelationDesc)); + ExecStoreTuple(&deltuple, slot, InvalidBuffer, false); + + ExecProcessReturning(resultRelInfo->ri_projectReturning, + slot, planSlot, dest); + + ExecClearTuple(slot); + ReleaseBuffer(delbuffer); + } } /* ---------------------------------------------------------------- @@ -1610,6 +1553,8 @@ ldelete:; static void ExecUpdate(TupleTableSlot *slot, ItemPointer tupleid, + TupleTableSlot *planSlot, + DestReceiver *dest, EState *estate) { HeapTuple tuple; @@ -1755,8 +1700,16 @@ lreplace:; /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, tupleid, tuple); + + /* Process RETURNING if present */ + if (resultRelInfo->ri_projectReturning) + ExecProcessReturning(resultRelInfo->ri_projectReturning, + slot, planSlot, dest); } +/* + * ExecRelCheck --- check that tuple meets constraints for result relation + */ static const char * ExecRelCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate) @@ -1854,6 +1807,42 @@ ExecConstraints(ResultRelInfo *resultRelInfo, } /* + * ExecProcessReturning --- evaluate a RETURNING list and send to dest + * + * projectReturning: RETURNING projection info for current result rel + * tupleSlot: slot holding tuple actually inserted/updated/deleted + * planSlot: slot holding tuple returned by top plan node + * dest: where to send the output + */ +static void +ExecProcessReturning(ProjectionInfo *projectReturning, + TupleTableSlot *tupleSlot, + TupleTableSlot *planSlot, + DestReceiver *dest) +{ + ExprContext *econtext = projectReturning->pi_exprContext; + TupleTableSlot *retSlot; + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous cycle. + */ + ResetExprContext(econtext); + + /* Make tuple and any needed join variables available to ExecProject */ + econtext->ecxt_scantuple = tupleSlot; + econtext->ecxt_outertuple = planSlot; + + /* Compute the RETURNING expressions */ + retSlot = ExecProject(projectReturning, NULL); + + /* Send to dest */ + (*dest->receiveSlot) (retSlot, dest); + + ExecClearTuple(retSlot); +} + +/* * Check a modified tuple to see if we want to process its updated version * under READ COMMITTED rules. * @@ -2318,3 +2307,269 @@ EvalPlanQualStop(evalPlanQual *epq) epq->estate = NULL; epq->planstate = NULL; } + + +/* + * Support for SELECT INTO (a/k/a CREATE TABLE AS) + * + * We implement SELECT INTO by diverting SELECT's normal output with + * a specialized DestReceiver type. + * + * TODO: remove some of the INTO-specific cruft from EState, and keep + * it in the DestReceiver instead. + */ + +typedef struct +{ + DestReceiver pub; /* publicly-known function pointers */ + EState *estate; /* EState we are working with */ +} DR_intorel; + +/* + * OpenIntoRel --- actually create the SELECT INTO target relation + * + * This also replaces QueryDesc->dest with the special DestReceiver for + * SELECT INTO. We assume that the correct result tuple type has already + * been placed in queryDesc->tupDesc. + */ +static void +OpenIntoRel(QueryDesc *queryDesc) +{ + Query *parseTree = queryDesc->parsetree; + EState *estate = queryDesc->estate; + Relation intoRelationDesc; + char *intoName; + Oid namespaceId; + Oid tablespaceId; + Datum reloptions; + AclResult aclresult; + Oid intoRelationId; + TupleDesc tupdesc; + DR_intorel *myState; + + /* + * Check consistency of arguments + */ + if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("ON COMMIT can only be used on temporary tables"))); + + /* + * Find namespace to create in, check its permissions + */ + intoName = parseTree->into->relname; + namespaceId = RangeVarGetCreationNamespace(parseTree->into); + + aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_NAMESPACE, + get_namespace_name(namespaceId)); + + /* + * Select tablespace to use. If not specified, use default_tablespace + * (which may in turn default to database's default). + */ + if (parseTree->intoTableSpaceName) + { + tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName); + if (!OidIsValid(tablespaceId)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + parseTree->intoTableSpaceName))); + } else + { + tablespaceId = GetDefaultTablespace(); + /* note InvalidOid is OK in this case */ + } + + /* Check permissions except when using the database's default space */ + if (OidIsValid(tablespaceId)) + { + AclResult aclresult; + + aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), + ACL_CREATE); + + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + get_tablespace_name(tablespaceId)); + } + + /* Parse and validate any reloptions */ + reloptions = transformRelOptions((Datum) 0, + parseTree->intoOptions, + true, + false); + (void) heap_reloptions(RELKIND_RELATION, reloptions, true); + + /* have to copy the actual tupdesc to get rid of any constraints */ + tupdesc = CreateTupleDescCopy(queryDesc->tupDesc); + + /* Now we can actually create the new relation */ + intoRelationId = heap_create_with_catalog(intoName, + namespaceId, + tablespaceId, + InvalidOid, + GetUserId(), + tupdesc, + RELKIND_RELATION, + false, + true, + 0, + parseTree->intoOnCommit, + reloptions, + allowSystemTableMods); + + FreeTupleDesc(tupdesc); + + /* + * Advance command counter so that the newly-created relation's + * catalog tuples will be visible to heap_open. + */ + CommandCounterIncrement(); + + /* + * If necessary, create a TOAST table for the INTO relation. Note that + * AlterTableCreateToastTable ends with CommandCounterIncrement(), so + * that the TOAST table will be visible for insertion. + */ + AlterTableCreateToastTable(intoRelationId); + + /* + * And open the constructed table for writing. + */ + intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock); + + /* use_wal off requires rd_targblock be initially invalid */ + Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber); + + /* + * We can skip WAL-logging the insertions, unless PITR is in use. + * + * Note that for a non-temp INTO table, this is safe only because we + * know that the catalog changes above will have been WAL-logged, and + * so RecordTransactionCommit will think it needs to WAL-log the + * eventual transaction commit. Else the commit might be lost, even + * though all the data is safely fsync'd ... + */ + estate->es_into_relation_use_wal = XLogArchivingActive(); + estate->es_into_relation_descriptor = intoRelationDesc; + + /* + * Now replace the query's DestReceiver with one for SELECT INTO + */ + queryDesc->dest = CreateDestReceiver(DestIntoRel, NULL); + myState = (DR_intorel *) queryDesc->dest; + Assert(myState->pub.mydest == DestIntoRel); + myState->estate = estate; +} + +/* + * CloseIntoRel --- clean up SELECT INTO at ExecutorEnd time + */ +static void +CloseIntoRel(QueryDesc *queryDesc) +{ + EState *estate = queryDesc->estate; + + /* OpenIntoRel might never have gotten called */ + if (estate->es_into_relation_descriptor) + { + /* + * If we skipped using WAL, and it's not a temp relation, we must + * force the relation down to disk before it's safe to commit the + * transaction. This requires forcing out any dirty buffers and then + * doing a forced fsync. + */ + if (!estate->es_into_relation_use_wal && + !estate->es_into_relation_descriptor->rd_istemp) + { + FlushRelationBuffers(estate->es_into_relation_descriptor); + /* FlushRelationBuffers will have opened rd_smgr */ + smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr); + } + + /* close rel, but keep lock until commit */ + heap_close(estate->es_into_relation_descriptor, NoLock); + + estate->es_into_relation_descriptor = NULL; + } +} + +/* + * CreateIntoRelDestReceiver -- create a suitable DestReceiver object + * + * Since CreateDestReceiver doesn't accept the parameters we'd need, + * we just leave the private fields empty here. OpenIntoRel will + * fill them in. + */ +DestReceiver * +CreateIntoRelDestReceiver(void) +{ + DR_intorel *self = (DR_intorel *) palloc(sizeof(DR_intorel)); + + self->pub.receiveSlot = intorel_receive; + self->pub.rStartup = intorel_startup; + self->pub.rShutdown = intorel_shutdown; + self->pub.rDestroy = intorel_destroy; + self->pub.mydest = DestIntoRel; + + self->estate = NULL; + + return (DestReceiver *) self; +} + +/* + * intorel_startup --- executor startup + */ +static void +intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* no-op */ +} + +/* + * intorel_receive --- receive one tuple + */ +static void +intorel_receive(TupleTableSlot *slot, DestReceiver *self) +{ + DR_intorel *myState = (DR_intorel *) self; + EState *estate = myState->estate; + HeapTuple tuple; + + tuple = ExecCopySlotTuple(slot); + + heap_insert(estate->es_into_relation_descriptor, + tuple, + estate->es_snapshot->curcid, + estate->es_into_relation_use_wal, + false); /* never any point in using FSM */ + + /* We know this is a newly created relation, so there are no indexes */ + + heap_freetuple(tuple); + + IncrAppended(); +} + +/* + * intorel_shutdown --- executor end + */ +static void +intorel_shutdown(DestReceiver *self) +{ + /* no-op */ +} + +/* + * intorel_destroy --- release DestReceiver object + */ +static void +intorel_destroy(DestReceiver *self) +{ + pfree(self); +} |