/*-------------------------------------------------------------------------
 *
 * nodeMemoize.c
 *	  Routines to handle caching of results from parameterized nodes
 *
 * Portions Copyright (c) 2021-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/executor/nodeMemoize.c
 *
 * Memoize nodes are intended to sit above parameterized nodes in the plan
 * tree in order to cache results from them.  The intention here is that a
 * repeat scan with a parameter value that has already been seen by the node
 * can fetch tuples from the cache rather than having to re-scan the inner
 * node all over again.  The query planner may choose to make use of one of
 * these when it thinks rescans for previously seen values are likely enough
 * to warrant adding the additional node.
 *
 * The method of cache we use is a hash table.  When the cache fills, we never
 * spill tuples to disk, instead, we choose to evict the least recently used
 * cache entry from the cache.  We remember the least recently used entry by
 * always pushing new entries and entries we look for onto the tail of a
 * doubly linked list.  This means that older items always bubble to the top
 * of this LRU list.
 *
 * Sometimes our callers won't run their scans to completion. For example a
 * semi-join only needs to run until it finds a matching tuple, and once it
 * does, the join operator skips to the next outer tuple and does not execute
 * the inner side again on that scan.  Because of this, we must keep track of
 * when a cache entry is complete, and by default, we know it is when we run
 * out of tuples to read during the scan.  However, there are cases where we
 * can mark the cache entry as complete without exhausting the scan of all
 * tuples.  One case is unique joins, where the join operator knows that there
 * will only be at most one match for any given outer tuple.  In order to
 * support such cases we allow the "singlerow" option to be set for the cache.
 * This option marks the cache entry as complete after we read the first tuple
 * from the subnode.
 *
 * It's possible when we're filling the cache for a given set of parameters
 * that we're unable to free enough memory to store any more tuples.  If this
 * happens then we'll have already evicted all other cache entries.  When
 * caching another tuple would cause us to exceed our memory budget, we must
 * free the entry that we're currently populating and move the state machine
 * into MEMO_CACHE_BYPASS_MODE.  This means that we'll not attempt to cache
 * any further tuples for this particular scan.  We don't have the memory for
 * it.  The state machine will be reset again on the next rescan.  If the
 * memory requirements to cache the next parameter's tuples are less
 * demanding, then that may allow us to start putting useful entries back into
 * the cache again.
 *
 *
 * INTERFACE ROUTINES
 *		ExecMemoize			- lookup cache, exec subplan when not found
 *		ExecInitMemoize		- initialize node and subnodes
 *		ExecEndMemoize		- shutdown node and subnodes
 *		ExecReScanMemoize	- rescan the memoize node
 *
 *		ExecMemoizeEstimate		estimates DSM space needed for parallel plan
 *		ExecMemoizeInitializeDSM initialize DSM for parallel plan
 *		ExecMemoizeInitializeWorker attach to DSM info in parallel worker
 *		ExecMemoizeRetrieveInstrumentation get instrumentation from worker
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "common/hashfn.h"
#include "executor/executor.h"
#include "executor/nodeMemoize.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"

/* States of the ExecMemoize state machine */
#define MEMO_CACHE_LOOKUP			1	/* Attempt to perform a cache lookup */
#define MEMO_CACHE_FETCH_NEXT_TUPLE	2	/* Get another tuple from the cache */
#define MEMO_FILLING_CACHE			3	/* Read outer node to fill cache */
#define MEMO_CACHE_BYPASS_MODE		4	/* Bypass mode.  Just read from our
										 * subplan without caching anything */
#define MEMO_END_OF_SCAN			5	/* Ready for rescan */


/* Helper macros for memory accounting */
#define EMPTY_ENTRY_MEMORY_BYTES(e)		(sizeof(MemoizeEntry) + \
										 sizeof(MemoizeKey) + \
										 (e)->key->params->t_len);
#define CACHE_TUPLE_BYTES(t)			(sizeof(MemoizeTuple) + \
										 (t)->mintuple->t_len)

 /* MemoizeTuple Stores an individually cached tuple */
typedef struct MemoizeTuple
{
	MinimalTuple mintuple;		/* Cached tuple */
	struct MemoizeTuple *next;	/* The next tuple with the same parameter
								 * values or NULL if it's the last one */
} MemoizeTuple;

/*
 * MemoizeKey
 * The hash table key for cached entries plus the LRU list link
 */
typedef struct MemoizeKey
{
	MinimalTuple params;
	dlist_node	lru_node;		/* Pointer to next/prev key in LRU list */
} MemoizeKey;

/*
 * MemoizeEntry
 *		The data struct that the cache hash table stores
 */
typedef struct MemoizeEntry
{
	MemoizeKey *key;			/* Hash key for hash table lookups */
	MemoizeTuple *tuplehead;	/* Pointer to the first tuple or NULL if no
								 * tuples are cached for this entry */
	uint32		hash;			/* Hash value (cached) */
	char		status;			/* Hash status */
	bool		complete;		/* Did we read the outer plan to completion? */
} MemoizeEntry;


#define SH_PREFIX memoize
#define SH_ELEMENT_TYPE MemoizeEntry
#define SH_KEY_TYPE MemoizeKey *
#define SH_SCOPE static inline
#define SH_DECLARE
#include "lib/simplehash.h"

static uint32 MemoizeHash_hash(struct memoize_hash *tb,
							   const MemoizeKey *key);
static bool MemoizeHash_equal(struct memoize_hash *tb,
							  const MemoizeKey *key1,
							  const MemoizeKey *key2);

#define SH_PREFIX memoize
#define SH_ELEMENT_TYPE MemoizeEntry
#define SH_KEY_TYPE MemoizeKey *
#define SH_KEY key
#define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key)
#define SH_EQUAL(tb, a, b) MemoizeHash_equal(tb, a, b)
#define SH_SCOPE static inline
#define SH_STORE_HASH
#define SH_GET_HASH(tb, a) a->hash
#define SH_DEFINE
#include "lib/simplehash.h"

/*
 * MemoizeHash_hash
 *		Hash function for simplehash hashtable.  'key' is unused here as we
 *		require that all table lookups first populate the MemoizeState's
 *		probeslot with the key values to be looked up.
 */
static uint32
MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key)
{
	MemoizeState *mstate = (MemoizeState *) tb->private_data;
	ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
	MemoryContext oldcontext;
	TupleTableSlot *pslot = mstate->probeslot;
	uint32		hashkey = 0;
	int			numkeys = mstate->nkeys;

	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

	if (mstate->binary_mode)
	{
		for (int i = 0; i < numkeys; i++)
		{
			/* combine successive hashkeys by rotating */
			hashkey = pg_rotate_left32(hashkey, 1);

			if (!pslot->tts_isnull[i])	/* treat nulls as having hash key 0 */
			{
				CompactAttribute *attr;
				uint32		hkey;

				attr = TupleDescCompactAttr(pslot->tts_tupleDescriptor, i);

				hkey = datum_image_hash(pslot->tts_values[i], attr->attbyval, attr->attlen);

				hashkey ^= hkey;
			}
		}
	}
	else
	{
		FmgrInfo   *hashfunctions = mstate->hashfunctions;
		Oid		   *collations = mstate->collations;

		for (int i = 0; i < numkeys; i++)
		{
			/* combine successive hashkeys by rotating */
			hashkey = pg_rotate_left32(hashkey, 1);

			if (!pslot->tts_isnull[i])	/* treat nulls as having hash key 0 */
			{
				uint32		hkey;

				hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i],
														collations[i], pslot->tts_values[i]));
				hashkey ^= hkey;
			}
		}
	}

	MemoryContextSwitchTo(oldcontext);
	return murmurhash32(hashkey);
}

/*
 * MemoizeHash_equal
 *		Equality function for confirming hash value matches during a hash
 *		table lookup.  'key2' is never used.  Instead the MemoizeState's
 *		probeslot is always populated with details of what's being looked up.
 */
static bool
MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1,
				  const MemoizeKey *key2)
{
	MemoizeState *mstate = (MemoizeState *) tb->private_data;
	ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
	TupleTableSlot *tslot = mstate->tableslot;
	TupleTableSlot *pslot = mstate->probeslot;

	/* probeslot should have already been prepared by prepare_probe_slot() */
	ExecStoreMinimalTuple(key1->params, tslot, false);

	if (mstate->binary_mode)
	{
		MemoryContext oldcontext;
		int			numkeys = mstate->nkeys;
		bool		match = true;

		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

		slot_getallattrs(tslot);
		slot_getallattrs(pslot);

		for (int i = 0; i < numkeys; i++)
		{
			CompactAttribute *attr;

			if (tslot->tts_isnull[i] != pslot->tts_isnull[i])
			{
				match = false;
				break;
			}

			/* both NULL? they're equal */
			if (tslot->tts_isnull[i])
				continue;

			/* perform binary comparison on the two datums */
			attr = TupleDescCompactAttr(tslot->tts_tupleDescriptor, i);
			if (!datum_image_eq(tslot->tts_values[i], pslot->tts_values[i],
								attr->attbyval, attr->attlen))
			{
				match = false;
				break;
			}
		}

		MemoryContextSwitchTo(oldcontext);
		return match;
	}
	else
	{
		econtext->ecxt_innertuple = tslot;
		econtext->ecxt_outertuple = pslot;
		return ExecQual(mstate->cache_eq_expr, econtext);
	}
}

/*
 * Initialize the hash table to empty.  The MemoizeState's hashtable field
 * must point to NULL.
 */
static void
build_hash_table(MemoizeState *mstate, uint32 size)
{
	Assert(mstate->hashtable == NULL);

	/* Make a guess at a good size when we're not given a valid size. */
	if (size == 0)
		size = 1024;

	/* memoize_create will convert the size to a power of 2 */
	mstate->hashtable = memoize_create(mstate->tableContext, size, mstate);
}

/*
 * prepare_probe_slot
 *		Populate mstate's probeslot with the values from the tuple stored
 *		in 'key'.  If 'key' is NULL, then perform the population by evaluating
 *		mstate's param_exprs.
 */
static inline void
prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key)
{
	TupleTableSlot *pslot = mstate->probeslot;
	TupleTableSlot *tslot = mstate->tableslot;
	int			numKeys = mstate->nkeys;

	ExecClearTuple(pslot);

	if (key == NULL)
	{
		ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
		MemoryContext oldcontext;

		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

		/* Set the probeslot's values based on the current parameter values */
		for (int i = 0; i < numKeys; i++)
			pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i],
												econtext,
												&pslot->tts_isnull[i]);

		MemoryContextSwitchTo(oldcontext);
	}
	else
	{
		/* Process the key's MinimalTuple and store the values in probeslot */
		ExecStoreMinimalTuple(key->params, tslot, false);
		slot_getallattrs(tslot);
		memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys);
		memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys);
	}

	ExecStoreVirtualTuple(pslot);
}

/*
 * entry_purge_tuples
 *		Remove all tuples from the cache entry pointed to by 'entry'.  This
 *		leaves an empty cache entry.  Also, update the memory accounting to
 *		reflect the removal of the tuples.
 */
static inline void
entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry)
{
	MemoizeTuple *tuple = entry->tuplehead;
	uint64		freed_mem = 0;

	while (tuple != NULL)
	{
		MemoizeTuple *next = tuple->next;

		freed_mem += CACHE_TUPLE_BYTES(tuple);

		/* Free memory used for this tuple */
		pfree(tuple->mintuple);
		pfree(tuple);

		tuple = next;
	}

	entry->complete = false;
	entry->tuplehead = NULL;

	/* Update the memory accounting */
	mstate->mem_used -= freed_mem;
}

/*
 * remove_cache_entry
 *		Remove 'entry' from the cache and free memory used by it.
 */
static void
remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry)
{
	MemoizeKey *key = entry->key;

	dlist_delete(&entry->key->lru_node);

	/* Remove all of the tuples from this entry */
	entry_purge_tuples(mstate, entry);

	/*
	 * Update memory accounting. entry_purge_tuples should have already
	 * subtracted the memory used for each cached tuple.  Here we just update
	 * the amount used by the entry itself.
	 */
	mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry);

	/* Remove the entry from the cache */
	memoize_delete_item(mstate->hashtable, entry);

	pfree(key->params);
	pfree(key);
}

/*
 * cache_purge_all
 *		Remove all items from the cache
 */
static void
cache_purge_all(MemoizeState *mstate)
{
	uint64		evictions = 0;

	if (mstate->hashtable != NULL)
		evictions = mstate->hashtable->members;

	/*
	 * Likely the most efficient way to remove all items is to just reset the
	 * memory context for the cache and then rebuild a fresh hash table.  This
	 * saves having to remove each item one by one and pfree each cached tuple
	 */
	MemoryContextReset(mstate->tableContext);

	/* NULLify so we recreate the table on the next call */
	mstate->hashtable = NULL;

	/* reset the LRU list */
	dlist_init(&mstate->lru_list);
	mstate->last_tuple = NULL;
	mstate->entry = NULL;

	mstate->mem_used = 0;

	/* XXX should we add something new to track these purges? */
	mstate->stats.cache_evictions += evictions; /* Update Stats */
}

/*
 * cache_reduce_memory
 *		Evict older and less recently used items from the cache in order to
 *		reduce the memory consumption back to something below the
 *		MemoizeState's mem_limit.
 *
 * 'specialkey', if not NULL, causes the function to return false if the entry
 * which the key belongs to is removed from the cache.
 */
static bool
cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey)
{
	bool		specialkey_intact = true;	/* for now */
	dlist_mutable_iter iter;
	uint64		evictions = 0;

	/* Update peak memory usage */
	if (mstate->mem_used > mstate->stats.mem_peak)
		mstate->stats.mem_peak = mstate->mem_used;

	/* We expect only to be called when we've gone over budget on memory */
	Assert(mstate->mem_used > mstate->mem_limit);

	/* Start the eviction process starting at the head of the LRU list. */
	dlist_foreach_modify(iter, &mstate->lru_list)
	{
		MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur);
		MemoizeEntry *entry;

		/*
		 * Populate the hash probe slot in preparation for looking up this LRU
		 * entry.
		 */
		prepare_probe_slot(mstate, key);

		/*
		 * Ideally the LRU list pointers would be stored in the entry itself
		 * rather than in the key.  Unfortunately, we can't do that as the
		 * simplehash.h code may resize the table and allocate new memory for
		 * entries which would result in those pointers pointing to the old
		 * buckets.  However, it's fine to use the key to store this as that's
		 * only referenced by a pointer in the entry, which of course follows
		 * the entry whenever the hash table is resized.  Since we only have a
		 * pointer to the key here, we must perform a hash table lookup to
		 * find the entry that the key belongs to.
		 */
		entry = memoize_lookup(mstate->hashtable, NULL);

		/*
		 * Sanity check that we found the entry belonging to the LRU list
		 * item.  A misbehaving hash or equality function could cause the
		 * entry not to be found or the wrong entry to be found.
		 */
		if (unlikely(entry == NULL || entry->key != key))
			elog(ERROR, "could not find memoization table entry");

		/*
		 * If we're being called to free memory while the cache is being
		 * populated with new tuples, then we'd better take some care as we
		 * could end up freeing the entry which 'specialkey' belongs to.
		 * Generally callers will pass 'specialkey' as the key for the cache
		 * entry which is currently being populated, so we must set
		 * 'specialkey_intact' to false to inform the caller the specialkey
		 * entry has been removed.
		 */
		if (key == specialkey)
			specialkey_intact = false;

		/*
		 * Finally remove the entry.  This will remove from the LRU list too.
		 */
		remove_cache_entry(mstate, entry);

		evictions++;

		/* Exit if we've freed enough memory */
		if (mstate->mem_used <= mstate->mem_limit)
			break;
	}

	mstate->stats.cache_evictions += evictions; /* Update Stats */

	return specialkey_intact;
}

/*
 * cache_lookup
 *		Perform a lookup to see if we've already cached tuples based on the
 *		scan's current parameters.  If we find an existing entry we move it to
 *		the end of the LRU list, set *found to true then return it.  If we
 *		don't find an entry then we create a new one and add it to the end of
 *		the LRU list.  We also update cache memory accounting and remove older
 *		entries if we go over the memory budget.  If we managed to free enough
 *		memory we return the new entry, else we return NULL.
 *
 * Callers can assume we'll never return NULL when *found is true.
 */
static MemoizeEntry *
cache_lookup(MemoizeState *mstate, bool *found)
{
	MemoizeKey *key;
	MemoizeEntry *entry;
	MemoryContext oldcontext;

	/* prepare the probe slot with the current scan parameters */
	prepare_probe_slot(mstate, NULL);

	/*
	 * Add the new entry to the cache.  No need to pass a valid key since the
	 * hash function uses mstate's probeslot, which we populated above.
	 */
	entry = memoize_insert(mstate->hashtable, NULL, found);

	if (*found)
	{
		/*
		 * Move existing entry to the tail of the LRU list to mark it as the
		 * most recently used item.
		 */
		dlist_move_tail(&mstate->lru_list, &entry->key->lru_node);

		return entry;
	}

	oldcontext = MemoryContextSwitchTo(mstate->tableContext);

	/* Allocate a new key */
	entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey));
	key->params = ExecCopySlotMinimalTuple(mstate->probeslot);

	/* Update the total cache memory utilization */
	mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry);

	/* Initialize this entry */
	entry->complete = false;
	entry->tuplehead = NULL;

	/*
	 * Since this is the most recently used entry, push this entry onto the
	 * end of the LRU list.
	 */
	dlist_push_tail(&mstate->lru_list, &entry->key->lru_node);

	mstate->last_tuple = NULL;

	MemoryContextSwitchTo(oldcontext);

	/*
	 * If we've gone over our memory budget, then we'll free up some space in
	 * the cache.
	 */
	if (mstate->mem_used > mstate->mem_limit)
	{
		/*
		 * Try to free up some memory.  It's highly unlikely that we'll fail
		 * to do so here since the entry we've just added is yet to contain
		 * any tuples and we're able to remove any other entry to reduce the
		 * memory consumption.
		 */
		if (unlikely(!cache_reduce_memory(mstate, key)))
			return NULL;

		/*
		 * The process of removing entries from the cache may have caused the
		 * code in simplehash.h to shuffle elements to earlier buckets in the
		 * hash table.  If it has, we'll need to find the entry again by
		 * performing a lookup.  Fortunately, we can detect if this has
		 * happened by seeing if the entry is still in use and that the key
		 * pointer matches our expected key.
		 */
		if (entry->status != memoize_SH_IN_USE || entry->key != key)
		{
			/*
			 * We need to repopulate the probeslot as lookups performed during
			 * the cache evictions above will have stored some other key.
			 */
			prepare_probe_slot(mstate, key);

			/* Re-find the newly added entry */
			entry = memoize_lookup(mstate->hashtable, NULL);
			Assert(entry != NULL);
		}
	}

	return entry;
}

/*
 * cache_store_tuple
 *		Add the tuple stored in 'slot' to the mstate's current cache entry.
 *		The cache entry must have already been made with cache_lookup().
 *		mstate's last_tuple field must point to the tail of mstate->entry's
 *		list of tuples.
 */
static bool
cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot)
{
	MemoizeTuple *tuple;
	MemoizeEntry *entry = mstate->entry;
	MemoryContext oldcontext;

	Assert(slot != NULL);
	Assert(entry != NULL);

	oldcontext = MemoryContextSwitchTo(mstate->tableContext);

	tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple));
	tuple->mintuple = ExecCopySlotMinimalTuple(slot);
	tuple->next = NULL;

	/* Account for the memory we just consumed */
	mstate->mem_used += CACHE_TUPLE_BYTES(tuple);

	if (entry->tuplehead == NULL)
	{
		/*
		 * This is the first tuple for this entry, so just point the list head
		 * to it.
		 */
		entry->tuplehead = tuple;
	}
	else
	{
		/* push this tuple onto the tail of the list */
		mstate->last_tuple->next = tuple;
	}

	mstate->last_tuple = tuple;
	MemoryContextSwitchTo(oldcontext);

	/*
	 * If we've gone over our memory budget then free up some space in the
	 * cache.
	 */
	if (mstate->mem_used > mstate->mem_limit)
	{
		MemoizeKey *key = entry->key;

		if (!cache_reduce_memory(mstate, key))
			return false;

		/*
		 * The process of removing entries from the cache may have caused the
		 * code in simplehash.h to shuffle elements to earlier buckets in the
		 * hash table.  If it has, we'll need to find the entry again by
		 * performing a lookup.  Fortunately, we can detect if this has
		 * happened by seeing if the entry is still in use and that the key
		 * pointer matches our expected key.
		 */
		if (entry->status != memoize_SH_IN_USE || entry->key != key)
		{
			/*
			 * We need to repopulate the probeslot as lookups performed during
			 * the cache evictions above will have stored some other key.
			 */
			prepare_probe_slot(mstate, key);

			/* Re-find the entry */
			mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL);
			Assert(entry != NULL);
		}
	}

	return true;
}

static TupleTableSlot *
ExecMemoize(PlanState *pstate)
{
	MemoizeState *node = castNode(MemoizeState, pstate);
	ExprContext *econtext = node->ss.ps.ps_ExprContext;
	PlanState  *outerNode;
	TupleTableSlot *slot;

	CHECK_FOR_INTERRUPTS();

	/*
	 * Reset per-tuple memory context to free any expression evaluation
	 * storage allocated in the previous tuple cycle.
	 */
	ResetExprContext(econtext);

	switch (node->mstatus)
	{
		case MEMO_CACHE_LOOKUP:
			{
				MemoizeEntry *entry;
				TupleTableSlot *outerslot;
				bool		found;

				Assert(node->entry == NULL);

				/* first call? we'll need a hash table. */
				if (unlikely(node->hashtable == NULL))
					build_hash_table(node, ((Memoize *) pstate->plan)->est_entries);

				/*
				 * We're only ever in this state for the first call of the
				 * scan.  Here we have a look to see if we've already seen the
				 * current parameters before and if we have already cached a
				 * complete set of records that the outer plan will return for
				 * these parameters.
				 *
				 * When we find a valid cache entry, we'll return the first
				 * tuple from it. If not found, we'll create a cache entry and
				 * then try to fetch a tuple from the outer scan.  If we find
				 * one there, we'll try to cache it.
				 */

				/* see if we've got anything cached for the current parameters */
				entry = cache_lookup(node, &found);

				if (found && entry->complete)
				{
					node->stats.cache_hits += 1;	/* stats update */

					/*
					 * Set last_tuple and entry so that the state
					 * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next
					 * tuple for these parameters.
					 */
					node->last_tuple = entry->tuplehead;
					node->entry = entry;

					/* Fetch the first cached tuple, if there is one */
					if (entry->tuplehead)
					{
						node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE;

						slot = node->ss.ps.ps_ResultTupleSlot;
						ExecStoreMinimalTuple(entry->tuplehead->mintuple,
											  slot, false);

						return slot;
					}

					/* The cache entry is void of any tuples. */
					node->mstatus = MEMO_END_OF_SCAN;
					return NULL;
				}

				/* Handle cache miss */
				node->stats.cache_misses += 1;	/* stats update */

				if (found)
				{
					/*
					 * A cache entry was found, but the scan for that entry
					 * did not run to completion.  We'll just remove all
					 * tuples and start again.  It might be tempting to
					 * continue where we left off, but there's no guarantee
					 * the outer node will produce the tuples in the same
					 * order as it did last time.
					 */
					entry_purge_tuples(node, entry);
				}

				/* Scan the outer node for a tuple to cache */
				outerNode = outerPlanState(node);
				outerslot = ExecProcNode(outerNode);
				if (TupIsNull(outerslot))
				{
					/*
					 * cache_lookup may have returned NULL due to failure to
					 * free enough cache space, so ensure we don't do anything
					 * here that assumes it worked. There's no need to go into
					 * bypass mode here as we're setting mstatus to end of
					 * scan.
					 */
					if (likely(entry))
						entry->complete = true;

					node->mstatus = MEMO_END_OF_SCAN;
					return NULL;
				}

				node->entry = entry;

				/*
				 * If we failed to create the entry or failed to store the
				 * tuple in the entry, then go into bypass mode.
				 */
				if (unlikely(entry == NULL ||
							 !cache_store_tuple(node, outerslot)))
				{
					node->stats.cache_overflows += 1;	/* stats update */

					node->mstatus = MEMO_CACHE_BYPASS_MODE;

					/*
					 * No need to clear out last_tuple as we'll stay in bypass
					 * mode until the end of the scan.
					 */
				}
				else
				{
					/*
					 * If we only expect a single row from this scan then we
					 * can mark that we're not expecting more.  This allows
					 * cache lookups to work even when the scan has not been
					 * executed to completion.
					 */
					entry->complete = node->singlerow;
					node->mstatus = MEMO_FILLING_CACHE;
				}

				slot = node->ss.ps.ps_ResultTupleSlot;
				ExecCopySlot(slot, outerslot);
				return slot;
			}

		case MEMO_CACHE_FETCH_NEXT_TUPLE:
			{
				/* We shouldn't be in this state if these are not set */
				Assert(node->entry != NULL);
				Assert(node->last_tuple != NULL);

				/* Skip to the next tuple to output */
				node->last_tuple = node->last_tuple->next;

				/* No more tuples in the cache */
				if (node->last_tuple == NULL)
				{
					node->mstatus = MEMO_END_OF_SCAN;
					return NULL;
				}

				slot = node->ss.ps.ps_ResultTupleSlot;
				ExecStoreMinimalTuple(node->last_tuple->mintuple, slot,
									  false);

				return slot;
			}

		case MEMO_FILLING_CACHE:
			{
				TupleTableSlot *outerslot;
				MemoizeEntry *entry = node->entry;

				/* entry should already have been set by MEMO_CACHE_LOOKUP */
				Assert(entry != NULL);

				/*
				 * When in the MEMO_FILLING_CACHE state, we've just had a
				 * cache miss and are populating the cache with the current
				 * scan tuples.
				 */
				outerNode = outerPlanState(node);
				outerslot = ExecProcNode(outerNode);
				if (TupIsNull(outerslot))
				{
					/* No more tuples.  Mark it as complete */
					entry->complete = true;
					node->mstatus = MEMO_END_OF_SCAN;
					return NULL;
				}

				/*
				 * Validate if the planner properly set the singlerow flag. It
				 * should only set that if each cache entry can, at most,
				 * return 1 row.
				 */
				if (unlikely(entry->complete))
					elog(ERROR, "cache entry already complete");

				/* Record the tuple in the current cache entry */
				if (unlikely(!cache_store_tuple(node, outerslot)))
				{
					/* Couldn't store it?  Handle overflow */
					node->stats.cache_overflows += 1;	/* stats update */

					node->mstatus = MEMO_CACHE_BYPASS_MODE;

					/*
					 * No need to clear out entry or last_tuple as we'll stay
					 * in bypass mode until the end of the scan.
					 */
				}

				slot = node->ss.ps.ps_ResultTupleSlot;
				ExecCopySlot(slot, outerslot);
				return slot;
			}

		case MEMO_CACHE_BYPASS_MODE:
			{
				TupleTableSlot *outerslot;

				/*
				 * When in bypass mode we just continue to read tuples without
				 * caching.  We need to wait until the next rescan before we
				 * can come out of this mode.
				 */
				outerNode = outerPlanState(node);
				outerslot = ExecProcNode(outerNode);
				if (TupIsNull(outerslot))
				{
					node->mstatus = MEMO_END_OF_SCAN;
					return NULL;
				}

				slot = node->ss.ps.ps_ResultTupleSlot;
				ExecCopySlot(slot, outerslot);
				return slot;
			}

		case MEMO_END_OF_SCAN:

			/*
			 * We've already returned NULL for this scan, but just in case
			 * something calls us again by mistake.
			 */
			return NULL;

		default:
			elog(ERROR, "unrecognized memoize state: %d",
				 (int) node->mstatus);
			return NULL;
	}							/* switch */
}

MemoizeState *
ExecInitMemoize(Memoize *node, EState *estate, int eflags)
{
	MemoizeState *mstate = makeNode(MemoizeState);
	Plan	   *outerNode;
	int			i;
	int			nkeys;
	Oid		   *eqfuncoids;

	/* check for unsupported flags */
	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));

	mstate->ss.ps.plan = (Plan *) node;
	mstate->ss.ps.state = estate;
	mstate->ss.ps.ExecProcNode = ExecMemoize;

	/*
	 * Miscellaneous initialization
	 *
	 * create expression context for node
	 */
	ExecAssignExprContext(estate, &mstate->ss.ps);

	outerNode = outerPlan(node);
	outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags);

	/*
	 * Initialize return slot and type. No need to initialize projection info
	 * because this node doesn't do projections.
	 */
	ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple);
	mstate->ss.ps.ps_ProjInfo = NULL;

	/*
	 * Initialize scan slot and type.
	 */
	ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple);

	/*
	 * Set the state machine to lookup the cache.  We won't find anything
	 * until we cache something, but this saves a special case to create the
	 * first entry.
	 */
	mstate->mstatus = MEMO_CACHE_LOOKUP;

	mstate->nkeys = nkeys = node->numKeys;
	mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs);
	mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
												 &TTSOpsMinimalTuple);
	mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
												 &TTSOpsVirtual);

	mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *));
	mstate->collations = node->collations;	/* Just point directly to the plan
											 * data */
	mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));

	eqfuncoids = palloc(nkeys * sizeof(Oid));

	for (i = 0; i < nkeys; i++)
	{
		Oid			hashop = node->hashOperators[i];
		Oid			left_hashfn;
		Oid			right_hashfn;
		Expr	   *param_expr = (Expr *) list_nth(node->param_exprs, i);

		if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
			elog(ERROR, "could not find hash function for hash operator %u",
				 hashop);

		fmgr_info(left_hashfn, &mstate->hashfunctions[i]);

		mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate);
		eqfuncoids[i] = get_opcode(hashop);
	}

	mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc,
												   &TTSOpsMinimalTuple,
												   &TTSOpsVirtual,
												   eqfuncoids,
												   node->collations,
												   node->param_exprs,
												   (PlanState *) mstate);

	pfree(eqfuncoids);
	mstate->mem_used = 0;

	/* Limit the total memory consumed by the cache to this */
	mstate->mem_limit = get_hash_memory_limit();

	/* A memory context dedicated for the cache */
	mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
												 "MemoizeHashTable",
												 ALLOCSET_DEFAULT_SIZES);

	dlist_init(&mstate->lru_list);
	mstate->last_tuple = NULL;
	mstate->entry = NULL;

	/*
	 * Mark if we can assume the cache entry is completed after we get the
	 * first record for it.  Some callers might not call us again after
	 * getting the first match. e.g. A join operator performing a unique join
	 * is able to skip to the next outer tuple after getting the first
	 * matching inner tuple.  In this case, the cache entry is complete after
	 * getting the first tuple.  This allows us to mark it as so.
	 */
	mstate->singlerow = node->singlerow;
	mstate->keyparamids = node->keyparamids;

	/*
	 * Record if the cache keys should be compared bit by bit, or logically
	 * using the type's hash equality operator
	 */
	mstate->binary_mode = node->binary_mode;

	/* Zero the statistics counters */
	memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation));

	/*
	 * Because it may require a large allocation, we delay building of the
	 * hash table until executor run.
	 */
	mstate->hashtable = NULL;

	return mstate;
}

void
ExecEndMemoize(MemoizeState *node)
{
#ifdef USE_ASSERT_CHECKING
	/* Validate the memory accounting code is correct in assert builds. */
	if (node->hashtable != NULL)
	{
		int			count;
		uint64		mem = 0;
		memoize_iterator i;
		MemoizeEntry *entry;

		memoize_start_iterate(node->hashtable, &i);

		count = 0;
		while ((entry = memoize_iterate(node->hashtable, &i)) != NULL)
		{
			MemoizeTuple *tuple = entry->tuplehead;

			mem += EMPTY_ENTRY_MEMORY_BYTES(entry);
			while (tuple != NULL)
			{
				mem += CACHE_TUPLE_BYTES(tuple);
				tuple = tuple->next;
			}
			count++;
		}

		Assert(count == node->hashtable->members);
		Assert(mem == node->mem_used);
	}
#endif

	/*
	 * When ending a parallel worker, copy the statistics gathered by the
	 * worker back into shared memory so that it can be picked up by the main
	 * process to report in EXPLAIN ANALYZE.
	 */
	if (node->shared_info != NULL && IsParallelWorker())
	{
		MemoizeInstrumentation *si;

		/* Make mem_peak available for EXPLAIN */
		if (node->stats.mem_peak == 0)
			node->stats.mem_peak = node->mem_used;

		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
		si = &node->shared_info->sinstrument[ParallelWorkerNumber];
		memcpy(si, &node->stats, sizeof(MemoizeInstrumentation));
	}

	/* Remove the cache context */
	MemoryContextDelete(node->tableContext);

	/*
	 * shut down the subplan
	 */
	ExecEndNode(outerPlanState(node));
}

void
ExecReScanMemoize(MemoizeState *node)
{
	PlanState  *outerPlan = outerPlanState(node);

	/* Mark that we must lookup the cache for a new set of parameters */
	node->mstatus = MEMO_CACHE_LOOKUP;

	/* nullify pointers used for the last scan */
	node->entry = NULL;
	node->last_tuple = NULL;

	/*
	 * if chgParam of subnode is not null then plan will be re-scanned by
	 * first ExecProcNode.
	 */
	if (outerPlan->chgParam == NULL)
		ExecReScan(outerPlan);

	/*
	 * Purge the entire cache if a parameter changed that is not part of the
	 * cache key.
	 */
	if (bms_nonempty_difference(outerPlan->chgParam, node->keyparamids))
		cache_purge_all(node);
}

/*
 * ExecEstimateCacheEntryOverheadBytes
 *		For use in the query planner to help it estimate the amount of memory
 *		required to store a single entry in the cache.
 */
double
ExecEstimateCacheEntryOverheadBytes(double ntuples)
{
	return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) *
		ntuples;
}

/* ----------------------------------------------------------------
 *						Parallel Query Support
 * ----------------------------------------------------------------
 */

 /* ----------------------------------------------------------------
  *		ExecMemoizeEstimate
  *
  *		Estimate space required to propagate memoize statistics.
  * ----------------------------------------------------------------
  */
void
ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
{
	Size		size;

	/* don't need this if not instrumenting or no workers */
	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
		return;

	size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation));
	size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument));
	shm_toc_estimate_chunk(&pcxt->estimator, size);
	shm_toc_estimate_keys(&pcxt->estimator, 1);
}

/* ----------------------------------------------------------------
 *		ExecMemoizeInitializeDSM
 *
 *		Initialize DSM space for memoize statistics.
 * ----------------------------------------------------------------
 */
void
ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
{
	Size		size;

	/* don't need this if not instrumenting or no workers */
	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
		return;

	size = offsetof(SharedMemoizeInfo, sinstrument)
		+ pcxt->nworkers * sizeof(MemoizeInstrumentation);
	node->shared_info = shm_toc_allocate(pcxt->toc, size);
	/* ensure any unfilled slots will contain zeroes */
	memset(node->shared_info, 0, size);
	node->shared_info->num_workers = pcxt->nworkers;
	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
				   node->shared_info);
}

/* ----------------------------------------------------------------
 *		ExecMemoizeInitializeWorker
 *
 *		Attach worker to DSM space for memoize statistics.
 * ----------------------------------------------------------------
 */
void
ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
{
	node->shared_info =
		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
}

/* ----------------------------------------------------------------
 *		ExecMemoizeRetrieveInstrumentation
 *
 *		Transfer memoize statistics from DSM to private memory.
 * ----------------------------------------------------------------
 */
void
ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
{
	Size		size;
	SharedMemoizeInfo *si;

	if (node->shared_info == NULL)
		return;

	size = offsetof(SharedMemoizeInfo, sinstrument)
		+ node->shared_info->num_workers * sizeof(MemoizeInstrumentation);
	si = palloc(size);
	memcpy(si, node->shared_info, size);
	node->shared_info = si;
}