aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_backup_archiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c356
1 files changed, 268 insertions, 88 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 36e3383b851..3f7a658bcec 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -49,6 +49,24 @@ typedef struct _outputContext
int gzOut;
} OutputContext;
+/*
+ * State for tracking TocEntrys that are ready to process during a parallel
+ * restore. (This used to be a list, and we still call it that, though now
+ * it's really an array so that we can apply qsort to it.)
+ *
+ * tes[] is sized large enough that we can't overrun it.
+ * The valid entries are indexed first_te .. last_te inclusive.
+ * We periodically sort the array to bring larger-by-dataLength entries to
+ * the front; "sorted" is true if the valid entries are known sorted.
+ */
+typedef struct _parallelReadyList
+{
+ TocEntry **tes; /* Ready-to-dump TocEntrys */
+ int first_te; /* index of first valid entry in tes[] */
+ int last_te; /* index of last valid entry in tes[] */
+ bool sorted; /* are valid entries currently sorted? */
+} ParallelReadyList;
+
/* translator: this is a module name */
static const char *modulename = gettext_noop("archiver");
@@ -95,13 +113,20 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH,
TocEntry *pending_list);
static void restore_toc_entries_postfork(ArchiveHandle *AH,
TocEntry *pending_list);
-static void par_list_header_init(TocEntry *l);
-static void par_list_append(TocEntry *l, TocEntry *te);
-static void par_list_remove(TocEntry *te);
-static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+static void pending_list_header_init(TocEntry *l);
+static void pending_list_append(TocEntry *l, TocEntry *te);
+static void pending_list_remove(TocEntry *te);
+static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
+static void ready_list_free(ParallelReadyList *ready_list);
+static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
+static void ready_list_remove(ParallelReadyList *ready_list, int i);
+static void ready_list_sort(ParallelReadyList *ready_list);
+static int TocEntrySizeCompare(const void *p1, const void *p2);
+static void move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
RestorePass pass);
-static TocEntry *get_next_work_item(ArchiveHandle *AH,
- TocEntry *ready_list,
+static TocEntry *pop_next_work_item(ArchiveHandle *AH,
+ ParallelReadyList *ready_list,
ParallelState *pstate);
static void mark_dump_job_done(ArchiveHandle *AH,
TocEntry *te,
@@ -116,7 +141,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH);
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
- TocEntry *ready_list);
+ ParallelReadyList *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
@@ -639,7 +664,11 @@ RestoreArchive(Archive *AHX)
ParallelState *pstate;
TocEntry pending_list;
- par_list_header_init(&pending_list);
+ /* The archive format module may need some setup for this */
+ if (AH->PrepParallelRestorePtr)
+ AH->PrepParallelRestorePtr(AH);
+
+ pending_list_header_init(&pending_list);
/* This runs PRE_DATA items and then disconnects from the database */
restore_toc_entries_prefork(AH, &pending_list);
@@ -1039,10 +1068,14 @@ WriteData(Archive *AHX, const void *data, size_t dLen)
/*
* Create a new TOC entry. The TOC was designed as a TOC, but is now the
* repository for all metadata. But the name has stuck.
+ *
+ * The new entry is added to the Archive's TOC list. Most callers can ignore
+ * the result value because nothing else need be done, but a few want to
+ * manipulate the TOC entry further.
*/
/* Public */
-void
+TocEntry *
ArchiveEntry(Archive *AHX,
CatalogId catalogId, DumpId dumpId,
const char *tag,
@@ -1100,9 +1133,12 @@ ArchiveEntry(Archive *AHX,
newToc->hadDumper = dumpFn ? true : false;
newToc->formatData = NULL;
+ newToc->dataLength = 0;
if (AH->ArchiveEntryPtr != NULL)
AH->ArchiveEntryPtr(AH, newToc);
+
+ return newToc;
}
/* Public */
@@ -2413,32 +2449,59 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
{
TocEntry *te;
- for (te = AH->toc->next; te != AH->toc; te = te->next)
+ if (pstate && pstate->numWorkers > 1)
{
- if (!te->dataDumper)
- continue;
-
- if ((te->reqs & REQ_DATA) == 0)
- continue;
+ /*
+ * In parallel mode, this code runs in the master process. We
+ * construct an array of candidate TEs, then sort it into decreasing
+ * size order, then dispatch each TE to a data-transfer worker. By
+ * dumping larger tables first, we avoid getting into a situation
+ * where we're down to one job and it's big, losing parallelism.
+ */
+ TocEntry **tes;
+ int ntes;
- if (pstate && pstate->numWorkers > 1)
+ tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
+ ntes = 0;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
{
- /*
- * If we are in a parallel backup, then we are always the master
- * process. Dispatch each data-transfer job to a worker.
- */
- DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
- mark_dump_job_done, NULL);
+ /* Consider only TEs with dataDumper functions ... */
+ if (!te->dataDumper)
+ continue;
+ /* ... and ignore ones not enabled for dump */
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ tes[ntes++] = te;
}
- else
- WriteDataChunksForTocEntry(AH, te);
- }
- /*
- * If parallel, wait for workers to finish.
- */
- if (pstate && pstate->numWorkers > 1)
+ if (ntes > 1)
+ qsort((void *) tes, ntes, sizeof(TocEntry *),
+ TocEntrySizeCompare);
+
+ for (int i = 0; i < ntes; i++)
+ DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
+ mark_dump_job_done, NULL);
+
+ pg_free(tes);
+
+ /* Now wait for workers to finish. */
WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
+ }
+ else
+ {
+ /* Non-parallel mode: just dump all candidate TEs sequentially. */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /* Must have same filter conditions as above */
+ if (!te->dataDumper)
+ continue;
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ WriteDataChunksForTocEntry(AH, te);
+ }
+ }
}
@@ -2690,6 +2753,7 @@ ReadToc(ArchiveHandle *AH)
te->dependencies = NULL;
te->nDeps = 0;
}
+ te->dataLength = 0;
if (AH->ReadExtraTocPtr)
AH->ReadExtraTocPtr(AH, te);
@@ -3996,7 +4060,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
else
{
/* Nope, so add it to pending_list */
- par_list_append(pending_list, next_work_item);
+ pending_list_append(pending_list, next_work_item);
}
}
@@ -4035,11 +4099,14 @@ static void
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
TocEntry *pending_list)
{
- TocEntry ready_list;
+ ParallelReadyList ready_list;
TocEntry *next_work_item;
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
+ /* Set up ready_list with enough room for all known TocEntrys */
+ ready_list_init(&ready_list, AH->tocCount);
+
/*
* The pending_list contains all items that we need to restore. Move all
* items that are available to process immediately into the ready_list.
@@ -4048,7 +4115,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
* contains items that have no remaining dependencies and are OK to
* process in the current restore pass.
*/
- par_list_header_init(&ready_list);
AH->restorePass = RESTORE_PASS_MAIN;
move_to_ready_list(pending_list, &ready_list, AH->restorePass);
@@ -4064,7 +4130,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
for (;;)
{
/* Look for an item ready to be dispatched to a worker */
- next_work_item = get_next_work_item(AH, &ready_list, pstate);
+ next_work_item = pop_next_work_item(AH, &ready_list, pstate);
if (next_work_item != NULL)
{
/* If not to be restored, don't waste time launching a worker */
@@ -4073,8 +4139,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
ahlog(AH, 1, "skipping item %d %s %s\n",
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- /* Drop it from ready_list, and update its dependencies */
- par_list_remove(next_work_item);
+ /* Update its dependencies as though we'd completed it */
reduce_dependencies(AH, next_work_item, &ready_list);
/* Loop around to see if anything else can be dispatched */
continue;
@@ -4084,9 +4149,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- /* Remove it from ready_list, and dispatch to some worker */
- par_list_remove(next_work_item);
-
+ /* Dispatch to some worker */
DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
mark_restore_job_done, &ready_list);
}
@@ -4132,7 +4195,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
}
/* There should now be nothing in ready_list. */
- Assert(ready_list.par_next == &ready_list);
+ Assert(ready_list.first_te > ready_list.last_te);
+
+ ready_list_free(&ready_list);
ahlog(AH, 1, "finished main parallel loop\n");
}
@@ -4170,7 +4235,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
* connection. We don't sweat about RestorePass ordering; it's likely we
* already violated that.
*/
- for (te = pending_list->par_next; te != pending_list; te = te->par_next)
+ for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
{
ahlog(AH, 1, "processing missed item %d %s %s\n",
te->dumpId, te->desc, te->tag);
@@ -4201,36 +4266,130 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
/*
- * Initialize the header of a parallel-processing list.
+ * Initialize the header of the pending-items list.
*
- * These are circular lists with a dummy TocEntry as header, just like the
+ * This is a circular list with a dummy TocEntry as header, just like the
* main TOC list; but we use separate list links so that an entry can be in
- * the main TOC list as well as in a parallel-processing list.
+ * the main TOC list as well as in the pending list.
+ */
+static void
+pending_list_header_init(TocEntry *l)
+{
+ l->pending_prev = l->pending_next = l;
+}
+
+/* Append te to the end of the pending-list headed by l */
+static void
+pending_list_append(TocEntry *l, TocEntry *te)
+{
+ te->pending_prev = l->pending_prev;
+ l->pending_prev->pending_next = te;
+ l->pending_prev = te;
+ te->pending_next = l;
+}
+
+/* Remove te from the pending-list */
+static void
+pending_list_remove(TocEntry *te)
+{
+ te->pending_prev->pending_next = te->pending_next;
+ te->pending_next->pending_prev = te->pending_prev;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
+}
+
+
+/*
+ * Initialize the ready_list with enough room for up to tocCount entries.
*/
static void
-par_list_header_init(TocEntry *l)
+ready_list_init(ParallelReadyList *ready_list, int tocCount)
{
- l->par_prev = l->par_next = l;
+ ready_list->tes = (TocEntry **)
+ pg_malloc(tocCount * sizeof(TocEntry *));
+ ready_list->first_te = 0;
+ ready_list->last_te = -1;
+ ready_list->sorted = false;
}
-/* Append te to the end of the parallel-processing list headed by l */
+/*
+ * Free storage for a ready_list.
+ */
+static void
+ready_list_free(ParallelReadyList *ready_list)
+{
+ pg_free(ready_list->tes);
+}
+
+/* Add te to the ready_list */
static void
-par_list_append(TocEntry *l, TocEntry *te)
+ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
{
- te->par_prev = l->par_prev;
- l->par_prev->par_next = te;
- l->par_prev = te;
- te->par_next = l;
+ ready_list->tes[++ready_list->last_te] = te;
+ /* List is (probably) not sorted anymore. */
+ ready_list->sorted = false;
+}
+
+/* Remove the i'th entry in the ready_list */
+static void
+ready_list_remove(ParallelReadyList *ready_list, int i)
+{
+ int f = ready_list->first_te;
+
+ Assert(i >= f && i <= ready_list->last_te);
+
+ /*
+ * In the typical case where the item to be removed is the first ready
+ * entry, we need only increment first_te to remove it. Otherwise, move
+ * the entries before it to compact the list. (This preserves sortedness,
+ * if any.) We could alternatively move the entries after i, but there
+ * are typically many more of those.
+ */
+ if (i > f)
+ {
+ TocEntry **first_te_ptr = &ready_list->tes[f];
+
+ memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
+ }
+ ready_list->first_te++;
}
-/* Remove te from whatever parallel-processing list it's in */
+/* Sort the ready_list into the desired order */
static void
-par_list_remove(TocEntry *te)
+ready_list_sort(ParallelReadyList *ready_list)
{
- te->par_prev->par_next = te->par_next;
- te->par_next->par_prev = te->par_prev;
- te->par_prev = NULL;
- te->par_next = NULL;
+ if (!ready_list->sorted)
+ {
+ int n = ready_list->last_te - ready_list->first_te + 1;
+
+ if (n > 1)
+ qsort(ready_list->tes + ready_list->first_te, n,
+ sizeof(TocEntry *),
+ TocEntrySizeCompare);
+ ready_list->sorted = true;
+ }
+}
+
+/* qsort comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompare(const void *p1, const void *p2)
+{
+ const TocEntry *te1 = *(const TocEntry *const *) p1;
+ const TocEntry *te2 = *(const TocEntry *const *) p2;
+
+ /* Sort by decreasing dataLength */
+ if (te1->dataLength > te2->dataLength)
+ return -1;
+ if (te1->dataLength < te2->dataLength)
+ return 1;
+
+ /* For equal dataLengths, sort by dumpId, just to be stable */
+ if (te1->dumpId < te2->dumpId)
+ return -1;
+ if (te1->dumpId > te2->dumpId)
+ return 1;
+
+ return 0;
}
@@ -4242,52 +4401,50 @@ par_list_remove(TocEntry *te)
* which applies the same logic one-at-a-time.)
*/
static void
-move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
RestorePass pass)
{
TocEntry *te;
TocEntry *next_te;
- for (te = pending_list->par_next; te != pending_list; te = next_te)
+ for (te = pending_list->pending_next; te != pending_list; te = next_te)
{
- /* must save list link before possibly moving te to other list */
- next_te = te->par_next;
+ /* must save list link before possibly removing te from list */
+ next_te = te->pending_next;
if (te->depCount == 0 &&
_tocEntryRestorePass(te) == pass)
{
/* Remove it from pending_list ... */
- par_list_remove(te);
+ pending_list_remove(te);
/* ... and add to ready_list */
- par_list_append(ready_list, te);
+ ready_list_insert(ready_list, te);
}
}
}
/*
- * Find the next work item (if any) that is capable of being run now.
+ * Find the next work item (if any) that is capable of being run now,
+ * and remove it from the ready_list.
+ *
+ * Returns the item, or NULL if nothing is runnable.
*
* To qualify, the item must have no remaining dependencies
* and no requirements for locks that are incompatible with
* items currently running. Items in the ready_list are known to have
* no remaining dependencies, but we have to check for lock conflicts.
*
- * Note that the returned item has *not* been removed from ready_list.
- * The caller must do that after successfully dispatching the item.
- *
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
+pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
ParallelState *pstate)
{
bool pref_non_data = false; /* or get from AH->ropt */
- TocEntry *data_te = NULL;
- TocEntry *te;
- int i,
- k;
+ int data_te_index = -1;
/*
* Bogus heuristics for pref_non_data
@@ -4296,7 +4453,7 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
{
int count = 0;
- for (k = 0; k < pstate->numWorkers; k++)
+ for (int k = 0; k < pstate->numWorkers; k++)
{
TocEntry *running_te = pstate->te[k];
@@ -4309,10 +4466,16 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
}
/*
+ * Sort the ready_list so that we'll tackle larger jobs first.
+ */
+ ready_list_sort(ready_list);
+
+ /*
* Search the ready_list until we find a suitable item.
*/
- for (te = ready_list->par_next; te != ready_list; te = te->par_next)
+ for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
{
+ TocEntry *te = ready_list->tes[i];
bool conflicts = false;
/*
@@ -4320,9 +4483,9 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
- for (i = 0; i < pstate->numWorkers; i++)
+ for (int k = 0; k < pstate->numWorkers; k++)
{
- TocEntry *running_te = pstate->te[i];
+ TocEntry *running_te = pstate->te[k];
if (running_te == NULL)
continue;
@@ -4339,17 +4502,23 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
if (pref_non_data && te->section == SECTION_DATA)
{
- if (data_te == NULL)
- data_te = te;
+ if (data_te_index < 0)
+ data_te_index = i;
continue;
}
/* passed all tests, so this item can run */
+ ready_list_remove(ready_list, i);
return te;
}
- if (data_te != NULL)
+ if (data_te_index >= 0)
+ {
+ TocEntry *data_te = ready_list->tes[data_te_index];
+
+ ready_list_remove(ready_list, data_te_index);
return data_te;
+ }
ahlog(AH, 2, "no item ready\n");
return NULL;
@@ -4393,7 +4562,7 @@ mark_restore_job_done(ArchiveHandle *AH,
int status,
void *callback_data)
{
- TocEntry *ready_list = (TocEntry *) callback_data;
+ ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
ahlog(AH, 1, "finished item %d %s %s\n",
te->dumpId, te->desc, te->tag);
@@ -4443,8 +4612,8 @@ fix_dependencies(ArchiveHandle *AH)
te->depCount = te->nDeps;
te->revDeps = NULL;
te->nRevDeps = 0;
- te->par_prev = NULL;
- te->par_next = NULL;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
}
/*
@@ -4551,6 +4720,12 @@ fix_dependencies(ArchiveHandle *AH)
/*
* Change dependencies on table items to depend on table data items instead,
* but only in POST_DATA items.
+ *
+ * Also, for any item having such dependency(s), set its dataLength to the
+ * largest dataLength of the table data items it depends on. This ensures
+ * that parallel restore will prioritize larger jobs (index builds, FK
+ * constraint checks, etc) over smaller ones, avoiding situations where we
+ * end a restore with only one active job working on a large table.
*/
static void
repoint_table_dependencies(ArchiveHandle *AH)
@@ -4569,9 +4744,13 @@ repoint_table_dependencies(ArchiveHandle *AH)
if (olddep <= AH->maxDumpId &&
AH->tableDataId[olddep] != 0)
{
- te->dependencies[i] = AH->tableDataId[olddep];
+ DumpId tabledataid = AH->tableDataId[olddep];
+ TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
+
+ te->dependencies[i] = tabledataid;
+ te->dataLength = Max(te->dataLength, tabledatate->dataLength);
ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
- te->dumpId, olddep, AH->tableDataId[olddep]);
+ te->dumpId, olddep, tabledataid);
}
}
}
@@ -4647,7 +4826,8 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
* becomes ready should be moved to the ready_list, if that's provided.
*/
static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ ParallelReadyList *ready_list)
{
int i;
@@ -4670,13 +4850,13 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
*/
if (otherte->depCount == 0 &&
_tocEntryRestorePass(otherte) == AH->restorePass &&
- otherte->par_prev != NULL &&
+ otherte->pending_prev != NULL &&
ready_list != NULL)
{
/* Remove it from pending list ... */
- par_list_remove(otherte);
+ pending_list_remove(otherte);
/* ... and add to ready_list */
- par_list_append(ready_list, otherte);
+ ready_list_insert(ready_list, otherte);
}
}
}