aboutsummaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-exec.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r--src/interfaces/libpq/fe-exec.c186
1 files changed, 175 insertions, 11 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index e6fb7e9ee7f..9840cc3b9c6 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -7,12 +7,13 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.86 1999/11/11 00:10:14 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.87 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include <errno.h>
#include <ctype.h>
+#include <fcntl.h>
#include "postgres.h"
#include "libpq-fe.h"
@@ -24,7 +25,6 @@
#include <unistd.h>
#endif
-
/* keep this in same order as ExecStatusType in libpq-fe.h */
const char *const pgresStatus[] = {
"PGRES_EMPTY_QUERY",
@@ -514,13 +514,53 @@ PQsendQuery(PGconn *conn, const char *query)
conn->curTuple = NULL;
/* send the query to the backend; */
- /* the frontend-backend protocol uses 'Q' to designate queries */
- if (pqPutnchar("Q", 1, conn) ||
- pqPuts(query, conn) ||
- pqFlush(conn))
+
+ /*
+ * in order to guarantee that we don't send a partial query
+ * where we would become out of sync with the backend and/or
+ * block during a non-blocking connection we must first flush
+ * the send buffer before sending more data
+ *
+ * an alternative is to implement 'queue reservations' where
+ * we are able to roll up a transaction
+ * (the 'Q' along with our query) and make sure we have
+ * enough space for it all in the send buffer.
+ */
+ if (pqIsnonblocking(conn))
{
- handleSendFailure(conn);
- return 0;
+ /*
+ * the buffer must have emptied completely before we allow
+ * a new query to be buffered
+ */
+ if (pqFlush(conn))
+ return 0;
+ /* 'Q' == queries */
+ /* XXX: if we fail here we really ought to not block */
+ if (pqPutnchar("Q", 1, conn) ||
+ pqPuts(query, conn))
+ {
+ handleSendFailure(conn);
+ return 0;
+ }
+ /*
+ * give the data a push, ignore the return value as
+ * ConsumeInput() will do any aditional flushing if needed
+ */
+ (void) pqFlush(conn);
+ }
+ else
+ {
+ /*
+ * the frontend-backend protocol uses 'Q' to
+ * designate queries
+ */
+ if (pqPutnchar("Q", 1, conn) ||
+ pqPuts(query, conn) ||
+ pqFlush(conn))
+ {
+ handleSendFailure(conn);
+ return 0;
+ }
}
/* OK, it's launched! */
@@ -574,7 +614,17 @@ PQconsumeInput(PGconn *conn)
* we will NOT block waiting for more input.
*/
if (pqReadData(conn) < 0)
+ {
+ /*
+ * for non-blocking connections
+ * try to flush the send-queue otherwise we may never get a
+ * responce for something that may not have already been sent
+ * because it's in our write buffer!
+ */
+ if (pqIsnonblocking(conn))
+ (void) pqFlush(conn);
return 0;
+ }
/* Parsing of the data waits till later. */
return 1;
}
@@ -1088,6 +1138,16 @@ PQexec(PGconn *conn, const char *query)
{
PGresult *result;
PGresult *lastResult;
+ bool savedblocking;
+
+ /*
+ * we assume anyone calling PQexec wants blocking behaviour,
+ * we force the blocking status of the connection to blocking
+ * for the duration of this function and restore it on return
+ */
+ savedblocking = pqIsnonblocking(conn);
+ if (PQsetnonblocking(conn, FALSE) == -1)
+ return NULL;
/*
* Silently discard any prior query result that application didn't
@@ -1102,14 +1162,15 @@ PQexec(PGconn *conn, const char *query)
PQclear(result);
printfPQExpBuffer(&conn->errorMessage,
"PQexec: you gotta get out of a COPY state yourself.\n");
- return NULL;
+ /* restore blocking status */
+ goto errout;
}
PQclear(result);
}
/* OK to send the message */
if (!PQsendQuery(conn, query))
- return NULL;
+ goto errout; /* restore blocking status */
/*
* For backwards compatibility, return the last result if there are
@@ -1142,7 +1203,15 @@ PQexec(PGconn *conn, const char *query)
result->resultStatus == PGRES_COPY_OUT)
break;
}
+
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
return lastResult;
+
+errout:
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
+ return NULL;
}
@@ -1432,7 +1501,16 @@ PQendcopy(PGconn *conn)
return 1;
}
- (void) pqFlush(conn); /* make sure no data is waiting to be sent */
+ /*
+ * make sure no data is waiting to be sent,
+ * abort if we are non-blocking and the flush fails
+ */
+ if (pqFlush(conn) && pqIsnonblocking(conn))
+ return (1);
+
+ /* non blocking connections may have to abort at this point. */
+ if (pqIsnonblocking(conn) && PQisBusy(conn))
+ return (1);
/* Return to active duty */
conn->asyncStatus = PGASYNC_BUSY;
@@ -2026,3 +2104,89 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
else
return 0;
}
+
+/* PQsetnonblocking:
+ sets the PGconn's database connection non-blocking if the arg is TRUE
+ or makes it non-blocking if the arg is FALSE, this will not protect
+ you from PQexec(), you'll only be safe when using the non-blocking
+ API
+ Needs to be called only on a connected database connection.
+*/
+
+int
+PQsetnonblocking(PGconn *conn, int arg)
+{
+ int fcntlarg;
+
+ arg = (arg == TRUE) ? 1 : 0;
+ /* early out if the socket is already in the state requested */
+ if (arg == conn->nonblocking)
+ return (0);
+
+ /*
+ * to guarantee constancy for flushing/query/result-polling behavior
+ * we need to flush the send queue at this point in order to guarantee
+ * proper behavior.
+ * this is ok because either they are making a transition
+ * _from_ or _to_ blocking mode, either way we can block them.
+ */
+ /* if we are going from blocking to non-blocking flush here */
+ if (!pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+
+#ifdef USE_SSL
+ if (conn->ssl)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetnonblocking() -- not supported when using SSL\n");
+ return (-1);
+ }
+#endif /* USE_SSL */
+
+#ifndef WIN32
+ fcntlarg = fcntl(conn->sock, F_GETFL, 0);
+ if (fcntlarg == -1)
+ return (-1);
+
+ if ((arg == TRUE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
+ (arg == FALSE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
+#else
+ fcntlarg = arg;
+ if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
+#endif
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetblocking() -- unable to set nonblocking status to %s\n",
+ arg == TRUE ? "TRUE" : "FALSE");
+ return (-1);
+ }
+
+ conn->nonblocking = arg;
+
+ /* if we are going from non-blocking to blocking flush here */
+ if (pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+ return (0);
+}
+
+/* return the blocking status of the database connection, TRUE == nonblocking,
+ FALSE == blocking
+*/
+int
+PQisnonblocking(const PGconn *conn)
+{
+
+ return (pqIsnonblocking(conn));
+}
+
+/* try to force data out, really only useful for non-blocking users */
+int
+PQflush(PGconn *conn)
+{
+
+ return (pqFlush(conn));
+}