aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/pgbench/pgbench.c158
-rw-r--r--src/bin/pgbench/t/001_pgbench_with_server.pl20
2 files changed, 127 insertions, 51 deletions
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 508ed218e83..47b2c87f7f2 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -628,7 +628,8 @@ typedef struct
pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
- bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
+ /* whether client prepared each command of each script */
+ bool **prepared;
/*
* For processing failures and repeating transactions with serialization
@@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
* argv Command arguments, the first of which is the command or SQL
* string itself. For SQL commands, after post-processing
* argv[0] is the same as 'lines' with variables substituted.
- * varprefix SQL commands terminated with \gset or \aset have this set
+ * prepname The name that this command is prepared under, in prepare mode
+ * varprefix SQL commands terminated with \gset or \aset have this set
* to a non NULL value. If nonempty, it's used to prefix the
* variable name that receives the value.
* aset do gset on all possible queries of a combined query (\;).
@@ -751,6 +753,7 @@ typedef struct Command
MetaCommand meta;
int argc;
char *argv[MAX_ARGS];
+ char *prepname;
char *varprefix;
PgBenchExpr *expr;
SimpleStats stats;
@@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
return true;
}
-#define MAX_PREPARE_NAME 32
-static void
-preparedStatementName(char *buffer, int file, int state)
-{
- sprintf(buffer, "P%d_%d", file, state);
-}
-
/*
* Report the abortion of the client when processing SQL commands.
*/
@@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
return i - 1;
}
+/*
+ * Prepare the SQL command from st->use_file at command_num.
+ */
+static void
+prepareCommand(CState *st, int command_num)
+{
+ Command *command = sql_script[st->use_file].commands[command_num];
+
+ /* No prepare for non-SQL commands */
+ if (command->type != SQL_COMMAND)
+ return;
+
+ /*
+ * If not already done, allocate space for 'prepared' flags: one boolean
+ * for each command of each script.
+ */
+ if (!st->prepared)
+ {
+ st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
+ for (int i = 0; i < num_scripts; i++)
+ {
+ ParsedScript *script = &sql_script[i];
+ int numcmds;
+
+ for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
+ ;
+ st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
+ }
+ }
+
+ if (!st->prepared[st->use_file][command_num])
+ {
+ PGresult *res;
+
+ pg_log_debug("client %d preparing %s", st->id, command->prepname);
+ res = PQprepare(st->con, command->prepname,
+ command->argv[0], command->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_log_error("%s", PQerrorMessage(st->con));
+ PQclear(res);
+ st->prepared[st->use_file][command_num] = true;
+ }
+}
+
+/*
+ * Prepare all the commands in the script that come after the \startpipeline
+ * that's at position st->command, and the first \endpipeline we find.
+ *
+ * This sets the ->prepared flag for each relevant command as well as the
+ * \startpipeline itself, but doesn't move the st->command counter.
+ */
+static void
+prepareCommandsInPipeline(CState *st)
+{
+ int j;
+ Command **commands = sql_script[st->use_file].commands;
+
+ Assert(commands[st->command]->type == META_COMMAND &&
+ commands[st->command]->meta == META_STARTPIPELINE);
+
+ /*
+ * We set the 'prepared' flag on the \startpipeline itself to flag that we
+ * don't need to do this next time without calling prepareCommand(), even
+ * though we don't actually prepare this command.
+ */
+ if (st->prepared &&
+ st->prepared[st->use_file][st->command])
+ return;
+
+ for (j = st->command + 1; commands[j] != NULL; j++)
+ {
+ if (commands[j]->type == META_COMMAND &&
+ commands[j]->meta == META_ENDPIPELINE)
+ break;
+
+ prepareCommand(st, j);
+ }
+
+ st->prepared[st->use_file][st->command] = true;
+}
+
/* Send a SQL command, using the chosen querymode */
static bool
sendCommand(CState *st, Command *command)
@@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
}
else if (querymode == QUERY_PREPARED)
{
- char name[MAX_PREPARE_NAME];
const char *params[MAX_ARGS];
- if (!st->prepared[st->use_file])
- {
- int j;
- Command **commands = sql_script[st->use_file].commands;
-
- for (j = 0; commands[j] != NULL; j++)
- {
- PGresult *res;
-
- if (commands[j]->type != SQL_COMMAND)
- continue;
- preparedStatementName(name, st->use_file, j);
- if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
- {
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_log_error("%s", PQerrorMessage(st->con));
- PQclear(res);
- }
- else
- {
- /*
- * In pipeline mode, we use asynchronous functions. If a
- * server-side error occurs, it will be processed later
- * among the other results.
- */
- if (!PQsendPrepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL))
- pg_log_error("%s", PQerrorMessage(st->con));
- }
- }
- st->prepared[st->use_file] = true;
- }
-
+ prepareCommand(st, st->command);
getQueryParams(&st->variables, command, params);
- preparedStatementName(name, st->use_file, st->command);
- pg_log_debug("client %d sending %s", st->id, name);
- r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ pg_log_debug("client %d sending %s", st->id, command->prepname);
+ r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
params, NULL, NULL, 0);
}
else /* unknown sql mode */
@@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
thread->conn_duration += now - start;
/* Reset session-local state */
- memset(st->prepared, 0, sizeof(st->prepared));
+ pg_free(st->prepared);
+ st->prepared = NULL;
}
/*
@@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}
+ /*
+ * If we're in prepared-query mode, we need to prepare all the
+ * commands that are inside the pipeline before we actually start the
+ * pipeline itself. This solves the problem that running BEGIN
+ * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
+ * snapshot having been acquired by the prepare within the pipeline.
+ */
+ if (querymode == QUERY_PREPARED)
+ prepareCommandsInPipeline(st);
+
if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
{
commandFailed(st, "startpipeline", "already in pipeline mode");
@@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
my_command->varprefix = NULL; /* allocated later, if needed */
my_command->expr = NULL;
initSimpleStats(&my_command->stats);
+ my_command->prepname = NULL; /* set later, if needed */
return my_command;
}
@@ -5468,6 +5521,7 @@ static void
postprocess_sql_command(Command *my_command)
{
char buffer[128];
+ static int prepnum = 0;
Assert(my_command->type == SQL_COMMAND);
@@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command)
buffer[strcspn(buffer, "\n\r")] = '\0';
my_command->first_line = pg_strdup(buffer);
- /* parse query if necessary */
+ /* Parse query and generate prepared statement name, if necessary */
switch (querymode)
{
case QUERY_SIMPLE:
my_command->argv[0] = my_command->lines.data;
my_command->argc++;
break;
- case QUERY_EXTENDED:
case QUERY_PREPARED:
+ my_command->prepname = psprintf("P_%d", prepnum++);
+ /* fall through */
+ case QUERY_EXTENDED:
if (!parseQuery(my_command))
exit(1);
break;
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 4bf508ea966..99273203f03 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -839,6 +839,26 @@ select 1 \gset f
}
});
+# Working \startpipeline in prepared query mode with serializable
+$node->pgbench(
+ '-c4 -j2 -t 10 -n -M prepared',
+ 0,
+ [
+ qr{type: .*/001_pgbench_pipeline_serializable},
+ qr{actually processed: (\d+)/\1}
+ ],
+ [],
+ 'working \startpipeline with serializable',
+ {
+ '001_pgbench_pipeline_serializable' => q{
+-- test startpipeline with serializable
+\startpipeline
+BEGIN ISOLATION LEVEL SERIALIZABLE;
+} . "select 1;\n" x 10 . q{
+END;
+\endpipeline
+}
+ });
# trigger many expression errors
my @errors = (