diff options
author | Robert Haas <rhaas@postgresql.org> | 2017-08-29 13:12:23 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2017-08-29 13:16:55 -0400 |
commit | 3452dc5240da43e833118484e1e9b4894d04431c (patch) | |
tree | d158d5d9630d80bc8a5ca838ba76eede60c77ba9 /src/backend/executor/execParallel.c | |
parent | ce5dcf54b942a469194ae390730f803b3f3fb928 (diff) | |
download | postgresql-3452dc5240da43e833118484e1e9b4894d04431c.tar.gz postgresql-3452dc5240da43e833118484e1e9b4894d04431c.zip |
Push tuple limits through Gather and Gather Merge.
If we only need, say, 10 tuples in total, then we certainly don't need
more than 10 tuples from any single process. Pushing down the limit
lets workers exit early when possible. For Gather Merge, there is
an additional benefit: a Sort immediately below the Gather Merge can
be done as a bounded sort if there is an applicable limit.
Robert Haas and Tom Lane
Discussion: http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8b..ad9eba63dd3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,17 +47,26 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 /* + * Fixed-size random stuff that we need to pass to parallel workers. + */ +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ +} FixedParallelExecutorState; + +/* * DSM structure for accumulating per-PlanState instrumentation. * * instrument_options: Same meaning here as in instrument.c. @@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); - /* Run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + + /* + * Run the plan. If we specified a tuple bound, be careful not to demand + * more tuples than that. + */ + ExecutorRun(queryDesc, + ForwardScanDirection, + fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed, + true); /* Shut down the executor */ ExecutorFinish(queryDesc); |