aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c82
1 files changed, 82 insertions, 0 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c93084e4d2a..838ec842991 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -37,6 +37,7 @@
#include "executor/nodeSort.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
+#include "jit/jit.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
@@ -62,6 +63,7 @@
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
+#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -573,9 +575,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *paramlistinfo_space;
BufferUsage *bufusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
+ SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
int paramlistinfo_len;
int instrumentation_len = 0;
+ int jit_instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
@@ -669,6 +673,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(e.nnodes, nworkers));
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate space for JIT instrumentation, if required. */
+ if (estate->es_jit_flags != PGJIT_NONE)
+ {
+ jit_instrumentation_len =
+ offsetof(SharedJitInstrumentation, jit_instr) +
+ sizeof(JitInstrumentation) * nworkers;
+ shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
}
/* Estimate space for DSA area. */
@@ -742,6 +756,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
+
+ if (estate->es_jit_flags != PGJIT_NONE)
+ {
+ jit_instrumentation = shm_toc_allocate(pcxt->toc,
+ jit_instrumentation_len);
+ jit_instrumentation->num_workers = nworkers;
+ memset(jit_instrumentation->jit_instr, 0,
+ sizeof(JitInstrumentation) * nworkers);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
+ jit_instrumentation);
+ pei->jit_instrumentation = jit_instrumentation;
+ }
}
/*
@@ -1004,6 +1030,46 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
}
/*
+ * Add up the workers' JIT instrumentation from dynamic shared memory.
+ */
+static void
+ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
+ SharedJitInstrumentation *shared_jit)
+{
+ JitInstrumentation *combined;
+ int ibytes;
+
+ int n;
+
+ /*
+ * Accumulate worker JIT instrumentation into the combined JIT
+ * instrumentation, allocating it if required. Note this is kept separate
+ * from the leader's own instrumentation.
+ */
+ if (!planstate->state->es_jit_combined_instr)
+ planstate->state->es_jit_combined_instr =
+ MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
+ combined = planstate->state->es_jit_combined_instr;
+
+ /* Accummulate all the workers' instrumentations. */
+ for (n = 0; n < shared_jit->num_workers; ++n)
+ InstrJitAgg(combined, &shared_jit->jit_instr[n]);
+
+ /*
+ * Store the per-worker detail.
+ *
+ * Similar to ExecParallelRetrieveInstrumentation(), allocate the
+ * instrumentation in per-query context.
+ */
+ ibytes = offsetof(SharedJitInstrumentation, jit_instr)
+ + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
+ planstate->worker_jit_instrument =
+ MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
+
+ memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
+}
+
+/*
* Finish parallel execution. We wait for parallel workers to finish, and
* accumulate their buffer usage.
*/
@@ -1068,6 +1134,11 @@ ExecParallelCleanup(ParallelExecutorInfo *pei)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
+ /* Accumulate JIT instrumentation, if any. */
+ if (pei->jit_instrumentation)
+ ExecParallelRetrieveJitInstrumentation(pei->planstate,
+ pei->jit_instrumentation);
+
/* Free any serialized parameters. */
if (DsaPointerIsValid(pei->param_exec))
{
@@ -1274,6 +1345,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
+ SharedJitInstrumentation *jit_instrumentation;
int instrument_options = 0;
void *area_space;
dsa_area *area;
@@ -1287,6 +1359,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
if (instrumentation != NULL)
instrument_options = instrumentation->instrument_options;
+ jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
+ true);
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
/* Setting debug_query_string for individual workers */
@@ -1350,6 +1424,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecParallelReportInstrumentation(queryDesc->planstate,
instrumentation);
+ /* Report JIT instrumentation data if any */
+ if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
+ {
+ Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
+ jit_instrumentation->jit_instr[ParallelWorkerNumber] =
+ queryDesc->estate->es_jit->instr;
+ }
+
/* Must do this after capturing instrumentation. */
ExecutorEnd(queryDesc);