diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execMain.c | 10 | ||||
-rw-r--r-- | src/backend/executor/execTuples.c | 2 | ||||
-rw-r--r-- | src/backend/executor/functions.c | 6 | ||||
-rw-r--r-- | src/backend/executor/spi.c | 4 | ||||
-rw-r--r-- | src/backend/executor/tqueue.c | 14 | ||||
-rw-r--r-- | src/backend/executor/tstoreReceiver.c | 12 |
6 files changed, 37 insertions, 11 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ac0230411c3..b5ced388d20 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate, * practice, this is probably always the case at this point.) */ if (sendTuples) - (*dest->receiveSlot) (slot, dest); + { + /* + * If we are not able to send the tuple, we assume the destination + * has closed and no more tuples can be sent. If that's the case, + * end the loop. + */ + if (!((*dest->receiveSlot) (slot, dest))) + break; + } /* * Count tuples processed, if this is a SELECT. (For other operation diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 2b81f60a519..533050dc859 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull) ExecStoreVirtualTuple(slot); /* send the tuple to the receiver */ - (*tstate->dest->receiveSlot) (slot, tstate->dest); + (void) (*tstate->dest->receiveSlot) (slot, tstate->dest); /* clean up */ ExecClearTuple(slot); diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 6e14c9d2967..cd93c045dcb 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot, static void sql_exec_error_callback(void *arg); static void ShutdownSQLFunction(Datum arg); static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); static void sqlfunction_shutdown(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self); @@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * sqlfunction_receive --- receive one tuple */ -static void +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) { DR_sqlfunction *myState = (DR_sqlfunction *) self; @@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) /* Store the filtered tuple into the tuplestore */ tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 23cb6f407dd..7ccabdb44b2 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * store tuple retrieved by Executor into SPITupleTable * of current SPI procedure */ -void +bool spi_printtup(TupleTableSlot *slot, DestReceiver *self) { SPITupleTable *tuptable; @@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self) (tuptable->free)--; MemoryContextSwitchTo(oldcxt); + + return true; } /* diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 383b5352cba..8abb1f16e45 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); * type over a range type over a range type over an array type over a record, * or something like that. */ -static void +static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TupleDesc tupledesc = slot->tts_tupleDescriptor; HeapTuple tuple; + shm_mq_result result; /* * Test to see whether the tupledesc has changed; if so, set up for the @@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) } /* Send the tuple itself. */ - shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + + if (result == SHM_MQ_DETACHED) + return false; + else if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to send tuples"))); + + return true; } /* diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c index 516440ad32e..8f1e1b3f50c 100644 --- a/src/backend/executor/tstoreReceiver.c +++ b/src/backend/executor/tstoreReceiver.c @@ -37,8 +37,8 @@ typedef struct } TStoreState; -static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); -static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); /* @@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) * Receive a tuple from the executor and store it in the tuplestore. * This is for the easy case where we don't have to detoast. */ -static void +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* * Receive a tuple from the executor and store it in the tuplestore. * This is for the case where we have to detoast any toasted values. */ -static void +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; @@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) /* And release any temporary detoasted values */ for (i = 0; i < nfree; i++) pfree(DatumGetPointer(myState->tofree[i])); + + return true; } /* |