diff options
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r-- | src/backend/executor/nodeGatherMerge.c | 687 |
1 files changed, 687 insertions, 0 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c new file mode 100644 index 00000000000..62a6b1866dc --- /dev/null +++ b/src/backend/executor/nodeGatherMerge.c @@ -0,0 +1,687 @@ +/*------------------------------------------------------------------------- + * + * nodeGatherMerge.c + * Scan a plan in multiple workers, and do order-preserving merge. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeGatherMerge.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relscan.h" +#include "access/xact.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodeGatherMerge.h" +#include "executor/nodeSubplan.h" +#include "executor/tqueue.h" +#include "lib/binaryheap.h" +#include "miscadmin.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +/* + * Tuple array for each worker + */ +typedef struct GMReaderTupleBuffer +{ + HeapTuple *tuple; + int readCounter; + int nTuples; + bool done; +} GMReaderTupleBuffer; + +/* + * When we read tuples from workers, it's a good idea to read several at once + * for efficiency when possible: this minimizes context-switching overhead. + * But reading too many at a time wastes memory without improving performance. + */ +#define MAX_TUPLE_STORE 10 + +static int32 heap_compare_slots(Datum a, Datum b, void *arg); +static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); +static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, + bool nowait, bool *done); +static void gather_merge_init(GatherMergeState *gm_state); +static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); +static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, + bool nowait); +static void form_tuple_array(GatherMergeState *gm_state, int reader); + +/* ---------------------------------------------------------------- + * ExecInitGather + * ---------------------------------------------------------------- + */ +GatherMergeState * +ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) +{ + GatherMergeState *gm_state; + Plan *outerNode; + bool hasoid; + TupleDesc tupDesc; + + /* Gather merge node doesn't have innerPlan node. */ + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + gm_state = makeNode(GatherMergeState); + gm_state->ps.plan = (Plan *) node; + gm_state->ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &gm_state->ps); + + /* + * initialize child expressions + */ + gm_state->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) gm_state); + gm_state->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) gm_state); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &gm_state->ps); + + /* + * now initialize outer plan + */ + outerNode = outerPlan(node); + outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags); + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&gm_state->ps); + ExecAssignProjectionInfo(&gm_state->ps, NULL); + + gm_state->gm_initialized = false; + + /* + * initialize sort-key information + */ + if (node->numCols) + { + int i; + + gm_state->gm_nkeys = node->numCols; + gm_state->gm_sortkeys = + palloc0(sizeof(SortSupportData) * node->numCols); + + for (i = 0; i < node->numCols; i++) + { + SortSupport sortKey = gm_state->gm_sortkeys + i; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = node->collations[i]; + sortKey->ssup_nulls_first = node->nullsFirst[i]; + sortKey->ssup_attno = node->sortColIdx[i]; + + /* + * We don't perform abbreviated key conversion here, for the same + * reasons that it isn't used in MergeAppend + */ + sortKey->abbreviate = false; + + PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); + } + } + + /* + * store the tuple descriptor into gather merge state, so we can use it + * later while initializing the gather merge slots. + */ + if (!ExecContextForcesOids(&gm_state->ps, &hasoid)) + hasoid = false; + tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); + gm_state->tupDesc = tupDesc; + + return gm_state; +} + +/* ---------------------------------------------------------------- + * ExecGatherMerge(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecGatherMerge(GatherMergeState *node) +{ + TupleTableSlot *slot; + ExprContext *econtext; + int i; + + /* + * As with Gather, we don't launch workers until this node is actually + * executed. + */ + if (!node->initialized) + { + EState *estate = node->ps.state; + GatherMerge *gm = (GatherMerge *) node->ps.plan; + + /* + * Sometimes we might have to run without parallelism; but if parallel + * mode is active then we can try to fire up some workers. + */ + if (gm->num_workers > 0 && IsInParallelMode()) + { + ParallelContext *pcxt; + + /* Initialize data structures for workers. */ + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gm->num_workers); + + /* Try to launch workers. */ + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); + node->nworkers_launched = pcxt->nworkers_launched; + + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers_launched > 0) + { + node->nreaders = 0; + node->reader = palloc(pcxt->nworkers_launched * + sizeof(TupleQueueReader *)); + + Assert(gm->numCols); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + shm_mq_set_handle(node->pei->tqueue[i], + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders++] = + CreateTupleQueueReader(node->pei->tqueue[i], + node->tupDesc); + } + } + else + { + /* No workers? Then never mind. */ + ExecShutdownGatherMergeWorkers(node); + } + } + + /* always allow leader to participate */ + node->need_to_scan_locally = true; + node->initialized = true; + } + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous tuple cycle. + */ + econtext = node->ps.ps_ExprContext; + ResetExprContext(econtext); + + /* + * Get next tuple, either from one of our workers, or by running the + * plan ourselves. + */ + slot = gather_merge_getnext(node); + if (TupIsNull(slot)) + return NULL; + + /* + * form the result tuple using ExecProject(), and return it --- unless + * the projection produces an empty set, in which case we must loop + * back around for another tuple + */ + econtext->ecxt_outertuple = slot; + return ExecProject(node->ps.ps_ProjInfo); +} + +/* ---------------------------------------------------------------- + * ExecEndGatherMerge + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndGatherMerge(GatherMergeState *node) +{ + ExecEndNode(outerPlanState(node)); /* let children clean up first */ + ExecShutdownGatherMerge(node); + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMerge + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGatherMerge(GatherMergeState *node) +{ + ExecShutdownGatherMergeWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { + ExecParallelCleanup(node->pei); + node->pei = NULL; + } +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMergeWorkers + * + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. + * ---------------------------------------------------------------- + */ +static void +ExecShutdownGatherMergeWorkers(GatherMergeState *node) +{ + /* Shut down tuple queue readers before shutting down workers. */ + if (node->reader != NULL) + { + int i; + + for (i = 0; i < node->nreaders; ++i) + if (node->reader[i]) + DestroyTupleQueueReader(node->reader[i]); + + pfree(node->reader); + node->reader = NULL; + } + + /* Now shut down the workers. */ + if (node->pei != NULL) + ExecParallelFinish(node->pei); +} + +/* ---------------------------------------------------------------- + * ExecReScanGatherMerge + * + * Re-initialize the workers and rescans a relation via them. + * ---------------------------------------------------------------- + */ +void +ExecReScanGatherMerge(GatherMergeState *node) +{ + /* + * Re-initialize the parallel workers to perform rescan of relation. We + * want to gracefully shutdown all the workers so that they should be able + * to propagate any error or other information to master backend before + * dying. Parallel context will be reused for rescan. + */ + ExecShutdownGatherMergeWorkers(node); + + node->initialized = false; + + if (node->pei) + ExecParallelReinitialize(node->pei); + + ExecReScan(node->ps.lefttree); +} + +/* + * Initialize the Gather merge tuple read. + * + * Pull at least a single tuple from each worker + leader and set up the heap. + */ +static void +gather_merge_init(GatherMergeState *gm_state) +{ + int nreaders = gm_state->nreaders; + bool initialize = true; + int i; + + /* + * Allocate gm_slots for the number of worker + one more slot for leader. + * Last slot is always for leader. Leader always calls ExecProcNode() to + * read the tuple which will return the TupleTableSlot. Later it will + * directly get assigned to gm_slot. So just initialize leader gm_slot + * with NULL. For other slots below code will call + * ExecInitExtraTupleSlot() which will do the initialization of worker + * slots. + */ + gm_state->gm_slots = + palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); + gm_state->gm_slots[gm_state->nreaders] = NULL; + + /* Initialize the tuple slot and tuple array for each worker */ + gm_state->gm_tuple_buffers = + (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * + (gm_state->nreaders + 1)); + for (i = 0; i < gm_state->nreaders; i++) + { + /* Allocate the tuple array with MAX_TUPLE_STORE size */ + gm_state->gm_tuple_buffers[i].tuple = + (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + + /* Initialize slot for worker */ + gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state); + ExecSetSlotDescriptor(gm_state->gm_slots[i], + gm_state->tupDesc); + } + + /* Allocate the resources for the merge */ + gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, + heap_compare_slots, + gm_state); + + /* + * First, try to read a tuple from each worker (including leader) in + * nowait mode, so that we initialize read from each worker as well as + * leader. After this, if all active workers are unable to produce a + * tuple, then re-read and this time use wait mode. For workers that were + * able to produce a tuple in the earlier loop and are still active, just + * try to fill the tuple array if more tuples are avaiable. + */ +reread: + for (i = 0; i < nreaders + 1; i++) + { + if (!gm_state->gm_tuple_buffers[i].done && + (TupIsNull(gm_state->gm_slots[i]) || + gm_state->gm_slots[i]->tts_isempty)) + { + if (gather_merge_readnext(gm_state, i, initialize)) + { + binaryheap_add_unordered(gm_state->gm_heap, + Int32GetDatum(i)); + } + } + else + form_tuple_array(gm_state, i); + } + initialize = false; + + for (i = 0; i < nreaders; i++) + if (!gm_state->gm_tuple_buffers[i].done && + (TupIsNull(gm_state->gm_slots[i]) || + gm_state->gm_slots[i]->tts_isempty)) + goto reread; + + binaryheap_build(gm_state->gm_heap); + gm_state->gm_initialized = true; +} + +/* + * Clear out a slot in the tuple table for each gather merge + * slot and return the clear cleared slot. + */ +static TupleTableSlot * +gather_merge_clear_slots(GatherMergeState *gm_state) +{ + int i; + + for (i = 0; i < gm_state->nreaders; i++) + { + pfree(gm_state->gm_tuple_buffers[i].tuple); + gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); + } + + /* Free tuple array as we don't need it any more */ + pfree(gm_state->gm_tuple_buffers); + /* Free the binaryheap, which was created for sort */ + binaryheap_free(gm_state->gm_heap); + + /* return any clear slot */ + return gm_state->gm_slots[0]; +} + +/* + * Read the next tuple for gather merge. + * + * Fetch the sorted tuple out of the heap. + */ +static TupleTableSlot * +gather_merge_getnext(GatherMergeState *gm_state) +{ + int i; + + /* + * First time through: pull the first tuple from each participate, and set + * up the heap. + */ + if (gm_state->gm_initialized == false) + gather_merge_init(gm_state); + else + { + /* + * Otherwise, pull the next tuple from whichever participant we + * returned from last time, and reinsert the index into the heap, + * because it might now compare differently against the existing + * elements of the heap. + */ + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + + if (gather_merge_readnext(gm_state, i, false)) + binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); + else + (void) binaryheap_remove_first(gm_state->gm_heap); + } + + if (binaryheap_empty(gm_state->gm_heap)) + { + /* All the queues are exhausted, and so is the heap */ + return gather_merge_clear_slots(gm_state); + } + else + { + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + return gm_state->gm_slots[i]; + } + + return gather_merge_clear_slots(gm_state); +} + +/* + * Read the tuple for given reader in nowait mode, and form the tuple array. + */ +static void +form_tuple_array(GatherMergeState *gm_state, int reader) +{ + GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + int i; + + /* Last slot is for leader and we don't build tuple array for leader */ + if (reader == gm_state->nreaders) + return; + + /* + * We here because we already read all the tuples from the tuple array, so + * initialize the counter to zero. + */ + if (tuple_buffer->nTuples == tuple_buffer->readCounter) + tuple_buffer->nTuples = tuple_buffer->readCounter = 0; + + /* Tuple array is already full? */ + if (tuple_buffer->nTuples == MAX_TUPLE_STORE) + return; + + for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) + { + tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state, + reader, + false, + &tuple_buffer->done)); + if (!HeapTupleIsValid(tuple_buffer->tuple[i])) + break; + tuple_buffer->nTuples++; + } +} + +/* + * Store the next tuple for a given reader into the appropriate slot. + * + * Returns false if the reader is exhausted, and true otherwise. + */ +static bool +gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) +{ + GMReaderTupleBuffer *tuple_buffer; + HeapTuple tup = NULL; + + /* + * If we're being asked to generate a tuple from the leader, then we + * just call ExecProcNode as normal to produce one. + */ + if (gm_state->nreaders == reader) + { + if (gm_state->need_to_scan_locally) + { + PlanState *outerPlan = outerPlanState(gm_state); + TupleTableSlot *outerTupleSlot; + + outerTupleSlot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerTupleSlot)) + { + gm_state->gm_slots[reader] = outerTupleSlot; + return true; + } + gm_state->gm_tuple_buffers[reader].done = true; + gm_state->need_to_scan_locally = false; + } + return false; + } + + /* Otherwise, check the state of the relevant tuple buffer. */ + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + + if (tuple_buffer->nTuples > tuple_buffer->readCounter) + { + /* Return any tuple previously read that is still buffered. */ + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; + } + else if (tuple_buffer->done) + { + /* Reader is known to be exhausted. */ + DestroyTupleQueueReader(gm_state->reader[reader]); + gm_state->reader[reader] = NULL; + return false; + } + else + { + /* Read and buffer next tuple. */ + tup = heap_copytuple(gm_readnext_tuple(gm_state, + reader, + nowait, + &tuple_buffer->done)); + + /* + * Attempt to read more tuples in nowait mode and store them in + * the tuple array. + */ + if (HeapTupleIsValid(tup)) + form_tuple_array(gm_state, reader); + else + return false; + } + + Assert(HeapTupleIsValid(tup)); + + /* Build the TupleTableSlot for the given tuple */ + ExecStoreTuple(tup, /* tuple to store */ + gm_state->gm_slots[reader], /* slot in which to store the + * tuple */ + InvalidBuffer, /* buffer associated with this tuple */ + true); /* pfree this pointer if not from heap */ + + return true; +} + +/* + * Attempt to read a tuple from given reader. + */ +static HeapTuple +gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, + bool *done) +{ + TupleQueueReader *reader; + HeapTuple tup = NULL; + MemoryContext oldContext; + MemoryContext tupleContext; + + tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; + + if (done != NULL) + *done = false; + + /* Check for async events, particularly messages from workers. */ + CHECK_FOR_INTERRUPTS(); + + /* Attempt to read a tuple. */ + reader = gm_state->reader[nreader]; + + /* Run TupleQueueReaders in per-tuple context */ + oldContext = MemoryContextSwitchTo(tupleContext); + tup = TupleQueueReaderNext(reader, nowait, done); + MemoryContextSwitchTo(oldContext); + + return tup; +} + +/* + * We have one slot for each item in the heap array. We use SlotNumber + * to store slot indexes. This doesn't actually provide any formal + * type-safety, but it makes the code more self-documenting. + */ +typedef int32 SlotNumber; + +/* + * Compare the tuples in the two given slots. + */ +static int32 +heap_compare_slots(Datum a, Datum b, void *arg) +{ + GatherMergeState *node = (GatherMergeState *) arg; + SlotNumber slot1 = DatumGetInt32(a); + SlotNumber slot2 = DatumGetInt32(b); + + TupleTableSlot *s1 = node->gm_slots[slot1]; + TupleTableSlot *s2 = node->gm_slots[slot2]; + int nkey; + + Assert(!TupIsNull(s1)); + Assert(!TupIsNull(s2)); + + for (nkey = 0; nkey < node->gm_nkeys; nkey++) + { + SortSupport sortKey = node->gm_sortkeys + nkey; + AttrNumber attno = sortKey->ssup_attno; + Datum datum1, + datum2; + bool isNull1, + isNull2; + int compare; + + datum1 = slot_getattr(s1, attno, &isNull1); + datum2 = slot_getattr(s2, attno, &isNull2); + + compare = ApplySortComparator(datum1, isNull1, + datum2, isNull2, + sortKey); + if (compare != 0) + return -compare; + } + return 0; +} |