aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/misc/injection_point.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2024-07-15 10:21:16 +0300
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2024-07-15 10:22:11 +0300
commit86db52a5062a77c6ee533586be2cef8672a20c7d (patch)
treecf438b1434cebba79bfc83094b65be6942b2c0f3 /src/backend/utils/misc/injection_point.c
parent4e5d6c40914e2b890c4780d33de8ad07b1d79689 (diff)
downloadpostgresql-86db52a5062a77c6ee533586be2cef8672a20c7d.tar.gz
postgresql-86db52a5062a77c6ee533586be2cef8672a20c7d.zip
Use atomics to avoid locking in InjectionPointRun()
This allows using injection points without having a PGPROC, like early at backend startup, or in the postmaster. The injection points facility is new in v17, so backpatch there. Reviewed-by: Michael Paquier <michael@paquier.xyz> Disussion: https://www.postgresql.org/message-id/4317a7f7-8d24-435e-9e49-29b72a3dc418@iki.fi
Diffstat (limited to 'src/backend/utils/misc/injection_point.c')
-rw-r--r--src/backend/utils/misc/injection_point.c404
1 files changed, 280 insertions, 124 deletions
diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c
index 48f29e9b60a..84ad5e470d7 100644
--- a/src/backend/utils/misc/injection_point.c
+++ b/src/backend/utils/misc/injection_point.c
@@ -21,7 +21,6 @@
#include "fmgr.h"
#include "miscadmin.h"
-#include "port/pg_bitutils.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
@@ -31,22 +30,35 @@
#ifdef USE_INJECTION_POINTS
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash; /* find points from names */
-
/* Field sizes */
#define INJ_NAME_MAXLEN 64
#define INJ_LIB_MAXLEN 128
#define INJ_FUNC_MAXLEN 128
#define INJ_PRIVATE_MAXLEN 1024
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
typedef struct InjectionPointEntry
{
+ /*
+ * Because injection points need to be usable without LWLocks, we use a
+ * generation counter on each entry to allow safe, lock-free reading.
+ *
+ * To read an entry, first read the current 'generation' value. If it's
+ * even, then the slot is currently unused, and odd means it's in use.
+ * When reading the other fields, beware that they may change while
+ * reading them, if the entry is released and reused! After reading the
+ * other fields, read 'generation' again: if its value hasn't changed, you
+ * can be certain that the other fields you read are valid. Otherwise,
+ * the slot was concurrently recycled, and you should ignore it.
+ *
+ * When adding an entry, you must store all the other fields first, and
+ * then update the generation number, with an appropriate memory barrier
+ * in between. In addition to that protocol, you must also hold
+ * InjectionPointLock, to prevent two backends from modifying the array at
+ * the same time.
+ */
+ pg_atomic_uint64 generation;
+
char name[INJ_NAME_MAXLEN]; /* hash key */
char library[INJ_LIB_MAXLEN]; /* library */
char function[INJ_FUNC_MAXLEN]; /* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
char private_data[INJ_PRIVATE_MAXLEN];
} InjectionPointEntry;
-#define INJECTION_POINT_HASH_INIT_SIZE 16
-#define INJECTION_POINT_HASH_MAX_SIZE 128
+#define MAX_INJECTION_POINTS 128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one. It's just an
+ * optimization to avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+ pg_atomic_uint32 max_inuse;
+ InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
/*
* Backend local cache of injection callbacks already loaded, stored in
@@ -70,6 +96,14 @@ typedef struct InjectionPointCacheEntry
char name[INJ_NAME_MAXLEN];
char private_data[INJ_PRIVATE_MAXLEN];
InjectionPointCallback callback;
+
+ /*
+ * Shmem slot and copy of its generation number when this cache entry was
+ * created. They can be used to validate if the cached entry is still
+ * valid.
+ */
+ int slot_idx;
+ uint64 generation;
} InjectionPointCacheEntry;
static HTAB *InjectionPointCache = NULL;
@@ -79,8 +113,10 @@ static HTAB *InjectionPointCache = NULL;
*
* Add an injection point to the local cache.
*/
-static void
+static InjectionPointCacheEntry *
injection_point_cache_add(const char *name,
+ int slot_idx,
+ uint64 generation,
InjectionPointCallback callback,
const void *private_data)
{
@@ -97,7 +133,7 @@ injection_point_cache_add(const char *name,
hash_ctl.hcxt = TopMemoryContext;
InjectionPointCache = hash_create("InjectionPoint cache hash",
- INJECTION_POINT_HASH_MAX_SIZE,
+ MAX_INJECTION_POINTS,
&hash_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
@@ -107,9 +143,12 @@ injection_point_cache_add(const char *name,
Assert(!found);
strlcpy(entry->name, name, sizeof(entry->name));
+ entry->slot_idx = slot_idx;
+ entry->generation = generation;
entry->callback = callback;
- if (private_data != NULL)
- memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+ memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+ return entry;
}
/*
@@ -122,11 +161,10 @@ injection_point_cache_add(const char *name,
static void
injection_point_cache_remove(const char *name)
{
- /* leave if no cache */
- if (InjectionPointCache == NULL)
- return;
+ bool found PG_USED_FOR_ASSERTS_ONLY;
- (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+ (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+ Assert(found);
}
/*
@@ -134,29 +172,32 @@ injection_point_cache_remove(const char *name)
*
* Load an injection point into the local cache.
*/
-static void
-injection_point_cache_load(InjectionPointEntry *entry_by_name)
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
{
char path[MAXPGPATH];
void *injection_callback_local;
snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
- entry_by_name->library, DLSUFFIX);
+ entry->library, DLSUFFIX);
if (!pg_file_exists(path))
elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
- path, entry_by_name->name);
+ path, entry->name);
injection_callback_local = (void *)
- load_external_function(path, entry_by_name->function, false, NULL);
+ load_external_function(path, entry->function, false, NULL);
if (injection_callback_local == NULL)
elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
- entry_by_name->function, path, entry_by_name->name);
-
- /* add it to the local cache when found */
- injection_point_cache_add(entry_by_name->name, injection_callback_local,
- entry_by_name->private_data);
+ entry->function, path, entry->name);
+
+ /* add it to the local cache */
+ return injection_point_cache_add(entry->name,
+ slot_idx,
+ generation,
+ injection_callback_local,
+ entry->private_data);
}
/*
@@ -193,8 +234,7 @@ InjectionPointShmemSize(void)
#ifdef USE_INJECTION_POINTS
Size sz = 0;
- sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
- sizeof(InjectionPointEntry)));
+ sz = add_size(sz, sizeof(InjectionPointsCtl));
return sz;
#else
return 0;
@@ -208,16 +248,20 @@ void
InjectionPointShmemInit(void)
{
#ifdef USE_INJECTION_POINTS
- HASHCTL info;
-
- /* key is a NULL-terminated string */
- info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
- info.entrysize = sizeof(InjectionPointEntry);
- InjectionPointHash = ShmemInitHash("InjectionPoint hash",
- INJECTION_POINT_HASH_INIT_SIZE,
- INJECTION_POINT_HASH_MAX_SIZE,
- &info,
- HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+ bool found;
+
+ ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+ sizeof(InjectionPointsCtl),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ Assert(!found);
+ pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+ for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+ pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+ }
+ else
+ Assert(found);
#endif
}
@@ -232,8 +276,10 @@ InjectionPointAttach(const char *name,
int private_data_size)
{
#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
+ InjectionPointEntry *entry;
+ uint64 generation;
+ uint32 max_inuse;
+ int free_idx;
if (strlen(name) >= INJ_NAME_MAXLEN)
elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -253,21 +299,51 @@ InjectionPointAttach(const char *name,
* exist. For testing purposes this should be fine.
*/
LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_ENTER, &found);
- if (found)
- elog(ERROR, "injection point \"%s\" already defined", name);
+ max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ free_idx = -1;
+
+ for (int idx = 0; idx < max_inuse; idx++)
+ {
+ entry = &ActiveInjectionPoints->entries[idx];
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 == 0)
+ {
+ /*
+ * Found a free slot where we can add the new entry, but keep
+ * going so that we will find out if the entry already exists.
+ */
+ if (free_idx == -1)
+ free_idx = idx;
+ }
+
+ if (strcmp(entry->name, name) == 0)
+ elog(ERROR, "injection point \"%s\" already defined", name);
+ }
+ if (free_idx == -1)
+ {
+ if (max_inuse == MAX_INJECTION_POINTS)
+ elog(ERROR, "too many injection points");
+ free_idx = max_inuse;
+ }
+ entry = &ActiveInjectionPoints->entries[free_idx];
+ generation = pg_atomic_read_u64(&entry->generation);
+ Assert(generation % 2 == 0);
/* Save the entry */
- strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
- entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
- strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
- entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
- strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
- entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+ strlcpy(entry->name, name, sizeof(entry->name));
+ entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+ strlcpy(entry->library, library, sizeof(entry->library));
+ entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+ strlcpy(entry->function, function, sizeof(entry->function));
+ entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
if (private_data != NULL)
- memcpy(entry_by_name->private_data, private_data, private_data_size);
+ memcpy(entry->private_data, private_data, private_data_size);
+
+ pg_write_barrier();
+ pg_atomic_write_u64(&entry->generation, generation + 1);
+
+ if (free_idx + 1 > max_inuse)
+ pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
LWLockRelease(InjectionPointLock);
@@ -285,63 +361,177 @@ bool
InjectionPointDetach(const char *name)
{
#ifdef USE_INJECTION_POINTS
- bool found;
+ bool found = false;
+ int idx;
+ int max_inuse;
LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
- hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
- LWLockRelease(InjectionPointLock);
- if (!found)
- return false;
+ /* Find it in the shmem array, and mark the slot as unused */
+ max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ for (idx = max_inuse - 1; idx >= 0; --idx)
+ {
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation;
+
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 == 0)
+ continue; /* empty slot */
+
+ if (strcmp(entry->name, name) == 0)
+ {
+ Assert(!found);
+ found = true;
+ pg_atomic_write_u64(&entry->generation, generation + 1);
+ break;
+ }
+ }
+
+ /* If we just removed the highest-numbered entry, update 'max_inuse' */
+ if (found && idx == max_inuse - 1)
+ {
+ for (; idx >= 0; --idx)
+ {
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation;
+
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 != 0)
+ break;
+ }
+ pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+ }
+ LWLockRelease(InjectionPointLock);
- return true;
+ return found;
#else
elog(ERROR, "Injection points are not supported by this build");
return true; /* silence compiler */
#endif
}
+#ifdef USE_INJECTION_POINTS
/*
- * Load an injection point into the local cache.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
*
- * This is useful to be able to load an injection point before running it,
- * especially if the injection point is called in a code path where memory
- * allocations cannot happen, like critical sections.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
*/
-void
-InjectionPointLoad(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
{
-#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
+ uint32 max_inuse;
+ int namelen;
+ InjectionPointEntry local_copy;
+ InjectionPointCacheEntry *cached;
- LWLockAcquire(InjectionPointLock, LW_SHARED);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_FIND, &found);
+ /*
+ * First read the number of in-use slots. More entries can be added or
+ * existing ones can be removed while we're reading them. If the entry
+ * we're looking for is concurrently added or removed, we might or might
+ * not see it. That's OK.
+ */
+ max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ if (max_inuse == 0)
+ {
+ if (InjectionPointCache)
+ {
+ hash_destroy(InjectionPointCache);
+ InjectionPointCache = NULL;
+ }
+ return NULL;
+ }
/*
- * If not found, do nothing and remove it from the local cache if it
- * existed there.
+ * If we have this entry in the local cache already, check if the cached
+ * entry is still valid.
*/
- if (!found)
+ cached = injection_point_cache_get(name);
+ if (cached)
{
+ int idx = cached->slot_idx;
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+ if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+ {
+ /* still good */
+ return cached;
+ }
injection_point_cache_remove(name);
- LWLockRelease(InjectionPointLock);
- return;
+ cached = NULL;
}
- /* Check first the local cache, and leave if this entry exists. */
- if (injection_point_cache_get(name) != NULL)
+ /*
+ * Search the shared memory array.
+ *
+ * It's possible that the entry we're looking for is concurrently detached
+ * or attached. Or detached *and* re-attached, to the same slot or a
+ * different slot. Detach and re-attach is not an atomic operation, so
+ * it's OK for us to return the old value, NULL, or the new value in such
+ * cases.
+ */
+ namelen = strlen(name);
+ for (int idx = 0; idx < max_inuse; idx++)
{
- LWLockRelease(InjectionPointLock);
- return;
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation;
+
+ /*
+ * Read the generation number so that we can detect concurrent
+ * modifications. The read barrier ensures that the generation number
+ * is loaded before any of the other fields.
+ */
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 == 0)
+ continue; /* empty slot */
+ pg_read_barrier();
+
+ /* Is this the injection point we're looking for? */
+ if (memcmp(entry->name, name, namelen + 1) != 0)
+ continue;
+
+ /*
+ * The entry can change at any time, if the injection point is
+ * concurrently detached. Copy it to local memory, and re-check the
+ * generation. If the generation hasn't changed, we know our local
+ * copy is coherent.
+ */
+ memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+ pg_read_barrier();
+ if (pg_atomic_read_u64(&entry->generation) != generation)
+ {
+ /*
+ * The entry was concurrently detached.
+ *
+ * Continue the search, because if the generation number changed,
+ * we cannot trust the result of the name comparison we did above.
+ * It's theoretically possible that it falsely matched a mixed-up
+ * state of the old and new name, if the slot was recycled with a
+ * different name.
+ */
+ continue;
+ }
+
+ /* Success! Load it into the cache and return it */
+ return injection_point_cache_load(&local_copy, idx, generation);
}
+ return NULL;
+}
+#endif
- /* Nothing? Then load it and leave */
- injection_point_cache_load(entry_by_name);
-
- LWLockRelease(InjectionPointLock);
+/*
+ * Load an injection point into the local cache.
+ *
+ * This is useful to be able to load an injection point before running it,
+ * especially if the injection point is called in a code path where memory
+ * allocations cannot happen, like critical sections.
+ */
+void
+InjectionPointLoad(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+ InjectionPointCacheRefresh(name);
#else
elog(ERROR, "Injection points are not supported by this build");
#endif
@@ -349,50 +539,16 @@ InjectionPointLoad(const char *name)
/*
* Execute an injection point, if defined.
- *
- * Check first the shared hash table, and adapt the local cache depending
- * on that as it could be possible that an entry to run has been removed.
*/
void
InjectionPointRun(const char *name)
{
#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
InjectionPointCacheEntry *cache_entry;
- LWLockAcquire(InjectionPointLock, LW_SHARED);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_FIND, &found);
-
- /*
- * If not found, do nothing and remove it from the local cache if it
- * existed there.
- */
- if (!found)
- {
- injection_point_cache_remove(name);
- LWLockRelease(InjectionPointLock);
- return;
- }
-
- /*
- * Check if the callback exists in the local cache, to avoid unnecessary
- * external loads.
- */
- if (injection_point_cache_get(name) == NULL)
- {
- /* not found in local cache, so load and register it */
- injection_point_cache_load(entry_by_name);
- }
-
- /* Now loaded, so get it. */
- cache_entry = injection_point_cache_get(name);
-
- LWLockRelease(InjectionPointLock);
-
- cache_entry->callback(name, cache_entry->private_data);
+ cache_entry = InjectionPointCacheRefresh(name);
+ if (cache_entry)
+ cache_entry->callback(name, cache_entry->private_data);
#else
elog(ERROR, "Injection points are not supported by this build");
#endif