diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-06-06 14:52:58 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-06-06 14:52:58 -0400 |
commit | c6dbf1fe79287291bc260cbc06b0de419d2a198c (patch) | |
tree | 8edc5d8f7d13201c73d90f0c217f6e00a27e5099 /src/backend/executor | |
parent | 44339b892a04e94bbb472235882dc6f7023bdc65 (diff) | |
download | postgresql-c6dbf1fe79287291bc260cbc06b0de419d2a198c.tar.gz postgresql-c6dbf1fe79287291bc260cbc06b0de419d2a198c.zip |
Stop the executor if no more tuples can be sent from worker to leader.
If a Gather node has read as many tuples as it needs (for example, due
to Limit) it may detach the queue connecting it to the worker before
reading all of the worker's tuples. Rather than let the worker
continue to generate and send all of the results, have it stop after
sending the next tuple.
More could be done here to stop the worker even quicker, but this is
about as well as we can hope to do for 9.6.
This is in response to a problem report from Andreas Seltenreich.
Commit 44339b892a04e94bbb472235882dc6f7023bdc65 should be actually be
sufficient to fix that example even without this change, but it seems
better to do this, too, since we might otherwise waste quite a large
amount of effort in one or more workers.
Discussion: CAA4eK1KOKGqmz9bGu+Z42qhRwMbm4R5rfnqsLCNqFs9j14jzEA@mail.gmail.com
Amit Kapila
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execMain.c | 10 | ||||
-rw-r--r-- | src/backend/executor/execTuples.c | 2 | ||||
-rw-r--r-- | src/backend/executor/functions.c | 6 | ||||
-rw-r--r-- | src/backend/executor/spi.c | 4 | ||||
-rw-r--r-- | src/backend/executor/tqueue.c | 14 | ||||
-rw-r--r-- | src/backend/executor/tstoreReceiver.c | 12 |
6 files changed, 37 insertions, 11 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ac0230411c3..b5ced388d20 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate, * practice, this is probably always the case at this point.) */ if (sendTuples) - (*dest->receiveSlot) (slot, dest); + { + /* + * If we are not able to send the tuple, we assume the destination + * has closed and no more tuples can be sent. If that's the case, + * end the loop. + */ + if (!((*dest->receiveSlot) (slot, dest))) + break; + } /* * Count tuples processed, if this is a SELECT. (For other operation diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 2b81f60a519..533050dc859 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull) ExecStoreVirtualTuple(slot); /* send the tuple to the receiver */ - (*tstate->dest->receiveSlot) (slot, tstate->dest); + (void) (*tstate->dest->receiveSlot) (slot, tstate->dest); /* clean up */ ExecClearTuple(slot); diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 6e14c9d2967..cd93c045dcb 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot, static void sql_exec_error_callback(void *arg); static void ShutdownSQLFunction(Datum arg); static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); static void sqlfunction_shutdown(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self); @@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * sqlfunction_receive --- receive one tuple */ -static void +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) { DR_sqlfunction *myState = (DR_sqlfunction *) self; @@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) /* Store the filtered tuple into the tuplestore */ tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 23cb6f407dd..7ccabdb44b2 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * store tuple retrieved by Executor into SPITupleTable * of current SPI procedure */ -void +bool spi_printtup(TupleTableSlot *slot, DestReceiver *self) { SPITupleTable *tuptable; @@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self) (tuptable->free)--; MemoryContextSwitchTo(oldcxt); + + return true; } /* diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 383b5352cba..8abb1f16e45 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); * type over a range type over a range type over an array type over a record, * or something like that. */ -static void +static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TupleDesc tupledesc = slot->tts_tupleDescriptor; HeapTuple tuple; + shm_mq_result result; /* * Test to see whether the tupledesc has changed; if so, set up for the @@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) } /* Send the tuple itself. */ - shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + + if (result == SHM_MQ_DETACHED) + return false; + else if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to send tuples"))); + + return true; } /* diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c index 516440ad32e..8f1e1b3f50c 100644 --- a/src/backend/executor/tstoreReceiver.c +++ b/src/backend/executor/tstoreReceiver.c @@ -37,8 +37,8 @@ typedef struct } TStoreState; -static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); -static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); /* @@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) * Receive a tuple from the executor and store it in the tuplestore. * This is for the easy case where we don't have to detoast. */ -static void +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* * Receive a tuple from the executor and store it in the tuplestore. * This is for the case where we have to detoast any toasted values. */ -static void +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; @@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) /* And release any temporary detoasted values */ for (i = 0; i < nfree; i++) pfree(DatumGetPointer(myState->tofree[i])); + + return true; } /* |