diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2021-03-15 18:13:42 -0300 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2021-03-15 18:13:42 -0300 |
commit | acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659 (patch) | |
tree | ff5dccb6a8372d0373a442841d8df4333a234eaa /src/interfaces/libpq/fe-protocol3.c | |
parent | 146cb3889c3ccb3fce198fe7464a1296a9e107c3 (diff) | |
download | postgresql-acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659.tar.gz postgresql-acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659.zip |
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in
the FE/BE protocol that are implicit in the old libpq API after each
query. The application can then insert Sync at its leisure with a new
libpq function PQpipelineSync. This can lead to substantial reductions
in query latency.
Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com>
Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com>
Reviewed-by: Daniel Vérité <daniel@manitou-mail.org>
Reviewed-by: David G. Johnston <david.g.johnston@gmail.com>
Reviewed-by: Justin Pryzby <pryzby@telsasoft.com>
Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com>
Reviewed-by: Michael Paquier <michael.paquier@gmail.com>
Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com>
Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com
Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
Diffstat (limited to 'src/interfaces/libpq/fe-protocol3.c')
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 77 |
1 files changed, 66 insertions, 11 deletions
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index eb55d528fb1..306e89acfd2 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -159,6 +159,18 @@ pqParseInput3(PGconn *conn) return; /* + * We're also notionally not-IDLE when in pipeline mode the state + * says "idle" (so we have completed receiving the results of one + * query from the server and dispatched them to the application) + * but another query is queued; yield back control to caller so + * that they can initiate processing of the next query in the + * queue. + */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF && + conn->cmd_queue_head != NULL) + return; + + /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; * ParameterStatus is handled normally; anything else is just @@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn) } else { + /* Any other case is unexpected and we summarily skip it */ pqInternalNotice(&conn->noticeHooks, "message type 0x%02x arrived from server while idle", id); @@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_PIPELINE_SYNC); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + } + else + { + conn->pipelineStatus = PQ_PIPELINE_ON; + conn->asyncStatus = PGASYNC_READY; + } + } + else + { + /* + * In simple query protocol, advance the command queue + * (see PQgetResult). + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE) + pqCommandQueueAdvance(conn); + conn->asyncStatus = PGASYNC_IDLE; + } break; case 'I': /* empty query */ if (conn->result == NULL) @@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn) break; case '1': /* Parse Complete */ /* If we're doing PQprepare, we're done; else ignore */ - if (conn->queryclass == PGQUERY_PREPARE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_PREPARE) { if (conn->result == NULL) { @@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn) conn->inCursor += msgLength; } else if (conn->result == NULL || - conn->queryclass == PGQUERY_DESCRIBE) + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { /* First 'T' in a query sequence */ if (getRowDescriptions(conn, msgLength)) @@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn) * instead of PGRES_TUPLES_OK. Otherwise we can just * ignore this message. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE) { if (conn->result == NULL) { @@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) id, msgLength); /* build an error result holding the error message */ pqSaveErrorResult(conn); - conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ + conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */ /* flush input data since we're giving up on processing it */ pqDropConnection(conn, true); conn->status = CONNECTION_BAD; /* No more connection to backend */ @@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * PGresult created by getParamDescriptions, and we should fill data into * that. Otherwise, create a new, empty PGresult. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (!conn->cmd_queue_head || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { if (conn->result) result = conn->result; @@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * If we're doing a Describe, we're done, and ready to pass the result * back to the client. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if ((!conn->cmd_queue_head) || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { conn->asyncStatus = PGASYNC_READY; return 0; @@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + /* If in pipeline mode, set error indicator for it */ + if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF) + conn->pipelineStatus = PQ_PIPELINE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and @@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError) * might need it for an error cursor display, which is only true if there * is a PG_DIAG_STATEMENT_POSITION field. */ - if (have_position && conn->last_query && res) - res->errQuery = pqResultStrdup(res, conn->last_query); + if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query) + res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query); /* * Now build the "overall" error message for PQresultErrorMessage. @@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid, int avail; int i; + /* already validated by PQfn */ + Assert(conn->pipelineStatus == PQ_PIPELINE_OFF); + /* PQfn already validated connection state */ if (pqPutMsgStart('F', conn) < 0 || /* function call msg */ |