diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 259 |
1 files changed, 256 insertions, 3 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 945e563fcc5..cbc575d5cf2 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -23,6 +23,8 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" +#include "storage/condition_variable.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -63,6 +65,45 @@ typedef struct MemoryContext pagedelcontext; } BTVacState; +/* + * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * BTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum +{ + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +} BTPS_State; + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData +{ + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btps_pageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + slock_t btps_mutex; /* protects above variables */ + ConditionVariable btps_cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btbuildCallback(Relation index, HeapTuple htup, @@ -118,9 +159,9 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amendscan = btendscan; amroutine->ammarkpos = btmarkpos; amroutine->amrestrpos = btrestrpos; - amroutine->amestimateparallelscan = NULL; - amroutine->aminitparallelscan = NULL; - amroutine->amparallelrescan = NULL; + amroutine->amestimateparallelscan = btestimateparallelscan; + amroutine->aminitparallelscan = btinitparallelscan; + amroutine->amparallelrescan = btparallelrescan; PG_RETURN_POINTER(amroutine); } @@ -491,6 +532,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, } so->markItemIndex = -1; + so->arrayKeyCount = 0; BTScanPosUnpinIfPinned(so->markPos); BTScanPosInvalidate(so->markPos); @@ -653,6 +695,217 @@ btrestrpos(IndexScanDesc scan) } /* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +Size +btestimateparallelscan(void) +{ + return sizeof(BTParallelScanDescData); +} + +/* + * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void +btinitparallelscan(void *target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc) target; + + SpinLockInit(&bt_target->btps_mutex); + bt_target->btps_scanPage = InvalidBlockNumber; + bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->btps_arrayKeyCount = 0; + ConditionVariableInit(&bt_target->btps_cv); +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void +btparallelrescan(IndexScanDesc scan) +{ + BTParallelScanDesc btscan; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + Assert(parallel_scan); + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount = 0; + SpinLockRelease(&btscan->btps_mutex); +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool +_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTPS_State pageStatus; + bool exit_loop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + *pageno = P_NONE; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + while (1) + { + SpinLockAcquire(&btscan->btps_mutex); + pageStatus = btscan->btps_pageStatus; + + if (so->arrayKeyCount < btscan->btps_arrayKeyCount) + { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } + else if (pageStatus == BTPARALLEL_DONE) + { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } + else if (pageStatus != BTPARALLEL_ADVANCING) + { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + btscan->btps_pageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exit_loop = true; + } + SpinLockRelease(&btscan->btps_mutex); + if (exit_loop || !status) + break; + ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE); + } + ConditionVariableCancelSleep(); + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void +_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = scan_page; + btscan->btps_pageStatus = BTPARALLEL_IDLE; + SpinLockRelease(&btscan->btps_mutex); + ConditionVariableSignal(&btscan->btps_cv); +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void +_bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + bool status_changed = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) + return; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * Mark the parallel scan as done for this combination of scan keys, + * unless some other process already did so. See also + * _bt_advance_array_keys. + */ + SpinLockAcquire(&btscan->btps_mutex); + if (so->arrayKeyCount >= btscan->btps_arrayKeyCount && + btscan->btps_pageStatus != BTPARALLEL_DONE) + { + btscan->btps_pageStatus = BTPARALLEL_DONE; + status_changed = true; + } + SpinLockRelease(&btscan->btps_mutex); + + /* wake up all the workers associated with this parallel scan */ + if (status_changed) + ConditionVariableBroadcast(&btscan->btps_cv); +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void +_bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + so->arrayKeyCount++; + SpinLockAcquire(&btscan->btps_mutex); + if (btscan->btps_pageStatus == BTPARALLEL_DONE) + { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount++; + } + SpinLockRelease(&btscan->btps_mutex); +} + +/* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells * whether any given heap tuple (identified by ItemPointer) is being deleted. |