aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/common/printtup.c22
-rw-r--r--src/backend/commands/copy.c4
-rw-r--r--src/backend/commands/createas.c6
-rw-r--r--src/backend/commands/matview.c6
-rw-r--r--src/backend/executor/execMain.c10
-rw-r--r--src/backend/executor/execTuples.c2
-rw-r--r--src/backend/executor/functions.c6
-rw-r--r--src/backend/executor/spi.c4
-rw-r--r--src/backend/executor/tqueue.c14
-rw-r--r--src/backend/executor/tstoreReceiver.c12
-rw-r--r--src/backend/tcop/dest.c3
-rw-r--r--src/backend/tcop/pquery.c8
-rw-r--r--src/include/access/printtup.h4
-rw-r--r--src/include/tcop/dest.h6
14 files changed, 78 insertions, 29 deletions
diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c
index 1939ff5155b..d9664aa6c6b 100644
--- a/src/backend/access/common/printtup.c
+++ b/src/backend/access/common/printtup.c
@@ -26,9 +26,9 @@
static void printtup_startup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-static void printtup(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_20(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
static void printtup_shutdown(DestReceiver *self);
static void printtup_destroy(DestReceiver *self);
@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
* printtup --- print a tuple in protocol 3.0
* ----------------
*/
-static void
+static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
/* ----------------
* printtup_20 --- print a tuple in protocol 2.0
* ----------------
*/
-static void
+static bool
printtup_20(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
/* ----------------
@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
* debugtup - print one tuple for an interactive backend
* ----------------
*/
-void
+bool
debugtup(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
printatt((unsigned) i + 1, typeinfo->attrs[i], value);
}
printf("\t----\n");
+
+ return true;
}
/* ----------------
@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
* This is largely same as printtup_20, except we use binary formatting.
* ----------------
*/
-static void
+static bool
printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3201476c9e8..28dcd340017 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/*
* copy_dest_receive --- receive one tuple
*/
-static void
+static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
@@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
myState->processed++;
+
+ return true;
}
/*
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index cb7a145ee5d..5a853c48a8b 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -62,7 +62,7 @@ typedef struct
static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0};
static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
static void intorel_shutdown(DestReceiver *self);
static void intorel_destroy(DestReceiver *self);
@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/*
* intorel_receive --- receive one tuple
*/
-static void
+static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
+
+ return true;
}
/*
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index f00aab39e7b..62e61a26749 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -56,7 +56,7 @@ typedef struct
static int matview_maintenance_depth = 0;
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self);
static void refresh_matview_datafill(DestReceiver *dest, Query *query,
@@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/*
* transientrel_receive --- receive one tuple
*/
-static void
+static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
@@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
+
+ return true;
}
/*
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ac0230411c3..b5ced388d20 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate,
* practice, this is probably always the case at this point.)
*/
if (sendTuples)
- (*dest->receiveSlot) (slot, dest);
+ {
+ /*
+ * If we are not able to send the tuple, we assume the destination
+ * has closed and no more tuples can be sent. If that's the case,
+ * end the loop.
+ */
+ if (!((*dest->receiveSlot) (slot, dest)))
+ break;
+ }
/*
* Count tuples processed, if this is a SELECT. (For other operation
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 2b81f60a519..533050dc859 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
ExecStoreVirtualTuple(slot);
/* send the tuple to the receiver */
- (*tstate->dest->receiveSlot) (slot, tstate->dest);
+ (void) (*tstate->dest->receiveSlot) (slot, tstate->dest);
/* clean up */
ExecClearTuple(slot);
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 6e14c9d2967..cd93c045dcb 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot,
static void sql_exec_error_callback(void *arg);
static void ShutdownSQLFunction(Datum arg);
static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
static void sqlfunction_shutdown(DestReceiver *self);
static void sqlfunction_destroy(DestReceiver *self);
@@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/*
* sqlfunction_receive --- receive one tuple
*/
-static void
+static bool
sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_sqlfunction *myState = (DR_sqlfunction *) self;
@@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
/* Store the filtered tuple into the tuplestore */
tuplestore_puttupleslot(myState->tstore, slot);
+
+ return true;
}
/*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 23cb6f407dd..7ccabdb44b2 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* store tuple retrieved by Executor into SPITupleTable
* of current SPI procedure
*/
-void
+bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{
SPITupleTable *tuptable;
@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
(tuptable->free)--;
MemoryContextSwitchTo(oldcxt);
+
+ return true;
}
/*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 383b5352cba..8abb1f16e45 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
* type over a range type over a range type over an array type over a record,
* or something like that.
*/
-static void
+static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple;
+ shm_mq_result result;
/*
* Test to see whether the tupledesc has changed; if so, set up for the
@@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
}
/* Send the tuple itself. */
- shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+ result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+
+ if (result == SHM_MQ_DETACHED)
+ return false;
+ else if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to send tuples")));
+
+ return true;
}
/*
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 516440ad32e..8f1e1b3f50c 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -37,8 +37,8 @@ typedef struct
} TStoreState;
-static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
-static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
/*
@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the easy case where we don't have to detoast.
*/
-static void
+static bool
tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
tuplestore_puttupleslot(myState->tstore, slot);
+
+ return true;
}
/*
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the case where we have to detoast any toasted values.
*/
-static void
+static bool
tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
/* And release any temporary detoasted values */
for (i = 0; i < nfree; i++)
pfree(DatumGetPointer(myState->tofree[i]));
+
+ return true;
}
/*
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 2c7dc6e5267..de45cbc4fb8 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -45,9 +45,10 @@
* dummy DestReceiver functions
* ----------------
*/
-static void
+static bool
donothingReceive(TupleTableSlot *slot, DestReceiver *self)
{
+ return true;
}
static void
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index fcdc4c347c7..3f6cb12b4e5 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count,
if (!ok)
break;
- (*dest->receiveSlot) (slot, dest);
+ /*
+ * If we are not able to send the tuple, we assume the destination
+ * has closed and no more tuples can be sent. If that's the case,
+ * end the loop.
+ */
+ if (!((*dest->receiveSlot) (slot, dest)))
+ break;
ExecClearTuple(slot);
diff --git a/src/include/access/printtup.h b/src/include/access/printtup.h
index 64dde01cd14..608c5642872 100644
--- a/src/include/access/printtup.h
+++ b/src/include/access/printtup.h
@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist,
extern void debugStartup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-extern void debugtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool debugtup(TupleTableSlot *slot, DestReceiver *self);
/* XXX these are really in executor/spi.c */
extern void spi_dest_startup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self);
#endif /* PRINTTUP_H */
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 4e42d61c37a..dd80433f74f 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -104,7 +104,9 @@ typedef enum
* pointers that the executor must call.
*
* Note: the receiveSlot routine must be passed a slot containing a TupleDesc
- * identical to the one given to the rStartup routine.
+ * identical to the one given to the rStartup routine. It returns bool where
+ * a "true" value means "continue processing" and a "false" value means
+ * "stop early, just as if we'd reached the end of the scan".
* ----------------
*/
typedef struct _DestReceiver DestReceiver;
@@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver;
struct _DestReceiver
{
/* Called for each tuple to be output: */
- void (*receiveSlot) (TupleTableSlot *slot,
+ bool (*receiveSlot) (TupleTableSlot *slot,
DestReceiver *self);
/* Per-executor-run initialization and shutdown: */
void (*rStartup) (DestReceiver *self,