diff options
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 139 |
1 files changed, 133 insertions, 6 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index d6e15e25a19..c7a6c918a65 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX) /* Otherwise, drop anything that's selected and has a dropStmt */ if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt) { + bool not_allowed_in_txn = false; + pg_log_info("dropping %s %s", te->desc, te->tag); + + /* + * In --transaction-size mode, we have to temporarily exit our + * transaction block to drop objects that can't be dropped + * within a transaction. + */ + if (ropt->txn_size > 0) + { + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + not_allowed_in_txn = true; + if (AH->connection) + CommitTransaction(AHX); + else + ahprintf(AH, "COMMIT;\n"); + } + } + /* Select owner and schema as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); @@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX) } } } + + /* + * In --transaction-size mode, re-establish the transaction + * block if needed; otherwise, commit after every N drops. + */ + if (ropt->txn_size > 0) + { + if (not_allowed_in_txn) + { + if (AH->connection) + StartTransaction(AHX); + else + ahprintf(AH, "BEGIN;\n"); + AH->txnCount = 0; + } + else if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(AHX); + StartTransaction(AHX); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n"); + AH->txnCount = 0; + } + } } } @@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX) } } - if (ropt->single_txn) + /* + * Close out any persistent transaction we may have. While these two + * cases are started in different places, we can end both cases here. + */ + if (ropt->single_txn || ropt->txn_size > 0) { if (AH->connection) CommitTransaction(AHX); @@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) */ if ((reqs & REQ_SCHEMA) != 0) { + bool object_is_db = false; + + /* + * In --transaction-size mode, must exit our transaction block to + * create a database or set its properties. + */ + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + object_is_db = true; + if (ropt->txn_size > 0) + { + if (AH->connection) + CommitTransaction(&AH->public); + else + ahprintf(AH, "COMMIT;\n\n"); + } + } + /* Show namespace in log message if available */ if (te->namespace) pg_log_info("creating %s \"%s.%s\"", @@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) /* * If we created a DB, connect to it. Also, if we changed DB * properties, reconnect to ensure that relevant GUC settings are - * applied to our session. + * applied to our session. (That also restarts the transaction block + * in --transaction-size mode.) */ - if (strcmp(te->desc, "DATABASE") == 0 || - strcmp(te->desc, "DATABASE PROPERTIES") == 0) + if (object_is_db) { pg_log_info("connecting to new database \"%s\"", te->tag); _reconnectToDB(AH, te->tag); @@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) } } + /* + * If we emitted anything for this TOC entry, that counts as one action + * against the transaction-size limit. Commit if it's time to. + */ + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + if (AH->public.n_errors > 0 && status == WORKER_OK) status = WORKER_IGNORED_ERRORS; @@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + /* + * LOs must be restored within a transaction block, since we need the LO + * handle to stay open while we write it. Establish a transaction unless + * there's one being used globally. + */ + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) StartTransaction(&AH->public); @@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) CommitTransaction(&AH->public); @@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH) else ahprintf(AH, "SET row_security = off;\n"); + /* + * In --transaction-size mode, we should always be in a transaction when + * we begin to restore objects. + */ + if (ropt && ropt->txn_size > 0) + { + if (AH->connection) + StartTransaction(&AH->public); + else + ahprintf(AH, "\nBEGIN;\n"); + AH->txnCount = 0; + } + ahprintf(AH, "\n"); } @@ -4044,6 +4152,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) } /* + * In --transaction-size mode, we must commit the open transaction before + * dropping the database connection. This also ensures that child workers + * can see the objects we've created so far. + */ + if (AH->public.ropt->txn_size > 0) + CommitTransaction(&AH->public); + + /* * Now close parent connection in prep for parallel steps. We do this * mainly to ensure that we don't exceed the specified number of parallel * connections. @@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH) clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle)); memcpy(clone, AH, sizeof(ArchiveHandle)); + /* Likewise flat-copy the RestoreOptions, so we can alter them locally */ + clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions)); + memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions)); + /* Handle format-independent fields */ memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse)); @@ -4804,6 +4924,13 @@ CloneArchive(ArchiveHandle *AH) clone->lo_buf = NULL; /* + * Clone connections disregard --transaction-size; they must commit after + * each command so that the results are immediately visible to other + * workers. + */ + clone->public.ropt->txn_size = 0; + + /* * Connect our new clone object to the database, using the same connection * parameters used for the original connection. */ |