diff options
Diffstat (limited to 'src/backend/access/gin/gininsert.c')
-rw-r--r-- | src/backend/access/gin/gininsert.c | 1649 |
1 files changed, 1635 insertions, 14 deletions
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index d1b5e8f0dd1..e399d867e0f 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -15,14 +15,126 @@ #include "postgres.h" #include "access/gin_private.h" +#include "access/gin_tuple.h" +#include "access/parallel.h" +#include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" +#include "catalog/index.h" +#include "catalog/pg_collation.h" +#include "commands/progress.h" #include "miscadmin.h" #include "nodes/execnodes.h" +#include "pgstat.h" #include "storage/bufmgr.h" #include "storage/predicate.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ +#include "utils/datum.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/builtins.h" + + +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct GinBuildShared +{ + /* + * These fields are not modified during the build. They primarily exist + * for the benefit of worker processes that need to create state + * corresponding to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * results built by the workers (and before leader can write the data into + * the index). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all following fields + * + * These fields contain status information of interest to GIN index builds + * that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} GinBuildShared; + +/* + * Return pointer to a GinBuildShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromGinBuildShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GinBuildShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct GinLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * GinBuildShared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * snapshot is the snapshot used by the scan iff an MVCC snapshot is + * required. + */ + GinBuildShared *ginshared; + Sharedsort *sharedsort; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} GinLeader; typedef struct { @@ -32,9 +144,58 @@ typedef struct MemoryContext tmpCtx; MemoryContext funcCtx; BuildAccumulator accum; + ItemPointerData tid; + int work_mem; + + /* + * bs_leader is only present when a parallel index build is performed, and + * only in the leader process. + */ + GinLeader *bs_leader; + int bs_worker_id; + + /* used to pass information from workers to leader */ + double bs_numtuples; + double bs_reltuples; + + /* + * The sortstate is used by workers (including the leader). It has to be + * part of the build state, because that's the only thing passed to the + * build callback etc. + */ + Tuplesortstate *bs_sortstate; + + /* + * The sortstate used only within a single worker for the first merge pass + * happenning there. In principle it doesn't need to be part of the build + * state and we could pass it around directly, but it's more convenient + * this way. And it's part of the build state, after all. + */ + Tuplesortstate *bs_worker_sort; } GinBuildState; +/* parallel index builds */ +static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); +static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static double _gin_parallel_heapscan(GinBuildState *buildstate); +static double _gin_parallel_merge(GinBuildState *buildstate); +static void _gin_leader_participate_as_worker(GinBuildState *buildstate, + Relation heap, Relation index); +static void _gin_parallel_scan_and_build(GinBuildState *buildstate, + GinBuildShared *ginshared, + Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress); + +static Datum _gin_parse_tuple(GinTuple *a, ItemPointerData **items); +static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category, + Datum key, int16 typlen, bool typbyval, + ItemPointerData *items, uint32 nitems, + Size *len); + /* * Adds array of item pointers to tuple's posting list, or * creates posting tree and tuple pointing to tree in case @@ -313,12 +474,114 @@ ginBuildCallback(Relation index, ItemPointer tid, Datum *values, MemoryContextSwitchTo(oldCtx); } +/* + * ginFlushBuildState + * Write all data from BuildAccumulator into the tuplesort. + */ +static void +ginFlushBuildState(GinBuildState *buildstate, Relation index) +{ + ItemPointerData *list; + Datum key; + GinNullCategory category; + uint32 nlist; + OffsetNumber attnum; + TupleDesc tdesc = RelationGetDescr(index); + + ginBeginBAScan(&buildstate->accum); + while ((list = ginGetBAEntry(&buildstate->accum, + &attnum, &key, &category, &nlist)) != NULL) + { + /* information about the key */ + Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1)); + + /* GIN tuple and tuple length */ + GinTuple *tup; + Size tuplen; + + /* there could be many entries, so be willing to abort here */ + CHECK_FOR_INTERRUPTS(); + + tup = _gin_build_tuple(attnum, category, + key, attr->attlen, attr->attbyval, + list, nlist, &tuplen); + + tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen); + + pfree(tup); + } + + MemoryContextReset(buildstate->tmpCtx); + ginInitBA(&buildstate->accum); +} + +/* + * ginBuildCallbackParallel + * Callback for the parallel index build. + * + * This is similar to the serial build callback ginBuildCallback, but + * instead of writing the accumulated entries into the index, each worker + * writes them into a (local) tuplesort. + * + * The worker then sorts and combines these entries, before writing them + * into a shared tuplesort for the leader (see _gin_parallel_scan_and_build + * for the whole process). + */ +static void +ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values, + bool *isnull, bool tupleIsAlive, void *state) +{ + GinBuildState *buildstate = (GinBuildState *) state; + MemoryContext oldCtx; + int i; + + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); + + /* + * if scan wrapped around - flush accumulated entries and start anew + * + * With parallel scans, we don't have a guarantee the scan does not start + * half-way through the relation (serial builds disable sync scans and + * always start from block 0, parallel scans require allow_sync=true). + * + * Building the posting lists assumes the TIDs are monotonic and never go + * back, and the wrap around would break that. We handle that by detecting + * the wraparound, and flushing all entries. This means we'll later see + * two separate entries with non-overlapping TID lists (which can be + * combined by merge sort). + * + * To detect a wraparound, we remember the last TID seen by each worker + * (for any key). If the next TID seen by the worker is lower, the scan + * must have wrapped around. + */ + if (ItemPointerCompare(tid, &buildstate->tid) < 0) + ginFlushBuildState(buildstate, index); + + /* remember the TID we're about to process */ + buildstate->tid = *tid; + + for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++) + ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1), + values[i], isnull[i], tid); + + /* + * If we've maxed out our available memory, dump everything to the + * tuplesort. We use half the per-worker fraction of maintenance_work_mem, + * the other half is used for the tuplesort. + */ + if (buildstate->accum.allocatedMemory >= buildstate->work_mem * (Size) 1024) + ginFlushBuildState(buildstate, index); + + MemoryContextSwitchTo(oldCtx); +} + IndexBuildResult * ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) { IndexBuildResult *result; double reltuples; GinBuildState buildstate; + GinBuildState *state = &buildstate; Buffer RootBuffer, MetaBuffer; ItemPointerData *list; @@ -336,6 +599,12 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) buildstate.indtuples = 0; memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); + /* Initialize fields for parallel build too. */ + buildstate.bs_numtuples = 0; + buildstate.bs_reltuples = 0; + buildstate.bs_leader = NULL; + memset(&buildstate.tid, 0, sizeof(ItemPointerData)); + /* initialize the meta page */ MetaBuffer = GinNewBuffer(index); @@ -375,25 +644,96 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) buildstate.accum.ginstate = &buildstate.ginstate; ginInitBA(&buildstate.accum); + /* Report table scan phase started */ + pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, + PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN); + /* - * Do the heap scan. We disallow sync scan here because dataPlaceToPage - * prefers to receive tuples in TID order. + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent on + * maintenance_work_mem, requiring 32MB for each worker. For GIN that's + * reasonable too, because we sort the data just like btree. It does + * ignore the memory used to accumulate data in memory (set by work_mem), + * but there is no way to communicate that to plan_create_index_workers. */ - reltuples = table_index_build_scan(heap, index, indexInfo, false, true, - ginBuildCallback, &buildstate, NULL); + if (indexInfo->ii_ParallelWorkers > 0) + _gin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); - /* dump remaining entries to the index */ - oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); - ginBeginBAScan(&buildstate.accum); - while ((list = ginGetBAEntry(&buildstate.accum, - &attnum, &key, &category, &nlist)) != NULL) + /* + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state, wait for workers to + * complete. Then read all tuples from the shared tuplesort and insert + * them into the index. + * + * In serial mode, simply scan the table and build the index one index + * tuple at a time. + */ + if (state->bs_leader) { - /* there could be many entries, so be willing to abort here */ - CHECK_FOR_INTERRUPTS(); - ginEntryInsert(&buildstate.ginstate, attnum, key, category, - list, nlist, &buildstate.buildStats); + SortCoordinate coordinate; + + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + state->bs_leader->nparticipanttuplesorts; + coordinate->sharedsort = state->bs_leader->sharedsort; + + /* + * Begin leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the + * same share of maintenance_work_mem as a serial sort (it is + * generally treated in the same way as a serial sort once we return). + * Parallel worker Tuplesortstates will have received only a fraction + * of maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may + * be some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory + * here. When its tuplesort_performsort() is called (by our caller), + * and significant amounts of memory are likely to be used, all + * workers must have already freed almost all memory held by their + * Tuplesortstates (they are about to go away completely, too). The + * overall effect is that maintenance_work_mem always represents an + * absolute high watermark on the amount of memory used by a CREATE + * INDEX operation, regardless of the use of parallelism or any other + * factor. + */ + state->bs_sortstate = + tuplesort_begin_index_gin(heap, index, + maintenance_work_mem, coordinate, + TUPLESORT_NONE); + + /* scan the relation in parallel and merge per-worker results */ + reltuples = _gin_parallel_merge(state); + + _gin_end_parallel(state->bs_leader, state); + } + else /* no parallel index build */ + { + /* + * Do the heap scan. We disallow sync scan here because + * dataPlaceToPage prefers to receive tuples in TID order. + */ + reltuples = table_index_build_scan(heap, index, indexInfo, false, true, + ginBuildCallback, &buildstate, NULL); + + /* dump remaining entries to the index */ + oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); + ginBeginBAScan(&buildstate.accum); + while ((list = ginGetBAEntry(&buildstate.accum, + &attnum, &key, &category, &nlist)) != NULL) + { + /* there could be many entries, so be willing to abort here */ + CHECK_FOR_INTERRUPTS(); + ginEntryInsert(&buildstate.ginstate, attnum, key, category, + list, nlist, &buildstate.buildStats); + } + MemoryContextSwitchTo(oldCtx); } - MemoryContextSwitchTo(oldCtx); MemoryContextDelete(buildstate.funcCtx); MemoryContextDelete(buildstate.tmpCtx); @@ -533,3 +873,1284 @@ gininsert(Relation index, Datum *values, bool *isnull, return false; } + +/* + * Create parallel context, and launch workers for leader. + * + * buildstate argument should be initialized (with the exception of the + * tuplesort states, which may later be created based on shared + * state initially set up here). + * + * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY. + * + * request is the target number of parallel worker processes to launch. + * + * Sets buildstate's GinLeader, which caller must use to shut down parallel + * mode by passing it to _gin_end_parallel() at the very end of its index + * build. If not even a single worker process can be launched, this is + * never set, and caller should proceed with a serial index build. + */ +static void +_gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estginshared; + Size estsort; + GinBuildShared *ginshared; + Sharedsort *sharedsort; + GinLeader *ginleader = (GinLeader *) palloc0(sizeof(GinLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of gin index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main", + request); + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. + */ + estginshared = _gin_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estginshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + shm_toc_estimate_keys(&pcxt->estimator, 2); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared); + /* Initialize immutable state */ + ginshared->heaprelid = RelationGetRelid(heap); + ginshared->indexrelid = RelationGetRelid(index); + ginshared->isconcurrent = isconcurrent; + ginshared->scantuplesortstates = scantuplesortstates; + + ConditionVariableInit(&ginshared->workersdonecv); + SpinLockInit(&ginshared->mutex); + + /* Initialize mutable state */ + ginshared->nparticipantsdone = 0; + ginshared->reltuples = 0.0; + ginshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromGinBuildShared(ginshared), + snapshot); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIN_SHARED, ginshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + ginleader->pcxt = pcxt; + ginleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + ginleader->nparticipanttuplesorts++; + ginleader->ginshared = ginshared; + ginleader->sharedsort = sharedsort; + ginleader->snapshot = snapshot; + ginleader->walusage = walusage; + ginleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _gin_end_parallel(ginleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bs_leader = ginleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _gin_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_gin_end_parallel(GinLeader *ginleader, GinBuildState *state) +{ + int i; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(ginleader->pcxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(ginleader->snapshot)) + UnregisterSnapshot(ginleader->snapshot); + DestroyParallelContext(ginleader->pcxt); + ExitParallelMode(); +} + +/* + * Within leader, wait for end of heap scan. + * + * When called, parallel heap scan started by _gin_begin_parallel() will + * already be underway within worker processes (when leader participates + * as a worker, we should end up here just as workers are finishing). + * + * Returns the total number of heap tuples scanned. + */ +static double +_gin_parallel_heapscan(GinBuildState *state) +{ + GinBuildShared *ginshared = state->bs_leader->ginshared; + int nparticipanttuplesorts; + + nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; + for (;;) + { + SpinLockAcquire(&ginshared->mutex); + if (ginshared->nparticipantsdone == nparticipanttuplesorts) + { + /* copy the data into leader state */ + state->bs_reltuples = ginshared->reltuples; + state->bs_numtuples = ginshared->indtuples; + + SpinLockRelease(&ginshared->mutex); + break; + } + SpinLockRelease(&ginshared->mutex); + + ConditionVariableSleep(&ginshared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); + + return state->bs_reltuples; +} + +/* + * Buffer used to accumulate TIDs from multiple GinTuples for the same key + * (we read these from the tuplesort, sorted by the key). + * + * This is similar to BuildAccumulator in that it's used to collect TIDs + * in memory before inserting them into the index, but it's much simpler + * as it only deals with a single index key at a time. + * + * When adding TIDs to the buffer, we make sure to keep them sorted, both + * during the initial table scan (and detecting when the scan wraps around), + * and during merging (where we do mergesort). + */ +typedef struct GinBuffer +{ + OffsetNumber attnum; + GinNullCategory category; + Datum key; /* 0 if no key (and keylen == 0) */ + Size keylen; /* number of bytes (not typlen) */ + + /* type info */ + int16 typlen; + bool typbyval; + + /* array of TID values */ + int nitems; + SortSupport ssup; /* for sorting/comparing keys */ + ItemPointerData *items; +} GinBuffer; + +/* + * Check that TID array contains valid values, and that it's sorted (if we + * expect it to be). + */ +static void +AssertCheckItemPointers(GinBuffer *buffer) +{ +#ifdef USE_ASSERT_CHECKING + /* we should not have a buffer with no TIDs to sort */ + Assert(buffer->items != NULL); + Assert(buffer->nitems > 0); + + for (int i = 0; i < buffer->nitems; i++) + { + Assert(ItemPointerIsValid(&buffer->items[i])); + + /* don't check ordering for the first TID item */ + if (i == 0) + continue; + + Assert(ItemPointerCompare(&buffer->items[i - 1], &buffer->items[i]) < 0); + } +#endif +} + +/* + * GinBuffer checks + * + * Make sure the nitems/items fields are consistent (either the array is empty + * or not empty, the fields need to agree). If there are items, check ordering. + */ +static void +AssertCheckGinBuffer(GinBuffer *buffer) +{ +#ifdef USE_ASSERT_CHECKING + /* if we have any items, the array must exist */ + Assert(!((buffer->nitems > 0) && (buffer->items == NULL))); + + /* + * The buffer may be empty, in which case we must not call the check of + * item pointers, because that assumes non-emptiness. + */ + if (buffer->nitems == 0) + return; + + /* Make sure the item pointers are valid and sorted. */ + AssertCheckItemPointers(buffer); +#endif +} + +/* + * GinBufferInit + * Initialize buffer to store tuples for a GIN index. + * + * Initialize the buffer used to accumulate TID for a single key at a time + * (we process the data sorted), so we know when we received all data for + * a given key. + * + * Initializes sort support procedures for all index attributes. + */ +static GinBuffer * +GinBufferInit(Relation index) +{ + GinBuffer *buffer = palloc0(sizeof(GinBuffer)); + int i, + nKeys; + TupleDesc desc = RelationGetDescr(index); + + nKeys = IndexRelationGetNumberOfKeyAttributes(index); + + buffer->ssup = palloc0(sizeof(SortSupportData) * nKeys); + + /* + * Lookup ordering operator for the index key data type, and initialize + * the sort support function. + */ + for (i = 0; i < nKeys; i++) + { + Oid cmpFunc; + SortSupport sortKey = &buffer->ssup[i]; + Form_pg_attribute att = TupleDescAttr(desc, i); + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = index->rd_indcollation[i]; + + if (!OidIsValid(sortKey->ssup_collation)) + sortKey->ssup_collation = DEFAULT_COLLATION_OID; + + sortKey->ssup_nulls_first = false; + sortKey->ssup_attno = i + 1; + sortKey->abbreviate = false; + + Assert(sortKey->ssup_attno != 0); + + /* + * If the compare proc isn't specified in the opclass definition, look + * up the index key type's default btree comparator. + */ + cmpFunc = index_getprocid(index, i + 1, GIN_COMPARE_PROC); + if (cmpFunc == InvalidOid) + { + TypeCacheEntry *typentry; + + typentry = lookup_type_cache(att->atttypid, + TYPECACHE_CMP_PROC_FINFO); + if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify a comparison function for type %s", + format_type_be(att->atttypid)))); + + cmpFunc = typentry->cmp_proc_finfo.fn_oid; + } + + PrepareSortSupportComparisonShim(cmpFunc, sortKey); + } + + return buffer; +} + +/* Is the buffer empty, i.e. has no TID values in the array? */ +static bool +GinBufferIsEmpty(GinBuffer *buffer) +{ + return (buffer->nitems == 0); +} + +/* + * GinBufferKeyEquals + * Can the buffer store TIDs for the provided GIN tuple (same key)? + * + * Compare if the tuple matches the already accumulated data in the GIN + * buffer. Compare scalar fields first, before the actual key. + * + * Returns true if the key matches, and the TID belonds to the buffer, or + * false if the key does not match. + */ +static bool +GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup) +{ + int r; + Datum tupkey; + + AssertCheckGinBuffer(buffer); + + if (tup->attrnum != buffer->attnum) + return false; + + /* same attribute should have the same type info */ + Assert(tup->typbyval == buffer->typbyval); + Assert(tup->typlen == buffer->typlen); + + if (tup->category != buffer->category) + return false; + + /* + * For NULL/empty keys, this means equality, for normal keys we need to + * compare the actual key value. + */ + if (buffer->category != GIN_CAT_NORM_KEY) + return true; + + /* + * For the tuple, get either the first sizeof(Datum) bytes for byval + * types, or a pointer to the beginning of the data array. + */ + tupkey = (buffer->typbyval) ? *(Datum *) tup->data : PointerGetDatum(tup->data); + + r = ApplySortComparator(buffer->key, false, + tupkey, false, + &buffer->ssup[buffer->attnum - 1]); + + return (r == 0); +} + +/* + * GinBufferStoreTuple + * Add data (especially TID list) from a GIN tuple to the buffer. + * + * The buffer is expected to be empty (in which case it's initialized), or + * having the same key. The TID values from the tuple are combined with the + * stored values using a merge sort. + * + * The tuples (for the same key) are expected to be sorted by first TID. But + * this does not guarantee the lists do not overlap, especially in the leader, + * because the workers process interleaving data. There should be no overlaps + * in a single worker - it could happen when the parallel scan wraps around, + * but we detect that and flush the data (see ginBuildCallbackParallel). + * + * By sorting the GinTuple not only by key, but also by the first TID, we make + * it more less likely the lists will overlap during merge. We merge them using + * mergesort, but it's cheaper to just append one list to the other. + * + * How often can the lists overlap? There should be no overlaps in workers, + * and in the leader we can see overlaps between lists built by different + * workers. But the workers merge the items as much as possible, so there + * should not be too many. + */ +static void +GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup) +{ + ItemPointerData *items; + Datum key; + + AssertCheckGinBuffer(buffer); + + key = _gin_parse_tuple(tup, &items); + + /* if the buffer is empty, set the fields (and copy the key) */ + if (GinBufferIsEmpty(buffer)) + { + buffer->category = tup->category; + buffer->keylen = tup->keylen; + buffer->attnum = tup->attrnum; + + buffer->typlen = tup->typlen; + buffer->typbyval = tup->typbyval; + + if (tup->category == GIN_CAT_NORM_KEY) + buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen); + else + buffer->key = (Datum) 0; + } + + /* add the new TIDs into the buffer, combine using merge-sort */ + { + int nnew; + ItemPointer new; + + new = ginMergeItemPointers(buffer->items, buffer->nitems, + items, tup->nitems, &nnew); + + Assert(nnew == buffer->nitems + tup->nitems); + + if (buffer->items) + pfree(buffer->items); + + buffer->items = new; + buffer->nitems = nnew; + + AssertCheckItemPointers(buffer); + } +} + +/* + * GinBufferReset + * Reset the buffer into a state as if it contains no data. + */ +static void +GinBufferReset(GinBuffer *buffer) +{ + Assert(!GinBufferIsEmpty(buffer)); + + /* release byref values, do nothing for by-val ones */ + if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval) + pfree(DatumGetPointer(buffer->key)); + + /* + * Not required, but makes it more likely to trigger NULL derefefence if + * using the value incorrectly, etc. + */ + buffer->key = (Datum) 0; + + buffer->attnum = 0; + buffer->category = 0; + buffer->keylen = 0; + buffer->nitems = 0; + + buffer->typlen = 0; + buffer->typbyval = 0; +} + +/* + * GinBufferFree + * Release memory associated with the GinBuffer (including TID array). + */ +static void +GinBufferFree(GinBuffer *buffer) +{ + if (buffer->items) + pfree(buffer->items); + + /* release byref values, do nothing for by-val ones */ + if (!GinBufferIsEmpty(buffer) && + (buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval) + pfree(DatumGetPointer(buffer->key)); + + pfree(buffer); +} + +/* + * GinBufferCanAddKey + * Check if a given GIN tuple can be added to the current buffer. + * + * Returns true if the buffer is either empty or for the same index key. + */ +static bool +GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup) +{ + /* empty buffer can accept data for any key */ + if (GinBufferIsEmpty(buffer)) + return true; + + /* otherwise just data for the same key */ + return GinBufferKeyEquals(buffer, tup); +} + +/* + * Within leader, wait for end of heap scan and merge per-worker results. + * + * After waiting for all workers to finish, merge the per-worker results into + * the complete index. The results from each worker are sorted by block number + * (start of the page range). While combinig the per-worker results we merge + * summaries for the same page range, and also fill-in empty summaries for + * ranges without any tuples. + * + * Returns the total number of heap tuples scanned. + */ +static double +_gin_parallel_merge(GinBuildState *state) +{ + GinTuple *tup; + Size tuplen; + double reltuples = 0; + GinBuffer *buffer; + + /* GIN tuples from workers, merged by leader */ + double numtuples = 0; + + /* wait for workers to scan table and produce partial results */ + reltuples = _gin_parallel_heapscan(state); + + /* Execute the sort */ + pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, + PROGRESS_GIN_PHASE_PERFORMSORT_2); + + /* do the actual sort in the leader */ + tuplesort_performsort(state->bs_sortstate); + + /* initialize buffer to combine entries for the same key */ + buffer = GinBufferInit(state->ginstate.index); + + /* + * Set the progress target for the next phase. Reset the block number + * values set by table_index_build_scan + */ + { + const int progress_index[] = { + PROGRESS_CREATEIDX_SUBPHASE, + PROGRESS_CREATEIDX_TUPLES_TOTAL, + PROGRESS_SCAN_BLOCKS_TOTAL, + PROGRESS_SCAN_BLOCKS_DONE + }; + const int64 progress_vals[] = { + PROGRESS_GIN_PHASE_MERGE_2, + state->bs_numtuples, + 0, 0 + }; + + pgstat_progress_update_multi_param(4, progress_index, progress_vals); + } + + /* + * Read the GIN tuples from the shared tuplesort, sorted by category and + * key. That probably gives us order matching how data is organized in the + * index. + * + * We don't insert the GIN tuples right away, but instead accumulate as + * many TIDs for the same key as possible, and then insert that at once. + * This way we don't need to decompress/recompress the posting lists, etc. + */ + while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + /* + * If the buffer can accept the new GIN tuple, just store it there and + * we're done. If it's a different key (or maybe too much data) flush + * the current contents into the index first. + */ + if (!GinBufferCanAddKey(buffer, tup)) + { + /* + * Buffer is not empty and it's storing a different key - flush + * the data into the insert, and start a new entry for current + * GinTuple. + */ + AssertCheckItemPointers(buffer); + + ginEntryInsert(&state->ginstate, + buffer->attnum, buffer->key, buffer->category, + buffer->items, buffer->nitems, &state->buildStats); + + /* discard the existing data */ + GinBufferReset(buffer); + } + + /* + * Remember data for the current tuple (either remember the new key, + * or append if to the existing data). + */ + GinBufferStoreTuple(buffer, tup); + + /* Report progress */ + pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, + ++numtuples); + } + + /* flush data remaining in the buffer (for the last key) */ + if (!GinBufferIsEmpty(buffer)) + { + AssertCheckItemPointers(buffer); + + ginEntryInsert(&state->ginstate, + buffer->attnum, buffer->key, buffer->category, + buffer->items, buffer->nitems, &state->buildStats); + + /* discard the existing data */ + GinBufferReset(buffer); + + /* Report progress */ + pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, + ++numtuples); + } + + /* relase all the memory */ + GinBufferFree(buffer); + + tuplesort_end(state->bs_sortstate); + + return reltuples; +} + +/* + * Returns size of shared memory required to store state for a parallel + * gin index build based on the snapshot its parallel scan will use. + */ +static Size +_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(GinBuildShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Relation index) +{ + GinLeader *ginleader = buildstate->bs_leader; + int sortmem; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / ginleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _gin_parallel_scan_and_build(buildstate, ginleader->ginshared, + ginleader->sharedsort, heap, index, + sortmem, true); +} + +/* + * _gin_process_worker_data + * First phase of the key merging, happening in the worker. + * + * Depending on the number of distinct keys, the TID lists produced by the + * callback may be very short (due to frequent evictions in the callback). + * But combining many tiny lists is expensive, so we try to do as much as + * possible in the workers and only then pass the results to the leader. + * + * We read the tuples sorted by the key, and merge them into larger lists. + * At the moment there's no memory limit, so this will just produce one + * huge (sorted) list per key in each worker. Which means the leader will + * do a very limited number of mergesorts, which is good. + */ +static void +_gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort, + bool progress) +{ + GinTuple *tup; + Size tuplen; + + GinBuffer *buffer; + + /* initialize buffer to combine entries for the same key */ + buffer = GinBufferInit(state->ginstate.index); + + /* sort the raw per-worker data */ + if (progress) + pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, + PROGRESS_GIN_PHASE_PERFORMSORT_1); + + tuplesort_performsort(state->bs_worker_sort); + + /* reset the number of GIN tuples produced by this worker */ + state->bs_numtuples = 0; + + if (progress) + pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, + PROGRESS_GIN_PHASE_MERGE_1); + + /* + * Read the GIN tuples from the shared tuplesort, sorted by the key, and + * merge them into larger chunks for the leader to combine. + */ + while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL) + { + + CHECK_FOR_INTERRUPTS(); + + /* + * If the buffer can accept the new GIN tuple, just store it there and + * we're done. If it's a different key (or maybe too much data) flush + * the current contents into the index first. + */ + if (!GinBufferCanAddKey(buffer, tup)) + { + GinTuple *ntup; + Size ntuplen; + + /* + * Buffer is not empty and it's storing a different key - flush + * the data into the insert, and start a new entry for current + * GinTuple. + */ + AssertCheckItemPointers(buffer); + + ntup = _gin_build_tuple(buffer->attnum, buffer->category, + buffer->key, buffer->typlen, buffer->typbyval, + buffer->items, buffer->nitems, &ntuplen); + + tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen); + state->bs_numtuples++; + + pfree(ntup); + + /* discard the existing data */ + GinBufferReset(buffer); + } + + /* + * Remember data for the current tuple (either remember the new key, + * or append if to the existing data). + */ + GinBufferStoreTuple(buffer, tup); + } + + /* flush data remaining in the buffer (for the last key) */ + if (!GinBufferIsEmpty(buffer)) + { + GinTuple *ntup; + Size ntuplen; + + AssertCheckItemPointers(buffer); + + ntup = _gin_build_tuple(buffer->attnum, buffer->category, + buffer->key, buffer->typlen, buffer->typbyval, + buffer->items, buffer->nitems, &ntuplen); + + tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen); + state->bs_numtuples++; + + pfree(ntup); + + /* discard the existing data */ + GinBufferReset(buffer); + } + + /* relase all the memory */ + GinBufferFree(buffer); + + tuplesort_end(worker_sort); +} + +/* + * Perform a worker's portion of a parallel GIN index build sort. + * + * This generates a tuplesort for the worker portion of the table. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + * + * Before feeding data into a shared tuplesort (for the leader process), + * the workers process data in two phases. + * + * 1) A worker reads a portion of rows from the table, accumulates entries + * in memory, and flushes them into a private tuplesort (e.g. because of + * using too much memory). + * + * 2) The private tuplesort gets sorted (by key and TID), the worker reads + * the data again, and combines the entries as much as possible. This has + * to happen eventually, and this way it's done in workers in parallel. + * + * Finally, the combined entries are written into the shared tuplesort, so + * that the leader can process them. + * + * How well this works (compared to just writing entries into the shared + * tuplesort) depends on the data set. For large tables with many distinct + * keys this helps a lot. With many distinct keys it's likely the buffers has + * to be flushed often, generating many entries with the same key and short + * TID lists. These entries need to be sorted and merged at some point, + * before writing them to the index. The merging is quite expensive, it can + * easily be ~50% of a serial build, and doing as much of it in the workers + * means it's parallelized. The leader still has to merge results from the + * workers, but it's much more efficient to merge few large entries than + * many tiny ones. + * + * This also reduces the amount of data the workers pass to the leader through + * the shared tuplesort. OTOH the workers need more space for the private sort, + * possibly up to 2x of the data, if no entries be merged in a worker. But this + * is very unlikely, and the only consequence is inefficiency, so we ignore it. + */ +static void +_gin_parallel_scan_and_build(GinBuildState *state, + GinBuildShared *ginshared, Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress) +{ + SortCoordinate coordinate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* remember how much space is allowed for the accumulated entries */ + state->work_mem = (sortmem / 2); + + /* Begin "partial" tuplesort */ + state->bs_sortstate = tuplesort_begin_index_gin(heap, index, + state->work_mem, + coordinate, + TUPLESORT_NONE); + + /* Local per-worker sort of raw-data */ + state->bs_worker_sort = tuplesort_begin_index_gin(heap, index, + state->work_mem, + NULL, + TUPLESORT_NONE); + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = ginshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromGinBuildShared(ginshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, progress, + ginBuildCallbackParallel, state, scan); + + /* write remaining accumulated entries */ + ginFlushBuildState(state, index); + + /* + * Do the first phase of in-worker processing - sort the data produced by + * the callback, and combine them into much larger chunks and place that + * into the shared tuplestore for leader to process. + */ + _gin_process_worker_data(state, state->bs_worker_sort, progress); + + /* sort the GIN tuples built by this worker */ + tuplesort_performsort(state->bs_sortstate); + + state->bs_reltuples += reltuples; + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&ginshared->mutex); + ginshared->nparticipantsdone++; + ginshared->reltuples += state->bs_reltuples; + ginshared->indtuples += state->bs_numtuples; + SpinLockRelease(&ginshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&ginshared->workersdonecv); + + tuplesort_end(state->bs_sortstate); +} + +/* + * Perform work within a launched parallel process. + */ +void +_gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + GinBuildShared *ginshared; + Sharedsort *sharedsort; + GinBuildState buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int sortmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up gin shared state */ + ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!ginshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(ginshared->heaprelid, heapLockmode); + indexRel = index_open(ginshared->indexrelid, indexLockmode); + + /* initialize the GIN build state */ + initGinState(&buildstate.ginstate, indexRel); + buildstate.indtuples = 0; + memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); + memset(&buildstate.tid, 0, sizeof(ItemPointerData)); + + /* + * create a temporary memory context that is used to hold data not yet + * dumped out to the index + */ + buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext, + "Gin build temporary context", + ALLOCSET_DEFAULT_SIZES); + + /* + * create a temporary memory context that is used for calling + * ginExtractEntries(), and can be reset after each tuple + */ + buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext, + "Gin build temporary context for user-defined function", + ALLOCSET_DEFAULT_SIZES); + + buildstate.accum.ginstate = &buildstate.ginstate; + ginInitBA(&buildstate.accum); + + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / ginshared->scantuplesortstates; + + _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, + heapRel, indexRel, sortmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + +/* + * _gin_build_tuple + * Serialize the state for an index key into a tuple for tuplesort. + * + * The tuple has a number of scalar fields (mostly matching the build state), + * and then a data array that stores the key first, and then the TID list. + * + * For by-reference data types, we store the actual data. For by-val types + * we simply copy the whole Datum, so that we don't have to care about stuff + * like endianess etc. We could make it a little bit smaller, but it's not + * worth it - it's a tiny fraction of the data, and we need to MAXALIGN the + * start of the TID list anyway. So we wouldn't save anything. + */ +static GinTuple * +_gin_build_tuple(OffsetNumber attrnum, unsigned char category, + Datum key, int16 typlen, bool typbyval, + ItemPointerData *items, uint32 nitems, + Size *len) +{ + GinTuple *tuple; + char *ptr; + + Size tuplen; + int keylen; + + /* + * Calculate how long is the key value. Only keys with GIN_CAT_NORM_KEY + * have actual non-empty key. We include varlena headers and \0 bytes for + * strings, to make it easier to access the data in-line. + * + * For byval types we simply copy the whole Datum. We could store just the + * necessary bytes, but this is simpler to work with and not worth the + * extra complexity. Moreover we still need to do the MAXALIGN to allow + * direct access to items pointers. + * + * XXX Note that for byval types we store the whole datum, no matter what + * the typlen value is. + */ + if (category != GIN_CAT_NORM_KEY) + keylen = 0; + else if (typbyval) + keylen = sizeof(Datum); + else if (typlen > 0) + keylen = typlen; + else if (typlen == -1) + keylen = VARSIZE_ANY(key); + else if (typlen == -2) + keylen = strlen(DatumGetPointer(key)) + 1; + else + elog(ERROR, "unexpected typlen value (%d)", typlen); + + /* + * Determine GIN tuple length with all the data included. Be careful about + * alignment, to allow direct access to item pointers. + */ + tuplen = SHORTALIGN(offsetof(GinTuple, data) + keylen) + + (sizeof(ItemPointerData) * nitems); + + *len = tuplen; + + /* + * Allocate space for the whole GIN tuple. + * + * The palloc0 is needed - writetup_index_gin will write the whole tuple + * to disk, so we need to make sure the padding bytes are defined + * (otherwise valgrind would report this). + */ + tuple = palloc0(tuplen); + + tuple->tuplen = tuplen; + tuple->attrnum = attrnum; + tuple->category = category; + tuple->keylen = keylen; + tuple->nitems = nitems; + + /* key type info */ + tuple->typlen = typlen; + tuple->typbyval = typbyval; + + /* + * Copy the key and items into the tuple. First the key value, which we + * can simply copy right at the beginning of the data array. + */ + if (category == GIN_CAT_NORM_KEY) + { + if (typbyval) + { + memcpy(tuple->data, &key, sizeof(Datum)); + } + else if (typlen > 0) /* byref, fixed length */ + { + memcpy(tuple->data, DatumGetPointer(key), typlen); + } + else if (typlen == -1) + { + memcpy(tuple->data, DatumGetPointer(key), keylen); + } + else if (typlen == -2) + { + memcpy(tuple->data, DatumGetPointer(key), keylen); + } + } + + /* finally, copy the TIDs into the array */ + ptr = (char *) tuple + SHORTALIGN(offsetof(GinTuple, data) + keylen); + + memcpy(ptr, items, sizeof(ItemPointerData) * nitems); + + return tuple; +} + +/* + * _gin_parse_tuple + * Deserialize the tuple from the tuplestore representation. + * + * Most of the fields are actually directly accessible, the only thing that + * needs more care is the key and the TID list. + * + * For the key, this returns a regular Datum representing it. It's either the + * actual key value, or a pointer to the beginning of the data array (which is + * where the data was copied by _gin_build_tuple). + * + * The pointer to the TID list is returned through 'items' (which is simply + * a pointer to the data array). + */ +static Datum +_gin_parse_tuple(GinTuple *a, ItemPointerData **items) +{ + Datum key; + + if (items) + { + char *ptr = (char *) a + SHORTALIGN(offsetof(GinTuple, data) + a->keylen); + + *items = (ItemPointerData *) ptr; + } + + if (a->category != GIN_CAT_NORM_KEY) + return (Datum) 0; + + if (a->typbyval) + { + memcpy(&key, a->data, a->keylen); + return key; + } + + return PointerGetDatum(a->data); +} + +/* + * _gin_compare_tuples + * Compare GIN tuples, used by tuplesort during parallel index build. + * + * The scalar fields (attrnum, category) are compared first, the key value is + * compared last. The comparisons are done using type-specific sort support + * functions. + * + * If the key value matches, we compare the first TID value in the TID list, + * which means the tuples are merged in an order in which they are most + * likely to be simply concatenated. (This "first" TID will also allow us + * to determine a point up to which the list is fully determined and can be + * written into the index to enforce a memory limit etc.) + */ +int +_gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup) +{ + int r; + Datum keya, + keyb; + + if (a->attrnum < b->attrnum) + return -1; + + if (a->attrnum > b->attrnum) + return 1; + + if (a->category < b->category) + return -1; + + if (a->category > b->category) + return 1; + + if (a->category == GIN_CAT_NORM_KEY) + { + keya = _gin_parse_tuple(a, NULL); + keyb = _gin_parse_tuple(b, NULL); + + r = ApplySortComparator(keya, false, + keyb, false, + &ssup[a->attrnum - 1]); + + /* if the key is the same, consider the first TID in the array */ + return (r != 0) ? r : ItemPointerCompare(GinTupleGetFirst(a), + GinTupleGetFirst(b)); + } + + return ItemPointerCompare(GinTupleGetFirst(a), + GinTupleGetFirst(b)); +} |