aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/auto_explain/auto_explain.c2
-rw-r--r--contrib/pg_stat_statements/pg_stat_statements.c2
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out39
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c9
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql21
-rw-r--r--doc/src/sgml/fdwhandler.sgml2
-rw-r--r--src/backend/executor/execAsync.c30
-rw-r--r--src/backend/executor/execMain.c2
-rw-r--r--src/backend/executor/execProcnode.c3
-rw-r--r--src/backend/executor/instrument.c21
-rw-r--r--src/backend/executor/nodeAppend.c12
-rw-r--r--src/backend/executor/nodeForeignscan.c7
-rw-r--r--src/include/executor/instrument.h5
-rw-r--r--src/include/nodes/execnodes.h5
14 files changed, 134 insertions, 26 deletions
diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 445bb371912..e9092ba359a 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
- queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
+ queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
MemoryContextSwitchTo(oldcxt);
}
}
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index f42f07622e9..77ca5abcdc8 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
- queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
+ queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
MemoryContextSwitchTo(oldcxt);
}
}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 6f533c745d6..0b0c45f0d9a 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
Filter: (t1_3.b === 505)
(14 rows)
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+ QUERY PLAN
+-------------------------------------------------------------------------
+ Limit (actual rows=1 loops=1)
+ -> Append (actual rows=1 loops=1)
+ -> Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1)
+ Filter: (b === 505)
+ -> Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1)
+ Filter: (b === 505)
+ -> Seq Scan on async_p3 t1_3 (actual rows=1 loops=1)
+ Filter: (b === 505)
+ Rows Removed by Filter: 101
+(9 rows)
+
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
a | b | c
------+-----+------
@@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1;
(3 rows)
DELETE FROM join_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
RESET enable_mergejoin;
RESET enable_hashjoin;
+-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
+DELETE FROM async_p1;
+DELETE FROM async_p2;
+DELETE FROM async_p3;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt;
+ QUERY PLAN
+-------------------------------------------------------------------------
+ Append (actual rows=0 loops=1)
+ -> Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1)
+ -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1)
+ -> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1)
+(4 rows)
+
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
-DROP TABLE local_tbl;
-DROP FOREIGN TABLE remote_tbl;
-DROP FOREIGN TABLE insert_tbl;
-DROP TABLE base_tbl3;
-DROP TABLE base_tbl4;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4ff58d9c275..ee93ee07cc4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_values);
/* Set the async-capable flag */
- fsstate->async_capable = node->ss.ps.plan->async_capable;
+ fsstate->async_capable = node->ss.ps.async_capable;
}
/*
@@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
}
/* Get a tuple from the ForeignScan node */
- result = ExecProcNode((PlanState *) node);
+ result = areq->requestee->ExecProcNodeReal(areq->requestee);
if (!TupIsNull(result))
{
/* Mark the request as complete */
@@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq)
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
ExecAsyncResponse(areq);
+ /* Also, we do instrumentation ourselves, if required */
+ if (areq->requestee->instrument)
+ InstrUpdateTupleCount(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
+
MemoryContextSwitchTo(oldcontext);
}
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 000e2534fc8..53adfe2abc8 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
-- Check with foreign modify
@@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND
SELECT * FROM join_tbl ORDER BY a1;
DELETE FROM join_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+
RESET enable_mergejoin;
RESET enable_hashjoin;
+-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
+DELETE FROM async_p1;
+DELETE FROM async_p2;
+DELETE FROM async_p3;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt;
+
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
-DROP TABLE local_tbl;
-DROP FOREIGN TABLE remote_tbl;
-DROP FOREIGN TABLE insert_tbl;
-DROP TABLE base_tbl3;
-DROP TABLE base_tbl4;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 8aa7edfe4af..d1194def820 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq);
<literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
for the <structname>ForeignScan</structname> node to get a callback from
the callback functions described below. If no more tuples are available,
- set the slot to NULL, and the
+ set the slot to NULL or an empty slot, and the
<literal>areq-&gt;request_complete</literal> flag to
<literal>true</literal>. It's recommended to use
<function>ExecAsyncRequestDone</function> or
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index f1985e658c4..75108d36be2 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "executor/execAsync.h"
+#include "executor/executor.h"
#include "executor/nodeAppend.h"
#include "executor/nodeForeignscan.h"
@@ -24,6 +25,13 @@
void
ExecAsyncRequest(AsyncRequest *areq)
{
+ if (areq->requestee->chgParam != NULL) /* something changed? */
+ ExecReScan(areq->requestee); /* let ReScan handle this */
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*
@@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
void
ExecAsyncConfigureWait(AsyncRequest *areq)
{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument, 0.0);
}
/*
@@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
void
ExecAsyncNotify(AsyncRequest *areq)
{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index df3d7f9a8bc..58b49687350 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_TrigWhenExprs = (ExprState **)
palloc0(n * sizeof(ExprState *));
if (instrument_options)
- resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options);
+ resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
}
else
{
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 9f8c7582e04..753f46863b7 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
/* Set up instrumentation for this node if requested */
if (estate->es_instrument)
- result->instrument = InstrAlloc(1, estate->es_instrument);
+ result->instrument = InstrAlloc(1, estate->es_instrument,
+ result->async_capable);
return result;
}
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 237e13361b5..2b106d8473c 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* Allocate new instrumentation structure(s) */
Instrumentation *
-InstrAlloc(int n, int instrument_options)
+InstrAlloc(int n, int instrument_options, bool async_mode)
{
Instrumentation *instr;
@@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
+ instr[i].async_mode = async_mode;
}
}
@@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
void
InstrStopNode(Instrumentation *instr, double nTuples)
{
+ double save_tuplecount = instr->tuplecount;
instr_time endtime;
/* count the returned tuples */
@@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
instr->running = true;
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
+ else
+ {
+ /*
+ * In async mode, if the plan node hadn't emitted any tuples before,
+ * this might be the first tuple
+ */
+ if (instr->async_mode && save_tuplecount < 1.0)
+ instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
+ }
+}
+
+/* Update tuple count */
+void
+InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
+{
+ /* count the returned tuples */
+ instr->tuplecount += nTuples;
}
/* Finish a run cycle for a plan node */
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 3c1f12adafb..1558fafad1e 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate)
}
/*
- * wait or poll async events if any. We do this before checking for
- * the end of iteration, because it might drain the remaining async
- * subplans.
+ * wait or poll for async events if any. We do this before checking
+ * for the end of iteration, because it might drain the remaining
+ * async subplans.
*/
if (node->as_nasyncremain > 0)
ExecAppendAsyncEventWait(node);
@@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node)
/*
* If chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
+ * first ExecProcNode or by first ExecAsyncRequest.
*/
if (subnode->chgParam == NULL)
ExecReScan(subnode);
@@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
{
CHECK_FOR_INTERRUPTS();
- /* Wait or poll async events. */
+ /* Wait or poll for async events. */
ExecAppendAsyncEventWait(node);
/* Request a tuple asynchronously. */
@@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
/* Nothing to do if the request is pending. */
if (!areq->request_complete)
{
- /* The request would have been pending for a callback */
+ /* The request would have been pending for a callback. */
Assert(areq->callback_pending);
return;
}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 898890fb08f..9dc38d47ea7 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -210,6 +210,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
/*
+ * Determine whether to scan the foreign relation asynchronously or not;
+ * this has to be kept in sync with the code in ExecInitAppend().
+ */
+ scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
+ estate->es_epq_active == NULL);
+
+ /*
* Initialize FDW-related state.
*/
scanstate->fdwroutine = fdwroutine;
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index c25aa1b04c2..fc87eed4fb2 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -55,6 +55,7 @@ typedef struct Instrumentation
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
bool need_walusage; /* true if we need WAL usage data */
+ bool async_mode; /* true if node is in async mode */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
instr_time starttime; /* start time of current iteration of node */
@@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation
extern PGDLLIMPORT BufferUsage pgBufferUsage;
extern PGDLLIMPORT WalUsage pgWalUsage;
-extern Instrumentation *InstrAlloc(int n, int instrument_options);
+extern Instrumentation *InstrAlloc(int n, int instrument_options,
+ bool async_mode);
extern void InstrInit(Instrumentation *instr, int instrument_options);
extern void InstrStartNode(Instrumentation *instr);
extern void InstrStopNode(Instrumentation *instr, double nTuples);
+extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e7ae21c023c..91a1c3a780e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -538,7 +538,8 @@ typedef struct AsyncRequest
int request_index; /* Scratch space for requestor */
bool callback_pending; /* Callback is needed */
bool request_complete; /* Request complete, result valid */
- TupleTableSlot *result; /* Result (NULL if no more tuples) */
+ TupleTableSlot *result; /* Result (NULL or an empty slot if no more
+ * tuples) */
} AsyncRequest;
/* ----------------
@@ -1003,6 +1004,8 @@ typedef struct PlanState
ExprContext *ps_ExprContext; /* node's expression-evaluation context */
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
+ bool async_capable; /* true if node is async-capable */
+
/*
* Scanslot's descriptor if known. This is a bit of a hack, but otherwise
* it's hard for expression compilation to optimize based on the