aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c26
-rw-r--r--src/backend/executor/nodeCustom.c45
-rw-r--r--src/backend/executor/nodeForeignscan.c62
3 files changed, 133 insertions, 0 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 29e450a571c..95e8e41d2bb 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,8 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
@@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanEstimate((ForeignScanState *) planstate,
+ e->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanEstimate((CustomScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+ d->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+ d->pcxt);
+ break;
default:
break;
}
@@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+ toc);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+ toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 640289e2773..322abca282a 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -10,6 +10,7 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->EstimateDSMCustomScan)
+ {
+ node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeWorkerCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ methods->InitializeWorkerCustomScan(node, toc, coordinate);
+ }
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 64a07bcc771..388c9227498 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
ExecScanReScan(&node->ss);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanEstimate
+ *
+ * Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->EstimateDSMForeignScan)
+ {
+ node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeWorkerForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+ }
+}