aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/functions.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/functions.c')
-rw-r--r--src/backend/executor/functions.c346
1 files changed, 243 insertions, 103 deletions
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index d5385438e3b..a2f4fac74cb 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -47,6 +47,10 @@ typedef struct
* We have an execution_state record for each query in a function. Each
* record contains a plantree for its query. If the query is currently in
* F_EXEC_RUN state then there's a QueryDesc too.
+ *
+ * The "next" fields chain together all the execution_state records generated
+ * from a single original parsetree. (There will only be more than one in
+ * case of rule expansion of the original parsetree.)
*/
typedef enum
{
@@ -93,15 +97,20 @@ typedef struct
JunkFilter *junkFilter; /* will be NULL if function returns VOID */
- /* head of linked list of execution_state records */
- execution_state *func_state;
+ /*
+ * func_state is a List of execution_state records, each of which is the
+ * first for its original parsetree, with any additional records chained
+ * to it via the "next" fields. This sublist structure is needed to keep
+ * track of where the original query boundaries are.
+ */
+ List *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
-static execution_state *init_execution_state(List *queryTree_list,
+static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
@@ -122,62 +131,78 @@ static void sqlfunction_shutdown(DestReceiver *self);
static void sqlfunction_destroy(DestReceiver *self);
-/* Set up the list of per-query execution_state records for a SQL function */
-static execution_state *
+/*
+ * Set up the per-query execution_state records for a SQL function.
+ *
+ * The input is a List of Lists of parsed and rewritten, but not planned,
+ * querytrees. The sublist structure denotes the original query boundaries.
+ */
+static List *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
- execution_state *firstes = NULL;
- execution_state *preves = NULL;
+ List *eslist = NIL;
execution_state *lasttages = NULL;
- ListCell *qtl_item;
+ ListCell *lc1;
- foreach(qtl_item, queryTree_list)
+ foreach(lc1, queryTree_list)
{
- Query *queryTree = (Query *) lfirst(qtl_item);
- Node *stmt;
- execution_state *newes;
+ List *qtlist = (List *) lfirst(lc1);
+ execution_state *firstes = NULL;
+ execution_state *preves = NULL;
+ ListCell *lc2;
- Assert(IsA(queryTree, Query));
+ foreach(lc2, qtlist)
+ {
+ Query *queryTree = (Query *) lfirst(lc2);
+ Node *stmt;
+ execution_state *newes;
- if (queryTree->commandType == CMD_UTILITY)
- stmt = queryTree->utilityStmt;
- else
- stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
+ Assert(IsA(queryTree, Query));
- /* Precheck all commands for validity in a function */
- if (IsA(stmt, TransactionStmt))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- /* translator: %s is a SQL statement name */
- errmsg("%s is not allowed in a SQL function",
- CreateCommandTag(stmt))));
+ /* Plan the query if needed */
+ if (queryTree->commandType == CMD_UTILITY)
+ stmt = queryTree->utilityStmt;
+ else
+ stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
- if (fcache->readonly_func && !CommandIsReadOnly(stmt))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- /* translator: %s is a SQL statement name */
- errmsg("%s is not allowed in a non-volatile function",
- CreateCommandTag(stmt))));
+ /* Precheck all commands for validity in a function */
+ if (IsA(stmt, TransactionStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ /* translator: %s is a SQL statement name */
+ errmsg("%s is not allowed in a SQL function",
+ CreateCommandTag(stmt))));
- newes = (execution_state *) palloc(sizeof(execution_state));
- if (preves)
- preves->next = newes;
- else
- firstes = newes;
+ if (fcache->readonly_func && !CommandIsReadOnly(stmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ /* translator: %s is a SQL statement name */
+ errmsg("%s is not allowed in a non-volatile function",
+ CreateCommandTag(stmt))));
+
+ /* OK, build the execution_state for this query */
+ newes = (execution_state *) palloc(sizeof(execution_state));
+ if (preves)
+ preves->next = newes;
+ else
+ firstes = newes;
- newes->next = NULL;
- newes->status = F_EXEC_START;
- newes->setsResult = false; /* might change below */
- newes->lazyEval = false; /* might change below */
- newes->stmt = stmt;
- newes->qd = NULL;
+ newes->next = NULL;
+ newes->status = F_EXEC_START;
+ newes->setsResult = false; /* might change below */
+ newes->lazyEval = false; /* might change below */
+ newes->stmt = stmt;
+ newes->qd = NULL;
- if (queryTree->canSetTag)
- lasttages = newes;
+ if (queryTree->canSetTag)
+ lasttages = newes;
- preves = newes;
+ preves = newes;
+ }
+
+ eslist = lappend(eslist, firstes);
}
/*
@@ -211,7 +236,7 @@ init_execution_state(List *queryTree_list,
}
}
- return firstes;
+ return eslist;
}
/* Initialize the SQLFunctionCache for a SQL function */
@@ -225,7 +250,10 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
SQLFunctionCachePtr fcache;
Oid *argOidVect;
int nargs;
+ List *raw_parsetree_list;
List *queryTree_list;
+ List *flat_query_list;
+ ListCell *lc;
Datum tmp;
bool isNull;
@@ -318,9 +346,32 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
fcache->src = TextDatumGetCString(tmp);
/*
- * Parse and rewrite the queries in the function text.
+ * Parse and rewrite the queries in the function text. Use sublists to
+ * keep track of the original query boundaries. But we also build a
+ * "flat" list of the rewritten queries to pass to check_sql_fn_retval.
+ * This is because the last canSetTag query determines the result type
+ * independently of query boundaries --- and it might not be in the last
+ * sublist, for example if the last query rewrites to DO INSTEAD NOTHING.
+ * (It might not be unreasonable to throw an error in such a case, but
+ * this is the historical behavior and it doesn't seem worth changing.)
*/
- queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs);
+ raw_parsetree_list = pg_parse_query(fcache->src);
+
+ queryTree_list = NIL;
+ flat_query_list = NIL;
+ foreach(lc, raw_parsetree_list)
+ {
+ Node *parsetree = (Node *) lfirst(lc);
+ List *queryTree_sublist;
+
+ queryTree_sublist = pg_analyze_and_rewrite(parsetree,
+ fcache->src,
+ argOidVect,
+ nargs);
+ queryTree_list = lappend(queryTree_list, queryTree_sublist);
+ flat_query_list = list_concat(flat_query_list,
+ list_copy(queryTree_sublist));
+ }
/*
* Check that the function returns the type it claims to. Although in
@@ -343,7 +394,7 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
- queryTree_list,
+ flat_query_list,
NULL,
&fcache->junkFilter);
@@ -375,24 +426,12 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
- Snapshot snapshot;
DestReceiver *dest;
Assert(es->qd == NULL);
- /*
- * In a read-only function, use the surrounding query's snapshot;
- * otherwise take a new snapshot for each query. The snapshot should
- * include a fresh command ID so that all work to date in this transaction
- * is visible.
- */
- if (fcache->readonly_func)
- snapshot = GetActiveSnapshot();
- else
- {
- CommandCounterIncrement();
- snapshot = GetTransactionSnapshot();
- }
+ /* Caller should have ensured a suitable snapshot is active */
+ Assert(ActiveSnapshotSet());
/*
* If this query produces the function result, send its output to the
@@ -416,18 +455,17 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
- snapshot, InvalidSnapshot,
+ GetActiveSnapshot(),
+ InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
- snapshot,
+ GetActiveSnapshot(),
dest,
fcache->paramLI);
- /* We assume we don't need to set up ActiveSnapshot for ExecutorStart */
-
/* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL)
{
@@ -457,9 +495,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
{
bool result;
- /* Make our snapshot the active one for any called functions */
- PushActiveSnapshot(es->qd->snapshot);
-
if (es->qd->utilitystmt)
{
/* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */
@@ -487,8 +522,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
result = (count == 0L || es->qd->estate->es_processed == 0);
}
- PopActiveSnapshot();
-
return result;
}
@@ -502,13 +535,8 @@ postquel_end(execution_state *es)
/* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL)
{
- /* Make our snapshot the active one for any called functions */
- PushActiveSnapshot(es->qd->snapshot);
-
ExecutorFinish(es->qd);
ExecutorEnd(es->qd);
-
- PopActiveSnapshot();
}
(*es->qd->dest->rDestroy) (es->qd->dest);
@@ -619,9 +647,13 @@ fmgr_sql(PG_FUNCTION_ARGS)
ErrorContextCallback sqlerrcontext;
bool randomAccess;
bool lazyEvalOK;
+ bool is_first;
+ bool pushed_snapshot;
execution_state *es;
TupleTableSlot *slot;
Datum result;
+ List *eslist;
+ ListCell *eslc;
/*
* Switch to context in which the fcache lives. This ensures that
@@ -673,13 +705,33 @@ fmgr_sql(PG_FUNCTION_ARGS)
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
- es = fcache->func_state;
+ eslist = fcache->func_state;
+
+ /*
+ * Find first unfinished query in function, and note whether it's the
+ * first query.
+ */
+ es = NULL;
+ is_first = true;
+ foreach(eslc, eslist)
+ {
+ es = (execution_state *) lfirst(eslc);
+
+ while (es && es->status == F_EXEC_DONE)
+ {
+ is_first = false;
+ es = es->next;
+ }
+
+ if (es)
+ break;
+ }
/*
* Convert params to appropriate format if starting a fresh execution. (If
* continuing execution, we can re-use prior params.)
*/
- if (es && es->status == F_EXEC_START)
+ if (is_first && es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
@@ -690,21 +742,56 @@ fmgr_sql(PG_FUNCTION_ARGS)
fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem);
/*
- * Find first unfinished query in function.
- */
- while (es && es->status == F_EXEC_DONE)
- es = es->next;
-
- /*
* Execute each command in the function one after another until we either
* run out of commands or get a result row from a lazily-evaluated SELECT.
+ *
+ * Notes about snapshot management:
+ *
+ * In a read-only function, we just use the surrounding query's snapshot.
+ *
+ * In a non-read-only function, we rely on the fact that we'll never
+ * suspend execution between queries of the function: the only reason to
+ * suspend execution before completion is if we are returning a row from
+ * a lazily-evaluated SELECT. So, when first entering this loop, we'll
+ * either start a new query (and push a fresh snapshot) or re-establish
+ * the active snapshot from the existing query descriptor. If we need to
+ * start a new query in a subsequent execution of the loop, either we need
+ * a fresh snapshot (and pushed_snapshot is false) or the existing
+ * snapshot is on the active stack and we can just bump its command ID.
*/
+ pushed_snapshot = false;
while (es)
{
bool completed;
if (es->status == F_EXEC_START)
+ {
+ /*
+ * If not read-only, be sure to advance the command counter for
+ * each command, so that all work to date in this transaction is
+ * visible. Take a new snapshot if we don't have one yet,
+ * otherwise just bump the command ID in the existing snapshot.
+ */
+ if (!fcache->readonly_func)
+ {
+ CommandCounterIncrement();
+ if (!pushed_snapshot)
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pushed_snapshot = true;
+ }
+ else
+ UpdateActiveSnapshotCommandId();
+ }
+
postquel_start(es, fcache);
+ }
+ else if (!fcache->readonly_func && !pushed_snapshot)
+ {
+ /* Re-establish active snapshot when re-entering function */
+ PushActiveSnapshot(es->qd->snapshot);
+ pushed_snapshot = true;
+ }
completed = postquel_getnext(es, fcache);
@@ -730,7 +817,31 @@ fmgr_sql(PG_FUNCTION_ARGS)
*/
if (es->status != F_EXEC_DONE)
break;
+
+ /*
+ * Advance to next execution_state, which might be in the next list.
+ */
es = es->next;
+ while (!es)
+ {
+ eslc = lnext(eslc);
+ if (!eslc)
+ break; /* end of function */
+
+ es = (execution_state *) lfirst(eslc);
+
+ /*
+ * Flush the current snapshot so that we will take a new one
+ * for the new query list. This ensures that new snaps are
+ * taken at original-query boundaries, matching the behavior
+ * of interactive execution.
+ */
+ if (pushed_snapshot)
+ {
+ PopActiveSnapshot();
+ pushed_snapshot = false;
+ }
+ }
}
/*
@@ -857,17 +968,24 @@ fmgr_sql(PG_FUNCTION_ARGS)
tuplestore_clear(fcache->tstore);
}
+ /* Pop snapshot if we have pushed one */
+ if (pushed_snapshot)
+ PopActiveSnapshot();
+
/*
* If we've gone through every command in the function, we are done. Reset
* the execution states to start over again on next call.
*/
if (es == NULL)
{
- es = fcache->func_state;
- while (es)
+ foreach(eslc, fcache->func_state)
{
- es->status = F_EXEC_START;
- es = es->next;
+ es = (execution_state *) lfirst(eslc);
+ while (es)
+ {
+ es->status = F_EXEC_START;
+ es = es->next;
+ }
}
}
@@ -918,18 +1036,25 @@ sql_exec_error_callback(void *arg)
{
execution_state *es;
int query_num;
+ ListCell *lc;
- es = fcache->func_state;
+ es = NULL;
query_num = 1;
- while (es)
+ foreach(lc, fcache->func_state)
{
- if (es->qd)
+ es = (execution_state *) lfirst(lc);
+ while (es)
{
- errcontext("SQL function \"%s\" statement %d",
- fcache->fname, query_num);
- break;
+ if (es->qd)
+ {
+ errcontext("SQL function \"%s\" statement %d",
+ fcache->fname, query_num);
+ break;
+ }
+ es = es->next;
}
- es = es->next;
+ if (es)
+ break;
query_num++;
}
if (es == NULL)
@@ -961,16 +1086,31 @@ static void
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
- execution_state *es = fcache->func_state;
+ execution_state *es;
+ ListCell *lc;
- while (es != NULL)
+ foreach(lc, fcache->func_state)
{
- /* Shut down anything still running */
- if (es->status == F_EXEC_RUN)
- postquel_end(es);
- /* Reset states to START in case we're called again */
- es->status = F_EXEC_START;
- es = es->next;
+ es = (execution_state *) lfirst(lc);
+ while (es)
+ {
+ /* Shut down anything still running */
+ if (es->status == F_EXEC_RUN)
+ {
+ /* Re-establish active snapshot for any called functions */
+ if (!fcache->readonly_func)
+ PushActiveSnapshot(es->qd->snapshot);
+
+ postquel_end(es);
+
+ if (!fcache->readonly_func)
+ PopActiveSnapshot();
+ }
+
+ /* Reset states to START in case we're called again */
+ es->status = F_EXEC_START;
+ es = es->next;
+ }
}
/* Release tuplestore if we have one */