diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8b..ad9eba63dd3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,17 +47,26 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 /* + * Fixed-size random stuff that we need to pass to parallel workers. + */ +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ +} FixedParallelExecutorState; + +/* * DSM structure for accumulating per-PlanState instrumentation. * * instrument_options: Same meaning here as in instrument.c. @@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); - /* Run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + + /* + * Run the plan. If we specified a tuple bound, be careful not to demand + * more tuples than that. + */ + ExecutorRun(queryDesc, + ForwardScanDirection, + fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed, + true); /* Shut down the executor */ ExecutorFinish(queryDesc); |