aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/heapam.c28
-rw-r--r--src/backend/executor/execParallel.c101
-rw-r--r--src/backend/executor/nodeBitmapHeapscan.c42
-rw-r--r--src/backend/executor/nodeCustom.c15
-rw-r--r--src/backend/executor/nodeForeignscan.c23
-rw-r--r--src/backend/executor/nodeGather.c29
-rw-r--r--src/backend/executor/nodeGatherMerge.c29
-rw-r--r--src/backend/executor/nodeIndexonlyscan.c29
-rw-r--r--src/backend/executor/nodeIndexscan.c38
-rw-r--r--src/backend/executor/nodeSeqscan.c16
-rw-r--r--src/backend/executor/nodeSort.c17
-rw-r--r--src/include/access/heapam.h1
-rw-r--r--src/include/executor/execParallel.h3
-rw-r--r--src/include/executor/nodeBitmapHeapscan.h2
-rw-r--r--src/include/executor/nodeCustom.h2
-rw-r--r--src/include/executor/nodeForeignscan.h2
-rw-r--r--src/include/executor/nodeIndexonlyscan.h2
-rw-r--r--src/include/executor/nodeIndexscan.h1
-rw-r--r--src/include/executor/nodeSeqscan.h1
-rw-r--r--src/include/executor/nodeSort.h1
-rw-r--r--src/include/foreign/fdwapi.h4
-rw-r--r--src/include/nodes/extensible.h3
22 files changed, 274 insertions, 115 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ff03c68fcdb..e29c5ad0868 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1531,21 +1531,6 @@ heap_rescan(HeapScanDesc scan,
* reinitialize scan descriptor
*/
initscan(scan, key, true);
-
- /*
- * reset parallel scan, if present
- */
- if (scan->rs_parallel != NULL)
- {
- ParallelHeapScanDesc parallel_scan;
-
- /*
- * Caller is responsible for making sure that all workers have
- * finished the scan before calling this.
- */
- parallel_scan = scan->rs_parallel;
- pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
- }
}
/* ----------------
@@ -1643,6 +1628,19 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
}
/* ----------------
+ * heap_parallelscan_reinitialize - reset a parallel scan
+ *
+ * Call this in the leader process. Caller is responsible for
+ * making sure that all workers have finished the scan beforehand.
+ * ----------------
+ */
+void
+heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
+{
+ pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
+}
+
+/* ----------------
* heap_beginscan_parallel - join a parallel scan
*
* Caller must hold a suitable lock on the correct relation.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 01316ff5d94..c713b851399 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
case T_SortState:
/* even when not parallel-aware */
ExecSortEstimate((SortState *) planstate, e->pcxt);
+ break;
+
default:
break;
}
@@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate,
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
+ break;
+
default:
break;
}
@@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
}
/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
- ReinitializeParallelDSM(pei->pcxt);
- pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
- pei->finished = false;
-}
-
-/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
ExecParallelInitializeDSM(planstate, &d);
/*
- * Make sure that the world hasn't shifted under our feat. This could
+ * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
@@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
}
/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei)
+{
+ /* Old workers must already be shut down */
+ Assert(pei->finished);
+
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
+
+ /* Traverse plan tree and let each child node reset associated state. */
+ ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt)
+{
+ if (planstate == NULL)
+ return false;
+
+ /*
+ * Call reinitializers for DSM-using plan nodes.
+ */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ if (planstate->plan->parallel_aware)
+ ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+ pcxt);
+ break;
+ case T_ForeignScanState:
+ if (planstate->plan->parallel_aware)
+ ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+ pcxt);
+ break;
+ case T_CustomScanState:
+ if (planstate->plan->parallel_aware)
+ ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+ pcxt);
+ break;
+ case T_BitmapHeapScanState:
+ if (planstate->plan->parallel_aware)
+ ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+ pcxt);
+ break;
+ case T_SortState:
+ /* even when not parallel-aware */
+ ExecSortReInitializeDSM((SortState *) planstate, pcxt);
+ break;
+
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
+/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/
@@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
- ExecBitmapHeapInitializeWorker(
- (BitmapHeapScanState *) planstate, toc);
+ ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc);
+ break;
+
default:
break;
}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 79f534e4e92..f7e55e0b45b 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->shared_tbmiterator = NULL;
node->shared_prefetch_iterator = NULL;
- /* Reset parallel bitmap state, if present */
- if (node->pstate)
- {
- dsa_area *dsa = node->ss.ps.state->es_query_dsa;
-
- node->pstate->state = BM_INITIAL;
-
- if (DsaPointerIsValid(node->pstate->tbmiterator))
- tbm_free_shared_area(dsa, node->pstate->tbmiterator);
-
- if (DsaPointerIsValid(node->pstate->prefetch_iterator))
- tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
-
- node->pstate->tbmiterator = InvalidDsaPointer;
- node->pstate->prefetch_iterator = InvalidDsaPointer;
- }
-
ExecScanReScan(&node->ss);
/*
@@ -1000,6 +983,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecBitmapHeapReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapHeapState *pstate = node->pstate;
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
+
+ pstate->state = BM_INITIAL;
+
+ if (DsaPointerIsValid(pstate->tbmiterator))
+ tbm_free_shared_area(dsa, pstate->tbmiterator);
+
+ if (DsaPointerIsValid(pstate->prefetch_iterator))
+ tbm_free_shared_area(dsa, pstate->prefetch_iterator);
+
+ pstate->tbmiterator = InvalidDsaPointer;
+ pstate->prefetch_iterator = InvalidDsaPointer;
+}
+
+/* ----------------------------------------------------------------
* ExecBitmapHeapInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index fb7645b1f46..07dcabef551 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -195,6 +195,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
}
void
+ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->ReInitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+ methods->ReInitializeDSMCustomScan(node, pcxt, coordinate);
+ }
+}
+
+void
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
{
const CustomExecMethods *methods = node->methods;
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 140e82ef5e4..20892d6d5fb 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -332,7 +332,28 @@ ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
}
/* ----------------------------------------------------------------
- * ExecForeignScanInitializeDSM
+ * ExecForeignScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->ReInitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+ fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeWorker
*
* Initialization according to the parallel coordination information
* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 58f88a5724d..f9cf1b2f875 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -153,12 +153,15 @@ ExecGather(PlanState *pstate)
{
ParallelContext *pcxt;
- /* Initialize the workers required to execute Gather node. */
+ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
gather->num_workers,
node->tuples_needed);
+ else
+ ExecParallelReinitialize(node->ps.lefttree,
+ node->pei);
/*
* Register backend workers. We might not get as many as we
@@ -426,7 +429,7 @@ ExecShutdownGather(GatherState *node)
/* ----------------------------------------------------------------
* ExecReScanGather
*
- * Re-initialize the workers and rescans a relation via them.
+ * Prepare to re-scan the result of a Gather.
* ----------------------------------------------------------------
*/
void
@@ -435,19 +438,12 @@ ExecReScanGather(GatherState *node)
Gather *gather = (Gather *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
- /*
- * Re-initialize the parallel workers to perform rescan of relation. We
- * want to gracefully shutdown all the workers so that they should be able
- * to propagate any error or other information to master backend before
- * dying. Parallel context will be reused for rescan.
- */
+ /* Make sure any existing workers are gracefully shut down */
ExecShutdownGatherWorkers(node);
+ /* Mark node so that shared state will be rebuilt at next call */
node->initialized = false;
- if (node->pei)
- ExecParallelReinitialize(node->pei);
-
/*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
@@ -459,10 +455,15 @@ ExecReScanGather(GatherState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gather->rescan_param);
-
/*
- * if chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode. Note: because this does nothing if we have a
+ * rescan_param, it's currently guaranteed that parallel-aware child nodes
+ * will not see a ReScan call until after they get a ReInitializeDSM call.
+ * That ordering might not be something to rely on, though. A good rule
+ * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+ * should reset only local state, and anything that depends on both of
+ * those steps being finished must wait until the first ExecProcNode call.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index f50841699c4..0bd5da38b4a 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -187,12 +187,15 @@ ExecGatherMerge(PlanState *pstate)
{
ParallelContext *pcxt;
- /* Initialize data structures for workers. */
+ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
gm->num_workers,
node->tuples_needed);
+ else
+ ExecParallelReinitialize(node->ps.lefttree,
+ node->pei);
/* Try to launch workers. */
pcxt = node->pei->pcxt;
@@ -321,7 +324,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
/* ----------------------------------------------------------------
* ExecReScanGatherMerge
*
- * Re-initialize the workers and rescans a relation via them.
+ * Prepare to re-scan the result of a GatherMerge.
* ----------------------------------------------------------------
*/
void
@@ -330,20 +333,13 @@ ExecReScanGatherMerge(GatherMergeState *node)
GatherMerge *gm = (GatherMerge *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
- /*
- * Re-initialize the parallel workers to perform rescan of relation. We
- * want to gracefully shutdown all the workers so that they should be able
- * to propagate any error or other information to master backend before
- * dying. Parallel context will be reused for rescan.
- */
+ /* Make sure any existing workers are gracefully shut down */
ExecShutdownGatherMergeWorkers(node);
+ /* Mark node so that shared state will be rebuilt at next call */
node->initialized = false;
node->gm_initialized = false;
- if (node->pei)
- ExecParallelReinitialize(node->pei);
-
/*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
@@ -355,10 +351,15 @@ ExecReScanGatherMerge(GatherMergeState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gm->rescan_param);
-
/*
- * if chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode. Note: because this does nothing if we have a
+ * rescan_param, it's currently guaranteed that parallel-aware child nodes
+ * will not see a ReScan call until after they get a ReInitializeDSM call.
+ * That ordering might not be something to rely on, though. A good rule
+ * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+ * should reset only local state, and anything that depends on both of
+ * those steps being finished must wait until the first ExecProcNode call.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index fe7ba3f1a4f..5351cb8981e 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -25,6 +25,7 @@
* parallel index-only scan
* ExecIndexOnlyScanInitializeDSM initialize DSM for parallel
* index-only scan
+ * ExecIndexOnlyScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -336,16 +337,6 @@ ExecIndexOnlyScan(PlanState *pstate)
void
ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
{
- bool reset_parallel_scan = true;
-
- /*
- * If we are here to just update the scan keys, then don't reset parallel
- * scan. For detailed reason behind this look in the comments for
- * ExecReScanIndexScan.
- */
- if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
- reset_parallel_scan = false;
-
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -366,15 +357,10 @@ ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
/* reset index scan */
if (node->ioss_ScanDesc)
- {
-
index_rescan(node->ioss_ScanDesc,
node->ioss_ScanKeys, node->ioss_NumScanKeys,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
- if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan)
- index_parallelrescan(node->ioss_ScanDesc);
- }
ExecScanReScan(&node->ss);
}
@@ -672,6 +658,19 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecIndexOnlyScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt)
+{
+ index_parallelrescan(node->ioss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
* ExecIndexOnlyScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 404076d5930..638b17b07cb 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -24,6 +24,7 @@
* ExecIndexRestrPos restores scan position.
* ExecIndexScanEstimate estimates DSM space needed for parallel index scan
* ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
+ * ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -577,18 +578,6 @@ ExecIndexScan(PlanState *pstate)
void
ExecReScanIndexScan(IndexScanState *node)
{
- bool reset_parallel_scan = true;
-
- /*
- * If we are here to just update the scan keys, then don't reset parallel
- * scan. We don't want each of the participating process in the parallel
- * scan to update the shared parallel scan state at the start of the scan.
- * It is quite possible that one of the participants has already begun
- * scanning the index when another has yet to start it.
- */
- if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
- reset_parallel_scan = false;
-
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -614,21 +603,11 @@ ExecReScanIndexScan(IndexScanState *node)
reorderqueue_pop(node);
}
- /*
- * Reset (parallel) index scan. For parallel-aware nodes, the scan
- * descriptor is initialized during actual execution of node and we can
- * reach here before that (ex. during execution of nest loop join). So,
- * avoid updating the scan descriptor at that time.
- */
+ /* reset index scan */
if (node->iss_ScanDesc)
- {
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
-
- if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan)
- index_parallelrescan(node->iss_ScanDesc);
- }
node->iss_ReachedEnd = false;
ExecScanReScan(&node->ss);
@@ -1717,6 +1696,19 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecIndexScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexScanReInitializeDSM(IndexScanState *node,
+ ParallelContext *pcxt)
+{
+ index_parallelrescan(node->iss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
* ExecIndexScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 5c49d4ca8a9..d4ac939c9b0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -22,6 +22,7 @@
*
* ExecSeqScanEstimate estimates DSM space needed for parallel scan
* ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ * ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -325,6 +326,21 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecSeqScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanReInitializeDSM(SeqScanState *node,
+ ParallelContext *pcxt)
+{
+ HeapScanDesc scan = node->ss.ss_currentScanDesc;
+
+ heap_parallelscan_reinitialize(scan->rs_parallel);
+}
+
+/* ----------------------------------------------------------------
* ExecSeqScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 66ef109c122..98bcaeb66f5 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -397,6 +397,23 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
}
/* ----------------------------------------------------------------
+ * ExecSortReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
+{
+ /* If there's any instrumentation space, clear it for next time */
+ if (node->shared_info != NULL)
+ {
+ memset(node->shared_info->sinstrument, 0,
+ node->shared_info->num_workers * sizeof(TuplesortInstrumentation));
+ }
+}
+
+/* ----------------------------------------------------------------
* ExecSortInitializeWorker
*
* Attach worker to DSM space for sort statistics.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b2132e723ed..4e41024e926 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -130,6 +130,7 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
extern Size heap_parallelscan_estimate(Snapshot snapshot);
extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
Relation relation, Snapshot snapshot);
+extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
extern bool heap_fetch(Relation relation, Snapshot snapshot,
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 79b886706f7..1cb895d8984 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -36,7 +36,8 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
+extern void ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei);
extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h
index c77694cf22f..10844a405a5 100644
--- a/src/include/executor/nodeBitmapHeapscan.h
+++ b/src/include/executor/nodeBitmapHeapscan.h
@@ -24,6 +24,8 @@ extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt);
+extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
shm_toc *toc);
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index a1cc63ae1f3..25767b6a4a5 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -34,6 +34,8 @@ extern void ExecCustomScanEstimate(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeDSM(CustomScanState *node,
ParallelContext *pcxt);
+extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc);
extern void ExecShutdownCustomScan(CustomScanState *node);
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 0b662597d8f..0354c2c4308 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -25,6 +25,8 @@ extern void ExecForeignScanEstimate(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt);
+extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc);
extern void ExecShutdownForeignScan(ForeignScanState *node);
diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h
index c8a709c26ed..690b5dbfe59 100644
--- a/src/include/executor/nodeIndexonlyscan.h
+++ b/src/include/executor/nodeIndexonlyscan.h
@@ -28,6 +28,8 @@ extern void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt);
+extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
shm_toc *toc);
diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h
index 1668e347eef..0670e87e395 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -24,6 +24,7 @@ extern void ExecIndexRestrPos(IndexScanState *node);
extern void ExecReScanIndexScan(IndexScanState *node);
extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
+extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
/*
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index 0fba79f8de6..eb96799cade 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -24,6 +24,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
/* parallel scan support */
extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
#endif /* NODESEQSCAN_H */
diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h
index 77ac06597ff..1ab8f767210 100644
--- a/src/include/executor/nodeSort.h
+++ b/src/include/executor/nodeSort.h
@@ -26,6 +26,7 @@ extern void ExecReScanSort(SortState *node);
/* parallel instrumentation support */
extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
+extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc);
extern void ExecSortRetrieveInstrumentation(SortState *node);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e391f20fb86..ef0fbe6f9c6 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -148,6 +148,9 @@ typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt,
void *coordinate);
+typedef void (*ReInitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc,
void *coordinate);
@@ -224,6 +227,7 @@ typedef struct FdwRoutine
IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
ShutdownForeignScan_function ShutdownForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h
index 7325bf536af..0654e79c7ba 100644
--- a/src/include/nodes/extensible.h
+++ b/src/include/nodes/extensible.h
@@ -136,6 +136,9 @@ typedef struct CustomExecMethods
void (*InitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt,
void *coordinate);
+ void (*ReInitializeDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc,
void *coordinate);