diff options
Diffstat (limited to 'src')
22 files changed, 274 insertions, 115 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ff03c68fcdb..e29c5ad0868 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1531,21 +1531,6 @@ heap_rescan(HeapScanDesc scan, * reinitialize scan descriptor */ initscan(scan, key, true); - - /* - * reset parallel scan, if present - */ - if (scan->rs_parallel != NULL) - { - ParallelHeapScanDesc parallel_scan; - - /* - * Caller is responsible for making sure that all workers have - * finished the scan before calling this. - */ - parallel_scan = scan->rs_parallel; - pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); - } } /* ---------------- @@ -1643,6 +1628,19 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, } /* ---------------- + * heap_parallelscan_reinitialize - reset a parallel scan + * + * Call this in the leader process. Caller is responsible for + * making sure that all workers have finished the scan beforehand. + * ---------------- + */ +void +heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan) +{ + pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); +} + +/* ---------------- * heap_beginscan_parallel - join a parallel scan * * Caller must hold a suitable lock on the correct relation. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 01316ff5d94..c713b851399 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); +static bool ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) case T_SortState: /* even when not parallel-aware */ ExecSortEstimate((SortState *) planstate, e->pcxt); + break; + default: break; } @@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate, case T_SortState: /* even when not parallel-aware */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); + break; + default: break; } @@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) } /* - * Re-initialize the parallel executor info such that it can be reused by - * workers. - */ -void -ExecParallelReinitialize(ParallelExecutorInfo *pei) -{ - ReinitializeParallelDSM(pei->pcxt); - pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); - pei->finished = false; -} - -/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ExecParallelInitializeDSM(planstate, &d); /* - * Make sure that the world hasn't shifted under our feat. This could + * Make sure that the world hasn't shifted under our feet. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) @@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, } /* + * Re-initialize the parallel executor shared memory state before launching + * a fresh batch of workers. + */ +void +ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei) +{ + /* Old workers must already be shut down */ + Assert(pei->finished); + + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; + + /* Traverse plan tree and let each child node reset associated state. */ + ExecParallelReInitializeDSM(planstate, pei->pcxt); +} + +/* + * Traverse plan tree to reinitialize per-node dynamic shared memory state + */ +static bool +ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt) +{ + if (planstate == NULL) + return false; + + /* + * Call reinitializers for DSM-using plan nodes. + */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + if (planstate->plan->parallel_aware) + ExecSeqScanReInitializeDSM((SeqScanState *) planstate, + pcxt); + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) + ExecIndexScanReInitializeDSM((IndexScanState *) planstate, + pcxt); + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) + ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate, + pcxt); + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) + ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) + ExecCustomScanReInitializeDSM((CustomScanState *) planstate, + pcxt); + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) + ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, + pcxt); + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortReInitializeDSM((SortState *) planstate, pcxt); + break; + + default: + break; + } + + return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt); +} + +/* * Copy instrumentation information about this node and its descendants from * dynamic shared memory. */ @@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) - ExecBitmapHeapInitializeWorker( - (BitmapHeapScanState *) planstate, toc); + ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc); break; case T_SortState: /* even when not parallel-aware */ ExecSortInitializeWorker((SortState *) planstate, toc); + break; + default: break; } diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index 79f534e4e92..f7e55e0b45b 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node) node->shared_tbmiterator = NULL; node->shared_prefetch_iterator = NULL; - /* Reset parallel bitmap state, if present */ - if (node->pstate) - { - dsa_area *dsa = node->ss.ps.state->es_query_dsa; - - node->pstate->state = BM_INITIAL; - - if (DsaPointerIsValid(node->pstate->tbmiterator)) - tbm_free_shared_area(dsa, node->pstate->tbmiterator); - - if (DsaPointerIsValid(node->pstate->prefetch_iterator)) - tbm_free_shared_area(dsa, node->pstate->prefetch_iterator); - - node->pstate->tbmiterator = InvalidDsaPointer; - node->pstate->prefetch_iterator = InvalidDsaPointer; - } - ExecScanReScan(&node->ss); /* @@ -1000,6 +983,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, } /* ---------------------------------------------------------------- + * ExecBitmapHeapReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt) +{ + ParallelBitmapHeapState *pstate = node->pstate; + dsa_area *dsa = node->ss.ps.state->es_query_dsa; + + pstate->state = BM_INITIAL; + + if (DsaPointerIsValid(pstate->tbmiterator)) + tbm_free_shared_area(dsa, pstate->tbmiterator); + + if (DsaPointerIsValid(pstate->prefetch_iterator)) + tbm_free_shared_area(dsa, pstate->prefetch_iterator); + + pstate->tbmiterator = InvalidDsaPointer; + pstate->prefetch_iterator = InvalidDsaPointer; +} + +/* ---------------------------------------------------------------- * ExecBitmapHeapInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index fb7645b1f46..07dcabef551 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -195,6 +195,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) } void +ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->ReInitializeDSMCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); + methods->ReInitializeDSMCustomScan(node, pcxt, coordinate); + } +} + +void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) { const CustomExecMethods *methods = node->methods; diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 140e82ef5e4..20892d6d5fb 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -332,7 +332,28 @@ ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) } /* ---------------------------------------------------------------- - * ExecForeignScanInitializeDSM + * ExecForeignScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->ReInitializeDSMForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); + fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeWorker * * Initialization according to the parallel coordination information * ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 58f88a5724d..f9cf1b2f875 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -153,12 +153,15 @@ ExecGather(PlanState *pstate) { ParallelContext *pcxt; - /* Initialize the workers required to execute Gather node. */ + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers, node->tuples_needed); + else + ExecParallelReinitialize(node->ps.lefttree, + node->pei); /* * Register backend workers. We might not get as many as we @@ -426,7 +429,7 @@ ExecShutdownGather(GatherState *node) /* ---------------------------------------------------------------- * ExecReScanGather * - * Re-initialize the workers and rescans a relation via them. + * Prepare to re-scan the result of a Gather. * ---------------------------------------------------------------- */ void @@ -435,19 +438,12 @@ ExecReScanGather(GatherState *node) Gather *gather = (Gather *) node->ps.plan; PlanState *outerPlan = outerPlanState(node); - /* - * Re-initialize the parallel workers to perform rescan of relation. We - * want to gracefully shutdown all the workers so that they should be able - * to propagate any error or other information to master backend before - * dying. Parallel context will be reused for rescan. - */ + /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherWorkers(node); + /* Mark node so that shared state will be rebuilt at next call */ node->initialized = false; - if (node->pei) - ExecParallelReinitialize(node->pei); - /* * Set child node's chgParam to tell it that the next scan might deliver a * different set of rows within the leader process. (The overall rowset @@ -459,10 +455,15 @@ ExecReScanGather(GatherState *node) outerPlan->chgParam = bms_add_member(outerPlan->chgParam, gather->rescan_param); - /* - * if chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. Note: because this does nothing if we have a + * rescan_param, it's currently guaranteed that parallel-aware child nodes + * will not see a ReScan call until after they get a ReInitializeDSM call. + * That ordering might not be something to rely on, though. A good rule + * of thumb is that ReInitializeDSM should reset only shared state, ReScan + * should reset only local state, and anything that depends on both of + * those steps being finished must wait until the first ExecProcNode call. */ if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index f50841699c4..0bd5da38b4a 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -187,12 +187,15 @@ ExecGatherMerge(PlanState *pstate) { ParallelContext *pcxt; - /* Initialize data structures for workers. */ + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gm->num_workers, node->tuples_needed); + else + ExecParallelReinitialize(node->ps.lefttree, + node->pei); /* Try to launch workers. */ pcxt = node->pei->pcxt; @@ -321,7 +324,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node) /* ---------------------------------------------------------------- * ExecReScanGatherMerge * - * Re-initialize the workers and rescans a relation via them. + * Prepare to re-scan the result of a GatherMerge. * ---------------------------------------------------------------- */ void @@ -330,20 +333,13 @@ ExecReScanGatherMerge(GatherMergeState *node) GatherMerge *gm = (GatherMerge *) node->ps.plan; PlanState *outerPlan = outerPlanState(node); - /* - * Re-initialize the parallel workers to perform rescan of relation. We - * want to gracefully shutdown all the workers so that they should be able - * to propagate any error or other information to master backend before - * dying. Parallel context will be reused for rescan. - */ + /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherMergeWorkers(node); + /* Mark node so that shared state will be rebuilt at next call */ node->initialized = false; node->gm_initialized = false; - if (node->pei) - ExecParallelReinitialize(node->pei); - /* * Set child node's chgParam to tell it that the next scan might deliver a * different set of rows within the leader process. (The overall rowset @@ -355,10 +351,15 @@ ExecReScanGatherMerge(GatherMergeState *node) outerPlan->chgParam = bms_add_member(outerPlan->chgParam, gm->rescan_param); - /* - * if chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. Note: because this does nothing if we have a + * rescan_param, it's currently guaranteed that parallel-aware child nodes + * will not see a ReScan call until after they get a ReInitializeDSM call. + * That ordering might not be something to rely on, though. A good rule + * of thumb is that ReInitializeDSM should reset only shared state, ReScan + * should reset only local state, and anything that depends on both of + * those steps being finished must wait until the first ExecProcNode call. */ if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index fe7ba3f1a4f..5351cb8981e 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -25,6 +25,7 @@ * parallel index-only scan * ExecIndexOnlyScanInitializeDSM initialize DSM for parallel * index-only scan + * ExecIndexOnlyScanReInitializeDSM reinitialize DSM for fresh scan * ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" @@ -336,16 +337,6 @@ ExecIndexOnlyScan(PlanState *pstate) void ExecReScanIndexOnlyScan(IndexOnlyScanState *node) { - bool reset_parallel_scan = true; - - /* - * If we are here to just update the scan keys, then don't reset parallel - * scan. For detailed reason behind this look in the comments for - * ExecReScanIndexScan. - */ - if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady) - reset_parallel_scan = false; - /* * If we are doing runtime key calculations (ie, any of the index key * values weren't simple Consts), compute the new key values. But first, @@ -366,15 +357,10 @@ ExecReScanIndexOnlyScan(IndexOnlyScanState *node) /* reset index scan */ if (node->ioss_ScanDesc) - { - index_rescan(node->ioss_ScanDesc, node->ioss_ScanKeys, node->ioss_NumScanKeys, node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); - if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan) - index_parallelrescan(node->ioss_ScanDesc); - } ExecScanReScan(&node->ss); } @@ -672,6 +658,19 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, } /* ---------------------------------------------------------------- + * ExecIndexOnlyScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, + ParallelContext *pcxt) +{ + index_parallelrescan(node->ioss_ScanDesc); +} + +/* ---------------------------------------------------------------- * ExecIndexOnlyScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 404076d5930..638b17b07cb 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -24,6 +24,7 @@ * ExecIndexRestrPos restores scan position. * ExecIndexScanEstimate estimates DSM space needed for parallel index scan * ExecIndexScanInitializeDSM initialize DSM for parallel indexscan + * ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan * ExecIndexScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" @@ -577,18 +578,6 @@ ExecIndexScan(PlanState *pstate) void ExecReScanIndexScan(IndexScanState *node) { - bool reset_parallel_scan = true; - - /* - * If we are here to just update the scan keys, then don't reset parallel - * scan. We don't want each of the participating process in the parallel - * scan to update the shared parallel scan state at the start of the scan. - * It is quite possible that one of the participants has already begun - * scanning the index when another has yet to start it. - */ - if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady) - reset_parallel_scan = false; - /* * If we are doing runtime key calculations (ie, any of the index key * values weren't simple Consts), compute the new key values. But first, @@ -614,21 +603,11 @@ ExecReScanIndexScan(IndexScanState *node) reorderqueue_pop(node); } - /* - * Reset (parallel) index scan. For parallel-aware nodes, the scan - * descriptor is initialized during actual execution of node and we can - * reach here before that (ex. during execution of nest loop join). So, - * avoid updating the scan descriptor at that time. - */ + /* reset index scan */ if (node->iss_ScanDesc) - { index_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); - - if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan) - index_parallelrescan(node->iss_ScanDesc); - } node->iss_ReachedEnd = false; ExecScanReScan(&node->ss); @@ -1717,6 +1696,19 @@ ExecIndexScanInitializeDSM(IndexScanState *node, } /* ---------------------------------------------------------------- + * ExecIndexScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecIndexScanReInitializeDSM(IndexScanState *node, + ParallelContext *pcxt) +{ + index_parallelrescan(node->iss_ScanDesc); +} + +/* ---------------------------------------------------------------- * ExecIndexScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 5c49d4ca8a9..d4ac939c9b0 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -22,6 +22,7 @@ * * ExecSeqScanEstimate estimates DSM space needed for parallel scan * ExecSeqScanInitializeDSM initialize DSM for parallel scan + * ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan * ExecSeqScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" @@ -325,6 +326,21 @@ ExecSeqScanInitializeDSM(SeqScanState *node, } /* ---------------------------------------------------------------- + * ExecSeqScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanReInitializeDSM(SeqScanState *node, + ParallelContext *pcxt) +{ + HeapScanDesc scan = node->ss.ss_currentScanDesc; + + heap_parallelscan_reinitialize(scan->rs_parallel); +} + +/* ---------------------------------------------------------------- * ExecSeqScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 66ef109c122..98bcaeb66f5 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -397,6 +397,23 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt) } /* ---------------------------------------------------------------- + * ExecSortReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt) +{ + /* If there's any instrumentation space, clear it for next time */ + if (node->shared_info != NULL) + { + memset(node->shared_info->sinstrument, 0, + node->shared_info->num_workers * sizeof(TuplesortInstrumentation)); + } +} + +/* ---------------------------------------------------------------- * ExecSortInitializeWorker * * Attach worker to DSM space for sort statistics. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b2132e723ed..4e41024e926 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -130,6 +130,7 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, Snapshot snapshot); +extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan); extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); extern bool heap_fetch(Relation relation, Snapshot snapshot, diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 79b886706f7..1cb895d8984 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -36,7 +36,8 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, int64 tuples_needed); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); -extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); +extern void ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h index c77694cf22f..10844a405a5 100644 --- a/src/include/executor/nodeBitmapHeapscan.h +++ b/src/include/executor/nodeBitmapHeapscan.h @@ -24,6 +24,8 @@ extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt); extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt); +extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt); extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc); diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index a1cc63ae1f3..25767b6a4a5 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -34,6 +34,8 @@ extern void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt); extern void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt); +extern void ExecCustomScanReInitializeDSM(CustomScanState *node, + ParallelContext *pcxt); extern void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc); extern void ExecShutdownCustomScan(CustomScanState *node); diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 0b662597d8f..0354c2c4308 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -25,6 +25,8 @@ extern void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt); extern void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt); +extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, + ParallelContext *pcxt); extern void ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc); extern void ExecShutdownForeignScan(ForeignScanState *node); diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h index c8a709c26ed..690b5dbfe59 100644 --- a/src/include/executor/nodeIndexonlyscan.h +++ b/src/include/executor/nodeIndexonlyscan.h @@ -28,6 +28,8 @@ extern void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt); extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt); +extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, + ParallelContext *pcxt); extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc); diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 1668e347eef..0670e87e395 100644 --- a/src/include/executor/nodeIndexscan.h +++ b/src/include/executor/nodeIndexscan.h @@ -24,6 +24,7 @@ extern void ExecIndexRestrPos(IndexScanState *node); extern void ExecReScanIndexScan(IndexScanState *node); extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt); +extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc); /* diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 0fba79f8de6..eb96799cade 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -24,6 +24,7 @@ extern void ExecReScanSeqScan(SeqScanState *node); /* parallel scan support */ extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); +extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); #endif /* NODESEQSCAN_H */ diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h index 77ac06597ff..1ab8f767210 100644 --- a/src/include/executor/nodeSort.h +++ b/src/include/executor/nodeSort.h @@ -26,6 +26,7 @@ extern void ExecReScanSort(SortState *node); /* parallel instrumentation support */ extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); +extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc); extern void ExecSortRetrieveInstrumentation(SortState *node); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index e391f20fb86..ef0fbe6f9c6 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -148,6 +148,9 @@ typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, ParallelContext *pcxt, void *coordinate); +typedef void (*ReInitializeDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate); typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, shm_toc *toc, void *coordinate); @@ -224,6 +227,7 @@ typedef struct FdwRoutine IsForeignScanParallelSafe_function IsForeignScanParallelSafe; EstimateDSMForeignScan_function EstimateDSMForeignScan; InitializeDSMForeignScan_function InitializeDSMForeignScan; + ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; ShutdownForeignScan_function ShutdownForeignScan; } FdwRoutine; diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 7325bf536af..0654e79c7ba 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -136,6 +136,9 @@ typedef struct CustomExecMethods void (*InitializeDSMCustomScan) (CustomScanState *node, ParallelContext *pcxt, void *coordinate); + void (*ReInitializeDSMCustomScan) (CustomScanState *node, + ParallelContext *pcxt, + void *coordinate); void (*InitializeWorkerCustomScan) (CustomScanState *node, shm_toc *toc, void *coordinate); |