aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c54
1 files changed, 44 insertions, 10 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d4a8b..ad9eba63dd3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,17 +47,26 @@
* greater than any 32-bit integer here so that values < 2^32 can be used
* by individual parallel nodes to store their own state.
*/
-#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
/*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
+/*
* DSM structure for accumulating per-PlanState instrumentation.
*
* instrument_options: Same meaning here as in instrument.c.
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
* execution and return results to the main backend.
*/
ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+ int64 tuples_needed)
{
ParallelExecutorInfo *pei;
ParallelContext *pcxt;
ExecParallelEstimateContext e;
ExecParallelInitializeDSMContext d;
+ FixedParallelExecutorState *fpes;
char *pstmt_data;
char *pstmt_space;
char *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* for the various things we need to store.
*/
+ /* Estimate space for fixed-size state. */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ sizeof(FixedParallelExecutorState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for query text. */
query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* asked for has been allocated or initialized yet, though, so do that.
*/
+ /* Store fixed-size state. */
+ fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+ fpes->tuples_needed = tuples_needed;
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
/* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len);
memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
+ FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void *area_space;
dsa_area *area;
+ /* Get fixed-size state. */
+ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
queryDesc->planstate->state->es_query_dsa = area;
ExecParallelInitializeWorker(queryDesc->planstate, toc);
- /* Run the plan */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+ /* Pass down any tuple bound */
+ ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+ /*
+ * Run the plan. If we specified a tuple bound, be careful not to demand
+ * more tuples than that.
+ */
+ ExecutorRun(queryDesc,
+ ForwardScanDirection,
+ fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+ true);
/* Shut down the executor */
ExecutorFinish(queryDesc);