diff options
author | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
---|---|---|
committer | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
commit | 9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch) | |
tree | a2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/dumputils.c | |
parent | 3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff) | |
download | postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.tar.gz postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.zip |
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers
(threads on Windows, forked processes on Unix). Jobs are then
handed out to these workers by the master process as needed.
pg_restore is adjusted to use this new infrastructure in place of the
old setup which created a new worker for each step on the fly. Parallel
dumps acquire a snapshot clone in order to stay consistent, if
available.
The parallel option is selected by the -j / --jobs command line
parameter of pg_dump.
Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/dumputils.c')
-rw-r--r-- | src/bin/pg_dump/dumputils.c | 86 |
1 files changed, 73 insertions, 13 deletions
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c index 0a09882f5dc..7322f1a8257 100644 --- a/src/bin/pg_dump/dumputils.c +++ b/src/bin/pg_dump/dumputils.c @@ -38,6 +38,7 @@ static struct } on_exit_nicely_list[MAX_ON_EXIT_NICELY]; static int on_exit_nicely_index; +void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap) = vwrite_msg; #define supports_grant_options(version) ((version) >= 70400) @@ -48,11 +49,21 @@ static bool parseAclItem(const char *item, const char *type, static char *copyAclUserName(PQExpBuffer output, char *input); static void AddAcl(PQExpBuffer aclbuf, const char *keyword, const char *subname); +static PQExpBuffer getThreadLocalPQExpBuffer(void); #ifdef WIN32 +static void shutdown_parallel_dump_utils(int code, void *unused); static bool parallel_init_done = false; static DWORD tls_index; static DWORD mainThreadId; + +static void +shutdown_parallel_dump_utils(int code, void *unused) +{ + /* Call the cleanup function only from the main thread */ + if (mainThreadId == GetCurrentThreadId()) + WSACleanup(); +} #endif void @@ -61,23 +72,29 @@ init_parallel_dump_utils(void) #ifdef WIN32 if (!parallel_init_done) { + WSADATA wsaData; + int err; + tls_index = TlsAlloc(); - parallel_init_done = true; mainThreadId = GetCurrentThreadId(); + err = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (err != 0) + { + fprintf(stderr, _("WSAStartup failed: %d\n"), err); + exit_nicely(1); + } + on_exit_nicely(shutdown_parallel_dump_utils, NULL); + parallel_init_done = true; } #endif } /* - * Quotes input string if it's not a legitimate SQL identifier as-is. - * - * Note that the returned string must be used before calling fmtId again, - * since we re-use the same return buffer each time. Non-reentrant but - * reduces memory leakage. (On Windows the memory leakage will be one buffer - * per thread, which is at least better than one per call). + * Non-reentrant but reduces memory leakage. (On Windows the memory leakage + * will be one buffer per thread, which is at least better than one per call). */ -const char * -fmtId(const char *rawid) +static PQExpBuffer +getThreadLocalPQExpBuffer(void) { /* * The Tls code goes awry if we use a static var, so we provide for both @@ -86,9 +103,6 @@ fmtId(const char *rawid) static PQExpBuffer s_id_return = NULL; PQExpBuffer id_return; - const char *cp; - bool need_quotes = false; - #ifdef WIN32 if (parallel_init_done) id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */ @@ -118,6 +132,23 @@ fmtId(const char *rawid) } + return id_return; +} + +/* + * Quotes input string if it's not a legitimate SQL identifier as-is. + * + * Note that the returned string must be used before calling fmtId again, + * since we re-use the same return buffer each time. + */ +const char * +fmtId(const char *rawid) +{ + PQExpBuffer id_return = getThreadLocalPQExpBuffer(); + + const char *cp; + bool need_quotes = false; + /* * These checks need to match the identifier production in scan.l. Don't * use islower() etc. @@ -185,6 +216,35 @@ fmtId(const char *rawid) return id_return->data; } +/* + * fmtQualifiedId - convert a qualified name to the proper format for + * the source database. + * + * Like fmtId, use the result before calling again. + * + * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot + * use it until we're finished with calling fmtId(). + */ +const char * +fmtQualifiedId(int remoteVersion, const char *schema, const char *id) +{ + PQExpBuffer id_return; + PQExpBuffer lcl_pqexp = createPQExpBuffer(); + + /* Suppress schema name if fetching from pre-7.3 DB */ + if (remoteVersion >= 70300 && schema && *schema) + { + appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema)); + } + appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id)); + + id_return = getThreadLocalPQExpBuffer(); + + appendPQExpBuffer(id_return, "%s", lcl_pqexp->data); + destroyPQExpBuffer(lcl_pqexp); + + return id_return->data; +} /* * Convert a string value to an SQL string literal and append it to @@ -1315,7 +1375,7 @@ exit_horribly(const char *modulename, const char *fmt,...) va_list ap; va_start(ap, fmt); - vwrite_msg(modulename, fmt, ap); + on_exit_msg_func(modulename, fmt, ap); va_end(ap); exit_nicely(1); |