diff options
Diffstat (limited to 'src/backend/utils/cache/typcache.c')
-rw-r--r-- | src/backend/utils/cache/typcache.c | 634 |
1 files changed, 602 insertions, 32 deletions
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 2e633f08c51..3be853a85af 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -46,6 +46,8 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/parallel.h" +#include "access/session.h" #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_constraint.h" @@ -55,7 +57,9 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "executor/executor.h" +#include "lib/dshash.h" #include "optimizer/planner.h" +#include "storage/lwlock.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" @@ -142,6 +146,117 @@ typedef struct RecordCacheEntry TupleDesc tupdesc; } RecordCacheEntry; +/* + * To deal with non-anonymous record types that are exchanged by backends + * involved in a parallel query, we also need a shared verion of the above. + */ +struct SharedRecordTypmodRegistry +{ + /* A hash table for finding a matching TupleDesc. */ + dshash_table_handle record_table_handle; + /* A hash table for finding a TupleDesc by typmod. */ + dshash_table_handle typmod_table_handle; + /* A source of new record typmod numbers. */ + pg_atomic_uint32 next_typmod; +}; + +/* + * When using shared tuple descriptors as hash table keys we need a way to be + * able to search for an equal shared TupleDesc using a backend-local + * TupleDesc. So we use this type which can hold either, and hash and compare + * functions that know how to handle both. + */ +typedef struct SharedRecordTableKey +{ + union + { + TupleDesc local_tupdesc; + dsa_pointer shared_tupdesc; + }; + bool shared; +} SharedRecordTableKey; + +/* + * The shared version of RecordCacheEntry. This lets us look up a typmod + * using a TupleDesc which may be in local or shared memory. + */ +typedef struct SharedRecordTableEntry +{ + SharedRecordTableKey key; +} SharedRecordTableEntry; + +/* + * An entry in SharedRecordTypmodRegistry's typmod table. This lets us look + * up a TupleDesc in shared memory using a typmod. + */ +typedef struct SharedTypmodTableEntry +{ + uint32 typmod; + dsa_pointer shared_tupdesc; +} SharedTypmodTableEntry; + +/* + * A comparator function for SharedTupleDescTableKey. + */ +static int +shared_record_table_compare(const void *a, const void *b, size_t size, + void *arg) +{ + dsa_area *area = (dsa_area *) arg; + SharedRecordTableKey *k1 = (SharedRecordTableKey *) a; + SharedRecordTableKey *k2 = (SharedRecordTableKey *) b; + TupleDesc t1; + TupleDesc t2; + + if (k1->shared) + t1 = (TupleDesc) dsa_get_address(area, k1->shared_tupdesc); + else + t1 = k1->local_tupdesc; + + if (k2->shared) + t2 = (TupleDesc) dsa_get_address(area, k2->shared_tupdesc); + else + t2 = k2->local_tupdesc; + + return equalTupleDescs(t1, t2) ? 0 : 1; +} + +/* + * A hash function for SharedRecordTableKey. + */ +static uint32 +shared_record_table_hash(const void *a, size_t size, void *arg) +{ + dsa_area *area = (dsa_area *) arg; + SharedRecordTableKey *k = (SharedRecordTableKey *) a; + TupleDesc t; + + if (k->shared) + t = (TupleDesc) dsa_get_address(area, k->shared_tupdesc); + else + t = k->local_tupdesc; + + return hashTupleDesc(t); +} + +/* Parameters for SharedRecordTypmodRegistry's TupleDesc table. */ +static const dshash_parameters srtr_record_table_params = { + sizeof(SharedRecordTableKey), /* unused */ + sizeof(SharedRecordTableEntry), + shared_record_table_compare, + shared_record_table_hash, + LWTRANCHE_SESSION_RECORD_TABLE +}; + +/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */ +static const dshash_parameters srtr_typmod_table_params = { + sizeof(uint32), + sizeof(SharedTypmodTableEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_SESSION_TYPMOD_TABLE +}; + static HTAB *RecordCacheHash = NULL; static TupleDesc *RecordCacheArray = NULL; @@ -168,6 +283,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue); static void load_enum_cache_data(TypeCacheEntry *tcache); static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg); static int enum_oid_cmp(const void *left, const void *right); +static void shared_record_typmod_registry_detach(dsm_segment *segment, + Datum datum); +static void shared_record_typmod_registry_worker_detach(dsm_segment *segment, + Datum datum); +static TupleDesc find_or_make_matching_shared_tupledesc(TupleDesc tupdesc); +static dsa_pointer share_tupledesc(dsa_area *area, TupleDesc tupdesc, + uint32 typmod); /* @@ -377,8 +499,8 @@ lookup_type_cache(Oid type_id, int flags) /* * Reset info about hash functions whenever we pick up new info about - * equality operator. This is so we can ensure that the hash functions - * match the operator. + * equality operator. This is so we can ensure that the hash + * functions match the operator. */ typentry->flags &= ~(TCFLAGS_CHECKED_HASH_PROC); typentry->flags &= ~(TCFLAGS_CHECKED_HASH_EXTENDED_PROC); @@ -1243,6 +1365,33 @@ cache_record_field_properties(TypeCacheEntry *typentry) typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES; } +/* + * Make sure that RecordCacheArray is large enough to store 'typmod'. + */ +static void +ensure_record_cache_typmod_slot_exists(int32 typmod) +{ + if (RecordCacheArray == NULL) + { + RecordCacheArray = (TupleDesc *) + MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc)); + RecordCacheArrayLen = 64; + } + + if (typmod >= RecordCacheArrayLen) + { + int32 newlen = RecordCacheArrayLen * 2; + + while (typmod >= newlen) + newlen *= 2; + + RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, + newlen * sizeof(TupleDesc)); + memset(RecordCacheArray + RecordCacheArrayLen, 0, + (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *)); + RecordCacheArrayLen = newlen; + } +} /* * lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype @@ -1273,15 +1422,53 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError) /* * It's a transient record type, so look in our record-type table. */ - if (typmod < 0 || typmod >= NextRecordTypmod) + if (typmod >= 0) { - if (!noError) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("record type has not been registered"))); - return NULL; + /* It is already in our local cache? */ + if (typmod < RecordCacheArrayLen && + RecordCacheArray[typmod] != NULL) + return RecordCacheArray[typmod]; + + /* Are we attached to a shared record typmod registry? */ + if (CurrentSession->shared_typmod_registry != NULL) + { + SharedTypmodTableEntry *entry; + + /* Try to find it in the shared typmod index. */ + entry = dshash_find(CurrentSession->shared_typmod_table, + &typmod, false); + if (entry != NULL) + { + TupleDesc tupdesc; + + tupdesc = (TupleDesc) + dsa_get_address(CurrentSession->area, + entry->shared_tupdesc); + Assert(typmod == tupdesc->tdtypmod); + + /* We may need to extend the local RecordCacheArray. */ + ensure_record_cache_typmod_slot_exists(typmod); + + /* + * Our local array can now point directly to the TupleDesc + * in shared memory. + */ + RecordCacheArray[typmod] = tupdesc; + Assert(tupdesc->tdrefcount == -1); + + dshash_release_lock(CurrentSession->shared_typmod_table, + entry); + + return RecordCacheArray[typmod]; + } + } } - return RecordCacheArray[typmod]; + + if (!noError) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("record type has not been registered"))); + return NULL; } } @@ -1303,7 +1490,7 @@ lookup_rowtype_tupdesc(Oid type_id, int32 typmod) TupleDesc tupDesc; tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, false); - IncrTupleDescRefCount(tupDesc); + PinTupleDesc(tupDesc); return tupDesc; } @@ -1321,7 +1508,7 @@ lookup_rowtype_tupdesc_noerror(Oid type_id, int32 typmod, bool noError) tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, noError); if (tupDesc != NULL) - IncrTupleDescRefCount(tupDesc); + PinTupleDesc(tupDesc); return tupDesc; } @@ -1376,7 +1563,6 @@ assign_record_type_typmod(TupleDesc tupDesc) RecordCacheEntry *recentry; TupleDesc entDesc; bool found; - int32 newtypmod; MemoryContext oldcxt; Assert(tupDesc->tdtypeid == RECORDOID); @@ -1414,34 +1600,208 @@ assign_record_type_typmod(TupleDesc tupDesc) recentry->tupdesc = NULL; oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - if (RecordCacheArray == NULL) + /* Look in the SharedRecordTypmodRegistry, if attached */ + entDesc = find_or_make_matching_shared_tupledesc(tupDesc); + if (entDesc == NULL) { - RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc)); - RecordCacheArrayLen = 64; + /* Reference-counted local cache only. */ + entDesc = CreateTupleDescCopy(tupDesc); + entDesc->tdrefcount = 1; + entDesc->tdtypmod = NextRecordTypmod++; } - else if (NextRecordTypmod >= RecordCacheArrayLen) + ensure_record_cache_typmod_slot_exists(entDesc->tdtypmod); + RecordCacheArray[entDesc->tdtypmod] = entDesc; + recentry->tupdesc = entDesc; + + /* Update the caller's tuple descriptor. */ + tupDesc->tdtypmod = entDesc->tdtypmod; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Return the amout of shmem required to hold a SharedRecordTypmodRegistry. + * This exists only to avoid exposing private innards of + * SharedRecordTypmodRegistry in a header. + */ +size_t +SharedRecordTypmodRegistryEstimate(void) +{ + return sizeof(SharedRecordTypmodRegistry); +} + +/* + * Initialize 'registry' in a pre-existing shared memory region, which must be + * maximally aligned and have space for SharedRecordTypmodRegistryEstimate() + * bytes. + * + * 'area' will be used to allocate shared memory space as required for the + * typemod registration. The current process, expected to be a leader process + * in a parallel query, will be attached automatically and its current record + * types will be loaded into *registry. While attached, all calls to + * assign_record_type_typmod will use the shared registry. Worker backends + * will need to attach explicitly. + * + * Note that this function takes 'area' and 'segment' as arguments rather than + * accessing them via CurrentSession, because they aren't installed there + * until after this function runs. + */ +void +SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry, + dsm_segment *segment, + dsa_area *area) +{ + MemoryContext old_context; + dshash_table *record_table; + dshash_table *typmod_table; + int32 typmod; + + Assert(!IsParallelWorker()); + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSession->shared_typmod_registry == NULL); + Assert(CurrentSession->shared_record_table == NULL); + Assert(CurrentSession->shared_typmod_table == NULL); + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Create the hash table of tuple descriptors indexed by themselves. */ + record_table = dshash_create(area, &srtr_record_table_params, area); + + /* Create the hash table of tuple descriptors indexed by typmod. */ + typmod_table = dshash_create(area, &srtr_typmod_table_params, NULL); + + MemoryContextSwitchTo(old_context); + + /* Initialize the SharedRecordTypmodRegistry. */ + registry->record_table_handle = dshash_get_hash_table_handle(record_table); + registry->typmod_table_handle = dshash_get_hash_table_handle(typmod_table); + pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod); + + /* + * Copy all entries from this backend's private registry into the shared + * registry. + */ + for (typmod = 0; typmod < NextRecordTypmod; ++typmod) { - int32 newlen = RecordCacheArrayLen * 2; + SharedTypmodTableEntry *typmod_table_entry; + SharedRecordTableEntry *record_table_entry; + SharedRecordTableKey record_table_key; + dsa_pointer shared_dp; + TupleDesc tupdesc; + bool found; - RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, - newlen * sizeof(TupleDesc)); - RecordCacheArrayLen = newlen; + tupdesc = RecordCacheArray[typmod]; + if (tupdesc == NULL) + continue; + + /* Copy the TupleDesc into shared memory. */ + shared_dp = share_tupledesc(area, tupdesc, typmod); + + /* Insert into the typmod table. */ + typmod_table_entry = dshash_find_or_insert(typmod_table, + &tupdesc->tdtypmod, + &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + typmod_table_entry->typmod = tupdesc->tdtypmod; + typmod_table_entry->shared_tupdesc = shared_dp; + dshash_release_lock(typmod_table, typmod_table_entry); + + /* Insert into the record table. */ + record_table_key.shared = false; + record_table_key.local_tupdesc = tupdesc; + record_table_entry = dshash_find_or_insert(record_table, + &record_table_key, + &found); + if (!found) + { + record_table_entry->key.shared = true; + record_table_entry->key.shared_tupdesc = shared_dp; + } + dshash_release_lock(record_table, record_table_entry); } - /* if fail in subrs, no damage except possibly some wasted memory... */ - entDesc = CreateTupleDescCopy(tupDesc); - recentry->tupdesc = entDesc; - /* mark it as a reference-counted tupdesc */ - entDesc->tdrefcount = 1; - /* now it's safe to advance NextRecordTypmod */ - newtypmod = NextRecordTypmod++; - entDesc->tdtypmod = newtypmod; - RecordCacheArray[newtypmod] = entDesc; + /* + * Set up the global state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSession->shared_record_table = record_table; + CurrentSession->shared_typmod_table = typmod_table; + CurrentSession->shared_typmod_registry = registry; - /* report to caller as well */ - tupDesc->tdtypmod = newtypmod; + /* + * We install a detach hook in the leader, but only to handle cleanup on + * failure during GetSessionDsmHandle(). Once GetSessionDsmHandle() pins + * the memory, the leader process will use a shared registry until it + * exits. + */ + on_dsm_detach(segment, shared_record_typmod_registry_detach, (Datum) 0); +} - MemoryContextSwitchTo(oldcxt); +/* + * Attach to 'registry', which must have been initialized already by another + * backend. Future calls to assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal will use the shared registry until the + * current session is detached. + */ +void +SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry) +{ + MemoryContext old_context; + dshash_table *record_table; + dshash_table *typmod_table; + + Assert(IsParallelWorker()); + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSession != NULL); + Assert(CurrentSession->segment != NULL); + Assert(CurrentSession->area != NULL); + Assert(CurrentSession->shared_typmod_registry == NULL); + Assert(CurrentSession->shared_record_table == NULL); + Assert(CurrentSession->shared_typmod_table == NULL); + + /* + * We can't already have typmods in our local cache, because they'd clash + * with those imported by SharedRecordTypmodRegistryInit. This should be + * a freshly started parallel worker. If we ever support worker + * recycling, a worker would need to zap its local cache in between + * servicing different queries, in order to be able to call this and + * synchronize typmods with a new leader; see + * shared_record_typmod_registry_detach(). + */ + Assert(NextRecordTypmod == 0); + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Attach to the two hash tables. */ + record_table = dshash_attach(CurrentSession->area, + &srtr_record_table_params, + registry->record_table_handle, + CurrentSession->area); + typmod_table = dshash_attach(CurrentSession->area, + &srtr_typmod_table_params, + registry->typmod_table_handle, + NULL); + + MemoryContextSwitchTo(old_context); + + /* + * We install a different detach callback that performs a more complete + * reset of backend local state. + */ + on_dsm_detach(CurrentSession->segment, + shared_record_typmod_registry_worker_detach, + PointerGetDatum(registry)); + + /* + * Set up the session state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSession->shared_typmod_registry = registry; + CurrentSession->shared_record_table = record_table; + CurrentSession->shared_typmod_table = typmod_table; } /* @@ -1858,3 +2218,213 @@ enum_oid_cmp(const void *left, const void *right) else return 0; } + +/* + * Copy 'tupdesc' into newly allocated shared memory in 'area', set its typmod + * to the given value and return a dsa_pointer. + */ +static dsa_pointer +share_tupledesc(dsa_area *area, TupleDesc tupdesc, uint32 typmod) +{ + dsa_pointer shared_dp; + TupleDesc shared; + + shared_dp = dsa_allocate(area, TupleDescSize(tupdesc)); + shared = (TupleDesc) dsa_get_address(area, shared_dp); + TupleDescCopy(shared, tupdesc); + shared->tdtypmod = typmod; + + return shared_dp; +} + +/* + * If we are attached to a SharedRecordTypmodRegistry, use it to find or + * create a shared TupleDesc that matches 'tupdesc'. Otherwise return NULL. + * Tuple descriptors returned by this function are not reference counted, and + * will exist at least as long as the current backend remained attached to the + * current session. + */ +static TupleDesc +find_or_make_matching_shared_tupledesc(TupleDesc tupdesc) +{ + TupleDesc result; + SharedRecordTableKey key; + SharedRecordTableEntry *record_table_entry; + SharedTypmodTableEntry *typmod_table_entry; + dsa_pointer shared_dp; + bool found; + uint32 typmod; + + /* If not even attached, nothing to do. */ + if (CurrentSession->shared_typmod_registry == NULL) + return NULL; + + /* Try to find a matching tuple descriptor in the record table. */ + key.shared = false; + key.local_tupdesc = tupdesc; + record_table_entry = (SharedRecordTableEntry *) + dshash_find(CurrentSession->shared_record_table, &key, false); + if (record_table_entry) + { + Assert(record_table_entry->key.shared); + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, + record_table_entry->key.shared_tupdesc); + Assert(result->tdrefcount == -1); + + return result; + } + + /* Allocate a new typmod number. This will be wasted if we error out. */ + typmod = (int) + pg_atomic_fetch_add_u32(&CurrentSession->shared_typmod_registry->next_typmod, + 1); + + /* Copy the TupleDesc into shared memory. */ + shared_dp = share_tupledesc(CurrentSession->area, tupdesc, typmod); + + /* + * Create an entry in the typmod table so that others will understand this + * typmod number. + */ + PG_TRY(); + { + typmod_table_entry = (SharedTypmodTableEntry *) + dshash_find_or_insert(CurrentSession->shared_typmod_table, + &typmod, &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + } + PG_CATCH(); + { + dsa_free(CurrentSession->area, shared_dp); + PG_RE_THROW(); + } + PG_END_TRY(); + typmod_table_entry->typmod = typmod; + typmod_table_entry->shared_tupdesc = shared_dp; + dshash_release_lock(CurrentSession->shared_typmod_table, + typmod_table_entry); + + /* + * Finally create an entry in the record table so others with matching + * tuple descriptors can reuse the typmod. + */ + record_table_entry = (SharedRecordTableEntry *) + dshash_find_or_insert(CurrentSession->shared_record_table, &key, + &found); + if (found) + { + /* + * Someone concurrently inserted a matching tuple descriptor since the + * first time we checked. Use that one instead. + */ + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + + /* Might as well free up the space used by the one we created. */ + found = dshash_delete_key(CurrentSession->shared_typmod_table, + &typmod); + Assert(found); + dsa_free(CurrentSession->area, shared_dp); + + /* Return the one we found. */ + Assert(record_table_entry->key.shared); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, + record_table_entry->key.shared); + Assert(result->tdrefcount == -1); + + return result; + } + + /* Store it and return it. */ + record_table_entry->key.shared = true; + record_table_entry->key.shared_tupdesc = shared_dp; + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, shared_dp); + Assert(result->tdrefcount == -1); + + return result; +} + +/* + * Detach hook to forget about the current shared record typmod + * infrastructure. This is registered directly in leader backends, and + * reached only in case of error or shutdown. It's also reached indirectly + * via the worker detach callback below. + */ +static void +shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum) +{ + /* Be cautious here: maybe we didn't finish initializing. */ + if (CurrentSession->shared_record_table != NULL) + { + dshash_detach(CurrentSession->shared_record_table); + CurrentSession->shared_record_table = NULL; + } + if (CurrentSession->shared_typmod_table != NULL) + { + dshash_detach(CurrentSession->shared_typmod_table); + CurrentSession->shared_typmod_table = NULL; + } + CurrentSession->shared_typmod_registry = NULL; +} + +/* + * Deatch hook allowing workers to disconnect from shared record typmod + * registry. The resulting state should allow a worker to attach to a + * different leader, if worker reuse pools are invented. + */ +static void +shared_record_typmod_registry_worker_detach(dsm_segment *segment, Datum datum) +{ + /* + * Forget everything we learned about record typmods as part of the + * session we are disconnecting from, and return to the initial state. + */ + if (RecordCacheArray != NULL) + { + int32 i; + + for (i = 0; i < RecordCacheArrayLen; ++i) + { + if (RecordCacheArray[i] != NULL) + { + TupleDesc tupdesc = RecordCacheArray[i]; + + /* + * Pointers to tuple descriptors in shared memory are not + * reference counted, so we are not responsible for freeing + * them. They'll survive as long as the shared session + * exists, which should be as long as the owning leader + * backend exists. In theory we do need to free local + * reference counted tuple descriptors however, and we can't + * do that with DescTupleDescRefCount() because we aren't + * using a resource owner. In practice we don't expect to + * find any non-shared TupleDesc object in a worker. + */ + if (tupdesc->tdrefcount != -1) + { + Assert(tupdesc->tdrefcount > 0); + if (--tupdesc->tdrefcount == 0) + FreeTupleDesc(tupdesc); + } + } + } + pfree(RecordCacheArray); + RecordCacheArray = NULL; + } + if (RecordCacheHash != NULL) + { + hash_destroy(RecordCacheHash); + RecordCacheHash = NULL; + } + NextRecordTypmod = 0; + /* Call the code common to leader and worker detach. */ + shared_record_typmod_registry_detach(segment, datum); +} |