diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c93084e4d2a..838ec842991 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -37,6 +37,7 @@ #include "executor/nodeSort.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" +#include "jit/jit.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" #include "optimizer/planner.h" @@ -62,6 +63,7 @@ #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) +#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -573,9 +575,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; SharedExecutorInstrumentation *instrumentation = NULL; + SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; int paramlistinfo_len; int instrumentation_len = 0; + int jit_instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; @@ -669,6 +673,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(e.nnodes, nworkers)); shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for JIT instrumentation, if required. */ + if (estate->es_jit_flags != PGJIT_NONE) + { + jit_instrumentation_len = + offsetof(SharedJitInstrumentation, jit_instr) + + sizeof(JitInstrumentation) * nworkers; + shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } } /* Estimate space for DSA area. */ @@ -742,6 +756,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instrumentation); pei->instrumentation = instrumentation; + + if (estate->es_jit_flags != PGJIT_NONE) + { + jit_instrumentation = shm_toc_allocate(pcxt->toc, + jit_instrumentation_len); + jit_instrumentation->num_workers = nworkers; + memset(jit_instrumentation->jit_instr, 0, + sizeof(JitInstrumentation) * nworkers); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION, + jit_instrumentation); + pei->jit_instrumentation = jit_instrumentation; + } } /* @@ -1004,6 +1030,46 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, } /* + * Add up the workers' JIT instrumentation from dynamic shared memory. + */ +static void +ExecParallelRetrieveJitInstrumentation(PlanState *planstate, + SharedJitInstrumentation *shared_jit) +{ + JitInstrumentation *combined; + int ibytes; + + int n; + + /* + * Accumulate worker JIT instrumentation into the combined JIT + * instrumentation, allocating it if required. Note this is kept separate + * from the leader's own instrumentation. + */ + if (!planstate->state->es_jit_combined_instr) + planstate->state->es_jit_combined_instr = + MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation)); + combined = planstate->state->es_jit_combined_instr; + + /* Accummulate all the workers' instrumentations. */ + for (n = 0; n < shared_jit->num_workers; ++n) + InstrJitAgg(combined, &shared_jit->jit_instr[n]); + + /* + * Store the per-worker detail. + * + * Similar to ExecParallelRetrieveInstrumentation(), allocate the + * instrumentation in per-query context. + */ + ibytes = offsetof(SharedJitInstrumentation, jit_instr) + + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation)); + planstate->worker_jit_instrument = + MemoryContextAlloc(planstate->state->es_query_cxt, ibytes); + + memcpy(planstate->worker_jit_instrument, shared_jit, ibytes); +} + +/* * Finish parallel execution. We wait for parallel workers to finish, and * accumulate their buffer usage. */ @@ -1068,6 +1134,11 @@ ExecParallelCleanup(ParallelExecutorInfo *pei) ExecParallelRetrieveInstrumentation(pei->planstate, pei->instrumentation); + /* Accumulate JIT instrumentation, if any. */ + if (pei->jit_instrumentation) + ExecParallelRetrieveJitInstrumentation(pei->planstate, + pei->jit_instrumentation); + /* Free any serialized parameters. */ if (DsaPointerIsValid(pei->param_exec)) { @@ -1274,6 +1345,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; + SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; dsa_area *area; @@ -1287,6 +1359,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; + jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION, + true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); /* Setting debug_query_string for individual workers */ @@ -1350,6 +1424,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecParallelReportInstrumentation(queryDesc->planstate, instrumentation); + /* Report JIT instrumentation data if any */ + if (queryDesc->estate->es_jit && jit_instrumentation != NULL) + { + Assert(ParallelWorkerNumber < jit_instrumentation->num_workers); + jit_instrumentation->jit_instr[ParallelWorkerNumber] = + queryDesc->estate->es_jit->instr; + } + /* Must do this after capturing instrumentation. */ ExecutorEnd(queryDesc); |