aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execAsync.c30
-rw-r--r--src/backend/executor/execMain.c2
-rw-r--r--src/backend/executor/execProcnode.c3
-rw-r--r--src/backend/executor/instrument.c21
-rw-r--r--src/backend/executor/nodeAppend.c12
-rw-r--r--src/backend/executor/nodeForeignscan.c7
6 files changed, 66 insertions, 9 deletions
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index f1985e658c4..75108d36be2 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "executor/execAsync.h"
+#include "executor/executor.h"
#include "executor/nodeAppend.h"
#include "executor/nodeForeignscan.h"
@@ -24,6 +25,13 @@
void
ExecAsyncRequest(AsyncRequest *areq)
{
+ if (areq->requestee->chgParam != NULL) /* something changed? */
+ ExecReScan(areq->requestee); /* let ReScan handle this */
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*
@@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
void
ExecAsyncConfigureWait(AsyncRequest *areq)
{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument, 0.0);
}
/*
@@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
void
ExecAsyncNotify(AsyncRequest *areq)
{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index df3d7f9a8bc..58b49687350 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_TrigWhenExprs = (ExprState **)
palloc0(n * sizeof(ExprState *));
if (instrument_options)
- resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options);
+ resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
}
else
{
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 9f8c7582e04..753f46863b7 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
/* Set up instrumentation for this node if requested */
if (estate->es_instrument)
- result->instrument = InstrAlloc(1, estate->es_instrument);
+ result->instrument = InstrAlloc(1, estate->es_instrument,
+ result->async_capable);
return result;
}
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 237e13361b5..2b106d8473c 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* Allocate new instrumentation structure(s) */
Instrumentation *
-InstrAlloc(int n, int instrument_options)
+InstrAlloc(int n, int instrument_options, bool async_mode)
{
Instrumentation *instr;
@@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
+ instr[i].async_mode = async_mode;
}
}
@@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
void
InstrStopNode(Instrumentation *instr, double nTuples)
{
+ double save_tuplecount = instr->tuplecount;
instr_time endtime;
/* count the returned tuples */
@@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
instr->running = true;
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
+ else
+ {
+ /*
+ * In async mode, if the plan node hadn't emitted any tuples before,
+ * this might be the first tuple
+ */
+ if (instr->async_mode && save_tuplecount < 1.0)
+ instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
+ }
+}
+
+/* Update tuple count */
+void
+InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
+{
+ /* count the returned tuples */
+ instr->tuplecount += nTuples;
}
/* Finish a run cycle for a plan node */
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 3c1f12adafb..1558fafad1e 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate)
}
/*
- * wait or poll async events if any. We do this before checking for
- * the end of iteration, because it might drain the remaining async
- * subplans.
+ * wait or poll for async events if any. We do this before checking
+ * for the end of iteration, because it might drain the remaining
+ * async subplans.
*/
if (node->as_nasyncremain > 0)
ExecAppendAsyncEventWait(node);
@@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node)
/*
* If chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * first ExecProcNode or by first ExecAsyncRequest.
*/
if (subnode->chgParam == NULL)
ExecReScan(subnode);
@@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
{
CHECK_FOR_INTERRUPTS();
- /* Wait or poll async events. */
+ /* Wait or poll for async events. */
ExecAppendAsyncEventWait(node);
/* Request a tuple asynchronously. */
@@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
/* Nothing to do if the request is pending. */
if (!areq->request_complete)
{
- /* The request would have been pending for a callback */
+ /* The request would have been pending for a callback. */
Assert(areq->callback_pending);
return;
}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 898890fb08f..9dc38d47ea7 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -210,6 +210,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
/*
+ * Determine whether to scan the foreign relation asynchronously or not;
+ * this has to be kept in sync with the code in ExecInitAppend().
+ */
+ scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
+ estate->es_epq_active == NULL);
+
+ /*
* Initialize FDW-related state.
*/
scanstate->fdwroutine = fdwroutine;