diff options
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 356 |
1 files changed, 268 insertions, 88 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 36e3383b851..3f7a658bcec 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -49,6 +49,24 @@ typedef struct _outputContext int gzOut; } OutputContext; +/* + * State for tracking TocEntrys that are ready to process during a parallel + * restore. (This used to be a list, and we still call it that, though now + * it's really an array so that we can apply qsort to it.) + * + * tes[] is sized large enough that we can't overrun it. + * The valid entries are indexed first_te .. last_te inclusive. + * We periodically sort the array to bring larger-by-dataLength entries to + * the front; "sorted" is true if the valid entries are known sorted. + */ +typedef struct _parallelReadyList +{ + TocEntry **tes; /* Ready-to-dump TocEntrys */ + int first_te; /* index of first valid entry in tes[] */ + int last_te; /* index of last valid entry in tes[] */ + bool sorted; /* are valid entries currently sorted? */ +} ParallelReadyList; + /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver"); @@ -95,13 +113,20 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, TocEntry *pending_list); static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); -static void par_list_header_init(TocEntry *l); -static void par_list_append(TocEntry *l, TocEntry *te); -static void par_list_remove(TocEntry *te); -static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +static void pending_list_header_init(TocEntry *l); +static void pending_list_append(TocEntry *l, TocEntry *te); +static void pending_list_remove(TocEntry *te); +static void ready_list_init(ParallelReadyList *ready_list, int tocCount); +static void ready_list_free(ParallelReadyList *ready_list); +static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); +static void ready_list_remove(ParallelReadyList *ready_list, int i); +static void ready_list_sort(ParallelReadyList *ready_list); +static int TocEntrySizeCompare(const void *p1, const void *p2); +static void move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass); -static TocEntry *get_next_work_item(ArchiveHandle *AH, - TocEntry *ready_list, +static TocEntry *pop_next_work_item(ArchiveHandle *AH, + ParallelReadyList *ready_list, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -116,7 +141,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - TocEntry *ready_list); + ParallelReadyList *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -639,7 +664,11 @@ RestoreArchive(Archive *AHX) ParallelState *pstate; TocEntry pending_list; - par_list_header_init(&pending_list); + /* The archive format module may need some setup for this */ + if (AH->PrepParallelRestorePtr) + AH->PrepParallelRestorePtr(AH); + + pending_list_header_init(&pending_list); /* This runs PRE_DATA items and then disconnects from the database */ restore_toc_entries_prefork(AH, &pending_list); @@ -1039,10 +1068,14 @@ WriteData(Archive *AHX, const void *data, size_t dLen) /* * Create a new TOC entry. The TOC was designed as a TOC, but is now the * repository for all metadata. But the name has stuck. + * + * The new entry is added to the Archive's TOC list. Most callers can ignore + * the result value because nothing else need be done, but a few want to + * manipulate the TOC entry further. */ /* Public */ -void +TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, const char *tag, @@ -1100,9 +1133,12 @@ ArchiveEntry(Archive *AHX, newToc->hadDumper = dumpFn ? true : false; newToc->formatData = NULL; + newToc->dataLength = 0; if (AH->ArchiveEntryPtr != NULL) AH->ArchiveEntryPtr(AH, newToc); + + return newToc; } /* Public */ @@ -2413,32 +2449,59 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { TocEntry *te; - for (te = AH->toc->next; te != AH->toc; te = te->next) + if (pstate && pstate->numWorkers > 1) { - if (!te->dataDumper) - continue; - - if ((te->reqs & REQ_DATA) == 0) - continue; + /* + * In parallel mode, this code runs in the master process. We + * construct an array of candidate TEs, then sort it into decreasing + * size order, then dispatch each TE to a data-transfer worker. By + * dumping larger tables first, we avoid getting into a situation + * where we're down to one job and it's big, losing parallelism. + */ + TocEntry **tes; + int ntes; - if (pstate && pstate->numWorkers > 1) + tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *)); + ntes = 0; + for (te = AH->toc->next; te != AH->toc; te = te->next) { - /* - * If we are in a parallel backup, then we are always the master - * process. Dispatch each data-transfer job to a worker. - */ - DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP, - mark_dump_job_done, NULL); + /* Consider only TEs with dataDumper functions ... */ + if (!te->dataDumper) + continue; + /* ... and ignore ones not enabled for dump */ + if ((te->reqs & REQ_DATA) == 0) + continue; + + tes[ntes++] = te; } - else - WriteDataChunksForTocEntry(AH, te); - } - /* - * If parallel, wait for workers to finish. - */ - if (pstate && pstate->numWorkers > 1) + if (ntes > 1) + qsort((void *) tes, ntes, sizeof(TocEntry *), + TocEntrySizeCompare); + + for (int i = 0; i < ntes; i++) + DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, + mark_dump_job_done, NULL); + + pg_free(tes); + + /* Now wait for workers to finish. */ WaitForWorkers(AH, pstate, WFW_ALL_IDLE); + } + else + { + /* Non-parallel mode: just dump all candidate TEs sequentially. */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + /* Must have same filter conditions as above */ + if (!te->dataDumper) + continue; + if ((te->reqs & REQ_DATA) == 0) + continue; + + WriteDataChunksForTocEntry(AH, te); + } + } } @@ -2690,6 +2753,7 @@ ReadToc(ArchiveHandle *AH) te->dependencies = NULL; te->nDeps = 0; } + te->dataLength = 0; if (AH->ReadExtraTocPtr) AH->ReadExtraTocPtr(AH, te); @@ -3996,7 +4060,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) else { /* Nope, so add it to pending_list */ - par_list_append(pending_list, next_work_item); + pending_list_append(pending_list, next_work_item); } } @@ -4035,11 +4099,14 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - TocEntry ready_list; + ParallelReadyList ready_list; TocEntry *next_work_item; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); + /* Set up ready_list with enough room for all known TocEntrys */ + ready_list_init(&ready_list, AH->tocCount); + /* * The pending_list contains all items that we need to restore. Move all * items that are available to process immediately into the ready_list. @@ -4048,7 +4115,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ - par_list_header_init(&ready_list); AH->restorePass = RESTORE_PASS_MAIN; move_to_ready_list(pending_list, &ready_list, AH->restorePass); @@ -4064,7 +4130,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = get_next_work_item(AH, &ready_list, pstate); + next_work_item = pop_next_work_item(AH, &ready_list, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4073,8 +4139,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, ahlog(AH, 1, "skipping item %d %s %s\n", next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Drop it from ready_list, and update its dependencies */ - par_list_remove(next_work_item); + /* Update its dependencies as though we'd completed it */ reduce_dependencies(AH, next_work_item, &ready_list); /* Loop around to see if anything else can be dispatched */ continue; @@ -4084,9 +4149,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Remove it from ready_list, and dispatch to some worker */ - par_list_remove(next_work_item); - + /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, mark_restore_job_done, &ready_list); } @@ -4132,7 +4195,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, } /* There should now be nothing in ready_list. */ - Assert(ready_list.par_next == &ready_list); + Assert(ready_list.first_te > ready_list.last_te); + + ready_list_free(&ready_list); ahlog(AH, 1, "finished main parallel loop\n"); } @@ -4170,7 +4235,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) * connection. We don't sweat about RestorePass ordering; it's likely we * already violated that. */ - for (te = pending_list->par_next; te != pending_list; te = te->par_next) + for (te = pending_list->pending_next; te != pending_list; te = te->pending_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4201,36 +4266,130 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2) /* - * Initialize the header of a parallel-processing list. + * Initialize the header of the pending-items list. * - * These are circular lists with a dummy TocEntry as header, just like the + * This is a circular list with a dummy TocEntry as header, just like the * main TOC list; but we use separate list links so that an entry can be in - * the main TOC list as well as in a parallel-processing list. + * the main TOC list as well as in the pending list. + */ +static void +pending_list_header_init(TocEntry *l) +{ + l->pending_prev = l->pending_next = l; +} + +/* Append te to the end of the pending-list headed by l */ +static void +pending_list_append(TocEntry *l, TocEntry *te) +{ + te->pending_prev = l->pending_prev; + l->pending_prev->pending_next = te; + l->pending_prev = te; + te->pending_next = l; +} + +/* Remove te from the pending-list */ +static void +pending_list_remove(TocEntry *te) +{ + te->pending_prev->pending_next = te->pending_next; + te->pending_next->pending_prev = te->pending_prev; + te->pending_prev = NULL; + te->pending_next = NULL; +} + + +/* + * Initialize the ready_list with enough room for up to tocCount entries. */ static void -par_list_header_init(TocEntry *l) +ready_list_init(ParallelReadyList *ready_list, int tocCount) { - l->par_prev = l->par_next = l; + ready_list->tes = (TocEntry **) + pg_malloc(tocCount * sizeof(TocEntry *)); + ready_list->first_te = 0; + ready_list->last_te = -1; + ready_list->sorted = false; } -/* Append te to the end of the parallel-processing list headed by l */ +/* + * Free storage for a ready_list. + */ +static void +ready_list_free(ParallelReadyList *ready_list) +{ + pg_free(ready_list->tes); +} + +/* Add te to the ready_list */ static void -par_list_append(TocEntry *l, TocEntry *te) +ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) { - te->par_prev = l->par_prev; - l->par_prev->par_next = te; - l->par_prev = te; - te->par_next = l; + ready_list->tes[++ready_list->last_te] = te; + /* List is (probably) not sorted anymore. */ + ready_list->sorted = false; +} + +/* Remove the i'th entry in the ready_list */ +static void +ready_list_remove(ParallelReadyList *ready_list, int i) +{ + int f = ready_list->first_te; + + Assert(i >= f && i <= ready_list->last_te); + + /* + * In the typical case where the item to be removed is the first ready + * entry, we need only increment first_te to remove it. Otherwise, move + * the entries before it to compact the list. (This preserves sortedness, + * if any.) We could alternatively move the entries after i, but there + * are typically many more of those. + */ + if (i > f) + { + TocEntry **first_te_ptr = &ready_list->tes[f]; + + memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); + } + ready_list->first_te++; } -/* Remove te from whatever parallel-processing list it's in */ +/* Sort the ready_list into the desired order */ static void -par_list_remove(TocEntry *te) +ready_list_sort(ParallelReadyList *ready_list) { - te->par_prev->par_next = te->par_next; - te->par_next->par_prev = te->par_prev; - te->par_prev = NULL; - te->par_next = NULL; + if (!ready_list->sorted) + { + int n = ready_list->last_te - ready_list->first_te + 1; + + if (n > 1) + qsort(ready_list->tes + ready_list->first_te, n, + sizeof(TocEntry *), + TocEntrySizeCompare); + ready_list->sorted = true; + } +} + +/* qsort comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompare(const void *p1, const void *p2) +{ + const TocEntry *te1 = *(const TocEntry *const *) p1; + const TocEntry *te2 = *(const TocEntry *const *) p2; + + /* Sort by decreasing dataLength */ + if (te1->dataLength > te2->dataLength) + return -1; + if (te1->dataLength < te2->dataLength) + return 1; + + /* For equal dataLengths, sort by dumpId, just to be stable */ + if (te1->dumpId < te2->dumpId) + return -1; + if (te1->dumpId > te2->dumpId) + return 1; + + return 0; } @@ -4242,52 +4401,50 @@ par_list_remove(TocEntry *te) * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass) { TocEntry *te; TocEntry *next_te; - for (te = pending_list->par_next; te != pending_list; te = next_te) + for (te = pending_list->pending_next; te != pending_list; te = next_te) { - /* must save list link before possibly moving te to other list */ - next_te = te->par_next; + /* must save list link before possibly removing te from list */ + next_te = te->pending_next; if (te->depCount == 0 && _tocEntryRestorePass(te) == pass) { /* Remove it from pending_list ... */ - par_list_remove(te); + pending_list_remove(te); /* ... and add to ready_list */ - par_list_append(ready_list, te); + ready_list_insert(ready_list, te); } } } /* - * Find the next work item (if any) that is capable of being run now. + * Find the next work item (if any) that is capable of being run now, + * and remove it from the ready_list. + * + * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with * items currently running. Items in the ready_list are known to have * no remaining dependencies, but we have to check for lock conflicts. * - * Note that the returned item has *not* been removed from ready_list. - * The caller must do that after successfully dispatching the item. - * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * -get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, +pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list, ParallelState *pstate) { bool pref_non_data = false; /* or get from AH->ropt */ - TocEntry *data_te = NULL; - TocEntry *te; - int i, - k; + int data_te_index = -1; /* * Bogus heuristics for pref_non_data @@ -4296,7 +4453,7 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, { int count = 0; - for (k = 0; k < pstate->numWorkers; k++) + for (int k = 0; k < pstate->numWorkers; k++) { TocEntry *running_te = pstate->te[k]; @@ -4309,10 +4466,16 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, } /* + * Sort the ready_list so that we'll tackle larger jobs first. + */ + ready_list_sort(ready_list); + + /* * Search the ready_list until we find a suitable item. */ - for (te = ready_list->par_next; te != ready_list; te = te->par_next) + for (int i = ready_list->first_te; i <= ready_list->last_te; i++) { + TocEntry *te = ready_list->tes[i]; bool conflicts = false; /* @@ -4320,9 +4483,9 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ - for (i = 0; i < pstate->numWorkers; i++) + for (int k = 0; k < pstate->numWorkers; k++) { - TocEntry *running_te = pstate->te[i]; + TocEntry *running_te = pstate->te[k]; if (running_te == NULL) continue; @@ -4339,17 +4502,23 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, if (pref_non_data && te->section == SECTION_DATA) { - if (data_te == NULL) - data_te = te; + if (data_te_index < 0) + data_te_index = i; continue; } /* passed all tests, so this item can run */ + ready_list_remove(ready_list, i); return te; } - if (data_te != NULL) + if (data_te_index >= 0) + { + TocEntry *data_te = ready_list->tes[data_te_index]; + + ready_list_remove(ready_list, data_te_index); return data_te; + } ahlog(AH, 2, "no item ready\n"); return NULL; @@ -4393,7 +4562,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - TocEntry *ready_list = (TocEntry *) callback_data; + ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; ahlog(AH, 1, "finished item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4443,8 +4612,8 @@ fix_dependencies(ArchiveHandle *AH) te->depCount = te->nDeps; te->revDeps = NULL; te->nRevDeps = 0; - te->par_prev = NULL; - te->par_next = NULL; + te->pending_prev = NULL; + te->pending_next = NULL; } /* @@ -4551,6 +4720,12 @@ fix_dependencies(ArchiveHandle *AH) /* * Change dependencies on table items to depend on table data items instead, * but only in POST_DATA items. + * + * Also, for any item having such dependency(s), set its dataLength to the + * largest dataLength of the table data items it depends on. This ensures + * that parallel restore will prioritize larger jobs (index builds, FK + * constraint checks, etc) over smaller ones, avoiding situations where we + * end a restore with only one active job working on a large table. */ static void repoint_table_dependencies(ArchiveHandle *AH) @@ -4569,9 +4744,13 @@ repoint_table_dependencies(ArchiveHandle *AH) if (olddep <= AH->maxDumpId && AH->tableDataId[olddep] != 0) { - te->dependencies[i] = AH->tableDataId[olddep]; + DumpId tabledataid = AH->tableDataId[olddep]; + TocEntry *tabledatate = AH->tocsByDumpId[tabledataid]; + + te->dependencies[i] = tabledataid; + te->dataLength = Max(te->dataLength, tabledatate->dataLength); ahlog(AH, 2, "transferring dependency %d -> %d to %d\n", - te->dumpId, olddep, AH->tableDataId[olddep]); + te->dumpId, olddep, tabledataid); } } } @@ -4647,7 +4826,8 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) * becomes ready should be moved to the ready_list, if that's provided. */ static void -reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) +reduce_dependencies(ArchiveHandle *AH, TocEntry *te, + ParallelReadyList *ready_list) { int i; @@ -4670,13 +4850,13 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && - otherte->par_prev != NULL && + otherte->pending_prev != NULL && ready_list != NULL) { /* Remove it from pending list ... */ - par_list_remove(otherte); + pending_list_remove(otherte); /* ... and add to ready_list */ - par_list_append(ready_list, otherte); + ready_list_insert(ready_list, otherte); } } } |