aboutsummaryrefslogtreecommitdiff
path: root/src/test/modules/libpq_pipeline/libpq_pipeline.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/modules/libpq_pipeline/libpq_pipeline.c')
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c90
1 files changed, 90 insertions, 0 deletions
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71eedb6dbb4..249ee22105c 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -231,6 +231,93 @@ test_multi_pipelines(PGconn *conn)
}
/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+ int numqueries = 10;
+ int results = 0;
+ int sock = PQsocket(conn);
+
+ fprintf(stderr, "nosync... ");
+
+ if (sock < 0)
+ pg_fatal("invalid socket");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("could not enter pipeline mode");
+ for (int i = 0; i < numqueries; i++)
+ {
+ fd_set input_mask;
+ struct timeval tv;
+
+ if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("error sending select: %s", PQerrorMessage(conn));
+ PQflush(conn);
+
+ /*
+ * If the server has written anything to us, read (some of) it now.
+ */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+ if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+ pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+ }
+
+ /* tell server to flush its output buffer */
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ PQflush(conn);
+
+ /* Now read all results */
+ for (;;)
+ {
+ PGresult *res;
+
+ res = PQgetResult(conn);
+
+ /* NULL results are only expected after TUPLES_OK */
+ if (res == NULL)
+ pg_fatal("got unexpected NULL result after %d results", results);
+
+ /* We expect exactly one TUPLES_OK result for each query we sent */
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ PGresult *res2;
+
+ /* and one NULL result should follow each */
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ PQclear(res);
+ results++;
+
+ /* if we're done, we're done */
+ if (results == numqueries)
+ break;
+
+ continue;
+ }
+
+ /* anything else is unexpected */
+ pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+ }
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
* When an operation in a pipeline fails the rest of the pipeline is flushed. We
* still have to get results for each pipeline item, but the item will just be
* a PGRES_PIPELINE_ABORTED code.
@@ -1237,6 +1324,7 @@ print_test_list(void)
{
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
+ printf("nosync\n");
printf("pipeline_abort\n");
printf("pipelined_insert\n");
printf("prepared\n");
@@ -1334,6 +1422,8 @@ main(int argc, char **argv)
test_disallowed_in_pipeline(conn);
else if (strcmp(testname, "multi_pipelines") == 0)
test_multi_pipelines(conn);
+ else if (strcmp(testname, "nosync") == 0)
+ test_nosync(conn);
else if (strcmp(testname, "pipeline_abort") == 0)
test_pipeline_abort(conn);
else if (strcmp(testname, "pipelined_insert") == 0)