aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/dumputils.c
diff options
context:
space:
mode:
authorAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
committerAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
commit9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch)
treea2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/dumputils.c
parent3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff)
downloadpostgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.tar.gz
postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.zip
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers (threads on Windows, forked processes on Unix). Jobs are then handed out to these workers by the master process as needed. pg_restore is adjusted to use this new infrastructure in place of the old setup which created a new worker for each step on the fly. Parallel dumps acquire a snapshot clone in order to stay consistent, if available. The parallel option is selected by the -j / --jobs command line parameter of pg_dump. Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/dumputils.c')
-rw-r--r--src/bin/pg_dump/dumputils.c86
1 files changed, 73 insertions, 13 deletions
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index 0a09882f5dc..7322f1a8257 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -38,6 +38,7 @@ static struct
} on_exit_nicely_list[MAX_ON_EXIT_NICELY];
static int on_exit_nicely_index;
+void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap) = vwrite_msg;
#define supports_grant_options(version) ((version) >= 70400)
@@ -48,11 +49,21 @@ static bool parseAclItem(const char *item, const char *type,
static char *copyAclUserName(PQExpBuffer output, char *input);
static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
const char *subname);
+static PQExpBuffer getThreadLocalPQExpBuffer(void);
#ifdef WIN32
+static void shutdown_parallel_dump_utils(int code, void *unused);
static bool parallel_init_done = false;
static DWORD tls_index;
static DWORD mainThreadId;
+
+static void
+shutdown_parallel_dump_utils(int code, void *unused)
+{
+ /* Call the cleanup function only from the main thread */
+ if (mainThreadId == GetCurrentThreadId())
+ WSACleanup();
+}
#endif
void
@@ -61,23 +72,29 @@ init_parallel_dump_utils(void)
#ifdef WIN32
if (!parallel_init_done)
{
+ WSADATA wsaData;
+ int err;
+
tls_index = TlsAlloc();
- parallel_init_done = true;
mainThreadId = GetCurrentThreadId();
+ err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ if (err != 0)
+ {
+ fprintf(stderr, _("WSAStartup failed: %d\n"), err);
+ exit_nicely(1);
+ }
+ on_exit_nicely(shutdown_parallel_dump_utils, NULL);
+ parallel_init_done = true;
}
#endif
}
/*
- * Quotes input string if it's not a legitimate SQL identifier as-is.
- *
- * Note that the returned string must be used before calling fmtId again,
- * since we re-use the same return buffer each time. Non-reentrant but
- * reduces memory leakage. (On Windows the memory leakage will be one buffer
- * per thread, which is at least better than one per call).
+ * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
+ * will be one buffer per thread, which is at least better than one per call).
*/
-const char *
-fmtId(const char *rawid)
+static PQExpBuffer
+getThreadLocalPQExpBuffer(void)
{
/*
* The Tls code goes awry if we use a static var, so we provide for both
@@ -86,9 +103,6 @@ fmtId(const char *rawid)
static PQExpBuffer s_id_return = NULL;
PQExpBuffer id_return;
- const char *cp;
- bool need_quotes = false;
-
#ifdef WIN32
if (parallel_init_done)
id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */
@@ -118,6 +132,23 @@ fmtId(const char *rawid)
}
+ return id_return;
+}
+
+/*
+ * Quotes input string if it's not a legitimate SQL identifier as-is.
+ *
+ * Note that the returned string must be used before calling fmtId again,
+ * since we re-use the same return buffer each time.
+ */
+const char *
+fmtId(const char *rawid)
+{
+ PQExpBuffer id_return = getThreadLocalPQExpBuffer();
+
+ const char *cp;
+ bool need_quotes = false;
+
/*
* These checks need to match the identifier production in scan.l. Don't
* use islower() etc.
@@ -185,6 +216,35 @@ fmtId(const char *rawid)
return id_return->data;
}
+/*
+ * fmtQualifiedId - convert a qualified name to the proper format for
+ * the source database.
+ *
+ * Like fmtId, use the result before calling again.
+ *
+ * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot
+ * use it until we're finished with calling fmtId().
+ */
+const char *
+fmtQualifiedId(int remoteVersion, const char *schema, const char *id)
+{
+ PQExpBuffer id_return;
+ PQExpBuffer lcl_pqexp = createPQExpBuffer();
+
+ /* Suppress schema name if fetching from pre-7.3 DB */
+ if (remoteVersion >= 70300 && schema && *schema)
+ {
+ appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema));
+ }
+ appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id));
+
+ id_return = getThreadLocalPQExpBuffer();
+
+ appendPQExpBuffer(id_return, "%s", lcl_pqexp->data);
+ destroyPQExpBuffer(lcl_pqexp);
+
+ return id_return->data;
+}
/*
* Convert a string value to an SQL string literal and append it to
@@ -1315,7 +1375,7 @@ exit_horribly(const char *modulename, const char *fmt,...)
va_list ap;
va_start(ap, fmt);
- vwrite_msg(modulename, fmt, ap);
+ on_exit_msg_func(modulename, fmt, ap);
va_end(ap);
exit_nicely(1);