diff options
-rw-r--r-- | doc/src/sgml/ref/pg_restore.sgml | 24 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup.h | 4 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 139 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.h | 3 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup_db.c | 18 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_restore.c | 15 | ||||
-rw-r--r-- | src/bin/pg_upgrade/pg_upgrade.c | 25 |
7 files changed, 220 insertions, 8 deletions
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml index 1a23874da68..2e3ba802581 100644 --- a/doc/src/sgml/ref/pg_restore.sgml +++ b/doc/src/sgml/ref/pg_restore.sgml @@ -787,6 +787,30 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>--transaction-size=<replaceable class="parameter">N</replaceable></option></term> + <listitem> + <para> + Execute the restore as a series of transactions, each processing + up to <replaceable class="parameter">N</replaceable> database + objects. This option implies <option>--exit-on-error</option>. + </para> + <para> + <option>--transaction-size</option> offers an intermediate choice + between the default behavior (one transaction per SQL command) + and <option>-1</option>/<option>--single-transaction</option> + (one transaction for all restored objects). + While <option>--single-transaction</option> has the least + overhead, it may be impractical for large databases because the + transaction will take a lock on each restored object, possibly + exhausting the server's lock table space. + Using <option>--transaction-size</option> with a size of a few + thousand objects offers nearly the same performance benefits while + capping the amount of lock table space needed. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--use-set-session-authorization</option></term> <listitem> <para> diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 9ef2f2017ef..fbf5f1c515e 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -149,7 +149,9 @@ typedef struct _restoreOptions * compression */ int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ - bool single_txn; + + bool single_txn; /* restore all TOCs in one transaction */ + int txn_size; /* restore this many TOCs per txn, if > 0 */ bool *idWanted; /* array showing which dump IDs to emit */ int enable_row_security; diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index d6e15e25a19..c7a6c918a65 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX) /* Otherwise, drop anything that's selected and has a dropStmt */ if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt) { + bool not_allowed_in_txn = false; + pg_log_info("dropping %s %s", te->desc, te->tag); + + /* + * In --transaction-size mode, we have to temporarily exit our + * transaction block to drop objects that can't be dropped + * within a transaction. + */ + if (ropt->txn_size > 0) + { + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + not_allowed_in_txn = true; + if (AH->connection) + CommitTransaction(AHX); + else + ahprintf(AH, "COMMIT;\n"); + } + } + /* Select owner and schema as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); @@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX) } } } + + /* + * In --transaction-size mode, re-establish the transaction + * block if needed; otherwise, commit after every N drops. + */ + if (ropt->txn_size > 0) + { + if (not_allowed_in_txn) + { + if (AH->connection) + StartTransaction(AHX); + else + ahprintf(AH, "BEGIN;\n"); + AH->txnCount = 0; + } + else if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(AHX); + StartTransaction(AHX); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n"); + AH->txnCount = 0; + } + } } } @@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX) } } - if (ropt->single_txn) + /* + * Close out any persistent transaction we may have. While these two + * cases are started in different places, we can end both cases here. + */ + if (ropt->single_txn || ropt->txn_size > 0) { if (AH->connection) CommitTransaction(AHX); @@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) */ if ((reqs & REQ_SCHEMA) != 0) { + bool object_is_db = false; + + /* + * In --transaction-size mode, must exit our transaction block to + * create a database or set its properties. + */ + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + object_is_db = true; + if (ropt->txn_size > 0) + { + if (AH->connection) + CommitTransaction(&AH->public); + else + ahprintf(AH, "COMMIT;\n\n"); + } + } + /* Show namespace in log message if available */ if (te->namespace) pg_log_info("creating %s \"%s.%s\"", @@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) /* * If we created a DB, connect to it. Also, if we changed DB * properties, reconnect to ensure that relevant GUC settings are - * applied to our session. + * applied to our session. (That also restarts the transaction block + * in --transaction-size mode.) */ - if (strcmp(te->desc, "DATABASE") == 0 || - strcmp(te->desc, "DATABASE PROPERTIES") == 0) + if (object_is_db) { pg_log_info("connecting to new database \"%s\"", te->tag); _reconnectToDB(AH, te->tag); @@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) } } + /* + * If we emitted anything for this TOC entry, that counts as one action + * against the transaction-size limit. Commit if it's time to. + */ + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + if (AH->public.n_errors > 0 && status == WORKER_OK) status = WORKER_IGNORED_ERRORS; @@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + /* + * LOs must be restored within a transaction block, since we need the LO + * handle to stay open while we write it. Establish a transaction unless + * there's one being used globally. + */ + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) StartTransaction(&AH->public); @@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) CommitTransaction(&AH->public); @@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH) else ahprintf(AH, "SET row_security = off;\n"); + /* + * In --transaction-size mode, we should always be in a transaction when + * we begin to restore objects. + */ + if (ropt && ropt->txn_size > 0) + { + if (AH->connection) + StartTransaction(&AH->public); + else + ahprintf(AH, "\nBEGIN;\n"); + AH->txnCount = 0; + } + ahprintf(AH, "\n"); } @@ -4044,6 +4152,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) } /* + * In --transaction-size mode, we must commit the open transaction before + * dropping the database connection. This also ensures that child workers + * can see the objects we've created so far. + */ + if (AH->public.ropt->txn_size > 0) + CommitTransaction(&AH->public); + + /* * Now close parent connection in prep for parallel steps. We do this * mainly to ensure that we don't exceed the specified number of parallel * connections. @@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH) clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle)); memcpy(clone, AH, sizeof(ArchiveHandle)); + /* Likewise flat-copy the RestoreOptions, so we can alter them locally */ + clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions)); + memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions)); + /* Handle format-independent fields */ memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse)); @@ -4804,6 +4924,13 @@ CloneArchive(ArchiveHandle *AH) clone->lo_buf = NULL; /* + * Clone connections disregard --transaction-size; they must commit after + * each command so that the results are immediately visible to other + * workers. + */ + clone->public.ropt->txn_size = 0; + + /* * Connect our new clone object to the database, using the same connection * parameters used for the original connection. */ diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 14aeb29dca5..d6104a71961 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -324,6 +324,9 @@ struct _archiveHandle char *currTablespace; /* current tablespace, or NULL */ char *currTableAm; /* current table access method, or NULL */ + /* in --transaction-size mode, this counts objects emitted in cur xact */ + int txnCount; + void *lo_buf; size_t lo_buf_used; size_t lo_buf_size; diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index f9683fb0c53..a02841c4050 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, { /* Make a writable copy of the command string */ char *buf = pg_strdup(te->defn); + RestoreOptions *ropt = AH->public.ropt; char *st; char *en; @@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, { *en++ = '\0'; ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd); + + /* In --transaction-size mode, count each command as an action */ + if (ropt && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + st = en; } ahprintf(AH, "\n"); diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index c3beacdec1d..5ea78cf7cc0 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -120,6 +120,7 @@ main(int argc, char **argv) {"role", required_argument, NULL, 2}, {"section", required_argument, NULL, 3}, {"strict-names", no_argument, &strict_names, 1}, + {"transaction-size", required_argument, NULL, 5}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {"no-comments", no_argument, &no_comments, 1}, {"no-publications", no_argument, &no_publications, 1}, @@ -289,10 +290,18 @@ main(int argc, char **argv) set_dump_section(optarg, &(opts->dumpSections)); break; - case 4: + case 4: /* filter */ read_restore_filters(optarg, opts); break; + case 5: /* transaction-size */ + if (!option_parse_int(optarg, "--transaction-size", + 1, INT_MAX, + &opts->txn_size)) + exit(1); + opts->exit_on_error = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -337,6 +346,9 @@ main(int argc, char **argv) if (opts->dataOnly && opts->dropSchema) pg_fatal("options -c/--clean and -a/--data-only cannot be used together"); + if (opts->single_txn && opts->txn_size > 0) + pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together"); + /* * -C is not compatible with -1, because we can't create a database inside * a transaction block. @@ -484,6 +496,7 @@ usage(const char *progname) printf(_(" --section=SECTION restore named section (pre-data, data, or post-data)\n")); printf(_(" --strict-names require table and/or schema include patterns to\n" " match at least one entity each\n")); + printf(_(" --transaction-size=N commit after every N objects\n")); printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index f6143b6bc47..af370768b60 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -51,6 +51,13 @@ #include "fe_utils/string_utils.h" #include "pg_upgrade.h" +/* + * Maximum number of pg_restore actions (TOC entries) to process within one + * transaction. At some point we might want to make this user-controllable, + * but for now a hard-wired setting will suffice. + */ +#define RESTORE_TRANSACTION_SIZE 1000 + static void set_locale_and_encoding(void); static void prepare_new_cluster(void); static void prepare_new_globals(void); @@ -562,10 +569,12 @@ create_new_objects(void) true, true, "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--transaction-size=%d " "--dbname postgres \"%s/%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), create_opts, + RESTORE_TRANSACTION_SIZE, log_opts.dumpdir, sql_file_name); @@ -578,6 +587,7 @@ create_new_objects(void) log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; const char *create_opts; + int txn_size; /* Skip template1 in this pass */ if (strcmp(old_db->db_name, "template1") == 0) @@ -597,13 +607,28 @@ create_new_objects(void) else create_opts = "--create"; + /* + * In parallel mode, reduce the --transaction-size of each restore job + * so that the total number of locks that could be held across all the + * jobs stays in bounds. + */ + txn_size = RESTORE_TRANSACTION_SIZE; + if (user_opts.jobs > 1) + { + txn_size /= user_opts.jobs; + /* Keep some sanity if -j is huge */ + txn_size = Max(txn_size, 10); + } + parallel_exec_prog(log_file_name, NULL, "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--transaction-size=%d " "--dbname template1 \"%s/%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), create_opts, + txn_size, log_opts.dumpdir, sql_file_name); } |