diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 26 | ||||
-rw-r--r-- | src/backend/executor/nodeCustom.c | 45 | ||||
-rw-r--r-- | src/backend/executor/nodeForeignscan.c | 62 |
3 files changed, 133 insertions, 0 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 29e450a571c..95e8e41d2bb 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,8 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeCustom.h" +#include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecSeqScanEstimate((SeqScanState *) planstate, e->pcxt); break; + case T_ForeignScanState: + ExecForeignScanEstimate((ForeignScanState *) planstate, + e->pcxt); + break; + case T_CustomScanState: + ExecCustomScanEstimate((CustomScanState *) planstate, + e->pcxt); + break; default: break; } @@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); break; + case T_ForeignScanState: + ExecForeignScanInitializeDSM((ForeignScanState *) planstate, + d->pcxt); + break; + case T_CustomScanState: + ExecCustomScanInitializeDSM((CustomScanState *) planstate, + d->pcxt); + break; default: break; } @@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); break; + case T_ForeignScanState: + ExecForeignScanInitializeWorker((ForeignScanState *) planstate, + toc); + break; + case T_CustomScanState: + ExecCustomScanInitializeWorker((CustomScanState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 640289e2773..322abca282a 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/nodeCustom.h" #include "nodes/execnodes.h" @@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node) node->methods->CustomName))); node->methods->RestrPosCustomScan(node); } + +void +ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomScan) + { + node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + methods->InitializeWorkerCustomScan(node, toc, coordinate); + } +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 64a07bcc771..388c9227498 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node) ExecScanReScan(&node->ss); } + +/* ---------------------------------------------------------------- + * ExecForeignScanEstimate + * + * Informs size of the parallel coordination information, if any + * ---------------------------------------------------------------- + */ +void +ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->EstimateDSMForeignScan) + { + node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialize the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeDSMForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialization according to the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeWorkerForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); + } +} |