aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-08-30 13:18:16 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-08-30 13:18:16 -0400
commit41b0dd987d44089dc48e9c70024277e253b396b7 (patch)
treec42eeeb2f175764a9b1ad9c095f8a46057078eb3 /src
parent6c2c5bea3cec4c874d1ee225bb6e222055c03d75 (diff)
downloadpostgresql-41b0dd987d44089dc48e9c70024277e253b396b7.tar.gz
postgresql-41b0dd987d44089dc48e9c70024277e253b396b7.zip
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared state within the ExecReScan code for parallel-aware scan nodes. This is problematic, because it means that the ExecReScan call has to occur synchronously (ie, during the parent Gather node's ReScan call). That is swimming very much against the tide so far as the ExecReScan machinery is concerned; the fact that it works at all today depends on a lot of fragile assumptions, such as that no plan node between Gather and a parallel-aware scan node is parameterized. Another objection is that because ExecReScan might be called in workers as well as the leader, hacky extra tests are needed in some places to prevent unwanted shared-state resets. Hence, let's separate this code into two functions, a ReInitializeDSM call and the ReScan call proper. ReInitializeDSM is called only in the leader and is guaranteed to run before we start new workers. ReScan is returned to its traditional function of resetting only local state, which means that ExecReScan's usual habits of delaying or eliminating child rescan calls are safe again. As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary to make these changes in 9.6, which is a good thing because the FDW and CustomScan APIs are impacted. Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/heapam.c28
-rw-r--r--src/backend/executor/execParallel.c101
-rw-r--r--src/backend/executor/nodeBitmapHeapscan.c42
-rw-r--r--src/backend/executor/nodeCustom.c15
-rw-r--r--src/backend/executor/nodeForeignscan.c23
-rw-r--r--src/backend/executor/nodeGather.c29
-rw-r--r--src/backend/executor/nodeGatherMerge.c29
-rw-r--r--src/backend/executor/nodeIndexonlyscan.c29
-rw-r--r--src/backend/executor/nodeIndexscan.c38
-rw-r--r--src/backend/executor/nodeSeqscan.c16
-rw-r--r--src/backend/executor/nodeSort.c17
-rw-r--r--src/include/access/heapam.h1
-rw-r--r--src/include/executor/execParallel.h3
-rw-r--r--src/include/executor/nodeBitmapHeapscan.h2
-rw-r--r--src/include/executor/nodeCustom.h2
-rw-r--r--src/include/executor/nodeForeignscan.h2
-rw-r--r--src/include/executor/nodeIndexonlyscan.h2
-rw-r--r--src/include/executor/nodeIndexscan.h1
-rw-r--r--src/include/executor/nodeSeqscan.h1
-rw-r--r--src/include/executor/nodeSort.h1
-rw-r--r--src/include/foreign/fdwapi.h4
-rw-r--r--src/include/nodes/extensible.h3
22 files changed, 274 insertions, 115 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ff03c68fcdb..e29c5ad0868 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1531,21 +1531,6 @@ heap_rescan(HeapScanDesc scan,
* reinitialize scan descriptor
*/
initscan(scan, key, true);
-
- /*
- * reset parallel scan, if present
- */
- if (scan->rs_parallel != NULL)
- {
- ParallelHeapScanDesc parallel_scan;
-
- /*
- * Caller is responsible for making sure that all workers have
- * finished the scan before calling this.
- */
- parallel_scan = scan->rs_parallel;
- pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
- }
}
/* ----------------
@@ -1643,6 +1628,19 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
}
/* ----------------
+ * heap_parallelscan_reinitialize - reset a parallel scan
+ *
+ * Call this in the leader process. Caller is responsible for
+ * making sure that all workers have finished the scan beforehand.
+ * ----------------
+ */
+void
+heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
+{
+ pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
+}
+
+/* ----------------
* heap_beginscan_parallel - join a parallel scan
*
* Caller must hold a suitable lock on the correct relation.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 01316ff5d94..c713b851399 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
case T_SortState:
/* even when not parallel-aware */
ExecSortEstimate((SortState *) planstate, e->pcxt);
+ break;
+
default:
break;
}
@@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate,
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
+ break;
+
default:
break;
}
@@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
}
/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
- ReinitializeParallelDSM(pei->pcxt);
- pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
- pei->finished = false;
-}
-
-/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
ExecParallelInitializeDSM(planstate, &d);
/*
- * Make sure that the world hasn't shifted under our feat. This could
+ * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
@@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
}
/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei)
+{
+ /* Old workers must already be shut down */
+ Assert(pei->finished);
+
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
+
+ /* Traverse plan tree and let each child node reset associated state. */
+ ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt)
+{
+ if (planstate == NULL)
+ return false;
+
+ /*
+ * Call reinitializers for DSM-using plan nodes.
+ */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ if (planstate->plan->parallel_aware)
+ ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+ pcxt);
+ break;
+ case T_ForeignScanState:
+ if (planstate->plan->parallel_aware)
+ ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+ pcxt);
+ break;
+ case T_CustomScanState:
+ if (planstate->plan->parallel_aware)
+ ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+ pcxt);
+ break;
+ case T_BitmapHeapScanState:
+ if (planstate->plan->parallel_aware)
+ ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+ pcxt);
+ break;
+ case T_SortState:
+ /* even when not parallel-aware */
+ ExecSortReInitializeDSM((SortState *) planstate, pcxt);
+ break;
+
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
+/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/
@@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
- ExecBitmapHeapInitializeWorker(
- (BitmapHeapScanState *) planstate, toc);
+ ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc);
+ break;
+
default:
break;
}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 79f534e4e92..f7e55e0b45b 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->shared_tbmiterator = NULL;
node->shared_prefetch_iterator = NULL;
- /* Reset parallel bitmap state, if present */
- if (node->pstate)
- {
- dsa_area *dsa = node->ss.ps.state->es_query_dsa;
-
- node->pstate->state = BM_INITIAL;
-
- if (DsaPointerIsValid(node->pstate->tbmiterator))
- tbm_free_shared_area(dsa, node->pstate->tbmiterator);
-
- if (DsaPointerIsValid(node->pstate->prefetch_iterator))
- tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
-
- node->pstate->tbmiterator = InvalidDsaPointer;
- node->pstate->prefetch_iterator = InvalidDsaPointer;
- }
-
ExecScanReScan(&node->ss);
/*
@@ -1000,6 +983,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecBitmapHeapReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapHeapState *pstate = node->pstate;
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
+
+ pstate->state = BM_INITIAL;
+
+ if (DsaPointerIsValid(pstate->tbmiterator))
+ tbm_free_shared_area(dsa, pstate->tbmiterator);
+
+ if (DsaPointerIsValid(pstate->prefetch_iterator))
+ tbm_free_shared_area(dsa, pstate->prefetch_iterator);
+
+ pstate->tbmiterator = InvalidDsaPointer;
+ pstate->prefetch_iterator = InvalidDsaPointer;
+}
+
+/* ----------------------------------------------------------------
* ExecBitmapHeapInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index fb7645b1f46..07dcabef551 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -195,6 +195,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
}
void
+ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->ReInitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+ methods->ReInitializeDSMCustomScan(node, pcxt, coordinate);
+ }
+}
+
+void
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
{
const CustomExecMethods *methods = node->methods;
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 140e82ef5e4..20892d6d5fb 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -332,7 +332,28 @@ ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
}
/* ----------------------------------------------------------------
- * ExecForeignScanInitializeDSM
+ * ExecForeignScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->ReInitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+ fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeWorker
*
* Initialization according to the parallel coordination information
* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 58f88a5724d..f9cf1b2f875 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -153,12 +153,15 @@ ExecGather(PlanState *pstate)
{
ParallelContext *pcxt;
- /* Initialize the workers required to execute Gather node. */
+ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
gather->num_workers,
node->tuples_needed);
+ else
+ ExecParallelReinitialize(node->ps.lefttree,
+ node->pei);
/*
* Register backend workers. We might not get as many as we
@@ -426,7 +429,7 @@ ExecShutdownGather(GatherState *node)
/* ----------------------------------------------------------------
* ExecReScanGather
*
- * Re-initialize the workers and rescans a relation via them.
+ * Prepare to re-scan the result of a Gather.
* ----------------------------------------------------------------
*/
void
@@ -435,19 +438,12 @@ ExecReScanGather(GatherState *node)
Gather *gather = (Gather *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
- /*
- * Re-initialize the parallel workers to perform rescan of relation. We
- * want to gracefully shutdown all the workers so that they should be able
- * to propagate any error or other information to master backend before
- * dying. Parallel context will be reused for rescan.
- */
+ /* Make sure any existing workers are gracefully shut down */
ExecShutdownGatherWorkers(node);
+ /* Mark node so that shared state will be rebuilt at next call */
node->initialized = false;
- if (node->pei)
- ExecParallelReinitialize(node->pei);
-
/*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
@@ -459,10 +455,15 @@ ExecReScanGather(GatherState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gather->rescan_param);
-
/*
- * if chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode. Note: because this does nothing if we have a
+ * rescan_param, it's currently guaranteed that parallel-aware child nodes
+ * will not see a ReScan call until after they get a ReInitializeDSM call.
+ * That ordering might not be something to rely on, though. A good rule
+ * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+ * should reset only local state, and anything that depends on both of
+ * those steps being finished must wait until the first ExecProcNode call.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index f50841699c4..0bd5da38b4a 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -187,12 +187,15 @@ ExecGatherMerge(PlanState *pstate)
{
ParallelContext *pcxt;
- /* Initialize data structures for workers. */
+ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
gm->num_workers,
node->tuples_needed);
+ else
+ ExecParallelReinitialize(node->ps.lefttree,
+ node->pei);
/* Try to launch workers. */
pcxt = node->pei->pcxt;
@@ -321,7 +324,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
/* ----------------------------------------------------------------
* ExecReScanGatherMerge
*
- * Re-initialize the workers and rescans a relation via them.
+ * Prepare to re-scan the result of a GatherMerge.
* ----------------------------------------------------------------
*/
void
@@ -330,20 +333,13 @@ ExecReScanGatherMerge(GatherMergeState *node)
GatherMerge *gm = (GatherMerge *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
- /*
- * Re-initialize the parallel workers to perform rescan of relation. We
- * want to gracefully shutdown all the workers so that they should be able
- * to propagate any error or other information to master backend before
- * dying. Parallel context will be reused for rescan.
- */
+ /* Make sure any existing workers are gracefully shut down */
ExecShutdownGatherMergeWorkers(node);
+ /* Mark node so that shared state will be rebuilt at next call */
node->initialized = false;
node->gm_initialized = false;
- if (node->pei)
- ExecParallelReinitialize(node->pei);
-
/*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
@@ -355,10 +351,15 @@ ExecReScanGatherMerge(GatherMergeState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gm->rescan_param);
-
/*
- * if chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode. Note: because this does nothing if we have a
+ * rescan_param, it's currently guaranteed that parallel-aware child nodes
+ * will not see a ReScan call until after they get a ReInitializeDSM call.
+ * That ordering might not be something to rely on, though. A good rule
+ * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+ * should reset only local state, and anything that depends on both of
+ * those steps being finished must wait until the first ExecProcNode call.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index fe7ba3f1a4f..5351cb8981e 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -25,6 +25,7 @@
* parallel index-only scan
* ExecIndexOnlyScanInitializeDSM initialize DSM for parallel
* index-only scan
+ * ExecIndexOnlyScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -336,16 +337,6 @@ ExecIndexOnlyScan(PlanState *pstate)
void
ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
{
- bool reset_parallel_scan = true;
-
- /*
- * If we are here to just update the scan keys, then don't reset parallel
- * scan. For detailed reason behind this look in the comments for
- * ExecReScanIndexScan.
- */
- if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
- reset_parallel_scan = false;
-
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -366,15 +357,10 @@ ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
/* reset index scan */
if (node->ioss_ScanDesc)
- {
-
index_rescan(node->ioss_ScanDesc,
node->ioss_ScanKeys, node->ioss_NumScanKeys,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
- if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan)
- index_parallelrescan(node->ioss_ScanDesc);
- }
ExecScanReScan(&node->ss);
}
@@ -672,6 +658,19 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecIndexOnlyScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt)
+{
+ index_parallelrescan(node->ioss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
* ExecIndexOnlyScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 404076d5930..638b17b07cb 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -24,6 +24,7 @@
* ExecIndexRestrPos restores scan position.
* ExecIndexScanEstimate estimates DSM space needed for parallel index scan
* ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
+ * ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -577,18 +578,6 @@ ExecIndexScan(PlanState *pstate)
void
ExecReScanIndexScan(IndexScanState *node)
{
- bool reset_parallel_scan = true;
-
- /*
- * If we are here to just update the scan keys, then don't reset parallel
- * scan. We don't want each of the participating process in the parallel
- * scan to update the shared parallel scan state at the start of the scan.
- * It is quite possible that one of the participants has already begun
- * scanning the index when another has yet to start it.
- */
- if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
- reset_parallel_scan = false;
-
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -614,21 +603,11 @@ ExecReScanIndexScan(IndexScanState *node)
reorderqueue_pop(node);
}
- /*
- * Reset (parallel) index scan. For parallel-aware nodes, the scan
- * descriptor is initialized during actual execution of node and we can
- * reach here before that (ex. during execution of nest loop join). So,
- * avoid updating the scan descriptor at that time.
- */
+ /* reset index scan */
if (node->iss_ScanDesc)
- {
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
-
- if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan)
- index_parallelrescan(node->iss_ScanDesc);
- }
node->iss_ReachedEnd = false;
ExecScanReScan(&node->ss);
@@ -1717,6 +1696,19 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecIndexScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexScanReInitializeDSM(IndexScanState *node,
+ ParallelContext *pcxt)
+{
+ index_parallelrescan(node->iss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
* ExecIndexScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 5c49d4ca8a9..d4ac939c9b0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -22,6 +22,7 @@
*
* ExecSeqScanEstimate estimates DSM space needed for parallel scan
* ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ * ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -325,6 +326,21 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
}
/* ----------------------------------------------------------------
+ * ExecSeqScanReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanReInitializeDSM(SeqScanState *node,
+ ParallelContext *pcxt)
+{
+ HeapScanDesc scan = node->ss.ss_currentScanDesc;
+
+ heap_parallelscan_reinitialize(scan->rs_parallel);
+}
+
+/* ----------------------------------------------------------------
* ExecSeqScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 66ef109c122..98bcaeb66f5 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -397,6 +397,23 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
}
/* ----------------------------------------------------------------
+ * ExecSortReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
+{
+ /* If there's any instrumentation space, clear it for next time */
+ if (node->shared_info != NULL)
+ {
+ memset(node->shared_info->sinstrument, 0,
+ node->shared_info->num_workers * sizeof(TuplesortInstrumentation));
+ }
+}
+
+/* ----------------------------------------------------------------
* ExecSortInitializeWorker
*
* Attach worker to DSM space for sort statistics.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b2132e723ed..4e41024e926 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -130,6 +130,7 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
extern Size heap_parallelscan_estimate(Snapshot snapshot);
extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
Relation relation, Snapshot snapshot);
+extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
extern bool heap_fetch(Relation relation, Snapshot snapshot,
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 79b886706f7..1cb895d8984 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -36,7 +36,8 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
+extern void ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei);
extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h
index c77694cf22f..10844a405a5 100644
--- a/src/include/executor/nodeBitmapHeapscan.h
+++ b/src/include/executor/nodeBitmapHeapscan.h
@@ -24,6 +24,8 @@ extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt);
+extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
shm_toc *toc);
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index a1cc63ae1f3..25767b6a4a5 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -34,6 +34,8 @@ extern void ExecCustomScanEstimate(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeDSM(CustomScanState *node,
ParallelContext *pcxt);
+extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc);
extern void ExecShutdownCustomScan(CustomScanState *node);
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 0b662597d8f..0354c2c4308 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -25,6 +25,8 @@ extern void ExecForeignScanEstimate(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt);
+extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc);
extern void ExecShutdownForeignScan(ForeignScanState *node);
diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h
index c8a709c26ed..690b5dbfe59 100644
--- a/src/include/executor/nodeIndexonlyscan.h
+++ b/src/include/executor/nodeIndexonlyscan.h
@@ -28,6 +28,8 @@ extern void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt);
+extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
shm_toc *toc);
diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h
index 1668e347eef..0670e87e395 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -24,6 +24,7 @@ extern void ExecIndexRestrPos(IndexScanState *node);
extern void ExecReScanIndexScan(IndexScanState *node);
extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
+extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
/*
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index 0fba79f8de6..eb96799cade 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -24,6 +24,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
/* parallel scan support */
extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
#endif /* NODESEQSCAN_H */
diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h
index 77ac06597ff..1ab8f767210 100644
--- a/src/include/executor/nodeSort.h
+++ b/src/include/executor/nodeSort.h
@@ -26,6 +26,7 @@ extern void ExecReScanSort(SortState *node);
/* parallel instrumentation support */
extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
+extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc);
extern void ExecSortRetrieveInstrumentation(SortState *node);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e391f20fb86..ef0fbe6f9c6 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -148,6 +148,9 @@ typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt,
void *coordinate);
+typedef void (*ReInitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc,
void *coordinate);
@@ -224,6 +227,7 @@ typedef struct FdwRoutine
IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
ShutdownForeignScan_function ShutdownForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h
index 7325bf536af..0654e79c7ba 100644
--- a/src/include/nodes/extensible.h
+++ b/src/include/nodes/extensible.h
@@ -136,6 +136,9 @@ typedef struct CustomExecMethods
void (*InitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt,
void *coordinate);
+ void (*ReInitializeDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc,
void *coordinate);