diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/pgbench/pgbench.c | 28 | ||||
-rw-r--r-- | src/bin/pgbench/t/001_pgbench_with_server.pl | 36 |
2 files changed, 61 insertions, 3 deletions
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 7b53f9c24da..af1f75257ff 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -608,6 +608,7 @@ typedef struct int use_file; /* index in sql_script for this client */ int command; /* command number in script */ + int num_syncs; /* number of ongoing sync commands */ /* client variables */ Variables variables; @@ -697,6 +698,7 @@ typedef enum MetaCommand META_ELSE, /* \else */ META_ENDIF, /* \endif */ META_STARTPIPELINE, /* \startpipeline */ + META_SYNCPIPELINE, /* \syncpipeline */ META_ENDPIPELINE, /* \endpipeline */ } MetaCommand; @@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd) mc = META_ASET; else if (pg_strcasecmp(cmd, "startpipeline") == 0) mc = META_STARTPIPELINE; + else if (pg_strcasecmp(cmd, "syncpipeline") == 0) + mc = META_SYNCPIPELINE; else if (pg_strcasecmp(cmd, "endpipeline") == 0) mc = META_ENDPIPELINE; else @@ -3317,8 +3321,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) break; case PGRES_PIPELINE_SYNC: - pg_log_debug("client %d pipeline ending", st->id); - if (PQexitPipelineMode(st->con) != 1) + pg_log_debug("client %d pipeline ending, ongoing syncs: %d", + st->id, st->num_syncs); + st->num_syncs--; + if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1) pg_log_error("client %d failed to exit pipeline mode: %s", st->id, PQerrorMessage(st->con)); break; @@ -4449,6 +4455,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } } + else if (command->meta == META_SYNCPIPELINE) + { + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + { + commandFailed(st, "syncpipeline", "not in pipeline mode"); + return CSTATE_ABORTED; + } + if (PQsendPipelineSync(st->con) == 0) + { + commandFailed(st, "syncpipeline", "failed to send a pipeline sync"); + return CSTATE_ABORTED; + } + st->num_syncs++; + } else if (command->meta == META_ENDPIPELINE) { if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) @@ -4461,6 +4481,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) commandFailed(st, "endpipeline", "failed to send a pipeline sync"); return CSTATE_ABORTED; } + st->num_syncs++; /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */ /* collect pending results before getting out of pipeline mode */ return CSTATE_WAIT_RESULT; @@ -5794,7 +5815,8 @@ process_backslash_command(PsqlScanState sstate, const char *source) } else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF || my_command->meta == META_STARTPIPELINE || - my_command->meta == META_ENDPIPELINE) + my_command->meta == META_ENDPIPELINE || + my_command->meta == META_SYNCPIPELINE) { if (my_command->argc != 1) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index f60f7e89c1f..5d2341a2035 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -814,6 +814,27 @@ $node->pgbench( } }); +# Working \startpipeline with \syncpipeline +$node->pgbench( + '-t 1 -n -M extended', + 0, + [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ], + [], + 'working \startpipeline with \syncpipeline', + { + '001_pgbench_pipeline_sync' => q{ +-- test startpipeline +\startpipeline +select 1; +\syncpipeline +\syncpipeline +select 2; +\syncpipeline +select 3; +\endpipeline +} + }); + # Working \startpipeline in prepared query mode $node->pgbench( '-t 1 -n -M prepared', @@ -904,6 +925,21 @@ $node->pgbench( } }); +# Try \startpipeline with \syncpipeline without \endpipeline +$node->pgbench( + '-t 2 -n -M extended', + 2, + [], + [qr{end of script reached with pipeline open}], + 'error: call \startpipeline and \syncpipeline without \endpipeline', + { + '001_pgbench_pipeline_7' => q{ +-- startpipeline with \syncpipeline only +\startpipeline +\syncpipeline +} + }); + # Working \startpipeline in prepared query mode with serializable $node->pgbench( '-c4 -t 10 -n -M prepared', |