aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_backup_archiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c139
1 files changed, 133 insertions, 6 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index d6e15e25a19..c7a6c918a65 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX)
/* Otherwise, drop anything that's selected and has a dropStmt */
if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
{
+ bool not_allowed_in_txn = false;
+
pg_log_info("dropping %s %s", te->desc, te->tag);
+
+ /*
+ * In --transaction-size mode, we have to temporarily exit our
+ * transaction block to drop objects that can't be dropped
+ * within a transaction.
+ */
+ if (ropt->txn_size > 0)
+ {
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ not_allowed_in_txn = true;
+ if (AH->connection)
+ CommitTransaction(AHX);
+ else
+ ahprintf(AH, "COMMIT;\n");
+ }
+ }
+
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace);
@@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX)
}
}
}
+
+ /*
+ * In --transaction-size mode, re-establish the transaction
+ * block if needed; otherwise, commit after every N drops.
+ */
+ if (ropt->txn_size > 0)
+ {
+ if (not_allowed_in_txn)
+ {
+ if (AH->connection)
+ StartTransaction(AHX);
+ else
+ ahprintf(AH, "BEGIN;\n");
+ AH->txnCount = 0;
+ }
+ else if (++AH->txnCount >= ropt->txn_size)
+ {
+ if (AH->connection)
+ {
+ CommitTransaction(AHX);
+ StartTransaction(AHX);
+ }
+ else
+ ahprintf(AH, "COMMIT;\nBEGIN;\n");
+ AH->txnCount = 0;
+ }
+ }
}
}
@@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX)
}
}
- if (ropt->single_txn)
+ /*
+ * Close out any persistent transaction we may have. While these two
+ * cases are started in different places, we can end both cases here.
+ */
+ if (ropt->single_txn || ropt->txn_size > 0)
{
if (AH->connection)
CommitTransaction(AHX);
@@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
*/
if ((reqs & REQ_SCHEMA) != 0)
{
+ bool object_is_db = false;
+
+ /*
+ * In --transaction-size mode, must exit our transaction block to
+ * create a database or set its properties.
+ */
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ object_is_db = true;
+ if (ropt->txn_size > 0)
+ {
+ if (AH->connection)
+ CommitTransaction(&AH->public);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+ }
+
/* Show namespace in log message if available */
if (te->namespace)
pg_log_info("creating %s \"%s.%s\"",
@@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
/*
* If we created a DB, connect to it. Also, if we changed DB
* properties, reconnect to ensure that relevant GUC settings are
- * applied to our session.
+ * applied to our session. (That also restarts the transaction block
+ * in --transaction-size mode.)
*/
- if (strcmp(te->desc, "DATABASE") == 0 ||
- strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ if (object_is_db)
{
pg_log_info("connecting to new database \"%s\"", te->tag);
_reconnectToDB(AH, te->tag);
@@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
}
}
+ /*
+ * If we emitted anything for this TOC entry, that counts as one action
+ * against the transaction-size limit. Commit if it's time to.
+ */
+ if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
+ {
+ if (++AH->txnCount >= ropt->txn_size)
+ {
+ if (AH->connection)
+ {
+ CommitTransaction(&AH->public);
+ StartTransaction(&AH->public);
+ }
+ else
+ ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+ AH->txnCount = 0;
+ }
+ }
+
if (AH->public.n_errors > 0 && status == WORKER_OK)
status = WORKER_IGNORED_ERRORS;
@@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->public.ropt;
- if (!ropt->single_txn)
+ /*
+ * LOs must be restored within a transaction block, since we need the LO
+ * handle to stay open while we write it. Establish a transaction unless
+ * there's one being used globally.
+ */
+ if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
StartTransaction(&AH->public);
@@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->public.ropt;
- if (!ropt->single_txn)
+ if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
CommitTransaction(&AH->public);
@@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH)
else
ahprintf(AH, "SET row_security = off;\n");
+ /*
+ * In --transaction-size mode, we should always be in a transaction when
+ * we begin to restore objects.
+ */
+ if (ropt && ropt->txn_size > 0)
+ {
+ if (AH->connection)
+ StartTransaction(&AH->public);
+ else
+ ahprintf(AH, "\nBEGIN;\n");
+ AH->txnCount = 0;
+ }
+
ahprintf(AH, "\n");
}
@@ -4044,6 +4152,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
}
/*
+ * In --transaction-size mode, we must commit the open transaction before
+ * dropping the database connection. This also ensures that child workers
+ * can see the objects we've created so far.
+ */
+ if (AH->public.ropt->txn_size > 0)
+ CommitTransaction(&AH->public);
+
+ /*
* Now close parent connection in prep for parallel steps. We do this
* mainly to ensure that we don't exceed the specified number of parallel
* connections.
@@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH)
clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
memcpy(clone, AH, sizeof(ArchiveHandle));
+ /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
+ clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
+ memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
+
/* Handle format-independent fields */
memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
@@ -4804,6 +4924,13 @@ CloneArchive(ArchiveHandle *AH)
clone->lo_buf = NULL;
/*
+ * Clone connections disregard --transaction-size; they must commit after
+ * each command so that the results are immediately visible to other
+ * workers.
+ */
+ clone->public.ropt->txn_size = 0;
+
+ /*
* Connect our new clone object to the database, using the same connection
* parameters used for the original connection.
*/