diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execAsync.c | 30 | ||||
-rw-r--r-- | src/backend/executor/execMain.c | 2 | ||||
-rw-r--r-- | src/backend/executor/execProcnode.c | 3 | ||||
-rw-r--r-- | src/backend/executor/instrument.c | 21 | ||||
-rw-r--r-- | src/backend/executor/nodeAppend.c | 12 | ||||
-rw-r--r-- | src/backend/executor/nodeForeignscan.c | 7 |
6 files changed, 66 insertions, 9 deletions
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index f1985e658c4..75108d36be2 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "executor/execAsync.h" +#include "executor/executor.h" #include "executor/nodeAppend.h" #include "executor/nodeForeignscan.h" @@ -24,6 +25,13 @@ void ExecAsyncRequest(AsyncRequest *areq) { + if (areq->requestee->chgParam != NULL) /* something changed? */ + ExecReScan(areq->requestee); /* let ReScan handle this */ + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* @@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq) void ExecAsyncConfigureWait(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) elog(ERROR, "unrecognized node type: %d", (int) nodeTag(areq->requestee)); } + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, 0.0); } /* @@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) void ExecAsyncNotify(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index df3d7f9a8bc..58b49687350 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_TrigWhenExprs = (ExprState **) palloc0(n * sizeof(ExprState *)); if (instrument_options) - resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options); + resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false); } else { diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 9f8c7582e04..753f46863b7 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags) /* Set up instrumentation for this node if requested */ if (estate->es_instrument) - result->instrument = InstrAlloc(1, estate->es_instrument); + result->instrument = InstrAlloc(1, estate->es_instrument, + result->async_capable); return result; } diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 237e13361b5..2b106d8473c 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add); /* Allocate new instrumentation structure(s) */ Instrumentation * -InstrAlloc(int n, int instrument_options) +InstrAlloc(int n, int instrument_options, bool async_mode) { Instrumentation *instr; @@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options) instr[i].need_bufusage = need_buffers; instr[i].need_walusage = need_wal; instr[i].need_timer = need_timer; + instr[i].async_mode = async_mode; } } @@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr) void InstrStopNode(Instrumentation *instr, double nTuples) { + double save_tuplecount = instr->tuplecount; instr_time endtime; /* count the returned tuples */ @@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples) instr->running = true; instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); } + else + { + /* + * In async mode, if the plan node hadn't emitted any tuples before, + * this might be the first tuple + */ + if (instr->async_mode && save_tuplecount < 1.0) + instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); + } +} + +/* Update tuple count */ +void +InstrUpdateTupleCount(Instrumentation *instr, double nTuples) +{ + /* count the returned tuples */ + instr->tuplecount += nTuples; } /* Finish a run cycle for a plan node */ diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 3c1f12adafb..1558fafad1e 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate) } /* - * wait or poll async events if any. We do this before checking for - * the end of iteration, because it might drain the remaining async - * subplans. + * wait or poll for async events if any. We do this before checking + * for the end of iteration, because it might drain the remaining + * async subplans. */ if (node->as_nasyncremain > 0) ExecAppendAsyncEventWait(node); @@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node) /* * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * first ExecProcNode or by first ExecAsyncRequest. */ if (subnode->chgParam == NULL) ExecReScan(subnode); @@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) { CHECK_FOR_INTERRUPTS(); - /* Wait or poll async events. */ + /* Wait or poll for async events. */ ExecAppendAsyncEventWait(node); /* Request a tuple asynchronously. */ @@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq) /* Nothing to do if the request is pending. */ if (!areq->request_complete) { - /* The request would have been pending for a callback */ + /* The request would have been pending for a callback. */ Assert(areq->callback_pending); return; } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 898890fb08f..9dc38d47ea7 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -210,6 +210,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); /* + * Determine whether to scan the foreign relation asynchronously or not; + * this has to be kept in sync with the code in ExecInitAppend(). + */ + scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && + estate->es_epq_active == NULL); + + /* * Initialize FDW-related state. */ scanstate->fdwroutine = fdwroutine; |