diff options
Diffstat (limited to 'src/backend/executor/nodeMergeAppend.c')
-rw-r--r-- | src/backend/executor/nodeMergeAppend.c | 392 |
1 files changed, 392 insertions, 0 deletions
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c new file mode 100644 index 00000000000..9ded25d8fcf --- /dev/null +++ b/src/backend/executor/nodeMergeAppend.c @@ -0,0 +1,392 @@ +/*------------------------------------------------------------------------- + * + * nodeMergeAppend.c + * routines to handle MergeAppend nodes. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeMergeAppend.c + * + *------------------------------------------------------------------------- + */ +/* INTERFACE ROUTINES + * ExecInitMergeAppend - initialize the MergeAppend node + * ExecMergeAppend - retrieve the next tuple from the node + * ExecEndMergeAppend - shut down the MergeAppend node + * ExecReScanMergeAppend - rescan the MergeAppend node + * + * NOTES + * A MergeAppend node contains a list of one or more subplans. + * These are each expected to deliver tuples that are sorted according + * to a common sort key. The MergeAppend node merges these streams + * to produce output sorted the same way. + * + * MergeAppend nodes don't make use of their left and right + * subtrees, rather they maintain a list of subplans so + * a typical MergeAppend node looks like this in the plan tree: + * + * ... + * / + * MergeAppend---+------+------+--- nil + * / \ | | | + * nil nil ... ... ... + * subplans + */ + +#include "postgres.h" + +#include "access/nbtree.h" +#include "executor/execdebug.h" +#include "executor/nodeMergeAppend.h" +#include "utils/lsyscache.h" + +/* + * It gets quite confusing having a heap array (indexed by integers) which + * contains integers which index into the slots array. These typedefs try to + * clear it up, but they're only documentation. + */ +typedef int SlotNumber; +typedef int HeapPosition; + +static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot); +static void heap_siftup_slot(MergeAppendState *node); +static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2); + + +/* ---------------------------------------------------------------- + * ExecInitMergeAppend + * + * Begin all of the subscans of the MergeAppend node. + * ---------------------------------------------------------------- + */ +MergeAppendState * +ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) +{ + MergeAppendState *mergestate = makeNode(MergeAppendState); + PlanState **mergeplanstates; + int nplans; + int i; + ListCell *lc; + + /* check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); + + /* + * Set up empty vector of subplan states + */ + nplans = list_length(node->mergeplans); + + mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *)); + + /* + * create new MergeAppendState for our node + */ + mergestate->ps.plan = (Plan *) node; + mergestate->ps.state = estate; + mergestate->mergeplans = mergeplanstates; + mergestate->ms_nplans = nplans; + + mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); + mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans); + + /* + * Miscellaneous initialization + * + * MergeAppend plans don't have expression contexts because they never + * call ExecQual or ExecProject. + */ + + /* + * MergeAppend nodes do have Result slots, which hold pointers to tuples, + * so we have to initialize them. + */ + ExecInitResultTupleSlot(estate, &mergestate->ps); + + /* + * call ExecInitNode on each of the plans to be executed and save the + * results into the array "mergeplans". + */ + i = 0; + foreach(lc, node->mergeplans) + { + Plan *initNode = (Plan *) lfirst(lc); + + mergeplanstates[i] = ExecInitNode(initNode, estate, eflags); + i++; + } + + /* + * initialize output tuple type + */ + ExecAssignResultTypeFromTL(&mergestate->ps); + mergestate->ps.ps_ProjInfo = NULL; + + /* + * initialize sort-key information + */ + mergestate->ms_nkeys = node->numCols; + mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) * node->numCols); + + for (i = 0; i < node->numCols; i++) + { + Oid sortFunction; + bool reverse; + + if (!get_compare_function_for_ordering_op(node->sortOperators[i], + &sortFunction, &reverse)) + elog(ERROR, "operator %u is not a valid ordering operator", + node->sortOperators[i]); + + /* + * We needn't fill in sk_strategy or sk_subtype since these scankeys + * will never be passed to an index. + */ + ScanKeyInit(&mergestate->ms_scankeys[i], + node->sortColIdx[i], + InvalidStrategy, + sortFunction, + (Datum) 0); + + /* However, we use btree's conventions for encoding directionality */ + if (reverse) + mergestate->ms_scankeys[i].sk_flags |= SK_BT_DESC; + if (node->nullsFirst[i]) + mergestate->ms_scankeys[i].sk_flags |= SK_BT_NULLS_FIRST; + } + + /* + * initialize to show we have not run the subplans yet + */ + mergestate->ms_heap_size = 0; + mergestate->ms_initialized = false; + mergestate->ms_last_slot = -1; + + return mergestate; +} + +/* ---------------------------------------------------------------- + * ExecMergeAppend + * + * Handles iteration over multiple subplans. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecMergeAppend(MergeAppendState *node) +{ + TupleTableSlot *result; + SlotNumber i; + + if (!node->ms_initialized) + { + /* + * First time through: pull the first tuple from each subplan, + * and set up the heap. + */ + for (i = 0; i < node->ms_nplans; i++) + { + node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + if (!TupIsNull(node->ms_slots[i])) + heap_insert_slot(node, i); + } + node->ms_initialized = true; + } + else + { + /* + * Otherwise, pull the next tuple from whichever subplan we returned + * from last time, and insert it into the heap. (We could simplify + * the logic a bit by doing this before returning from the prior call, + * but it's better to not pull tuples until necessary.) + */ + i = node->ms_last_slot; + node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + if (!TupIsNull(node->ms_slots[i])) + heap_insert_slot(node, i); + } + + if (node->ms_heap_size > 0) + { + /* Return the topmost heap node, and sift up the remaining nodes */ + i = node->ms_heap[0]; + result = node->ms_slots[i]; + node->ms_last_slot = i; + heap_siftup_slot(node); + } + else + { + /* All the subplans are exhausted, and so is the heap */ + result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + + return result; +} + +/* + * Insert a new slot into the heap. The slot must contain a valid tuple. + */ +static void +heap_insert_slot(MergeAppendState *node, SlotNumber new_slot) +{ + SlotNumber *heap = node->ms_heap; + HeapPosition j; + + Assert(!TupIsNull(node->ms_slots[new_slot])); + + j = node->ms_heap_size++; /* j is where the "hole" is */ + while (j > 0) + { + int i = (j-1)/2; + + if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0) + break; + heap[j] = heap[i]; + j = i; + } + heap[j] = new_slot; +} + +/* + * Delete the heap top (the slot in heap[0]), and sift up. + */ +static void +heap_siftup_slot(MergeAppendState *node) +{ + SlotNumber *heap = node->ms_heap; + HeapPosition i, + n; + + if (--node->ms_heap_size <= 0) + return; + n = node->ms_heap_size; /* heap[n] needs to be reinserted */ + i = 0; /* i is where the "hole" is */ + for (;;) + { + int j = 2 * i + 1; + + if (j >= n) + break; + if (j+1 < n && heap_compare_slots(node, heap[j], heap[j+1]) > 0) + j++; + if (heap_compare_slots(node, heap[n], heap[j]) <= 0) + break; + heap[i] = heap[j]; + i = j; + } + heap[i] = heap[n]; +} + +/* + * Compare the tuples in the two given slots. + */ +static int32 +heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2) +{ + TupleTableSlot *s1 = node->ms_slots[slot1]; + TupleTableSlot *s2 = node->ms_slots[slot2]; + int nkey; + + Assert(!TupIsNull(s1)); + Assert(!TupIsNull(s2)); + + for (nkey = 0; nkey < node->ms_nkeys; nkey++) + { + ScanKey scankey = node->ms_scankeys + nkey; + AttrNumber attno = scankey->sk_attno; + Datum datum1, + datum2; + bool isNull1, + isNull2; + int32 compare; + + datum1 = slot_getattr(s1, attno, &isNull1); + datum2 = slot_getattr(s2, attno, &isNull2); + + if (isNull1) + { + if (isNull2) + continue; /* NULL "=" NULL */ + else if (scankey->sk_flags & SK_BT_NULLS_FIRST) + return -1; /* NULL "<" NOT_NULL */ + else + return 1; /* NULL ">" NOT_NULL */ + } + else if (isNull2) + { + if (scankey->sk_flags & SK_BT_NULLS_FIRST) + return 1; /* NOT_NULL ">" NULL */ + else + return -1; /* NOT_NULL "<" NULL */ + } + else + { + compare = DatumGetInt32(FunctionCall2(&scankey->sk_func, + datum1, datum2)); + if (compare != 0) + { + if (scankey->sk_flags & SK_BT_DESC) + compare = -compare; + return compare; + } + } + } + return 0; +} + +/* ---------------------------------------------------------------- + * ExecEndMergeAppend + * + * Shuts down the subscans of the MergeAppend node. + * + * Returns nothing of interest. + * ---------------------------------------------------------------- + */ +void +ExecEndMergeAppend(MergeAppendState *node) +{ + PlanState **mergeplans; + int nplans; + int i; + + /* + * get information from the node + */ + mergeplans = node->mergeplans; + nplans = node->ms_nplans; + + /* + * shut down each of the subscans + */ + for (i = 0; i < nplans; i++) + ExecEndNode(mergeplans[i]); +} + +void +ExecReScanMergeAppend(MergeAppendState *node) +{ + int i; + + for (i = 0; i < node->ms_nplans; i++) + { + PlanState *subnode = node->mergeplans[i]; + + /* + * ExecReScan doesn't know about my subplans, so I have to do + * changed-parameter signaling myself. + */ + if (node->ps.chgParam != NULL) + UpdateChangedParamSet(subnode, node->ps.chgParam); + + /* + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (subnode->chgParam == NULL) + ExecReScan(subnode); + } + node->ms_heap_size = 0; + node->ms_initialized = false; + node->ms_last_slot = -1; +} |