diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2018-09-14 17:31:51 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2018-09-14 17:31:51 -0400 |
commit | 548e50976ce721b5e927d42a105c2f05b51b52a6 (patch) | |
tree | 9d492f63d5f8715d72c903448467623fb1dd737a /src/bin/pg_dump/pg_backup_archiver.c | |
parent | 20bef2c3110af295501919bac463b87ac58876de (diff) | |
download | postgresql-548e50976ce721b5e927d42a105c2f05b51b52a6.tar.gz postgresql-548e50976ce721b5e927d42a105c2f05b51b52a6.zip |
Improve parallel scheduling logic in pg_dump/pg_restore.
Previously, the way this worked was that a parallel pg_dump would
re-order the TABLE_DATA items in the dump's TOC into decreasing size
order, and separately re-order (some of) the INDEX items into decreasing
size order. Then pg_dump would dump the items in that order. Later,
parallel pg_restore just followed the TOC order. This method had lots
of deficiencies:
* TOC ordering randomly differed between parallel and non-parallel
dumps, and was hard to predict in the former case, causing problems
for building stable pg_dump test cases.
* Parallel restore only followed a well-chosen order if the dump had
been done in parallel; in particular, this never happened for restore
from custom-format dumps.
* The best order for restore isn't necessarily the same as for dump,
and it's not really static either because of locking considerations.
* TABLE_DATA and INDEX items aren't the only things that might take a lot
of work during restore. Scheduling was particularly stupid for the BLOBS
item, which might require lots of work during dump as well as restore,
but was left to the end in either case.
This patch removes the logic that changed the TOC order, fixing the
test instability problem. Instead, we sort the parallelizable items
just before processing them during a parallel dump. Independently
of that, parallel restore prioritizes the ready-to-execute tasks
based on the size of the underlying table. In the case of dependent
tasks such as index, constraint, or foreign key creation, the largest
relevant table is used as the metric for estimating the task length.
(This is pretty crude, but it should be enough to avoid the case we
want to avoid, which is ending the run with just a few large tasks
such that we can't make use of all N workers.)
Patch by me, responding to a complaint from Peter Eisentraut,
who also reviewed the patch.
Discussion: https://postgr.es/m/5137fe12-d0a2-4971-61b6-eb4e7e8875f8@2ndquadrant.com
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 356 |
1 files changed, 268 insertions, 88 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 36e3383b851..3f7a658bcec 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -49,6 +49,24 @@ typedef struct _outputContext int gzOut; } OutputContext; +/* + * State for tracking TocEntrys that are ready to process during a parallel + * restore. (This used to be a list, and we still call it that, though now + * it's really an array so that we can apply qsort to it.) + * + * tes[] is sized large enough that we can't overrun it. + * The valid entries are indexed first_te .. last_te inclusive. + * We periodically sort the array to bring larger-by-dataLength entries to + * the front; "sorted" is true if the valid entries are known sorted. + */ +typedef struct _parallelReadyList +{ + TocEntry **tes; /* Ready-to-dump TocEntrys */ + int first_te; /* index of first valid entry in tes[] */ + int last_te; /* index of last valid entry in tes[] */ + bool sorted; /* are valid entries currently sorted? */ +} ParallelReadyList; + /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver"); @@ -95,13 +113,20 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, TocEntry *pending_list); static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); -static void par_list_header_init(TocEntry *l); -static void par_list_append(TocEntry *l, TocEntry *te); -static void par_list_remove(TocEntry *te); -static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +static void pending_list_header_init(TocEntry *l); +static void pending_list_append(TocEntry *l, TocEntry *te); +static void pending_list_remove(TocEntry *te); +static void ready_list_init(ParallelReadyList *ready_list, int tocCount); +static void ready_list_free(ParallelReadyList *ready_list); +static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); +static void ready_list_remove(ParallelReadyList *ready_list, int i); +static void ready_list_sort(ParallelReadyList *ready_list); +static int TocEntrySizeCompare(const void *p1, const void *p2); +static void move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass); -static TocEntry *get_next_work_item(ArchiveHandle *AH, - TocEntry *ready_list, +static TocEntry *pop_next_work_item(ArchiveHandle *AH, + ParallelReadyList *ready_list, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -116,7 +141,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - TocEntry *ready_list); + ParallelReadyList *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -639,7 +664,11 @@ RestoreArchive(Archive *AHX) ParallelState *pstate; TocEntry pending_list; - par_list_header_init(&pending_list); + /* The archive format module may need some setup for this */ + if (AH->PrepParallelRestorePtr) + AH->PrepParallelRestorePtr(AH); + + pending_list_header_init(&pending_list); /* This runs PRE_DATA items and then disconnects from the database */ restore_toc_entries_prefork(AH, &pending_list); @@ -1039,10 +1068,14 @@ WriteData(Archive *AHX, const void *data, size_t dLen) /* * Create a new TOC entry. The TOC was designed as a TOC, but is now the * repository for all metadata. But the name has stuck. + * + * The new entry is added to the Archive's TOC list. Most callers can ignore + * the result value because nothing else need be done, but a few want to + * manipulate the TOC entry further. */ /* Public */ -void +TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, const char *tag, @@ -1100,9 +1133,12 @@ ArchiveEntry(Archive *AHX, newToc->hadDumper = dumpFn ? true : false; newToc->formatData = NULL; + newToc->dataLength = 0; if (AH->ArchiveEntryPtr != NULL) AH->ArchiveEntryPtr(AH, newToc); + + return newToc; } /* Public */ @@ -2413,32 +2449,59 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { TocEntry *te; - for (te = AH->toc->next; te != AH->toc; te = te->next) + if (pstate && pstate->numWorkers > 1) { - if (!te->dataDumper) - continue; - - if ((te->reqs & REQ_DATA) == 0) - continue; + /* + * In parallel mode, this code runs in the master process. We + * construct an array of candidate TEs, then sort it into decreasing + * size order, then dispatch each TE to a data-transfer worker. By + * dumping larger tables first, we avoid getting into a situation + * where we're down to one job and it's big, losing parallelism. + */ + TocEntry **tes; + int ntes; - if (pstate && pstate->numWorkers > 1) + tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *)); + ntes = 0; + for (te = AH->toc->next; te != AH->toc; te = te->next) { - /* - * If we are in a parallel backup, then we are always the master - * process. Dispatch each data-transfer job to a worker. - */ - DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP, - mark_dump_job_done, NULL); + /* Consider only TEs with dataDumper functions ... */ + if (!te->dataDumper) + continue; + /* ... and ignore ones not enabled for dump */ + if ((te->reqs & REQ_DATA) == 0) + continue; + + tes[ntes++] = te; } - else - WriteDataChunksForTocEntry(AH, te); - } - /* - * If parallel, wait for workers to finish. - */ - if (pstate && pstate->numWorkers > 1) + if (ntes > 1) + qsort((void *) tes, ntes, sizeof(TocEntry *), + TocEntrySizeCompare); + + for (int i = 0; i < ntes; i++) + DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, + mark_dump_job_done, NULL); + + pg_free(tes); + + /* Now wait for workers to finish. */ WaitForWorkers(AH, pstate, WFW_ALL_IDLE); + } + else + { + /* Non-parallel mode: just dump all candidate TEs sequentially. */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + /* Must have same filter conditions as above */ + if (!te->dataDumper) + continue; + if ((te->reqs & REQ_DATA) == 0) + continue; + + WriteDataChunksForTocEntry(AH, te); + } + } } @@ -2690,6 +2753,7 @@ ReadToc(ArchiveHandle *AH) te->dependencies = NULL; te->nDeps = 0; } + te->dataLength = 0; if (AH->ReadExtraTocPtr) AH->ReadExtraTocPtr(AH, te); @@ -3996,7 +4060,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) else { /* Nope, so add it to pending_list */ - par_list_append(pending_list, next_work_item); + pending_list_append(pending_list, next_work_item); } } @@ -4035,11 +4099,14 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - TocEntry ready_list; + ParallelReadyList ready_list; TocEntry *next_work_item; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); + /* Set up ready_list with enough room for all known TocEntrys */ + ready_list_init(&ready_list, AH->tocCount); + /* * The pending_list contains all items that we need to restore. Move all * items that are available to process immediately into the ready_list. @@ -4048,7 +4115,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ - par_list_header_init(&ready_list); AH->restorePass = RESTORE_PASS_MAIN; move_to_ready_list(pending_list, &ready_list, AH->restorePass); @@ -4064,7 +4130,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = get_next_work_item(AH, &ready_list, pstate); + next_work_item = pop_next_work_item(AH, &ready_list, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4073,8 +4139,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, ahlog(AH, 1, "skipping item %d %s %s\n", next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Drop it from ready_list, and update its dependencies */ - par_list_remove(next_work_item); + /* Update its dependencies as though we'd completed it */ reduce_dependencies(AH, next_work_item, &ready_list); /* Loop around to see if anything else can be dispatched */ continue; @@ -4084,9 +4149,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Remove it from ready_list, and dispatch to some worker */ - par_list_remove(next_work_item); - + /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, mark_restore_job_done, &ready_list); } @@ -4132,7 +4195,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, } /* There should now be nothing in ready_list. */ - Assert(ready_list.par_next == &ready_list); + Assert(ready_list.first_te > ready_list.last_te); + + ready_list_free(&ready_list); ahlog(AH, 1, "finished main parallel loop\n"); } @@ -4170,7 +4235,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) * connection. We don't sweat about RestorePass ordering; it's likely we * already violated that. */ - for (te = pending_list->par_next; te != pending_list; te = te->par_next) + for (te = pending_list->pending_next; te != pending_list; te = te->pending_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4201,36 +4266,130 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2) /* - * Initialize the header of a parallel-processing list. + * Initialize the header of the pending-items list. * - * These are circular lists with a dummy TocEntry as header, just like the + * This is a circular list with a dummy TocEntry as header, just like the * main TOC list; but we use separate list links so that an entry can be in - * the main TOC list as well as in a parallel-processing list. + * the main TOC list as well as in the pending list. + */ +static void +pending_list_header_init(TocEntry *l) +{ + l->pending_prev = l->pending_next = l; +} + +/* Append te to the end of the pending-list headed by l */ +static void +pending_list_append(TocEntry *l, TocEntry *te) +{ + te->pending_prev = l->pending_prev; + l->pending_prev->pending_next = te; + l->pending_prev = te; + te->pending_next = l; +} + +/* Remove te from the pending-list */ +static void +pending_list_remove(TocEntry *te) +{ + te->pending_prev->pending_next = te->pending_next; + te->pending_next->pending_prev = te->pending_prev; + te->pending_prev = NULL; + te->pending_next = NULL; +} + + +/* + * Initialize the ready_list with enough room for up to tocCount entries. */ static void -par_list_header_init(TocEntry *l) +ready_list_init(ParallelReadyList *ready_list, int tocCount) { - l->par_prev = l->par_next = l; + ready_list->tes = (TocEntry **) + pg_malloc(tocCount * sizeof(TocEntry *)); + ready_list->first_te = 0; + ready_list->last_te = -1; + ready_list->sorted = false; } -/* Append te to the end of the parallel-processing list headed by l */ +/* + * Free storage for a ready_list. + */ +static void +ready_list_free(ParallelReadyList *ready_list) +{ + pg_free(ready_list->tes); +} + +/* Add te to the ready_list */ static void -par_list_append(TocEntry *l, TocEntry *te) +ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) { - te->par_prev = l->par_prev; - l->par_prev->par_next = te; - l->par_prev = te; - te->par_next = l; + ready_list->tes[++ready_list->last_te] = te; + /* List is (probably) not sorted anymore. */ + ready_list->sorted = false; +} + +/* Remove the i'th entry in the ready_list */ +static void +ready_list_remove(ParallelReadyList *ready_list, int i) +{ + int f = ready_list->first_te; + + Assert(i >= f && i <= ready_list->last_te); + + /* + * In the typical case where the item to be removed is the first ready + * entry, we need only increment first_te to remove it. Otherwise, move + * the entries before it to compact the list. (This preserves sortedness, + * if any.) We could alternatively move the entries after i, but there + * are typically many more of those. + */ + if (i > f) + { + TocEntry **first_te_ptr = &ready_list->tes[f]; + + memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); + } + ready_list->first_te++; } -/* Remove te from whatever parallel-processing list it's in */ +/* Sort the ready_list into the desired order */ static void -par_list_remove(TocEntry *te) +ready_list_sort(ParallelReadyList *ready_list) { - te->par_prev->par_next = te->par_next; - te->par_next->par_prev = te->par_prev; - te->par_prev = NULL; - te->par_next = NULL; + if (!ready_list->sorted) + { + int n = ready_list->last_te - ready_list->first_te + 1; + + if (n > 1) + qsort(ready_list->tes + ready_list->first_te, n, + sizeof(TocEntry *), + TocEntrySizeCompare); + ready_list->sorted = true; + } +} + +/* qsort comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompare(const void *p1, const void *p2) +{ + const TocEntry *te1 = *(const TocEntry *const *) p1; + const TocEntry *te2 = *(const TocEntry *const *) p2; + + /* Sort by decreasing dataLength */ + if (te1->dataLength > te2->dataLength) + return -1; + if (te1->dataLength < te2->dataLength) + return 1; + + /* For equal dataLengths, sort by dumpId, just to be stable */ + if (te1->dumpId < te2->dumpId) + return -1; + if (te1->dumpId > te2->dumpId) + return 1; + + return 0; } @@ -4242,52 +4401,50 @@ par_list_remove(TocEntry *te) * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass) { TocEntry *te; TocEntry *next_te; - for (te = pending_list->par_next; te != pending_list; te = next_te) + for (te = pending_list->pending_next; te != pending_list; te = next_te) { - /* must save list link before possibly moving te to other list */ - next_te = te->par_next; + /* must save list link before possibly removing te from list */ + next_te = te->pending_next; if (te->depCount == 0 && _tocEntryRestorePass(te) == pass) { /* Remove it from pending_list ... */ - par_list_remove(te); + pending_list_remove(te); /* ... and add to ready_list */ - par_list_append(ready_list, te); + ready_list_insert(ready_list, te); } } } /* - * Find the next work item (if any) that is capable of being run now. + * Find the next work item (if any) that is capable of being run now, + * and remove it from the ready_list. + * + * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with * items currently running. Items in the ready_list are known to have * no remaining dependencies, but we have to check for lock conflicts. * - * Note that the returned item has *not* been removed from ready_list. - * The caller must do that after successfully dispatching the item. - * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * -get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, +pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list, ParallelState *pstate) { bool pref_non_data = false; /* or get from AH->ropt */ - TocEntry *data_te = NULL; - TocEntry *te; - int i, - k; + int data_te_index = -1; /* * Bogus heuristics for pref_non_data @@ -4296,7 +4453,7 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, { int count = 0; - for (k = 0; k < pstate->numWorkers; k++) + for (int k = 0; k < pstate->numWorkers; k++) { TocEntry *running_te = pstate->te[k]; @@ -4309,10 +4466,16 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, } /* + * Sort the ready_list so that we'll tackle larger jobs first. + */ + ready_list_sort(ready_list); + + /* * Search the ready_list until we find a suitable item. */ - for (te = ready_list->par_next; te != ready_list; te = te->par_next) + for (int i = ready_list->first_te; i <= ready_list->last_te; i++) { + TocEntry *te = ready_list->tes[i]; bool conflicts = false; /* @@ -4320,9 +4483,9 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ - for (i = 0; i < pstate->numWorkers; i++) + for (int k = 0; k < pstate->numWorkers; k++) { - TocEntry *running_te = pstate->te[i]; + TocEntry *running_te = pstate->te[k]; if (running_te == NULL) continue; @@ -4339,17 +4502,23 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, if (pref_non_data && te->section == SECTION_DATA) { - if (data_te == NULL) - data_te = te; + if (data_te_index < 0) + data_te_index = i; continue; } /* passed all tests, so this item can run */ + ready_list_remove(ready_list, i); return te; } - if (data_te != NULL) + if (data_te_index >= 0) + { + TocEntry *data_te = ready_list->tes[data_te_index]; + + ready_list_remove(ready_list, data_te_index); return data_te; + } ahlog(AH, 2, "no item ready\n"); return NULL; @@ -4393,7 +4562,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - TocEntry *ready_list = (TocEntry *) callback_data; + ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; ahlog(AH, 1, "finished item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4443,8 +4612,8 @@ fix_dependencies(ArchiveHandle *AH) te->depCount = te->nDeps; te->revDeps = NULL; te->nRevDeps = 0; - te->par_prev = NULL; - te->par_next = NULL; + te->pending_prev = NULL; + te->pending_next = NULL; } /* @@ -4551,6 +4720,12 @@ fix_dependencies(ArchiveHandle *AH) /* * Change dependencies on table items to depend on table data items instead, * but only in POST_DATA items. + * + * Also, for any item having such dependency(s), set its dataLength to the + * largest dataLength of the table data items it depends on. This ensures + * that parallel restore will prioritize larger jobs (index builds, FK + * constraint checks, etc) over smaller ones, avoiding situations where we + * end a restore with only one active job working on a large table. */ static void repoint_table_dependencies(ArchiveHandle *AH) @@ -4569,9 +4744,13 @@ repoint_table_dependencies(ArchiveHandle *AH) if (olddep <= AH->maxDumpId && AH->tableDataId[olddep] != 0) { - te->dependencies[i] = AH->tableDataId[olddep]; + DumpId tabledataid = AH->tableDataId[olddep]; + TocEntry *tabledatate = AH->tocsByDumpId[tabledataid]; + + te->dependencies[i] = tabledataid; + te->dataLength = Max(te->dataLength, tabledatate->dataLength); ahlog(AH, 2, "transferring dependency %d -> %d to %d\n", - te->dumpId, olddep, AH->tableDataId[olddep]); + te->dumpId, olddep, tabledataid); } } } @@ -4647,7 +4826,8 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) * becomes ready should be moved to the ready_list, if that's provided. */ static void -reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) +reduce_dependencies(ArchiveHandle *AH, TocEntry *te, + ParallelReadyList *ready_list) { int i; @@ -4670,13 +4850,13 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && - otherte->par_prev != NULL && + otherte->pending_prev != NULL && ready_list != NULL) { /* Remove it from pending list ... */ - par_list_remove(otherte); + pending_list_remove(otherte); /* ... and add to ready_list */ - par_list_append(ready_list, otherte); + ready_list_insert(ready_list, otherte); } } } |