diff options
Diffstat (limited to 'src/backend/executor/functions.c')
-rw-r--r-- | src/backend/executor/functions.c | 346 |
1 files changed, 243 insertions, 103 deletions
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index d5385438e3b..a2f4fac74cb 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -47,6 +47,10 @@ typedef struct * We have an execution_state record for each query in a function. Each * record contains a plantree for its query. If the query is currently in * F_EXEC_RUN state then there's a QueryDesc too. + * + * The "next" fields chain together all the execution_state records generated + * from a single original parsetree. (There will only be more than one in + * case of rule expansion of the original parsetree.) */ typedef enum { @@ -93,15 +97,20 @@ typedef struct JunkFilter *junkFilter; /* will be NULL if function returns VOID */ - /* head of linked list of execution_state records */ - execution_state *func_state; + /* + * func_state is a List of execution_state records, each of which is the + * first for its original parsetree, with any additional records chained + * to it via the "next" fields. This sublist structure is needed to keep + * track of where the original query boundaries are. + */ + List *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ -static execution_state *init_execution_state(List *queryTree_list, +static List *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); @@ -122,62 +131,78 @@ static void sqlfunction_shutdown(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self); -/* Set up the list of per-query execution_state records for a SQL function */ -static execution_state * +/* + * Set up the per-query execution_state records for a SQL function. + * + * The input is a List of Lists of parsed and rewritten, but not planned, + * querytrees. The sublist structure denotes the original query boundaries. + */ +static List * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { - execution_state *firstes = NULL; - execution_state *preves = NULL; + List *eslist = NIL; execution_state *lasttages = NULL; - ListCell *qtl_item; + ListCell *lc1; - foreach(qtl_item, queryTree_list) + foreach(lc1, queryTree_list) { - Query *queryTree = (Query *) lfirst(qtl_item); - Node *stmt; - execution_state *newes; + List *qtlist = (List *) lfirst(lc1); + execution_state *firstes = NULL; + execution_state *preves = NULL; + ListCell *lc2; - Assert(IsA(queryTree, Query)); + foreach(lc2, qtlist) + { + Query *queryTree = (Query *) lfirst(lc2); + Node *stmt; + execution_state *newes; - if (queryTree->commandType == CMD_UTILITY) - stmt = queryTree->utilityStmt; - else - stmt = (Node *) pg_plan_query(queryTree, 0, NULL); + Assert(IsA(queryTree, Query)); - /* Precheck all commands for validity in a function */ - if (IsA(stmt, TransactionStmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a SQL function", - CreateCommandTag(stmt)))); + /* Plan the query if needed */ + if (queryTree->commandType == CMD_UTILITY) + stmt = queryTree->utilityStmt; + else + stmt = (Node *) pg_plan_query(queryTree, 0, NULL); - if (fcache->readonly_func && !CommandIsReadOnly(stmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a non-volatile function", - CreateCommandTag(stmt)))); + /* Precheck all commands for validity in a function */ + if (IsA(stmt, TransactionStmt)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a SQL function", + CreateCommandTag(stmt)))); - newes = (execution_state *) palloc(sizeof(execution_state)); - if (preves) - preves->next = newes; - else - firstes = newes; + if (fcache->readonly_func && !CommandIsReadOnly(stmt)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateCommandTag(stmt)))); + + /* OK, build the execution_state for this query */ + newes = (execution_state *) palloc(sizeof(execution_state)); + if (preves) + preves->next = newes; + else + firstes = newes; - newes->next = NULL; - newes->status = F_EXEC_START; - newes->setsResult = false; /* might change below */ - newes->lazyEval = false; /* might change below */ - newes->stmt = stmt; - newes->qd = NULL; + newes->next = NULL; + newes->status = F_EXEC_START; + newes->setsResult = false; /* might change below */ + newes->lazyEval = false; /* might change below */ + newes->stmt = stmt; + newes->qd = NULL; - if (queryTree->canSetTag) - lasttages = newes; + if (queryTree->canSetTag) + lasttages = newes; - preves = newes; + preves = newes; + } + + eslist = lappend(eslist, firstes); } /* @@ -211,7 +236,7 @@ init_execution_state(List *queryTree_list, } } - return firstes; + return eslist; } /* Initialize the SQLFunctionCache for a SQL function */ @@ -225,7 +250,10 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) SQLFunctionCachePtr fcache; Oid *argOidVect; int nargs; + List *raw_parsetree_list; List *queryTree_list; + List *flat_query_list; + ListCell *lc; Datum tmp; bool isNull; @@ -318,9 +346,32 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) fcache->src = TextDatumGetCString(tmp); /* - * Parse and rewrite the queries in the function text. + * Parse and rewrite the queries in the function text. Use sublists to + * keep track of the original query boundaries. But we also build a + * "flat" list of the rewritten queries to pass to check_sql_fn_retval. + * This is because the last canSetTag query determines the result type + * independently of query boundaries --- and it might not be in the last + * sublist, for example if the last query rewrites to DO INSTEAD NOTHING. + * (It might not be unreasonable to throw an error in such a case, but + * this is the historical behavior and it doesn't seem worth changing.) */ - queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs); + raw_parsetree_list = pg_parse_query(fcache->src); + + queryTree_list = NIL; + flat_query_list = NIL; + foreach(lc, raw_parsetree_list) + { + Node *parsetree = (Node *) lfirst(lc); + List *queryTree_sublist; + + queryTree_sublist = pg_analyze_and_rewrite(parsetree, + fcache->src, + argOidVect, + nargs); + queryTree_list = lappend(queryTree_list, queryTree_sublist); + flat_query_list = list_concat(flat_query_list, + list_copy(queryTree_sublist)); + } /* * Check that the function returns the type it claims to. Although in @@ -343,7 +394,7 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) */ fcache->returnsTuple = check_sql_fn_retval(foid, rettype, - queryTree_list, + flat_query_list, NULL, &fcache->junkFilter); @@ -375,24 +426,12 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { - Snapshot snapshot; DestReceiver *dest; Assert(es->qd == NULL); - /* - * In a read-only function, use the surrounding query's snapshot; - * otherwise take a new snapshot for each query. The snapshot should - * include a fresh command ID so that all work to date in this transaction - * is visible. - */ - if (fcache->readonly_func) - snapshot = GetActiveSnapshot(); - else - { - CommandCounterIncrement(); - snapshot = GetTransactionSnapshot(); - } + /* Caller should have ensured a suitable snapshot is active */ + Assert(ActiveSnapshotSet()); /* * If this query produces the function result, send its output to the @@ -416,18 +455,17 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache) if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, - snapshot, InvalidSnapshot, + GetActiveSnapshot(), + InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, - snapshot, + GetActiveSnapshot(), dest, fcache->paramLI); - /* We assume we don't need to set up ActiveSnapshot for ExecutorStart */ - /* Utility commands don't need Executor. */ if (es->qd->utilitystmt == NULL) { @@ -457,9 +495,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) { bool result; - /* Make our snapshot the active one for any called functions */ - PushActiveSnapshot(es->qd->snapshot); - if (es->qd->utilitystmt) { /* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */ @@ -487,8 +522,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) result = (count == 0L || es->qd->estate->es_processed == 0); } - PopActiveSnapshot(); - return result; } @@ -502,13 +535,8 @@ postquel_end(execution_state *es) /* Utility commands don't need Executor. */ if (es->qd->utilitystmt == NULL) { - /* Make our snapshot the active one for any called functions */ - PushActiveSnapshot(es->qd->snapshot); - ExecutorFinish(es->qd); ExecutorEnd(es->qd); - - PopActiveSnapshot(); } (*es->qd->dest->rDestroy) (es->qd->dest); @@ -619,9 +647,13 @@ fmgr_sql(PG_FUNCTION_ARGS) ErrorContextCallback sqlerrcontext; bool randomAccess; bool lazyEvalOK; + bool is_first; + bool pushed_snapshot; execution_state *es; TupleTableSlot *slot; Datum result; + List *eslist; + ListCell *eslc; /* * Switch to context in which the fcache lives. This ensures that @@ -673,13 +705,33 @@ fmgr_sql(PG_FUNCTION_ARGS) init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } - es = fcache->func_state; + eslist = fcache->func_state; + + /* + * Find first unfinished query in function, and note whether it's the + * first query. + */ + es = NULL; + is_first = true; + foreach(eslc, eslist) + { + es = (execution_state *) lfirst(eslc); + + while (es && es->status == F_EXEC_DONE) + { + is_first = false; + es = es->next; + } + + if (es) + break; + } /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ - if (es && es->status == F_EXEC_START) + if (is_first && es && es->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* @@ -690,21 +742,56 @@ fmgr_sql(PG_FUNCTION_ARGS) fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem); /* - * Find first unfinished query in function. - */ - while (es && es->status == F_EXEC_DONE) - es = es->next; - - /* * Execute each command in the function one after another until we either * run out of commands or get a result row from a lazily-evaluated SELECT. + * + * Notes about snapshot management: + * + * In a read-only function, we just use the surrounding query's snapshot. + * + * In a non-read-only function, we rely on the fact that we'll never + * suspend execution between queries of the function: the only reason to + * suspend execution before completion is if we are returning a row from + * a lazily-evaluated SELECT. So, when first entering this loop, we'll + * either start a new query (and push a fresh snapshot) or re-establish + * the active snapshot from the existing query descriptor. If we need to + * start a new query in a subsequent execution of the loop, either we need + * a fresh snapshot (and pushed_snapshot is false) or the existing + * snapshot is on the active stack and we can just bump its command ID. */ + pushed_snapshot = false; while (es) { bool completed; if (es->status == F_EXEC_START) + { + /* + * If not read-only, be sure to advance the command counter for + * each command, so that all work to date in this transaction is + * visible. Take a new snapshot if we don't have one yet, + * otherwise just bump the command ID in the existing snapshot. + */ + if (!fcache->readonly_func) + { + CommandCounterIncrement(); + if (!pushed_snapshot) + { + PushActiveSnapshot(GetTransactionSnapshot()); + pushed_snapshot = true; + } + else + UpdateActiveSnapshotCommandId(); + } + postquel_start(es, fcache); + } + else if (!fcache->readonly_func && !pushed_snapshot) + { + /* Re-establish active snapshot when re-entering function */ + PushActiveSnapshot(es->qd->snapshot); + pushed_snapshot = true; + } completed = postquel_getnext(es, fcache); @@ -730,7 +817,31 @@ fmgr_sql(PG_FUNCTION_ARGS) */ if (es->status != F_EXEC_DONE) break; + + /* + * Advance to next execution_state, which might be in the next list. + */ es = es->next; + while (!es) + { + eslc = lnext(eslc); + if (!eslc) + break; /* end of function */ + + es = (execution_state *) lfirst(eslc); + + /* + * Flush the current snapshot so that we will take a new one + * for the new query list. This ensures that new snaps are + * taken at original-query boundaries, matching the behavior + * of interactive execution. + */ + if (pushed_snapshot) + { + PopActiveSnapshot(); + pushed_snapshot = false; + } + } } /* @@ -857,17 +968,24 @@ fmgr_sql(PG_FUNCTION_ARGS) tuplestore_clear(fcache->tstore); } + /* Pop snapshot if we have pushed one */ + if (pushed_snapshot) + PopActiveSnapshot(); + /* * If we've gone through every command in the function, we are done. Reset * the execution states to start over again on next call. */ if (es == NULL) { - es = fcache->func_state; - while (es) + foreach(eslc, fcache->func_state) { - es->status = F_EXEC_START; - es = es->next; + es = (execution_state *) lfirst(eslc); + while (es) + { + es->status = F_EXEC_START; + es = es->next; + } } } @@ -918,18 +1036,25 @@ sql_exec_error_callback(void *arg) { execution_state *es; int query_num; + ListCell *lc; - es = fcache->func_state; + es = NULL; query_num = 1; - while (es) + foreach(lc, fcache->func_state) { - if (es->qd) + es = (execution_state *) lfirst(lc); + while (es) { - errcontext("SQL function \"%s\" statement %d", - fcache->fname, query_num); - break; + if (es->qd) + { + errcontext("SQL function \"%s\" statement %d", + fcache->fname, query_num); + break; + } + es = es->next; } - es = es->next; + if (es) + break; query_num++; } if (es == NULL) @@ -961,16 +1086,31 @@ static void ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); - execution_state *es = fcache->func_state; + execution_state *es; + ListCell *lc; - while (es != NULL) + foreach(lc, fcache->func_state) { - /* Shut down anything still running */ - if (es->status == F_EXEC_RUN) - postquel_end(es); - /* Reset states to START in case we're called again */ - es->status = F_EXEC_START; - es = es->next; + es = (execution_state *) lfirst(lc); + while (es) + { + /* Shut down anything still running */ + if (es->status == F_EXEC_RUN) + { + /* Re-establish active snapshot for any called functions */ + if (!fcache->readonly_func) + PushActiveSnapshot(es->qd->snapshot); + + postquel_end(es); + + if (!fcache->readonly_func) + PopActiveSnapshot(); + } + + /* Reset states to START in case we're called again */ + es->status = F_EXEC_START; + es = es->next; + } } /* Release tuplestore if we have one */ |