aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execMain.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2006-08-12 02:52:06 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2006-08-12 02:52:06 +0000
commit7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07 (patch)
tree215adabe95d76123f6120fc22e4b51b5a1baf4cd /src/backend/executor/execMain.c
parent5c9e9c0c42904648af5a03fe90db8050e31d603f (diff)
downloadpostgresql-7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07.tar.gz
postgresql-7a3e30e608a25800a1f7fdfaaca4da3f0ac0fb07.zip
Add INSERT/UPDATE/DELETE RETURNING, with basic docs and regression tests.
plpgsql support to come later. Along the way, convert execMain's SELECT INTO support into a DestReceiver, in order to eliminate some ugly special cases. Jonah Harris and Tom Lane
Diffstat (limited to 'src/backend/executor/execMain.c')
-rw-r--r--src/backend/executor/execMain.c655
1 files changed, 455 insertions, 200 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index efc5d54fe9e..05c4a542b84 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -26,7 +26,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.277 2006/07/31 01:16:37 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.278 2006/08/12 02:52:04 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -43,6 +43,7 @@
#include "commands/trigger.h"
#include "executor/execdebug.h"
#include "executor/instrument.h"
+#include "executor/nodeSubplan.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "parser/parse_clause.h"
@@ -75,14 +76,20 @@ static TupleTableSlot *ExecutePlan(EState *estate, PlanState *planstate,
ScanDirection direction,
DestReceiver *dest);
static void ExecSelect(TupleTableSlot *slot,
- DestReceiver *dest,
- EState *estate);
+ DestReceiver *dest, EState *estate);
static void ExecInsert(TupleTableSlot *slot, ItemPointer tupleid,
- EState *estate);
-static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
- EState *estate);
+ TupleTableSlot *planSlot,
+ DestReceiver *dest, EState *estate);
+static void ExecDelete(ItemPointer tupleid,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest, EState *estate);
static void ExecUpdate(TupleTableSlot *slot, ItemPointer tupleid,
- EState *estate);
+ TupleTableSlot *planSlot,
+ DestReceiver *dest, EState *estate);
+static void ExecProcessReturning(ProjectionInfo *projectReturning,
+ TupleTableSlot *tupleSlot,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest);
static TupleTableSlot *EvalPlanQualNext(EState *estate);
static void EndEvalPlanQual(EState *estate);
static void ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -90,6 +97,12 @@ static void ExecCheckXactReadOnly(Query *parsetree);
static void EvalPlanQualStart(evalPlanQual *epq, EState *estate,
evalPlanQual *priorepq);
static void EvalPlanQualStop(evalPlanQual *epq);
+static void OpenIntoRel(QueryDesc *queryDesc);
+static void CloseIntoRel(QueryDesc *queryDesc);
+static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
+static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static void intorel_shutdown(DestReceiver *self);
+static void intorel_destroy(DestReceiver *self);
/* end of local decls */
@@ -185,6 +198,7 @@ ExecutorRun(QueryDesc *queryDesc,
EState *estate;
CmdType operation;
DestReceiver *dest;
+ bool sendTuples;
TupleTableSlot *result;
MemoryContext oldcontext;
@@ -207,12 +221,16 @@ ExecutorRun(QueryDesc *queryDesc,
dest = queryDesc->dest;
/*
- * startup tuple receiver
+ * startup tuple receiver, if we will be emitting tuples
*/
estate->es_processed = 0;
estate->es_lastoid = InvalidOid;
- (*dest->rStartup) (dest, operation, queryDesc->tupDesc);
+ sendTuples = (operation == CMD_SELECT ||
+ queryDesc->parsetree->returningList);
+
+ if (sendTuples)
+ (*dest->rStartup) (dest, operation, queryDesc->tupDesc);
/*
* run plan
@@ -228,9 +246,10 @@ ExecutorRun(QueryDesc *queryDesc,
dest);
/*
- * shutdown receiver
+ * shutdown tuple receiver, if we started it
*/
- (*dest->rShutdown) (dest);
+ if (sendTuples)
+ (*dest->rShutdown) (dest);
MemoryContextSwitchTo(oldcontext);
@@ -265,6 +284,12 @@ ExecutorEnd(QueryDesc *queryDesc)
ExecEndPlan(queryDesc->planstate, estate);
/*
+ * Close the SELECT INTO relation if any
+ */
+ if (estate->es_select_into)
+ CloseIntoRel(queryDesc);
+
+ /*
* Must switch out of context before destroying it
*/
MemoryContextSwitchTo(oldcontext);
@@ -449,8 +474,6 @@ InitPlan(QueryDesc *queryDesc, int eflags)
EState *estate = queryDesc->estate;
PlanState *planstate;
List *rangeTable;
- Relation intoRelationDesc;
- bool do_select_into;
TupleDesc tupType;
ListCell *l;
@@ -534,13 +557,11 @@ InitPlan(QueryDesc *queryDesc, int eflags)
/*
* Detect whether we're doing SELECT INTO. If so, set the es_into_oids
* flag appropriately so that the plan tree will be initialized with the
- * correct tuple descriptors.
+ * correct tuple descriptors. (Other SELECT INTO stuff comes later.)
*/
- do_select_into = false;
-
+ estate->es_select_into = false;
if (operation == CMD_SELECT && parseTree->into != NULL)
{
- do_select_into = true;
estate->es_select_into = true;
estate->es_into_oids = interpretOidsOption(parseTree->intoOptions);
}
@@ -581,7 +602,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
else
nSlots += 1;
if (operation != CMD_SELECT)
- nSlots++;
+ nSlots++; /* for es_trig_tuple_slot */
+ if (parseTree->returningLists)
+ nSlots++; /* for RETURNING projection */
estate->es_tupleTable = ExecCreateTupleTable(nSlots);
@@ -638,7 +661,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
}
}
if (!junk_filter_needed &&
- (operation == CMD_INSERT || do_select_into) &&
+ (operation == CMD_INSERT || estate->es_select_into) &&
ExecMayReturnRawTuples(planstate))
junk_filter_needed = true;
break;
@@ -713,140 +736,70 @@ InitPlan(QueryDesc *queryDesc, int eflags)
}
/*
- * If doing SELECT INTO, initialize the "into" relation. We must wait
- * till now so we have the "clean" result tuple type to create the new
- * table from.
- *
- * If EXPLAIN, skip creating the "into" relation.
+ * Initialize RETURNING projections if needed.
*/
- intoRelationDesc = NULL;
-
- if (do_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
+ if (parseTree->returningLists)
{
- char *intoName;
- Oid namespaceId;
- Oid tablespaceId;
- Datum reloptions;
- AclResult aclresult;
- Oid intoRelationId;
- TupleDesc tupdesc;
-
- /*
- * Check consistency of arguments
- */
- if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("ON COMMIT can only be used on temporary tables")));
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+ ResultRelInfo *resultRelInfo;
/*
- * find namespace to create in, check permissions
+ * We set QueryDesc.tupDesc to be the RETURNING rowtype in this case.
+ * We assume all the sublists will generate the same output tupdesc.
*/
- intoName = parseTree->into->relname;
- namespaceId = RangeVarGetCreationNamespace(parseTree->into);
+ tupType = ExecTypeFromTL((List *) linitial(parseTree->returningLists),
+ false);
- aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
- ACL_CREATE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
- get_namespace_name(namespaceId));
+ /* Set up a slot for the output of the RETURNING projection(s) */
+ slot = ExecAllocTableSlot(estate->es_tupleTable);
+ ExecSetSlotDescriptor(slot, tupType);
+ /* Need an econtext too */
+ econtext = CreateExprContext(estate);
/*
- * Select tablespace to use. If not specified, use default_tablespace
- * (which may in turn default to database's default).
+ * Build a projection for each result rel. Note that any SubPlans
+ * in the RETURNING lists get attached to the topmost plan node.
*/
- if (parseTree->intoTableSpaceName)
+ Assert(list_length(parseTree->returningLists) == estate->es_num_result_relations);
+ resultRelInfo = estate->es_result_relations;
+ foreach(l, parseTree->returningLists)
{
- tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName);
- if (!OidIsValid(tablespaceId))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("tablespace \"%s\" does not exist",
- parseTree->intoTableSpaceName)));
- } else
- {
- tablespaceId = GetDefaultTablespace();
- /* note InvalidOid is OK in this case */
- }
-
- /* Parse and validate any reloptions */
- reloptions = transformRelOptions((Datum) 0,
- parseTree->intoOptions,
- true,
- false);
- (void) heap_reloptions(RELKIND_RELATION, reloptions, true);
+ List *rlist = (List *) lfirst(l);
+ List *rliststate;
- /* Check permissions except when using the database's default */
- if (OidIsValid(tablespaceId))
- {
- AclResult aclresult;
-
- aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
- ACL_CREATE);
-
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
- get_tablespace_name(tablespaceId));
+ rliststate = (List *) ExecInitExpr((Expr *) rlist, planstate);
+ resultRelInfo->ri_projectReturning =
+ ExecBuildProjectionInfo(rliststate, econtext, slot);
+ resultRelInfo++;
}
-
- /*
- * have to copy tupType to get rid of constraints
- */
- tupdesc = CreateTupleDescCopy(tupType);
-
- intoRelationId = heap_create_with_catalog(intoName,
- namespaceId,
- tablespaceId,
- InvalidOid,
- GetUserId(),
- tupdesc,
- RELKIND_RELATION,
- false,
- true,
- 0,
- parseTree->intoOnCommit,
- reloptions,
- allowSystemTableMods);
-
- FreeTupleDesc(tupdesc);
-
/*
- * Advance command counter so that the newly-created relation's
- * catalog tuples will be visible to heap_open.
+ * Because we already ran ExecInitNode() for the top plan node,
+ * any subplans we just attached to it won't have been initialized;
+ * so we have to do it here. (Ugly, but the alternatives seem worse.)
*/
- CommandCounterIncrement();
-
- /*
- * If necessary, create a TOAST table for the into relation. Note that
- * AlterTableCreateToastTable ends with CommandCounterIncrement(), so
- * that the TOAST table will be visible for insertion.
- */
- AlterTableCreateToastTable(intoRelationId);
-
- /*
- * And open the constructed table for writing.
- */
- intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
-
- /* use_wal off requires rd_targblock be initially invalid */
- Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber);
+ foreach(l, planstate->subPlan)
+ {
+ SubPlanState *sstate = (SubPlanState *) lfirst(l);
- /*
- * We can skip WAL-logging the insertions, unless PITR is in use.
- *
- * Note that for a non-temp INTO table, this is safe only because we
- * know that the catalog changes above will have been WAL-logged, and
- * so RecordTransactionCommit will think it needs to WAL-log the
- * eventual transaction commit. Else the commit might be lost, even
- * though all the data is safely fsync'd ...
- */
- estate->es_into_relation_use_wal = XLogArchivingActive();
+ Assert(IsA(sstate, SubPlanState));
+ if (sstate->planstate == NULL) /* already inited? */
+ ExecInitSubPlan(sstate, estate, eflags);
+ }
}
- estate->es_into_relation_descriptor = intoRelationDesc;
-
queryDesc->tupDesc = tupType;
queryDesc->planstate = planstate;
+
+ /*
+ * If doing SELECT INTO, initialize the "into" relation. We must wait
+ * till now so we have the "clean" result tuple type to create the new
+ * table from.
+ *
+ * If EXPLAIN, skip creating the "into" relation.
+ */
+ if (estate->es_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
+ OpenIntoRel(queryDesc);
}
/*
@@ -914,6 +867,7 @@ initResultRelInfo(ResultRelInfo *resultRelInfo,
}
resultRelInfo->ri_ConstraintExprs = NULL;
resultRelInfo->ri_junkFilter = NULL;
+ resultRelInfo->ri_projectReturning = NULL;
/*
* If there are indices on the result relation, open them and save
@@ -1032,28 +986,6 @@ ExecEndPlan(PlanState *planstate, EState *estate)
}
/*
- * close the "into" relation if necessary, again keeping lock
- */
- if (estate->es_into_relation_descriptor != NULL)
- {
- /*
- * If we skipped using WAL, and it's not a temp relation, we must
- * force the relation down to disk before it's safe to commit the
- * transaction. This requires forcing out any dirty buffers and then
- * doing a forced fsync.
- */
- if (!estate->es_into_relation_use_wal &&
- !estate->es_into_relation_descriptor->rd_istemp)
- {
- FlushRelationBuffers(estate->es_into_relation_descriptor);
- /* FlushRelationBuffers will have opened rd_smgr */
- smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr);
- }
-
- heap_close(estate->es_into_relation_descriptor, NoLock);
- }
-
- /*
* close any relations selected FOR UPDATE/FOR SHARE, again keeping locks
*/
foreach(l, estate->es_rowMarks)
@@ -1088,6 +1020,7 @@ ExecutePlan(EState *estate,
DestReceiver *dest)
{
JunkFilter *junkfilter;
+ TupleTableSlot *planSlot;
TupleTableSlot *slot;
ItemPointer tupleid = NULL;
ItemPointerData tuple_ctid;
@@ -1097,7 +1030,6 @@ ExecutePlan(EState *estate,
/*
* initialize local variables
*/
- slot = NULL;
current_tuple_count = 0;
result = NULL;
@@ -1140,22 +1072,23 @@ ExecutePlan(EState *estate,
lnext: ;
if (estate->es_useEvalPlan)
{
- slot = EvalPlanQualNext(estate);
- if (TupIsNull(slot))
- slot = ExecProcNode(planstate);
+ planSlot = EvalPlanQualNext(estate);
+ if (TupIsNull(planSlot))
+ planSlot = ExecProcNode(planstate);
}
else
- slot = ExecProcNode(planstate);
+ planSlot = ExecProcNode(planstate);
/*
* if the tuple is null, then we assume there is nothing more to
* process so we just return null...
*/
- if (TupIsNull(slot))
+ if (TupIsNull(planSlot))
{
result = NULL;
break;
}
+ slot = planSlot;
/*
* if we have a junk filter, then project a new tuple with the junk
@@ -1261,7 +1194,7 @@ lnext: ;
estate->es_snapshot->curcid);
if (!TupIsNull(newSlot))
{
- slot = newSlot;
+ slot = planSlot = newSlot;
estate->es_useEvalPlan = true;
goto lmark;
}
@@ -1282,10 +1215,12 @@ lnext: ;
}
/*
- * Finally create a new "clean" tuple with all junk attributes
- * removed
+ * Create a new "clean" tuple with all junk attributes removed.
+ * We don't need to do this for DELETE, however (there will
+ * in fact be no non-junk attributes in a DELETE!)
*/
- slot = ExecFilterJunk(junkfilter, slot);
+ if (operation != CMD_DELETE)
+ slot = ExecFilterJunk(junkfilter, slot);
}
/*
@@ -1296,24 +1231,22 @@ lnext: ;
switch (operation)
{
case CMD_SELECT:
- ExecSelect(slot, /* slot containing tuple */
- dest, /* destination's tuple-receiver obj */
- estate);
+ ExecSelect(slot, dest, estate);
result = slot;
break;
case CMD_INSERT:
- ExecInsert(slot, tupleid, estate);
+ ExecInsert(slot, tupleid, planSlot, dest, estate);
result = NULL;
break;
case CMD_DELETE:
- ExecDelete(slot, tupleid, estate);
+ ExecDelete(tupleid, planSlot, dest, estate);
result = NULL;
break;
case CMD_UPDATE:
- ExecUpdate(slot, tupleid, estate);
+ ExecUpdate(slot, tupleid, planSlot, dest, estate);
result = NULL;
break;
@@ -1364,10 +1297,7 @@ lnext: ;
* ExecSelect
*
* SELECTs are easy.. we just pass the tuple to the appropriate
- * print function. The only complexity is when we do a
- * "SELECT INTO", in which case we insert the tuple into
- * the appropriate relation (note: this is a newly created relation
- * so we don't need to worry about indices or locks.)
+ * output function.
* ----------------------------------------------------------------
*/
static void
@@ -1375,28 +1305,6 @@ ExecSelect(TupleTableSlot *slot,
DestReceiver *dest,
EState *estate)
{
- /*
- * insert the tuple into the "into relation"
- *
- * XXX this probably ought to be replaced by a separate destination
- */
- if (estate->es_into_relation_descriptor != NULL)
- {
- HeapTuple tuple;
-
- tuple = ExecCopySlotTuple(slot);
- heap_insert(estate->es_into_relation_descriptor, tuple,
- estate->es_snapshot->curcid,
- estate->es_into_relation_use_wal,
- false); /* never any point in using FSM */
- /* we know there are no indexes to update */
- heap_freetuple(tuple);
- IncrAppended();
- }
-
- /*
- * send the tuple to the destination
- */
(*dest->receiveSlot) (slot, dest);
IncrRetrieved();
(estate->es_processed)++;
@@ -1413,6 +1321,8 @@ ExecSelect(TupleTableSlot *slot,
static void
ExecInsert(TupleTableSlot *slot,
ItemPointer tupleid,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest,
EState *estate)
{
HeapTuple tuple;
@@ -1490,6 +1400,11 @@ ExecInsert(TupleTableSlot *slot,
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple);
+
+ /* Process RETURNING if present */
+ if (resultRelInfo->ri_projectReturning)
+ ExecProcessReturning(resultRelInfo->ri_projectReturning,
+ slot, planSlot, dest);
}
/* ----------------------------------------------------------------
@@ -1500,8 +1415,9 @@ ExecInsert(TupleTableSlot *slot,
* ----------------------------------------------------------------
*/
static void
-ExecDelete(TupleTableSlot *slot,
- ItemPointer tupleid,
+ExecDelete(ItemPointer tupleid,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest,
EState *estate)
{
ResultRelInfo *resultRelInfo;
@@ -1594,6 +1510,33 @@ ldelete:;
/* AFTER ROW DELETE Triggers */
ExecARDeleteTriggers(estate, resultRelInfo, tupleid);
+
+ /* Process RETURNING if present */
+ if (resultRelInfo->ri_projectReturning)
+ {
+ /*
+ * We have to put the target tuple into a slot, which means
+ * first we gotta fetch it. We can use the trigger tuple slot.
+ */
+ TupleTableSlot *slot = estate->es_trig_tuple_slot;
+ HeapTupleData deltuple;
+ Buffer delbuffer;
+
+ deltuple.t_self = *tupleid;
+ if (!heap_fetch(resultRelationDesc, SnapshotAny,
+ &deltuple, &delbuffer, false, NULL))
+ elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING");
+
+ if (slot->tts_tupleDescriptor != RelationGetDescr(resultRelationDesc))
+ ExecSetSlotDescriptor(slot, RelationGetDescr(resultRelationDesc));
+ ExecStoreTuple(&deltuple, slot, InvalidBuffer, false);
+
+ ExecProcessReturning(resultRelInfo->ri_projectReturning,
+ slot, planSlot, dest);
+
+ ExecClearTuple(slot);
+ ReleaseBuffer(delbuffer);
+ }
}
/* ----------------------------------------------------------------
@@ -1610,6 +1553,8 @@ ldelete:;
static void
ExecUpdate(TupleTableSlot *slot,
ItemPointer tupleid,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest,
EState *estate)
{
HeapTuple tuple;
@@ -1755,8 +1700,16 @@ lreplace:;
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo, tupleid, tuple);
+
+ /* Process RETURNING if present */
+ if (resultRelInfo->ri_projectReturning)
+ ExecProcessReturning(resultRelInfo->ri_projectReturning,
+ slot, planSlot, dest);
}
+/*
+ * ExecRelCheck --- check that tuple meets constraints for result relation
+ */
static const char *
ExecRelCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
@@ -1854,6 +1807,42 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
}
/*
+ * ExecProcessReturning --- evaluate a RETURNING list and send to dest
+ *
+ * projectReturning: RETURNING projection info for current result rel
+ * tupleSlot: slot holding tuple actually inserted/updated/deleted
+ * planSlot: slot holding tuple returned by top plan node
+ * dest: where to send the output
+ */
+static void
+ExecProcessReturning(ProjectionInfo *projectReturning,
+ TupleTableSlot *tupleSlot,
+ TupleTableSlot *planSlot,
+ DestReceiver *dest)
+{
+ ExprContext *econtext = projectReturning->pi_exprContext;
+ TupleTableSlot *retSlot;
+
+ /*
+ * Reset per-tuple memory context to free any expression evaluation
+ * storage allocated in the previous cycle.
+ */
+ ResetExprContext(econtext);
+
+ /* Make tuple and any needed join variables available to ExecProject */
+ econtext->ecxt_scantuple = tupleSlot;
+ econtext->ecxt_outertuple = planSlot;
+
+ /* Compute the RETURNING expressions */
+ retSlot = ExecProject(projectReturning, NULL);
+
+ /* Send to dest */
+ (*dest->receiveSlot) (retSlot, dest);
+
+ ExecClearTuple(retSlot);
+}
+
+/*
* Check a modified tuple to see if we want to process its updated version
* under READ COMMITTED rules.
*
@@ -2318,3 +2307,269 @@ EvalPlanQualStop(evalPlanQual *epq)
epq->estate = NULL;
epq->planstate = NULL;
}
+
+
+/*
+ * Support for SELECT INTO (a/k/a CREATE TABLE AS)
+ *
+ * We implement SELECT INTO by diverting SELECT's normal output with
+ * a specialized DestReceiver type.
+ *
+ * TODO: remove some of the INTO-specific cruft from EState, and keep
+ * it in the DestReceiver instead.
+ */
+
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ EState *estate; /* EState we are working with */
+} DR_intorel;
+
+/*
+ * OpenIntoRel --- actually create the SELECT INTO target relation
+ *
+ * This also replaces QueryDesc->dest with the special DestReceiver for
+ * SELECT INTO. We assume that the correct result tuple type has already
+ * been placed in queryDesc->tupDesc.
+ */
+static void
+OpenIntoRel(QueryDesc *queryDesc)
+{
+ Query *parseTree = queryDesc->parsetree;
+ EState *estate = queryDesc->estate;
+ Relation intoRelationDesc;
+ char *intoName;
+ Oid namespaceId;
+ Oid tablespaceId;
+ Datum reloptions;
+ AclResult aclresult;
+ Oid intoRelationId;
+ TupleDesc tupdesc;
+ DR_intorel *myState;
+
+ /*
+ * Check consistency of arguments
+ */
+ if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("ON COMMIT can only be used on temporary tables")));
+
+ /*
+ * Find namespace to create in, check its permissions
+ */
+ intoName = parseTree->into->relname;
+ namespaceId = RangeVarGetCreationNamespace(parseTree->into);
+
+ aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
+ ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(namespaceId));
+
+ /*
+ * Select tablespace to use. If not specified, use default_tablespace
+ * (which may in turn default to database's default).
+ */
+ if (parseTree->intoTableSpaceName)
+ {
+ tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName);
+ if (!OidIsValid(tablespaceId))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("tablespace \"%s\" does not exist",
+ parseTree->intoTableSpaceName)));
+ } else
+ {
+ tablespaceId = GetDefaultTablespace();
+ /* note InvalidOid is OK in this case */
+ }
+
+ /* Check permissions except when using the database's default space */
+ if (OidIsValid(tablespaceId))
+ {
+ AclResult aclresult;
+
+ aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
+ ACL_CREATE);
+
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+ get_tablespace_name(tablespaceId));
+ }
+
+ /* Parse and validate any reloptions */
+ reloptions = transformRelOptions((Datum) 0,
+ parseTree->intoOptions,
+ true,
+ false);
+ (void) heap_reloptions(RELKIND_RELATION, reloptions, true);
+
+ /* have to copy the actual tupdesc to get rid of any constraints */
+ tupdesc = CreateTupleDescCopy(queryDesc->tupDesc);
+
+ /* Now we can actually create the new relation */
+ intoRelationId = heap_create_with_catalog(intoName,
+ namespaceId,
+ tablespaceId,
+ InvalidOid,
+ GetUserId(),
+ tupdesc,
+ RELKIND_RELATION,
+ false,
+ true,
+ 0,
+ parseTree->intoOnCommit,
+ reloptions,
+ allowSystemTableMods);
+
+ FreeTupleDesc(tupdesc);
+
+ /*
+ * Advance command counter so that the newly-created relation's
+ * catalog tuples will be visible to heap_open.
+ */
+ CommandCounterIncrement();
+
+ /*
+ * If necessary, create a TOAST table for the INTO relation. Note that
+ * AlterTableCreateToastTable ends with CommandCounterIncrement(), so
+ * that the TOAST table will be visible for insertion.
+ */
+ AlterTableCreateToastTable(intoRelationId);
+
+ /*
+ * And open the constructed table for writing.
+ */
+ intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
+
+ /* use_wal off requires rd_targblock be initially invalid */
+ Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber);
+
+ /*
+ * We can skip WAL-logging the insertions, unless PITR is in use.
+ *
+ * Note that for a non-temp INTO table, this is safe only because we
+ * know that the catalog changes above will have been WAL-logged, and
+ * so RecordTransactionCommit will think it needs to WAL-log the
+ * eventual transaction commit. Else the commit might be lost, even
+ * though all the data is safely fsync'd ...
+ */
+ estate->es_into_relation_use_wal = XLogArchivingActive();
+ estate->es_into_relation_descriptor = intoRelationDesc;
+
+ /*
+ * Now replace the query's DestReceiver with one for SELECT INTO
+ */
+ queryDesc->dest = CreateDestReceiver(DestIntoRel, NULL);
+ myState = (DR_intorel *) queryDesc->dest;
+ Assert(myState->pub.mydest == DestIntoRel);
+ myState->estate = estate;
+}
+
+/*
+ * CloseIntoRel --- clean up SELECT INTO at ExecutorEnd time
+ */
+static void
+CloseIntoRel(QueryDesc *queryDesc)
+{
+ EState *estate = queryDesc->estate;
+
+ /* OpenIntoRel might never have gotten called */
+ if (estate->es_into_relation_descriptor)
+ {
+ /*
+ * If we skipped using WAL, and it's not a temp relation, we must
+ * force the relation down to disk before it's safe to commit the
+ * transaction. This requires forcing out any dirty buffers and then
+ * doing a forced fsync.
+ */
+ if (!estate->es_into_relation_use_wal &&
+ !estate->es_into_relation_descriptor->rd_istemp)
+ {
+ FlushRelationBuffers(estate->es_into_relation_descriptor);
+ /* FlushRelationBuffers will have opened rd_smgr */
+ smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr);
+ }
+
+ /* close rel, but keep lock until commit */
+ heap_close(estate->es_into_relation_descriptor, NoLock);
+
+ estate->es_into_relation_descriptor = NULL;
+ }
+}
+
+/*
+ * CreateIntoRelDestReceiver -- create a suitable DestReceiver object
+ *
+ * Since CreateDestReceiver doesn't accept the parameters we'd need,
+ * we just leave the private fields empty here. OpenIntoRel will
+ * fill them in.
+ */
+DestReceiver *
+CreateIntoRelDestReceiver(void)
+{
+ DR_intorel *self = (DR_intorel *) palloc(sizeof(DR_intorel));
+
+ self->pub.receiveSlot = intorel_receive;
+ self->pub.rStartup = intorel_startup;
+ self->pub.rShutdown = intorel_shutdown;
+ self->pub.rDestroy = intorel_destroy;
+ self->pub.mydest = DestIntoRel;
+
+ self->estate = NULL;
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * intorel_startup --- executor startup
+ */
+static void
+intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ /* no-op */
+}
+
+/*
+ * intorel_receive --- receive one tuple
+ */
+static void
+intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+ EState *estate = myState->estate;
+ HeapTuple tuple;
+
+ tuple = ExecCopySlotTuple(slot);
+
+ heap_insert(estate->es_into_relation_descriptor,
+ tuple,
+ estate->es_snapshot->curcid,
+ estate->es_into_relation_use_wal,
+ false); /* never any point in using FSM */
+
+ /* We know this is a newly created relation, so there are no indexes */
+
+ heap_freetuple(tuple);
+
+ IncrAppended();
+}
+
+/*
+ * intorel_shutdown --- executor end
+ */
+static void
+intorel_shutdown(DestReceiver *self)
+{
+ /* no-op */
+}
+
+/*
+ * intorel_destroy --- release DestReceiver object
+ */
+static void
+intorel_destroy(DestReceiver *self)
+{
+ pfree(self);
+}