diff options
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 186 |
1 files changed, 175 insertions, 11 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index e6fb7e9ee7f..9840cc3b9c6 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -7,12 +7,13 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.86 1999/11/11 00:10:14 momjian Exp $ + * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.87 2000/01/18 06:09:24 momjian Exp $ * *------------------------------------------------------------------------- */ #include <errno.h> #include <ctype.h> +#include <fcntl.h> #include "postgres.h" #include "libpq-fe.h" @@ -24,7 +25,6 @@ #include <unistd.h> #endif - /* keep this in same order as ExecStatusType in libpq-fe.h */ const char *const pgresStatus[] = { "PGRES_EMPTY_QUERY", @@ -514,13 +514,53 @@ PQsendQuery(PGconn *conn, const char *query) conn->curTuple = NULL; /* send the query to the backend; */ - /* the frontend-backend protocol uses 'Q' to designate queries */ - if (pqPutnchar("Q", 1, conn) || - pqPuts(query, conn) || - pqFlush(conn)) + + /* + * in order to guarantee that we don't send a partial query + * where we would become out of sync with the backend and/or + * block during a non-blocking connection we must first flush + * the send buffer before sending more data + * + * an alternative is to implement 'queue reservations' where + * we are able to roll up a transaction + * (the 'Q' along with our query) and make sure we have + * enough space for it all in the send buffer. + */ + if (pqIsnonblocking(conn)) { - handleSendFailure(conn); - return 0; + /* + * the buffer must have emptied completely before we allow + * a new query to be buffered + */ + if (pqFlush(conn)) + return 0; + /* 'Q' == queries */ + /* XXX: if we fail here we really ought to not block */ + if (pqPutnchar("Q", 1, conn) || + pqPuts(query, conn)) + { + handleSendFailure(conn); + return 0; + } + /* + * give the data a push, ignore the return value as + * ConsumeInput() will do any aditional flushing if needed + */ + (void) pqFlush(conn); + } + else + { + /* + * the frontend-backend protocol uses 'Q' to + * designate queries + */ + if (pqPutnchar("Q", 1, conn) || + pqPuts(query, conn) || + pqFlush(conn)) + { + handleSendFailure(conn); + return 0; + } } /* OK, it's launched! */ @@ -574,7 +614,17 @@ PQconsumeInput(PGconn *conn) * we will NOT block waiting for more input. */ if (pqReadData(conn) < 0) + { + /* + * for non-blocking connections + * try to flush the send-queue otherwise we may never get a + * responce for something that may not have already been sent + * because it's in our write buffer! + */ + if (pqIsnonblocking(conn)) + (void) pqFlush(conn); return 0; + } /* Parsing of the data waits till later. */ return 1; } @@ -1088,6 +1138,16 @@ PQexec(PGconn *conn, const char *query) { PGresult *result; PGresult *lastResult; + bool savedblocking; + + /* + * we assume anyone calling PQexec wants blocking behaviour, + * we force the blocking status of the connection to blocking + * for the duration of this function and restore it on return + */ + savedblocking = pqIsnonblocking(conn); + if (PQsetnonblocking(conn, FALSE) == -1) + return NULL; /* * Silently discard any prior query result that application didn't @@ -1102,14 +1162,15 @@ PQexec(PGconn *conn, const char *query) PQclear(result); printfPQExpBuffer(&conn->errorMessage, "PQexec: you gotta get out of a COPY state yourself.\n"); - return NULL; + /* restore blocking status */ + goto errout; } PQclear(result); } /* OK to send the message */ if (!PQsendQuery(conn, query)) - return NULL; + goto errout; /* restore blocking status */ /* * For backwards compatibility, return the last result if there are @@ -1142,7 +1203,15 @@ PQexec(PGconn *conn, const char *query) result->resultStatus == PGRES_COPY_OUT) break; } + + if (PQsetnonblocking(conn, savedblocking) == -1) + return NULL; return lastResult; + +errout: + if (PQsetnonblocking(conn, savedblocking) == -1) + return NULL; + return NULL; } @@ -1432,7 +1501,16 @@ PQendcopy(PGconn *conn) return 1; } - (void) pqFlush(conn); /* make sure no data is waiting to be sent */ + /* + * make sure no data is waiting to be sent, + * abort if we are non-blocking and the flush fails + */ + if (pqFlush(conn) && pqIsnonblocking(conn)) + return (1); + + /* non blocking connections may have to abort at this point. */ + if (pqIsnonblocking(conn) && PQisBusy(conn)) + return (1); /* Return to active duty */ conn->asyncStatus = PGASYNC_BUSY; @@ -2026,3 +2104,89 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) else return 0; } + +/* PQsetnonblocking: + sets the PGconn's database connection non-blocking if the arg is TRUE + or makes it non-blocking if the arg is FALSE, this will not protect + you from PQexec(), you'll only be safe when using the non-blocking + API + Needs to be called only on a connected database connection. +*/ + +int +PQsetnonblocking(PGconn *conn, int arg) +{ + int fcntlarg; + + arg = (arg == TRUE) ? 1 : 0; + /* early out if the socket is already in the state requested */ + if (arg == conn->nonblocking) + return (0); + + /* + * to guarantee constancy for flushing/query/result-polling behavior + * we need to flush the send queue at this point in order to guarantee + * proper behavior. + * this is ok because either they are making a transition + * _from_ or _to_ blocking mode, either way we can block them. + */ + /* if we are going from blocking to non-blocking flush here */ + if (!pqIsnonblocking(conn) && pqFlush(conn)) + return (-1); + + +#ifdef USE_SSL + if (conn->ssl) + { + printfPQExpBuffer(&conn->errorMessage, + "PQsetnonblocking() -- not supported when using SSL\n"); + return (-1); + } +#endif /* USE_SSL */ + +#ifndef WIN32 + fcntlarg = fcntl(conn->sock, F_GETFL, 0); + if (fcntlarg == -1) + return (-1); + + if ((arg == TRUE && + fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) || + (arg == FALSE && + fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1)) +#else + fcntlarg = arg; + if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0) +#endif + { + printfPQExpBuffer(&conn->errorMessage, + "PQsetblocking() -- unable to set nonblocking status to %s\n", + arg == TRUE ? "TRUE" : "FALSE"); + return (-1); + } + + conn->nonblocking = arg; + + /* if we are going from non-blocking to blocking flush here */ + if (pqIsnonblocking(conn) && pqFlush(conn)) + return (-1); + + return (0); +} + +/* return the blocking status of the database connection, TRUE == nonblocking, + FALSE == blocking +*/ +int +PQisnonblocking(const PGconn *conn) +{ + + return (pqIsnonblocking(conn)); +} + +/* try to force data out, really only useful for non-blocking users */ +int +PQflush(PGconn *conn) +{ + + return (pqFlush(conn)); +} |