diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 3 | ||||
-rw-r--r-- | src/bin/pg_amcheck/pg_amcheck.c | 1 | ||||
-rw-r--r-- | src/interfaces/libpq/exports.txt | 1 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 146 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 3 | ||||
-rw-r--r-- | src/interfaces/libpq/libpq-fe.h | 4 | ||||
-rw-r--r-- | src/interfaces/libpq/libpq-int.h | 10 | ||||
-rw-r--r-- | src/test/modules/libpq_pipeline/libpq_pipeline.c | 40 | ||||
-rw-r--r-- | src/test/modules/libpq_pipeline/traces/singlerow.trace | 14 |
9 files changed, 163 insertions, 59 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 761bf0f677c..3c2b1bb4966 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1248,8 +1248,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, switch (PQresultStatus(pgres)) { - case PGRES_SINGLE_TUPLE: case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: walres->status = WALRCV_OK_TUPLES; libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes); break; diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index e5f9eedc478..7e3101704d4 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -991,6 +991,7 @@ should_processing_continue(PGresult *res) case PGRES_SINGLE_TUPLE: case PGRES_PIPELINE_SYNC: case PGRES_PIPELINE_ABORTED: + case PGRES_TUPLES_CHUNK: return false; } return true; diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1e48d37677d..8ee08115100 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -203,3 +203,4 @@ PQcancelErrorMessage 200 PQcancelReset 201 PQcancelFinish 202 PQsocketPoll 203 +PQsetChunkedRowsMode 204 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c02a9180b24..7bdfc4c21aa 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" }; /* We return this if we're unable to make a PGresult at all */ @@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: /* non-error cases */ break; default: @@ -771,7 +773,7 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. * - * Any "next" result gets cleared too. + * Any "saved" result gets cleared too. */ void pqClearAsyncResult(PGconn *conn) @@ -779,8 +781,8 @@ pqClearAsyncResult(PGconn *conn) PQclear(conn->result); conn->result = NULL; conn->error_result = false; - PQclear(conn->next_result); - conn->next_result = NULL; + PQclear(conn->saved_result); + conn->saved_result = NULL; } /* @@ -911,14 +913,14 @@ pqPrepareAsyncResult(PGconn *conn) } /* - * Replace conn->result with next_result, if any. In the normal case - * there isn't a next result and we're just dropping ownership of the - * current result. In single-row mode this restores the situation to what - * it was before we created the current single-row result. + * Replace conn->result with saved_result, if any. In the normal case + * there isn't a saved result and we're just dropping ownership of the + * current result. In partial-result mode this restores the situation to + * what it was before we created the current partial result. */ - conn->result = conn->next_result; - conn->error_result = false; /* next_result is never an error */ - conn->next_result = NULL; + conn->result = conn->saved_result; + conn->error_result = false; /* saved_result is never an error */ + conn->saved_result = NULL; return res; } @@ -1199,11 +1201,6 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) * On error, *errmsgp can be set to an error string to be returned. * (Such a string should already be translated via libpq_gettext().) * If it is left NULL, the error is presumed to be "out of memory". - * - * In single-row mode, we create a new result holding just the current row, - * stashing the previous result in conn->next_result so that it becomes - * active again after pqPrepareAsyncResult(). This allows the result metadata - * (column descriptions) to be carried forward to each result row. */ int pqRowProcessor(PGconn *conn, const char **errmsgp) @@ -1215,11 +1212,14 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) int i; /* - * In single-row mode, make a new PGresult that will hold just this one - * row; the original conn->result is left unchanged so that it can be used - * again as the template for future rows. + * In partial-result mode, if we don't already have a partial PGresult + * then make one by cloning conn->result (which should hold the correct + * result metadata by now). Then the original conn->result is moved over + * to saved_result so that we can re-use it as a reference for future + * partial results. The saved result will become active again after + * pqPrepareAsyncResult() returns the partial result to the application. */ - if (conn->singleRowMode) + if (conn->partialResMode && conn->saved_result == NULL) { /* Copy everything that should be in the result at this point */ res = PQcopyResult(res, @@ -1227,6 +1227,11 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) PG_COPYRES_NOTICEHOOKS); if (!res) return 0; + /* Change result status to appropriate special value */ + res->resultStatus = (conn->singleRowMode ? PGRES_SINGLE_TUPLE : PGRES_TUPLES_CHUNK); + /* And stash it as the active result */ + conn->saved_result = conn->result; + conn->result = res; } /* @@ -1241,7 +1246,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) tup = (PGresAttValue *) pqResultAlloc(res, nfields * sizeof(PGresAttValue), true); if (tup == NULL) - goto fail; + return 0; for (i = 0; i < nfields; i++) { @@ -1260,7 +1265,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) val = (char *) pqResultAlloc(res, clen + 1, isbinary); if (val == NULL) - goto fail; + return 0; /* copy and zero-terminate the data (even if it's binary) */ memcpy(val, columns[i].value, clen); @@ -1273,30 +1278,16 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) /* And add the tuple to the PGresult's tuple array */ if (!pqAddTuple(res, tup, errmsgp)) - goto fail; + return 0; /* - * Success. In single-row mode, make the result available to the client - * immediately. + * Success. In partial-result mode, if we have enough rows then make the + * result available to the client immediately. */ - if (conn->singleRowMode) - { - /* Change result status to special single-row value */ - res->resultStatus = PGRES_SINGLE_TUPLE; - /* Stash old result for re-use later */ - conn->next_result = conn->result; - conn->result = res; - /* And mark the result ready to return */ + if (conn->partialResMode && res->ntups >= conn->maxChunkSize) conn->asyncStatus = PGASYNC_READY_MORE; - } return 1; - -fail: - /* release locally allocated PGresult, if we made one */ - if (res != conn->result) - PQclear(res); - return 0; } @@ -1745,8 +1736,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery) */ pqClearAsyncResult(conn); - /* reset single-row processing mode */ + /* reset partial-result mode */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; } /* ready to send command message */ @@ -1926,29 +1919,60 @@ sendFailed: } /* - * Select row-by-row processing mode + * Is it OK to change partial-result mode now? */ -int -PQsetSingleRowMode(PGconn *conn) +static bool +canChangeResultMode(PGconn *conn) { /* - * Only allow setting the flag when we have launched a query and not yet + * Only allow changing the mode when we have launched a query and not yet * received any results. */ if (!conn) - return 0; + return false; if (conn->asyncStatus != PGASYNC_BUSY) - return 0; + return false; if (!conn->cmd_queue_head || (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) - return 0; + return false; if (pgHavePendingResult(conn)) + return false; + return true; +} + +/* + * Select row-by-row processing mode + */ +int +PQsetSingleRowMode(PGconn *conn) +{ + if (canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = true; + conn->maxChunkSize = 1; + return 1; + } + else return 0; +} - /* OK, set flag */ - conn->singleRowMode = true; - return 1; +/* + * Select chunked results processing mode + */ +int +PQsetChunkedRowsMode(PGconn *conn, int chunkSize) +{ + if (chunkSize > 0 && canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = false; + conn->maxChunkSize = chunkSize; + return 1; + } + else + return 0; } /* @@ -2117,6 +2141,20 @@ PQgetResult(PGconn *conn) case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + /* + * Normally pqPrepareAsyncResult will have left conn->result + * empty. Otherwise, "res" must be a not-full PGRES_TUPLES_CHUNK + * result, which we want to return to the caller while staying in + * PGASYNC_READY state. Then the next call here will return the + * empty PGRES_TUPLES_OK result that was restored from + * saved_result, after which we can proceed. + */ + if (conn->result) + { + Assert(res->resultStatus == PGRES_TUPLES_CHUNK); + break; + } + /* Advance the queue as appropriate */ pqCommandQueueAdvance(conn, false, res->resultStatus == PGRES_PIPELINE_SYNC); @@ -3173,10 +3211,12 @@ pqPipelineProcessQueue(PGconn *conn) } /* - * Reset single-row processing mode. (Client has to set it up for each - * query, if desired.) + * Reset partial-result mode. (Client has to set it up for each query, if + * desired.) */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; /* * If there are no further commands to process in the queue, get us in diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 701d58e1087..3170d484f02 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -379,7 +379,8 @@ pqParseInput3(PGconn *conn) break; case PqMsg_DataRow: if (conn->result != NULL && - conn->result->resultStatus == PGRES_TUPLES_OK) + (conn->result->resultStatus == PGRES_TUPLES_OK || + conn->result->resultStatus == PGRES_TUPLES_CHUNK)) { /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index c184e853889..c0443d68fdc 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -112,8 +112,9 @@ typedef enum PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ PGRES_PIPELINE_SYNC, /* pipeline synchronization point */ - PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort * earlier in a pipeline */ + PGRES_TUPLES_CHUNK /* chunk of tuples from larger resultset */ } ExecStatusType; typedef enum @@ -489,6 +490,7 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramFormats, int resultFormat); extern int PQsetSingleRowMode(PGconn *conn); +extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); extern PGresult *PQgetResult(PGconn *conn); /* Routines for managing an asynchronous query */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9c05f11a6e9..113ea47c400 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -434,7 +434,10 @@ struct pg_conn bool nonblocking; /* whether this connection is using nonblock * sending semantics */ PGpipelineStatus pipelineStatus; /* status of pipeline mode */ + bool partialResMode; /* true if single-row or chunked mode */ bool singleRowMode; /* return current query result row-by-row? */ + int maxChunkSize; /* return query result in chunks not exceeding + * this number of rows */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ PGnotify *notifyHead; /* oldest unreported Notify msg */ @@ -535,12 +538,13 @@ struct pg_conn * and error_result is true, then we need to return a PGRES_FATAL_ERROR * result, but haven't yet constructed it; text for the error has been * appended to conn->errorMessage. (Delaying construction simplifies - * dealing with out-of-memory cases.) If next_result isn't NULL, it is a - * PGresult that will replace "result" after we return that one. + * dealing with out-of-memory cases.) If saved_result isn't NULL, it is a + * PGresult that will replace "result" after we return that one; we use + * that in partial-result mode to remember the query's tuple metadata. */ PGresult *result; /* result being constructed */ bool error_result; /* do we need to make an ERROR result? */ - PGresult *next_result; /* next result (used in single-row mode) */ + PGresult *saved_result; /* original, empty result in partialResMode */ /* Assorted state for SASL, SSL, GSS, etc */ const pg_fe_sasl_mech *sasl; diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index b7e7a0947cb..928ef6b1700 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -1719,6 +1719,46 @@ test_singlerowmode(PGconn *conn) if (PQgetResult(conn) != NULL) pg_fatal("expected NULL result"); + /* + * Try chunked mode as well; make sure that it correctly delivers a + * partial final chunk. + */ + if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + if (PQsendFlushRequest(conn) != 1) + pg_fatal("failed to send flush request"); + if (PQsetChunkedRowsMode(conn, 3) != 1) + pg_fatal("PQsetChunkedRowsMode() failed"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_CHUNK) + pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s", + PQresStatus(PQresultStatus(res)), + PQerrorMessage(conn)); + if (PQntuples(res) != 3) + pg_fatal("Expected 3 rows, got %d", PQntuples(res)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_CHUNK) + pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s", + PQresStatus(PQresultStatus(res))); + if (PQntuples(res) != 2) + pg_fatal("Expected 2 rows, got %d", PQntuples(res)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Expected PGRES_TUPLES_OK, got %s", + PQresStatus(PQresultStatus(res))); + if (PQntuples(res) != 0) + pg_fatal("Expected 0 rows, got %d", PQntuples(res)); + if (PQgetResult(conn) != NULL) + pg_fatal("expected NULL result"); + if (PQexitPipelineMode(conn) != 1) pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn)); diff --git a/src/test/modules/libpq_pipeline/traces/singlerow.trace b/src/test/modules/libpq_pipeline/traces/singlerow.trace index 83043e1407e..029cd66581d 100644 --- a/src/test/modules/libpq_pipeline/traces/singlerow.trace +++ b/src/test/modules/libpq_pipeline/traces/singlerow.trace @@ -56,4 +56,18 @@ B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 11 DataRow 1 1 '1' B 13 CommandComplete "SELECT 1" +F 36 Parse "" "SELECT generate_series(1, 5)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 11 DataRow 1 1 '2' +B 11 DataRow 1 1 '3' +B 11 DataRow 1 1 '4' +B 11 DataRow 1 1 '5' +B 13 CommandComplete "SELECT 5" F 4 Terminate |