aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeWindowAgg.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2008-12-28 18:54:01 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2008-12-28 18:54:01 +0000
commit95b07bc7f5010233f52f9d11da74e2e5b653b0a7 (patch)
tree48f5858bf4eca1bfb316ef02bb959ca85f568e0a /src/backend/executor/nodeWindowAgg.c
parent38e9348282e9d078487147ba8a85aebec54e3a08 (diff)
downloadpostgresql-95b07bc7f5010233f52f9d11da74e2e5b653b0a7.tar.gz
postgresql-95b07bc7f5010233f52f9d11da74e2e5b653b0a7.zip
Support window functions a la SQL:2008.
Hitoshi Harada, with some kibitzing from Heikki and Tom.
Diffstat (limited to 'src/backend/executor/nodeWindowAgg.c')
-rw-r--r--src/backend/executor/nodeWindowAgg.c1854
1 files changed, 1854 insertions, 0 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
new file mode 100644
index 00000000000..37ef9a5e830
--- /dev/null
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -0,0 +1,1854 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeWindowAgg.c
+ * routines to handle WindowAgg nodes.
+ *
+ * A WindowAgg node evaluates "window functions" across suitable partitions
+ * of the input tuple set. Any one WindowAgg works for just a single window
+ * specification, though it can evaluate multiple window functions sharing
+ * identical window specifications. The input tuples are required to be
+ * delivered in sorted order, with the PARTITION BY columns (if any) as
+ * major sort keys and the ORDER BY columns (if any) as minor sort keys.
+ * (The planner generates a stack of WindowAggs with intervening Sort nodes
+ * as needed, if a query involves more than one window specification.)
+ *
+ * Since window functions can require access to any or all of the rows in
+ * the current partition, we accumulate rows of the partition into a
+ * tuplestore. The window functions are called using the WindowObject API
+ * so that they can access those rows as needed.
+ *
+ * We also support using plain aggregate functions as window functions.
+ * For these, the regular Agg-node environment is emulated for each partition.
+ * As required by the SQL spec, the output represents the value of the
+ * aggregate function over all rows in the current row's window frame.
+ *
+ *
+ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.1 2008/12/28 18:53:55 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_aggregate.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+#include "executor/executor.h"
+#include "executor/nodeWindowAgg.h"
+#include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/clauses.h"
+#include "parser/parse_agg.h"
+#include "parser/parse_coerce.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "windowapi.h"
+
+/*
+ * All the window function APIs are called with this object, which is passed
+ * to window functions as fcinfo->context.
+ */
+typedef struct WindowObjectData
+{
+ NodeTag type;
+ WindowAggState *winstate; /* parent WindowAggState */
+ List *argstates; /* ExprState trees for fn's arguments */
+ void *localmem; /* WinGetPartitionLocalMemory's chunk */
+ int markptr; /* tuplestore mark pointer for this fn */
+ int readptr; /* tuplestore read pointer for this fn */
+ int64 markpos; /* row that markptr is positioned on */
+ int64 seekpos; /* row that readptr is positioned on */
+} WindowObjectData;
+
+/*
+ * We have one WindowStatePerFunc struct for each window function and
+ * window aggregate handled by this node.
+ */
+typedef struct WindowStatePerFuncData
+{
+ /* Links to WindowFunc expr and state nodes this working state is for */
+ WindowFuncExprState *wfuncstate;
+ WindowFunc *wfunc;
+
+ int numArguments; /* number of arguments */
+
+ FmgrInfo flinfo; /* fmgr lookup data for window function */
+
+ /*
+ * We need the len and byval info for the result of each function
+ * in order to know how to copy/delete values.
+ */
+ int16 resulttypeLen;
+ bool resulttypeByVal;
+
+ bool plain_agg; /* is it just a plain aggregate function? */
+ int aggno; /* if so, index of its PerAggData */
+
+ WindowObject winobj; /* object used in window function API */
+} WindowStatePerFuncData;
+
+/*
+ * For plain aggregate window functions, we also have one of these.
+ */
+typedef struct WindowStatePerAggData
+{
+ /* Oids of transfer functions */
+ Oid transfn_oid;
+ Oid finalfn_oid; /* may be InvalidOid */
+
+ /*
+ * fmgr lookup data for transfer functions --- only valid when
+ * corresponding oid is not InvalidOid. Note in particular that fn_strict
+ * flags are kept here.
+ */
+ FmgrInfo transfn;
+ FmgrInfo finalfn;
+
+ /*
+ * initial value from pg_aggregate entry
+ */
+ Datum initValue;
+ bool initValueIsNull;
+
+ /*
+ * cached value for non-moving frame
+ */
+ Datum resultValue;
+ bool resultValueIsNull;
+ bool hasResult;
+
+ /*
+ * We need the len and byval info for the agg's input, result, and
+ * transition data types in order to know how to copy/delete values.
+ */
+ int16 inputtypeLen,
+ resulttypeLen,
+ transtypeLen;
+ bool inputtypeByVal,
+ resulttypeByVal,
+ transtypeByVal;
+
+ int wfuncno; /* index of associated PerFuncData */
+
+ /* Current transition value */
+ Datum transValue; /* current transition value */
+ bool transValueIsNull;
+
+ bool noTransValue; /* true if transValue not set yet */
+} WindowStatePerAggData;
+
+static void initialize_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
+static void advance_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
+static void finalize_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate,
+ Datum *result, bool *isnull);
+
+static void eval_windowaggregates(WindowAggState *winstate);
+static void eval_windowfunction(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ Datum *result, bool *isnull);
+
+static void begin_partition(WindowAggState *winstate);
+static void spool_tuples(WindowAggState *winstate, int64 pos);
+static void release_partition(WindowAggState *winstate);
+
+static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
+ WindowFunc *wfunc,
+ WindowStatePerAgg peraggstate);
+static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
+
+static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
+ TupleTableSlot *slot2);
+static bool window_gettupleslot(WindowObject winobj, int64 pos,
+ TupleTableSlot *slot);
+
+
+/*
+ * initialize_windowaggregate
+ * parallel to initialize_aggregate in nodeAgg.c
+ */
+static void
+initialize_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ MemoryContext oldContext;
+
+ if (peraggstate->initValueIsNull)
+ peraggstate->transValue = peraggstate->initValue;
+ else
+ {
+ oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ peraggstate->transValue = datumCopy(peraggstate->initValue,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ MemoryContextSwitchTo(oldContext);
+ }
+ peraggstate->transValueIsNull = peraggstate->initValueIsNull;
+ peraggstate->noTransValue = peraggstate->initValueIsNull;
+}
+
+/*
+ * advance_windowaggregate
+ * parallel to advance_aggregate in nodeAgg.c
+ */
+static void
+advance_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
+ int numArguments = perfuncstate->numArguments;
+ FunctionCallInfoData fcinfodata;
+ FunctionCallInfo fcinfo = &fcinfodata;
+ Datum newVal;
+ ListCell *arg;
+ int i;
+ MemoryContext oldContext;
+ ExprContext *econtext = winstate->tmpcontext;
+
+ oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+ /* We start from 1, since the 0th arg will be the transition value */
+ i = 1;
+ foreach(arg, wfuncstate->args)
+ {
+ ExprState *argstate = (ExprState *) lfirst(arg);
+
+ fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
+ &fcinfo->argnull[i], NULL);
+ i++;
+ }
+
+ if (peraggstate->transfn.fn_strict)
+ {
+ /*
+ * For a strict transfn, nothing happens when there's a NULL input; we
+ * just keep the prior transValue.
+ */
+ for (i = 1; i <= numArguments; i++)
+ {
+ if (fcinfo->argnull[i])
+ {
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ }
+ if (peraggstate->noTransValue)
+ {
+ /*
+ * transValue has not been initialized. This is the first non-NULL
+ * input value. We use it as the initial value for transValue. (We
+ * already checked that the agg's input type is binary-compatible
+ * with its transtype, so straight copy here is OK.)
+ *
+ * We must copy the datum into wincontext if it is pass-by-ref. We
+ * do not need to pfree the old transValue, since it's NULL.
+ */
+ MemoryContextSwitchTo(winstate->wincontext);
+ peraggstate->transValue = datumCopy(fcinfo->arg[1],
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ peraggstate->transValueIsNull = false;
+ peraggstate->noTransValue = false;
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ if (peraggstate->transValueIsNull)
+ {
+ /*
+ * Don't call a strict function with NULL inputs. Note it is
+ * possible to get here despite the above tests, if the transfn is
+ * strict *and* returned a NULL on a prior cycle. If that happens
+ * we will propagate the NULL all the way to the end.
+ */
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ }
+
+ /*
+ * OK to call the transition function
+ */
+ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
+ numArguments + 1,
+ (void *) winstate, NULL);
+ fcinfo->arg[0] = peraggstate->transValue;
+ fcinfo->argnull[0] = peraggstate->transValueIsNull;
+ newVal = FunctionCallInvoke(fcinfo);
+
+ /*
+ * If pass-by-ref datatype, must copy the new value into wincontext and
+ * pfree the prior transValue. But if transfn returned a pointer to its
+ * first input, we don't need to do anything.
+ */
+ if (!peraggstate->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
+ {
+ if (!fcinfo->isnull)
+ {
+ MemoryContextSwitchTo(winstate->wincontext);
+ newVal = datumCopy(newVal,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ }
+ if (!peraggstate->transValueIsNull)
+ pfree(DatumGetPointer(peraggstate->transValue));
+ }
+
+ MemoryContextSwitchTo(oldContext);
+ peraggstate->transValue = newVal;
+ peraggstate->transValueIsNull = fcinfo->isnull;
+}
+
+/*
+ * finalize_windowaggregate
+ * parallel to finalize_aggregate in nodeAgg.c
+ */
+static void
+finalize_windowaggregate(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate,
+ Datum *result, bool *isnull)
+{
+ MemoryContext oldContext;
+
+ oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+
+ /*
+ * Apply the agg's finalfn if one is provided, else return transValue.
+ */
+ if (OidIsValid(peraggstate->finalfn_oid))
+ {
+ FunctionCallInfoData fcinfo;
+
+ InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
+ (void *) winstate, NULL);
+ fcinfo.arg[0] = peraggstate->transValue;
+ fcinfo.argnull[0] = peraggstate->transValueIsNull;
+ if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
+ {
+ /* don't call a strict function with NULL inputs */
+ *result = (Datum) 0;
+ *isnull = true;
+ }
+ else
+ {
+ *result = FunctionCallInvoke(&fcinfo);
+ *isnull = fcinfo.isnull;
+ }
+ }
+ else
+ {
+ *result = peraggstate->transValue;
+ *isnull = peraggstate->transValueIsNull;
+ }
+
+ /*
+ * If result is pass-by-ref, make sure it is in the right context.
+ */
+ if (!peraggstate->resulttypeByVal && !*isnull &&
+ !MemoryContextContains(CurrentMemoryContext,
+ DatumGetPointer(*result)))
+ *result = datumCopy(*result,
+ peraggstate->resulttypeByVal,
+ peraggstate->resulttypeLen);
+ MemoryContextSwitchTo(oldContext);
+}
+
+/*
+ * eval_windowaggregates
+ * evaluate plain aggregates being used as window functions
+ *
+ * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be
+ * able to call aggregate final functions repeatedly after aggregating more
+ * data onto the same transition value. This is not a behavior required by
+ * nodeAgg.c.
+ */
+static void
+eval_windowaggregates(WindowAggState *winstate)
+{
+ WindowStatePerAgg peraggstate;
+ int wfuncno, numaggs;
+ int i;
+ MemoryContext oldContext;
+ ExprContext *econtext;
+ TupleTableSlot *first_peer_slot = winstate->first_peer_slot;
+ TupleTableSlot *slot;
+ bool first;
+
+ numaggs = winstate->numaggs;
+ if (numaggs == 0)
+ return; /* nothing to do */
+
+ /* final output execution is in ps_ExprContext */
+ econtext = winstate->ss.ps.ps_ExprContext;
+
+ /*
+ * We don't currently support explicitly-specified window frames. That
+ * means that the window frame always includes all the rows in the
+ * partition preceding and including the current row, and all its
+ * peers. As a special case, if there's no ORDER BY, all rows are peers,
+ * so the window frame includes all rows in the partition.
+ *
+ * When there's peer rows, all rows in a peer group will have the same
+ * aggregate values. The values will be calculated when current position
+ * reaches the first peer row, and on all the following peer rows we will
+ * just return the saved results.
+ *
+ * 'aggregatedupto' keeps track of the last row that has already been
+ * accumulated for the aggregates. When the current row has no peers,
+ * aggregatedupto will be the same as the current row after this
+ * function. If there are peer rows, all peers will be accumulated in one
+ * call of this function, and aggregatedupto will be ahead of the current
+ * position. If there's no ORDER BY, and thus all rows are peers, the
+ * first call will aggregate all rows in the partition.
+ *
+ * TODO: In the future, we could implement sliding frames by recalculating
+ * the aggregate whenever a row exits the frame. That would be pretty
+ * slow, though. For aggregates like SUM and COUNT we could implement a
+ * "negative transition function" that would be called for all the rows
+ * that exit the frame.
+ */
+
+ /*
+ * If we've already aggregated up through current row, reuse the
+ * saved result values
+ */
+ if (winstate->aggregatedupto > winstate->currentpos)
+ {
+ for (i = 0; i < numaggs; i++)
+ {
+ peraggstate = &winstate->peragg[i];
+ wfuncno = peraggstate->wfuncno;
+ econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
+ econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
+ }
+ return;
+ }
+
+ /* Initialize aggregates on first call for partition */
+ for (i = 0; i < numaggs; i++)
+ {
+ peraggstate = &winstate->peragg[i];
+ wfuncno = peraggstate->wfuncno;
+ if (!peraggstate->hasResult)
+ initialize_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ &winstate->peragg[i]);
+ }
+
+ /*
+ * If this is the first call for this partition, fetch the first row
+ * for comparing peer rows. On subsequent calls, we'll always read
+ * ahead until we reach the first non-peer row, and store that row in
+ * first_peer_slot, for use in the next call.
+ */
+ if (TupIsNull(first_peer_slot))
+ {
+ spool_tuples(winstate, winstate->aggregatedupto);
+ tuplestore_select_read_pointer(winstate->buffer, winstate->agg_ptr);
+ if (!tuplestore_gettupleslot(winstate->buffer, true, first_peer_slot))
+ elog(ERROR, "unexpected end of tuplestore");
+ }
+
+ /*
+ * Advance until we reach the next non-peer row
+ */
+ first = true;
+ for (;;)
+ {
+ if (!first)
+ {
+ /* Fetch the next row, and see if it's a peer */
+ spool_tuples(winstate, winstate->aggregatedupto);
+ tuplestore_select_read_pointer(winstate->buffer,
+ winstate->agg_ptr);
+ slot = winstate->temp_slot_1;
+ if (!tuplestore_gettupleslot(winstate->buffer, true, slot))
+ break;
+ if (!are_peers(winstate, first_peer_slot, slot))
+ {
+ ExecCopySlot(first_peer_slot, slot);
+ break;
+ }
+ }
+ else
+ {
+ /*
+ * On first iteration, just accumulate the tuple saved from
+ * last call
+ */
+ slot = first_peer_slot;
+ first = false;
+ }
+
+ /* set tuple context for evaluation of aggregate arguments */
+ winstate->tmpcontext->ecxt_outertuple = slot;
+
+ for (i = 0; i < numaggs; i++)
+ {
+ wfuncno = winstate->peragg[i].wfuncno;
+
+ advance_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ &winstate->peragg[i]);
+
+ }
+ /* Reset per-input-tuple context after each tuple */
+ ResetExprContext(winstate->tmpcontext);
+ winstate->aggregatedupto++;
+ }
+
+ /*
+ * finalize aggregates and fill result/isnull fields.
+ */
+ for (i = 0; i < numaggs; i++)
+ {
+ Datum *result;
+ bool *isnull;
+
+ peraggstate = &winstate->peragg[i];
+ wfuncno = peraggstate->wfuncno;
+ result = &econtext->ecxt_aggvalues[wfuncno];
+ isnull = &econtext->ecxt_aggnulls[wfuncno];
+ finalize_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate, result, isnull);
+
+ /*
+ * save the result for the next (non-shrinking frame) call.
+ */
+ if (!peraggstate->resulttypeByVal && !*isnull)
+ {
+ /*
+ * clear old resultValue in order not to leak memory.
+ */
+ if (peraggstate->hasResult &&
+ (DatumGetPointer(peraggstate->resultValue) !=
+ DatumGetPointer(*result)) &&
+ !peraggstate->resultValueIsNull)
+ pfree(DatumGetPointer(peraggstate->resultValue));
+
+ /*
+ * If pass-by-ref, copy it into our global context.
+ */
+ oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ peraggstate->resultValue = datumCopy(*result,
+ peraggstate->resulttypeByVal,
+ peraggstate->resulttypeLen);
+ MemoryContextSwitchTo(oldContext);
+ }
+ else
+ {
+ peraggstate->resultValue = *result;
+ }
+ peraggstate->resultValueIsNull = *isnull;
+ peraggstate->hasResult = true;
+ }
+}
+
+/*
+ * eval_windowfunction
+ *
+ * Arguments of window functions are not evaluated here, because a window
+ * function can need random access to arbitrary rows in the partition.
+ * The window function uses the special WinGetFuncArgInPartition and
+ * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
+ * it wants.
+ */
+static void
+eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
+ Datum *result, bool *isnull)
+{
+ FunctionCallInfoData fcinfo;
+ MemoryContext oldContext;
+
+ oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+
+ /*
+ * We don't pass any normal arguments to a window function, but we do
+ * pass it the number of arguments, in order to permit window function
+ * implementations to support varying numbers of arguments. The real
+ * info goes through the WindowObject, which is passed via fcinfo->context.
+ */
+ InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
+ perfuncstate->numArguments,
+ (void *) perfuncstate->winobj, NULL);
+ /* Just in case, make all the regular argument slots be null */
+ memset(fcinfo.argnull, true, perfuncstate->numArguments);
+
+ *result = FunctionCallInvoke(&fcinfo);
+ *isnull = fcinfo.isnull;
+
+ /*
+ * Make sure pass-by-ref data is allocated in the appropriate context.
+ * (We need this in case the function returns a pointer into some
+ * short-lived tuple, as is entirely possible.)
+ */
+ if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
+ !MemoryContextContains(CurrentMemoryContext,
+ DatumGetPointer(*result)))
+ *result = datumCopy(*result,
+ perfuncstate->resulttypeByVal,
+ perfuncstate->resulttypeLen);
+
+ MemoryContextSwitchTo(oldContext);
+}
+
+/*
+ * begin_partition
+ * Start buffering rows of the next partition.
+ */
+static void
+begin_partition(WindowAggState *winstate)
+{
+ PlanState *outerPlan = outerPlanState(winstate);
+ int numfuncs = winstate->numfuncs;
+ int i;
+
+ winstate->partition_spooled = false;
+ winstate->spooled_rows = 0;
+ winstate->currentpos = 0;
+ winstate->frametailpos = -1;
+ winstate->aggregatedupto = 0;
+
+ /*
+ * If this is the very first partition, we need to fetch the first
+ * input row to store in it.
+ */
+ if (TupIsNull(winstate->first_part_slot))
+ {
+ TupleTableSlot *outerslot = ExecProcNode(outerPlan);
+
+ if (!TupIsNull(outerslot))
+ ExecCopySlot(winstate->first_part_slot, outerslot);
+ else
+ {
+ /* outer plan is empty, so we have nothing to do */
+ winstate->partition_spooled = true;
+ winstate->more_partitions = false;
+ return;
+ }
+ }
+
+ /* Create new tuplestore for this partition */
+ winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
+
+ /*
+ * Set up read pointers for the tuplestore. The current and agg pointers
+ * don't need BACKWARD capability, but the per-window-function read
+ * pointers do.
+ */
+ winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
+
+ /* reset default REWIND capability bit for current ptr */
+ tuplestore_set_eflags(winstate->buffer, 0);
+
+ /* create a read pointer for aggregates, if needed */
+ if (winstate->numaggs > 0)
+ winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
+
+ /* create mark and read pointers for each real window function */
+ for (i = 0; i < numfuncs; i++)
+ {
+ WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
+
+ if (!perfuncstate->plain_agg)
+ {
+ WindowObject winobj = perfuncstate->winobj;
+
+ winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
+ 0);
+ winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
+ EXEC_FLAG_BACKWARD);
+ winobj->markpos = -1;
+ winobj->seekpos = -1;
+ }
+ }
+
+ /*
+ * Store the first tuple into the tuplestore (it's always available now;
+ * we either read it above, or saved it at the end of previous partition)
+ */
+ tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
+ winstate->spooled_rows++;
+}
+
+/*
+ * Read tuples from the outer node, up to position 'pos', and store them
+ * into the tuplestore. If pos is -1, reads the whole partition.
+ */
+static void
+spool_tuples(WindowAggState *winstate, int64 pos)
+{
+ WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
+ PlanState *outerPlan;
+ TupleTableSlot *outerslot;
+ MemoryContext oldcontext;
+
+ if (!winstate->buffer)
+ return; /* just a safety check */
+ if (winstate->partition_spooled)
+ return; /* whole partition done already */
+
+ /*
+ * If the tuplestore has spilled to disk, alternate reading and writing
+ * becomes quite expensive due to frequent buffer flushes. It's cheaper
+ * to force the entire partition to get spooled in one go.
+ *
+ * XXX this is a horrid kluge --- it'd be better to fix the performance
+ * problem inside tuplestore. FIXME
+ */
+ if (!tuplestore_in_memory(winstate->buffer))
+ pos = -1;
+
+ outerPlan = outerPlanState(winstate);
+
+ /* Must be in query context to call outerplan or touch tuplestore */
+ oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
+
+ while (winstate->spooled_rows <= pos || pos == -1)
+ {
+ outerslot = ExecProcNode(outerPlan);
+ if (TupIsNull(outerslot))
+ {
+ /* reached the end of the last partition */
+ winstate->partition_spooled = true;
+ winstate->more_partitions = false;
+ break;
+ }
+
+ if (node->partNumCols > 0)
+ {
+ /* Check if this tuple still belongs to the current partition */
+ if (!execTuplesMatch(winstate->first_part_slot,
+ outerslot,
+ node->partNumCols, node->partColIdx,
+ winstate->partEqfunctions,
+ winstate->tmpcontext->ecxt_per_tuple_memory))
+ {
+ /*
+ * end of partition; copy the tuple for the next cycle.
+ */
+ ExecCopySlot(winstate->first_part_slot, outerslot);
+ winstate->partition_spooled = true;
+ winstate->more_partitions = true;
+ break;
+ }
+ }
+
+ /* Still in partition, so save it into the tuplestore */
+ tuplestore_puttupleslot(winstate->buffer, outerslot);
+ winstate->spooled_rows++;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * release_partition
+ * clear information kept within a partition, including
+ * tuplestore and aggregate results.
+ */
+static void
+release_partition(WindowAggState *winstate)
+{
+ int i;
+
+ for (i = 0; i < winstate->numfuncs; i++)
+ {
+ WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
+
+ /* Release any partition-local state of this window function */
+ if (perfuncstate->winobj)
+ perfuncstate->winobj->localmem = NULL;
+
+ /* Reset agg result cache */
+ if (perfuncstate->plain_agg)
+ {
+ int aggno = perfuncstate->aggno;
+ WindowStatePerAggData *peraggstate = &winstate->peragg[aggno];
+
+ peraggstate->resultValueIsNull = true;
+ peraggstate->hasResult = false;
+ }
+ }
+
+ /*
+ * Release all partition-local memory (in particular, any partition-local
+ * state or aggregate temp data that we might have trashed our pointers
+ * to in the above loop). We don't rely on retail pfree because some
+ * aggregates might have allocated data we don't have direct pointers to.
+ */
+ MemoryContextResetAndDeleteChildren(winstate->wincontext);
+
+ /* Ensure eval_windowaggregates will see next call as partition start */
+ ExecClearTuple(winstate->first_peer_slot);
+
+ if (winstate->buffer)
+ tuplestore_end(winstate->buffer);
+ winstate->buffer = NULL;
+ winstate->partition_spooled = false;
+}
+
+
+/* -----------------
+ * ExecWindowAgg
+ *
+ * ExecWindowAgg receives tuples from its outer subplan and
+ * stores them into a tuplestore, then processes window functions.
+ * This node doesn't reduce nor qualify any row so the number of
+ * returned rows is exactly the same as its outer subplan's result
+ * (ignoring the case of SRFs in the targetlist, that is).
+ * -----------------
+ */
+TupleTableSlot *
+ExecWindowAgg(WindowAggState *winstate)
+{
+ TupleTableSlot *result;
+ ExprDoneCond isDone;
+ ExprContext *econtext;
+ int i;
+ int numfuncs;
+
+ if (winstate->all_done)
+ return NULL;
+
+ /*
+ * Check to see if we're still projecting out tuples from a previous output
+ * tuple (because there is a function-returning-set in the projection
+ * expressions). If so, try to project another one.
+ */
+ if (winstate->ss.ps.ps_TupFromTlist)
+ {
+ TupleTableSlot *result;
+ ExprDoneCond isDone;
+
+ result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
+ if (isDone == ExprMultipleResult)
+ return result;
+ /* Done with that source tuple... */
+ winstate->ss.ps.ps_TupFromTlist = false;
+ }
+
+restart:
+ if (winstate->buffer == NULL)
+ {
+ /* Initialize for first partition and set current row = 0 */
+ begin_partition(winstate);
+ }
+ else
+ {
+ /* Advance current row within partition */
+ winstate->currentpos++;
+ }
+
+ /*
+ * Spool all tuples up to and including the current row, if we haven't
+ * already
+ */
+ spool_tuples(winstate, winstate->currentpos);
+
+ /* Move to the next partition if we reached the end of this partition */
+ if (winstate->partition_spooled &&
+ winstate->currentpos >= winstate->spooled_rows)
+ {
+ release_partition(winstate);
+
+ if (winstate->more_partitions)
+ {
+ begin_partition(winstate);
+ Assert(winstate->spooled_rows > 0);
+ }
+ else
+ {
+ winstate->all_done = true;
+ return NULL;
+ }
+ }
+
+ /* final output execution is in ps_ExprContext */
+ econtext = winstate->ss.ps.ps_ExprContext;
+
+ /* Clear the per-output-tuple context for current row */
+ ResetExprContext(econtext);
+
+ /*
+ * Read the current row from the tuplestore, and save in ScanTupleSlot
+ * for possible use by WinGetFuncArgCurrent or the final projection step.
+ * (We can't rely on the outerplan's output slot because we may have to
+ * read beyond the current row.)
+ *
+ * Current row must be in the tuplestore, since we spooled it above.
+ */
+ tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
+ if (!tuplestore_gettupleslot(winstate->buffer, true,
+ winstate->ss.ss_ScanTupleSlot))
+ elog(ERROR, "unexpected end of tuplestore");
+
+ /*
+ * Evaluate true window functions
+ */
+ numfuncs = winstate->numfuncs;
+ for (i = 0; i < numfuncs; i++)
+ {
+ WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
+
+ if (perfuncstate->plain_agg)
+ continue;
+ eval_windowfunction(winstate, perfuncstate,
+ &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
+ &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
+ }
+
+ /*
+ * Evaluate aggregates
+ */
+ if (winstate->numaggs > 0)
+ eval_windowaggregates(winstate);
+
+ /*
+ * Truncate any no-longer-needed rows from the tuplestore.
+ */
+ tuplestore_trim(winstate->buffer);
+
+ /*
+ * Form and return a projection tuple using the windowfunc results
+ * and the current row. Setting ecxt_outertuple arranges that any
+ * Vars will be evaluated with respect to that row.
+ */
+ econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
+ result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
+
+ if (isDone == ExprEndResult)
+ {
+ /* SRF in tlist returned no rows, so advance to next input tuple */
+ goto restart;
+ }
+
+ winstate->ss.ps.ps_TupFromTlist =
+ (isDone == ExprMultipleResult);
+ return result;
+}
+
+/* -----------------
+ * ExecInitWindowAgg
+ *
+ * Creates the run-time information for the WindowAgg node produced by the
+ * planner and initializes its outer subtree
+ * -----------------
+ */
+WindowAggState *
+ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
+{
+ WindowAggState *winstate;
+ Plan *outerPlan;
+ ExprContext *econtext;
+ ExprContext *tmpcontext;
+ WindowStatePerFunc perfunc;
+ WindowStatePerAgg peragg;
+ int numfuncs,
+ wfuncno,
+ numaggs,
+ aggno;
+ ListCell *l;
+
+ /* check for unsupported flags */
+ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
+
+ /*
+ * create state structure
+ */
+ winstate = makeNode(WindowAggState);
+ winstate->ss.ps.plan = (Plan *) node;
+ winstate->ss.ps.state = estate;
+
+ /*
+ * Create expression contexts. We need two, one for per-input-tuple
+ * processing and one for per-output-tuple processing. We cheat a little
+ * by using ExecAssignExprContext() to build both.
+ */
+ ExecAssignExprContext(estate, &winstate->ss.ps);
+ tmpcontext = winstate->ss.ps.ps_ExprContext;
+ winstate->tmpcontext = tmpcontext;
+ ExecAssignExprContext(estate, &winstate->ss.ps);
+
+ /* Create long-lived context for storage of aggregate transvalues etc */
+ winstate->wincontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "WindowAggContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+#define WINDOWAGG_NSLOTS 6
+
+ /*
+ * tuple table initialization
+ */
+ ExecInitScanTupleSlot(estate, &winstate->ss);
+ ExecInitResultTupleSlot(estate, &winstate->ss.ps);
+ winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
+ winstate->first_peer_slot = ExecInitExtraTupleSlot(estate);
+ winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
+ winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
+
+ winstate->ss.ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) winstate);
+
+ /*
+ * WindowAgg nodes never have quals, since they can only occur at the
+ * logical top level of a query (ie, after any WHERE or HAVING filters)
+ */
+ Assert(node->plan.qual == NIL);
+ winstate->ss.ps.qual = NIL;
+
+ /*
+ * initialize child nodes
+ */
+ outerPlan = outerPlan(node);
+ outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
+
+ /*
+ * initialize source tuple type (which is also the tuple type that we'll
+ * store in the tuplestore and use in all our working slots).
+ */
+ ExecAssignScanTypeFromOuterPlan(&winstate->ss);
+
+ ExecSetSlotDescriptor(winstate->first_part_slot,
+ winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
+ ExecSetSlotDescriptor(winstate->first_peer_slot,
+ winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
+ ExecSetSlotDescriptor(winstate->temp_slot_1,
+ winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
+ ExecSetSlotDescriptor(winstate->temp_slot_2,
+ winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
+
+ /*
+ * Initialize result tuple type and projection info.
+ */
+ ExecAssignResultTypeFromTL(&winstate->ss.ps);
+ ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
+
+ winstate->ss.ps.ps_TupFromTlist = false;
+
+ /* Set up data for comparing tuples */
+ if (node->partNumCols > 0)
+ winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
+ node->partOperators);
+ if (node->ordNumCols > 0)
+ winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
+ node->ordOperators);
+
+ /*
+ * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
+ */
+ numfuncs = winstate->numfuncs;
+ numaggs = winstate->numaggs;
+ econtext = winstate->ss.ps.ps_ExprContext;
+ econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
+ econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
+
+ /*
+ * allocate per-wfunc/per-agg state information.
+ */
+ perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
+ peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
+ winstate->perfunc = perfunc;
+ winstate->peragg = peragg;
+
+ wfuncno = -1;
+ aggno = -1;
+ foreach(l, winstate->funcs)
+ {
+ WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
+ WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
+ WindowStatePerFunc perfuncstate;
+ AclResult aclresult;
+ int i;
+
+ /* Look for a previous duplicate window function */
+ for (i = 0; i <= wfuncno; i++)
+ {
+ if (equal(wfunc, perfunc[i].wfunc) &&
+ !contain_volatile_functions((Node *) wfunc))
+ break;
+ }
+ if (i <= wfuncno)
+ {
+ /* Found a match to an existing entry, so just mark it */
+ wfuncstate->wfuncno = i;
+ continue;
+ }
+
+ /* Nope, so assign a new PerAgg record */
+ perfuncstate = &perfunc[++wfuncno];
+
+ /* Mark WindowFunc state node with assigned index in the result array */
+ wfuncstate->wfuncno = wfuncno;
+
+ /* Check permission to call window function */
+ aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(wfunc->winfnoid));
+
+ /* Fill in the perfuncstate data */
+ perfuncstate->wfuncstate = wfuncstate;
+ perfuncstate->wfunc = wfunc;
+ perfuncstate->numArguments = list_length(wfuncstate->args);
+
+ fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
+ tmpcontext->ecxt_per_query_memory);
+ perfuncstate->flinfo.fn_expr = (Node *) wfunc;
+ get_typlenbyval(wfunc->wintype,
+ &perfuncstate->resulttypeLen,
+ &perfuncstate->resulttypeByVal);
+
+ /*
+ * If it's really just a plain aggregate function,
+ * we'll emulate the Agg environment for it.
+ */
+ perfuncstate->plain_agg = wfunc->winagg;
+ if (wfunc->winagg)
+ {
+ WindowStatePerAgg peraggstate;
+
+ perfuncstate->aggno = ++aggno;
+ peraggstate = &winstate->peragg[aggno];
+ initialize_peragg(winstate, wfunc, peraggstate);
+ peraggstate->wfuncno = wfuncno;
+ }
+ else
+ {
+ WindowObject winobj = makeNode(WindowObjectData);
+
+ winobj->winstate = winstate;
+ winobj->argstates = wfuncstate->args;
+ winobj->localmem = NULL;
+ perfuncstate->winobj = winobj;
+ }
+ }
+
+ /* Update numfuncs, numaggs to match number of unique functions found */
+ winstate->numfuncs = wfuncno + 1;
+ winstate->numaggs = aggno + 1;
+
+ winstate->partition_spooled = false;
+ winstate->more_partitions = false;
+
+ return winstate;
+}
+
+/* -----------------
+ * ExecCountSlotsWindowAgg
+ * -----------------
+ */
+int
+ExecCountSlotsWindowAgg(WindowAgg *node)
+{
+ return ExecCountSlotsNode(outerPlan(node)) +
+ ExecCountSlotsNode(innerPlan(node)) +
+ WINDOWAGG_NSLOTS;
+}
+
+/* -----------------
+ * ExecEndWindowAgg
+ * -----------------
+ */
+void
+ExecEndWindowAgg(WindowAggState *node)
+{
+ PlanState *outerPlan;
+
+ release_partition(node);
+
+ pfree(node->perfunc);
+ pfree(node->peragg);
+
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
+ ExecClearTuple(node->first_part_slot);
+ ExecClearTuple(node->first_peer_slot);
+ ExecClearTuple(node->temp_slot_1);
+ ExecClearTuple(node->temp_slot_2);
+
+ /*
+ * Free both the expr contexts.
+ */
+ ExecFreeExprContext(&node->ss.ps);
+ node->ss.ps.ps_ExprContext = node->tmpcontext;
+ ExecFreeExprContext(&node->ss.ps);
+
+ MemoryContextDelete(node->wincontext);
+
+ outerPlan = outerPlanState(node);
+ ExecEndNode(outerPlan);
+}
+
+/* -----------------
+ * ExecRescanWindowAgg
+ * -----------------
+ */
+void
+ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
+{
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
+
+ node->all_done = false;
+
+ node->ss.ps.ps_TupFromTlist = false;
+
+ /* release tuplestore et al */
+ release_partition(node);
+
+ /* release all temp tuples, but especially first_part_slot */
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
+ ExecClearTuple(node->first_part_slot);
+ ExecClearTuple(node->first_peer_slot);
+ ExecClearTuple(node->temp_slot_1);
+ ExecClearTuple(node->temp_slot_2);
+
+ /* Forget current wfunc values */
+ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
+ MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
+
+ /*
+ * if chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode.
+ */
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
+}
+
+/*
+ * initialize_peragg
+ *
+ * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
+ */
+static WindowStatePerAggData *
+initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
+ WindowStatePerAgg peraggstate)
+{
+ Oid inputTypes[FUNC_MAX_ARGS];
+ int numArguments;
+ HeapTuple aggTuple;
+ Form_pg_aggregate aggform;
+ Oid aggtranstype;
+ AclResult aclresult;
+ Oid transfn_oid,
+ finalfn_oid;
+ Expr *transfnexpr,
+ *finalfnexpr;
+ Datum textInitVal;
+ int i;
+ ListCell *lc;
+
+ numArguments = list_length(wfunc->args);
+
+ i = 0;
+ foreach(lc, wfunc->args)
+ {
+ inputTypes[i++] = exprType((Node *) lfirst(lc));
+ }
+
+ aggTuple = SearchSysCache(AGGFNOID,
+ ObjectIdGetDatum(wfunc->winfnoid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(aggTuple))
+ elog(ERROR, "cache lookup failed for aggregate %u",
+ wfunc->winfnoid);
+ aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+ /*
+ * ExecInitWindowAgg already checked permission to call aggregate function
+ * ... but we still need to check the component functions
+ */
+
+ peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
+ peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
+
+ /* Check that aggregate owner has permission to call component fns */
+ {
+ HeapTuple procTuple;
+ Oid aggOwner;
+
+ procTuple = SearchSysCache(PROCOID,
+ ObjectIdGetDatum(wfunc->winfnoid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(procTuple))
+ elog(ERROR, "cache lookup failed for function %u",
+ wfunc->winfnoid);
+ aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
+ ReleaseSysCache(procTuple);
+
+ aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(transfn_oid));
+ if (OidIsValid(finalfn_oid))
+ {
+ aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(finalfn_oid));
+ }
+ }
+
+ /* resolve actual type of transition state, if polymorphic */
+ aggtranstype = aggform->aggtranstype;
+ if (IsPolymorphicType(aggtranstype))
+ {
+ /* have to fetch the agg's declared input types... */
+ Oid *declaredArgTypes;
+ int agg_nargs;
+
+ get_func_signature(wfunc->winfnoid,
+ &declaredArgTypes, &agg_nargs);
+ Assert(agg_nargs == numArguments);
+ aggtranstype = enforce_generic_type_consistency(inputTypes,
+ declaredArgTypes,
+ agg_nargs,
+ aggtranstype,
+ false);
+ pfree(declaredArgTypes);
+ }
+
+ /* build expression trees using actual argument & result types */
+ build_aggregate_fnexprs(inputTypes,
+ numArguments,
+ aggtranstype,
+ wfunc->wintype,
+ transfn_oid,
+ finalfn_oid,
+ &transfnexpr,
+ &finalfnexpr);
+
+ fmgr_info(transfn_oid, &peraggstate->transfn);
+ peraggstate->transfn.fn_expr = (Node *) transfnexpr;
+
+ if (OidIsValid(finalfn_oid))
+ {
+ fmgr_info(finalfn_oid, &peraggstate->finalfn);
+ peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
+ }
+
+ get_typlenbyval(wfunc->wintype,
+ &peraggstate->resulttypeLen,
+ &peraggstate->resulttypeByVal);
+ get_typlenbyval(aggtranstype,
+ &peraggstate->transtypeLen,
+ &peraggstate->transtypeByVal);
+
+ /*
+ * initval is potentially null, so don't try to access it as a struct
+ * field. Must do it the hard way with SysCacheGetAttr.
+ */
+ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
+ Anum_pg_aggregate_agginitval,
+ &peraggstate->initValueIsNull);
+
+ if (peraggstate->initValueIsNull)
+ peraggstate->initValue = (Datum) 0;
+ else
+ peraggstate->initValue = GetAggInitVal(textInitVal,
+ aggtranstype);
+
+ /*
+ * If the transfn is strict and the initval is NULL, make sure input
+ * type and transtype are the same (or at least binary-compatible), so
+ * that it's OK to use the first input value as the initial
+ * transValue. This should have been checked at agg definition time,
+ * but just in case...
+ */
+ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
+ {
+ if (numArguments < 1 ||
+ !IsBinaryCoercible(inputTypes[0], aggtranstype))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate %u needs to have compatible input type and transition type",
+ wfunc->winfnoid)));
+ }
+
+ ReleaseSysCache(aggTuple);
+
+ return peraggstate;
+}
+
+static Datum
+GetAggInitVal(Datum textInitVal, Oid transtype)
+{
+ Oid typinput,
+ typioparam;
+ char *strInitVal;
+ Datum initVal;
+
+ getTypeInputInfo(transtype, &typinput, &typioparam);
+ strInitVal = TextDatumGetCString(textInitVal);
+ initVal = OidInputFunctionCall(typinput, strInitVal,
+ typioparam, -1);
+ pfree(strInitVal);
+ return initVal;
+}
+
+/*
+ * are_peers
+ * compare two rows to see if they are equal according to the ORDER BY clause
+ */
+static bool
+are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
+ TupleTableSlot *slot2)
+{
+ WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
+
+ /* If no ORDER BY, all rows are peers with each other */
+ if (node->ordNumCols == 0)
+ return true;
+
+ return execTuplesMatch(slot1, slot2,
+ node->ordNumCols, node->ordColIdx,
+ winstate->ordEqfunctions,
+ winstate->tmpcontext->ecxt_per_tuple_memory);
+}
+
+/*
+ * window_gettupleslot
+ * Fetch the pos'th tuple of the current partition into the slot
+ *
+ * Returns true if successful, false if no such row
+ */
+static bool
+window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
+{
+ WindowAggState *winstate = winobj->winstate;
+ MemoryContext oldcontext;
+
+ /* Don't allow passing -1 to spool_tuples here */
+ if (pos < 0)
+ return false;
+
+ /* If necessary, fetch the tuple into the spool */
+ spool_tuples(winstate, pos);
+
+ if (pos >= winstate->spooled_rows)
+ return false;
+
+ if (pos < winobj->markpos)
+ elog(ERROR, "cannot fetch row before WindowObject's mark position");
+
+ oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
+
+ tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
+
+ /*
+ * There's no API to refetch the tuple at the current position. We
+ * have to move one tuple forward, and then one backward. (We don't
+ * do it the other way because we might try to fetch the row before
+ * our mark, which isn't allowed.)
+ */
+ if (winobj->seekpos == pos)
+ {
+ tuplestore_advance(winstate->buffer, true);
+ winobj->seekpos++;
+ }
+
+ while (winobj->seekpos > pos)
+ {
+ if (!tuplestore_gettupleslot(winstate->buffer, false, slot))
+ elog(ERROR, "unexpected end of tuplestore");
+ winobj->seekpos--;
+ }
+
+ while (winobj->seekpos < pos)
+ {
+ if (!tuplestore_gettupleslot(winstate->buffer, true, slot))
+ elog(ERROR, "unexpected end of tuplestore");
+ winobj->seekpos++;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return true;
+}
+
+
+/***********************************************************************
+ * API exposed to window functions
+ ***********************************************************************/
+
+
+/*
+ * WinGetPartitionLocalMemory
+ * Get working memory that lives till end of partition processing
+ *
+ * On first call within a given partition, this allocates and zeroes the
+ * requested amount of space. Subsequent calls just return the same chunk.
+ *
+ * Memory obtained this way is normally used to hold state that should be
+ * automatically reset for each new partition. If a window function wants
+ * to hold state across the whole query, fcinfo->fn_extra can be used in the
+ * usual way for that.
+ */
+void *
+WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
+{
+ Assert(WindowObjectIsValid(winobj));
+ if (winobj->localmem == NULL)
+ winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
+ sz);
+ return winobj->localmem;
+}
+
+/*
+ * WinGetCurrentPosition
+ * Return the current row's position (counting from 0) within the current
+ * partition.
+ */
+int64
+WinGetCurrentPosition(WindowObject winobj)
+{
+ Assert(WindowObjectIsValid(winobj));
+ return winobj->winstate->currentpos;
+}
+
+/*
+ * WinGetPartitionRowCount
+ * Return total number of rows contained in the current partition.
+ *
+ * Note: this is a relatively expensive operation because it forces the
+ * whole partition to be "spooled" into the tuplestore at once. Once
+ * executed, however, additional calls within the same partition are cheap.
+ */
+int64
+WinGetPartitionRowCount(WindowObject winobj)
+{
+ Assert(WindowObjectIsValid(winobj));
+ spool_tuples(winobj->winstate, -1);
+ return winobj->winstate->spooled_rows;
+}
+
+/*
+ * WinSetMarkPosition
+ * Set the "mark" position for the window object, which is the oldest row
+ * number (counting from 0) it is allowed to fetch during all subsequent
+ * operations within the current partition.
+ *
+ * Window functions do not have to call this, but are encouraged to move the
+ * mark forward when possible to keep the tuplestore size down and prevent
+ * having to spill rows to disk.
+ */
+void
+WinSetMarkPosition(WindowObject winobj, int64 markpos)
+{
+ WindowAggState *winstate;
+
+ Assert(WindowObjectIsValid(winobj));
+ winstate = winobj->winstate;
+
+ if (markpos < winobj->markpos)
+ elog(ERROR, "cannot move WindowObject's mark position backward");
+ tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
+ while (markpos > winobj->markpos)
+ {
+ tuplestore_advance(winstate->buffer, true);
+ winobj->markpos++;
+ }
+ tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
+ while (markpos > winobj->seekpos)
+ {
+ tuplestore_advance(winstate->buffer, true);
+ winobj->seekpos++;
+ }
+}
+
+/*
+ * WinRowsArePeers
+ * Compare two rows (specified by absolute position in window) to see
+ * if they are equal according to the ORDER BY clause.
+ */
+bool
+WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
+{
+ WindowAggState *winstate;
+ WindowAgg *node;
+ TupleTableSlot *slot1;
+ TupleTableSlot *slot2;
+ bool res;
+
+ Assert(WindowObjectIsValid(winobj));
+
+ winstate = winobj->winstate;
+ node = (WindowAgg *) winstate->ss.ps.plan;
+
+ /* If no ORDER BY, all rows are peers; don't bother to fetch them */
+ if (node->ordNumCols == 0)
+ return true;
+
+ slot1 = winstate->temp_slot_1;
+ slot2 = winstate->temp_slot_2;
+
+ if (!window_gettupleslot(winobj, pos1, slot1))
+ elog(ERROR, "specified position is out of window: " INT64_FORMAT,
+ pos1);
+ if (!window_gettupleslot(winobj, pos2, slot2))
+ elog(ERROR, "specified position is out of window: " INT64_FORMAT,
+ pos2);
+
+ res = are_peers(winstate, slot1, slot2);
+
+ ExecClearTuple(slot1);
+ ExecClearTuple(slot2);
+
+ return res;
+}
+
+/*
+ * WinGetFuncArgInPartition
+ * Evaluate a window function's argument expression on a specified
+ * row of the partition. The row is identified in lseek(2) style,
+ * i.e. relative to the current, first, or last row.
+ *
+ * argno: argument number to evaluate (counted from 0)
+ * relpos: signed rowcount offset from the seek position
+ * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
+ * set_mark: If the row is found and set_mark is true, the mark is moved to
+ * the row as a side-effect.
+ * isnull: output argument, receives isnull status of result
+ * isout: output argument, set to indicate whether target row position
+ * is out of partition (can pass NULL if caller doesn't care about this)
+ *
+ * Specifying a nonexistent row is not an error, it just causes a null result
+ * (plus setting *isout true, if isout isn't NULL).
+ */
+Datum
+WinGetFuncArgInPartition(WindowObject winobj, int argno,
+ int relpos, int seektype, bool set_mark,
+ bool *isnull, bool *isout)
+{
+ ExprContext *econtext;
+ TupleTableSlot *slot;
+ bool gottuple;
+ int64 abs_pos;
+
+ Assert(WindowObjectIsValid(winobj));
+
+ econtext = winobj->winstate->ss.ps.ps_ExprContext;
+ slot = winobj->winstate->temp_slot_1;
+
+ switch (seektype)
+ {
+ case WINDOW_SEEK_CURRENT:
+ abs_pos = winobj->winstate->currentpos + relpos;
+ break;
+ case WINDOW_SEEK_HEAD:
+ abs_pos = relpos;
+ break;
+ case WINDOW_SEEK_TAIL:
+ spool_tuples(winobj->winstate, -1);
+ abs_pos = winobj->winstate->spooled_rows - 1 + relpos;
+ break;
+ default:
+ elog(ERROR, "unrecognized window seek type: %d", seektype);
+ abs_pos = 0; /* keep compiler quiet */
+ break;
+ }
+
+ if (abs_pos >= 0)
+ gottuple = window_gettupleslot(winobj, abs_pos, slot);
+ else
+ gottuple = false;
+
+ if (!gottuple)
+ {
+ if (isout)
+ *isout = true;
+ *isnull = true;
+ return (Datum) 0;
+ }
+ else
+ {
+ if (isout)
+ *isout = false;
+ if (set_mark)
+ WinSetMarkPosition(winobj, abs_pos);
+ econtext->ecxt_outertuple = slot;
+ return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
+ econtext, isnull, NULL);
+ }
+}
+
+/*
+ * WinGetFuncArgInFrame
+ * Evaluate a window function's argument expression on a specified
+ * row of the window frame. The row is identified in lseek(2) style,
+ * i.e. relative to the current, first, or last row.
+ *
+ * argno: argument number to evaluate (counted from 0)
+ * relpos: signed rowcount offset from the seek position
+ * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
+ * set_mark: If the row is found and set_mark is true, the mark is moved to
+ * the row as a side-effect.
+ * isnull: output argument, receives isnull status of result
+ * isout: output argument, set to indicate whether target row position
+ * is out of frame (can pass NULL if caller doesn't care about this)
+ *
+ * Specifying a nonexistent row is not an error, it just causes a null result
+ * (plus setting *isout true, if isout isn't NULL).
+ */
+Datum
+WinGetFuncArgInFrame(WindowObject winobj, int argno,
+ int relpos, int seektype, bool set_mark,
+ bool *isnull, bool *isout)
+{
+ ExprContext *econtext;
+ TupleTableSlot *slot;
+ bool gottuple;
+ int64 abs_pos;
+ int64 frametailpos;
+
+ Assert(WindowObjectIsValid(winobj));
+
+ /* if no ordering columns, partition and frame are the same thing */
+ if (((WindowAgg *) winobj->winstate->ss.ps.plan)->ordNumCols == 0)
+ return WinGetFuncArgInPartition(winobj, argno, relpos, seektype,
+ set_mark, isnull, isout);
+
+ econtext = winobj->winstate->ss.ps.ps_ExprContext;
+ slot = winobj->winstate->temp_slot_1;
+ frametailpos = winobj->winstate->frametailpos;
+
+ switch (seektype)
+ {
+ case WINDOW_SEEK_CURRENT:
+ abs_pos = winobj->winstate->currentpos + relpos;
+ break;
+ case WINDOW_SEEK_HEAD:
+ abs_pos = relpos;
+ break;
+ case WINDOW_SEEK_TAIL:
+ /* abs_pos is calculated later */
+ abs_pos = 0; /* keep compiler quiet */
+ break;
+ default:
+ elog(ERROR, "unrecognized window seek type: %d", seektype);
+ abs_pos = 0; /* keep compiler quiet */
+ break;
+ }
+
+ /*
+ * Seek for frame tail. If the tail position is before current,
+ * always check if the tail is after the current or not.
+ */
+ if (frametailpos <= winobj->winstate->currentpos)
+ {
+ int64 add = 1;
+
+ for (;;)
+ {
+ spool_tuples(winobj->winstate, winobj->winstate->currentpos + add);
+ if (winobj->winstate->spooled_rows > winobj->winstate->currentpos + add)
+ {
+ /*
+ * When seektype is not TAIL, we may optimize not to
+ * spool unnecessary tuples. In TAIL mode, we need to search
+ * until we find a row that's definitely not a peer.
+ */
+ if (!WinRowsArePeers(winobj, winobj->winstate->currentpos,
+ winobj->winstate->currentpos + add) ||
+ (seektype != WINDOW_SEEK_TAIL &&
+ winobj->winstate->currentpos + add < abs_pos))
+ break;
+ add++;
+ }
+ else
+ {
+ /*
+ * If hit the partition end, the last row is the frame tail.
+ */
+ break;
+ }
+ }
+ frametailpos = winobj->winstate->currentpos + add - 1;
+ winobj->winstate->frametailpos = frametailpos;
+ }
+
+ if (seektype == WINDOW_SEEK_TAIL)
+ {
+ abs_pos = frametailpos + relpos;
+ }
+
+ /*
+ * If there is an ORDER BY (we don't support other window frame
+ * specifications yet), the frame runs from first row of the partition
+ * to the last peer of the current row. Otherwise the frame is the
+ * whole partition.
+ */
+ if (abs_pos < 0 || abs_pos > frametailpos)
+ gottuple = false;
+ else
+ gottuple = window_gettupleslot(winobj, abs_pos, slot);
+
+ if (!gottuple)
+ {
+ if (isout)
+ *isout = true;
+ *isnull = true;
+ return (Datum) 0;
+ }
+ else
+ {
+ if (isout)
+ *isout = false;
+ if (set_mark)
+ WinSetMarkPosition(winobj, abs_pos);
+ econtext->ecxt_outertuple = slot;
+ return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
+ econtext, isnull, NULL);
+ }
+}
+
+/*
+ * WinGetFuncArgCurrent
+ * Evaluate a window function's argument expression on the current row.
+ *
+ * argno: argument number to evaluate (counted from 0)
+ * isnull: output argument, receives isnull status of result
+ *
+ * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
+ * WinGetFuncArgInFrame targeting the current row, because it will succeed
+ * even if the WindowObject's mark has been set beyond the current row.
+ * This should generally be used for "ordinary" arguments of a window
+ * function, such as the offset argument of lead() or lag().
+ */
+Datum
+WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
+{
+ WindowAggState *winstate;
+ ExprContext *econtext;
+
+ Assert(WindowObjectIsValid(winobj));
+ winstate = winobj->winstate;
+
+ econtext = winstate->ss.ps.ps_ExprContext;
+
+ econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
+ return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
+ econtext, isnull, NULL);
+}