aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/buffer/bufmgr.c8
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/storage/lmgr/lock.c34
-rw-r--r--src/backend/storage/lmgr/lwlock.c401
-rw-r--r--src/backend/storage/lmgr/predicate.c35
-rw-r--r--src/backend/storage/lmgr/proc.c11
6 files changed, 304 insertions, 188 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 91f0c7eb36e..19eecab4c28 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
{
BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */
- LWLockId newPartitionLock; /* buffer partition lock for it */
+ LWLock *newPartitionLock; /* buffer partition lock for it */
int buf_id;
/* create a tag so we can lookup the buffer */
@@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */
- LWLockId newPartitionLock; /* buffer partition lock for it */
+ LWLock *newPartitionLock; /* buffer partition lock for it */
BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */
- LWLockId oldPartitionLock; /* buffer partition lock for it */
+ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags;
int buf_id;
volatile BufferDesc *buf;
@@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf)
{
BufferTag oldTag;
uint32 oldHash; /* hash value for oldTag */
- LWLockId oldPartitionLock; /* buffer partition lock for it */
+ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags;
/* Save the original buffer tag before dropping the spinlock */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index cc219237097..2e717457b12 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -182,8 +182,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
* Now initialize LWLocks, which do shared memory allocation and are
* needed for InitShmemIndex.
*/
- if (!IsUnderPostmaster)
- CreateLWLocks();
+ CreateLWLocks();
/*
* Set up shmem.c index hashtable
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 5c8b4b0656c..6335129ac25 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
LOCALLOCK *locallock;
LOCK *lock;
PROCLOCK *proclock;
- LWLockId partitionLock;
+ LWLock *partitionLock;
bool hasWaiters = false;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
@@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
bool found;
ResourceOwner owner;
uint32 hashcode;
- LWLockId partitionLock;
+ LWLock *partitionLock;
int status;
bool log_lock = false;
@@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
LOCALLOCK *locallock;
LOCK *lock;
PROCLOCK *proclock;
- LWLockId partitionLock;
+ LWLock *partitionLock;
bool wakeupNeeded;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
@@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
*/
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{
- LWLockId partitionLock = FirstLockMgrLock + partition;
+ LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
PROCLOCK *nextplock;
+ partitionLock = LockHashPartitionLockByIndex(partition);
+
/*
* If the proclock list for this partition is empty, we can skip
* acquiring the partition lock. This optimization is trickier than
@@ -2475,7 +2477,7 @@ static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
uint32 hashcode)
{
- LWLockId partitionLock = LockHashPartitionLock(hashcode);
+ LWLock *partitionLock = LockHashPartitionLock(hashcode);
Oid relid = locktag->locktag_field2;
uint32 i;
@@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
LOCKTAG *locktag = &locallock->tag.lock;
PROCLOCK *proclock = NULL;
- LWLockId partitionLock = LockHashPartitionLock(locallock->hashcode);
+ LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode);
Oid relid = locktag->locktag_field2;
uint32 f;
@@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
uint32 hashcode;
- LWLockId partitionLock;
+ LWLock *partitionLock;
int count = 0;
int fast_count = 0;
@@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
PROCLOCKTAG proclocktag;
uint32 hashcode;
uint32 proclock_hashcode;
- LWLockId partitionLock;
+ LWLock *partitionLock;
bool wakeupNeeded;
hashcode = LockTagHashCode(locktag);
@@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid)
*/
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{
- LWLockId partitionLock = FirstLockMgrLock + partition;
+ LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
PROCLOCK *nextplock;
+ partitionLock = LockHashPartitionLockByIndex(partition);
+
/*
* If the proclock list for this partition is empty, we can skip
* acquiring the partition lock. This optimization is safer than the
@@ -3400,7 +3404,7 @@ GetLockStatusData(void)
* Must grab LWLocks in partition-number order to avoid LWLock deadlock.
*/
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
/* Now we can safely count the number of proclocks */
data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
@@ -3442,7 +3446,7 @@ GetLockStatusData(void)
* behavior inside LWLockRelease.
*/
for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
- LWLockRelease(FirstLockMgrLock + i);
+ LWLockRelease(LockHashPartitionLockByIndex(i));
Assert(el == data->nelements);
@@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks)
* Must grab LWLocks in partition-number order to avoid LWLock deadlock.
*/
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
/* Now we can safely count the number of proclocks */
els = hash_get_num_entries(LockMethodProcLockHash);
@@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks)
* behavior inside LWLockRelease.
*/
for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
- LWLockRelease(FirstLockMgrLock + i);
+ LWLockRelease(LockHashPartitionLockByIndex(i));
*nlocks = index;
return accessExclusiveLocks;
@@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
uint32 hashcode;
uint32 proclock_hashcode;
int partition;
- LWLockId partitionLock;
+ LWLock *partitionLock;
LockMethod lockMethodTable;
Assert(len == sizeof(TwoPhaseLockRecord));
@@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
PROCLOCK *proclock;
uint32 hashcode;
- LWLockId partitionLock;
+ LWLock *partitionLock;
hashcode = LockTagHashCode(&tag);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 0e319a7e6ac..55d9d7837ca 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -31,50 +31,37 @@
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/spin.h"
+#include "utils/memutils.h"
+
+#ifdef LWLOCK_STATS
+#include "utils/hsearch.h"
+#endif
/* We use the ShmemLock spinlock to protect LWLockAssign */
extern slock_t *ShmemLock;
-
-typedef struct LWLock
-{
- slock_t mutex; /* Protects LWLock and queue of PGPROCs */
- bool releaseOK; /* T if ok to release waiters */
- char exclusive; /* # of exclusive holders (0 or 1) */
- int shared; /* # of shared holders (0..MaxBackends) */
- PGPROC *head; /* head of list of waiting PGPROCs */
- PGPROC *tail; /* tail of list of waiting PGPROCs */
- /* tail is undefined when head is NULL */
-} LWLock;
-
/*
- * All the LWLock structs are allocated as an array in shared memory.
- * (LWLockIds are indexes into the array.) We force the array stride to
- * be a power of 2, which saves a few cycles in indexing, but more
- * importantly also ensures that individual LWLocks don't cross cache line
- * boundaries. This reduces cache contention problems, especially on AMD
- * Opterons. (Of course, we have to also ensure that the array start
- * address is suitably aligned.)
- *
- * LWLock is between 16 and 32 bytes on all known platforms, so these two
- * cases are sufficient.
+ * This is indexed by tranche ID and stores metadata for all tranches known
+ * to the current backend.
*/
-#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32)
+static LWLockTranche **LWLockTrancheArray = NULL;
+static int LWLockTranchesAllocated = 0;
-typedef union LWLockPadded
-{
- LWLock lock;
- char pad[LWLOCK_PADDED_SIZE];
-} LWLockPadded;
+#define T_NAME(lock) \
+ (LWLockTrancheArray[(lock)->tranche]->name)
+#define T_ID(lock) \
+ ((int) ((((char *) lock) - \
+ ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
+ LWLockTrancheArray[(lock)->tranche]->array_stride))
/*
- * This points to the array of LWLocks in shared memory. Backends inherit
+ * This points to the main array of LWLocks in shared memory. Backends inherit
* the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
* where we have special measures to pass it down).
*/
-NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
-
+LWLockPadded *MainLWLockArray = NULL;
+static LWLockTranche MainLWLockTranche;
/*
* We use this structure to keep track of locked LWLocks for release
@@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
#define MAX_SIMUL_LWLOCKS 100
static int num_held_lwlocks = 0;
-static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true;
#ifdef LWLOCK_STATS
+typedef struct lwlock_stats_key
+{
+ int tranche;
+ int instance;
+} lwlock_stats_key;
+
+typedef struct lwlock_stats
+{
+ lwlock_stats_key key;
+ int sh_acquire_count;
+ int ex_acquire_count;
+ int block_count;
+ int spin_delay_count;
+} lwlock_stats;
+
static int counts_for_pid = 0;
-static int *sh_acquire_counts;
-static int *ex_acquire_counts;
-static int *block_counts;
-static int *spin_delay_counts;
+static HTAB *lwlock_stats_htab;
#endif
#ifdef LOCK_DEBUG
bool Trace_lwlocks = false;
inline static void
-PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
+PRINT_LWDEBUG(const char *where, const volatile LWLock *lock)
{
if (Trace_lwlocks)
- elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
- where, (int) lockid,
+ elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+ where, T_NAME(lock), T_ID(lock),
(int) lock->exclusive, lock->shared, lock->head,
(int) lock->releaseOK);
}
inline static void
-LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
+LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg)
{
if (Trace_lwlocks)
- elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
+ elog(LOG, "%s(%s %d): %s", where, name, index, msg);
}
#else /* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b,c)
-#define LOG_LWDEBUG(a,b,c)
+#define PRINT_LWDEBUG(a,b)
+#define LOG_LWDEBUG(a,b,c,d)
#endif /* LOCK_DEBUG */
#ifdef LWLOCK_STATS
static void init_lwlock_stats(void);
static void print_lwlock_stats(int code, Datum arg);
+static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
static void
init_lwlock_stats(void)
{
- int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
- int numLocks = LWLockCounter[1];
+ HASHCTL ctl;
+
+ if (lwlock_stats_htab != NULL)
+ {
+ hash_destroy(lwlock_stats_htab);
+ lwlock_stats_htab = NULL;
+ }
- sh_acquire_counts = calloc(numLocks, sizeof(int));
- ex_acquire_counts = calloc(numLocks, sizeof(int));
- spin_delay_counts = calloc(numLocks, sizeof(int));
- block_counts = calloc(numLocks, sizeof(int));
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(lwlock_stats_key);
+ ctl.entrysize = sizeof(lwlock_stats);
+ ctl.hash = tag_hash;
+ lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
+ HASH_ELEM | HASH_FUNCTION);
counts_for_pid = MyProcPid;
on_shmem_exit(print_lwlock_stats, 0);
}
@@ -144,30 +151,58 @@ init_lwlock_stats(void)
static void
print_lwlock_stats(int code, Datum arg)
{
- int i;
- int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
- int numLocks = LWLockCounter[1];
+ HASH_SEQ_STATUS scan;
+ lwlock_stats *lwstats;
+
+ hash_seq_init(&scan, lwlock_stats_htab);
/* Grab an LWLock to keep different backends from mixing reports */
- LWLockAcquire(0, LW_EXCLUSIVE);
+ LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
- for (i = 0; i < numLocks; i++)
+ while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
{
- if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i])
- fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n",
- MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
- block_counts[i], spin_delay_counts[i]);
+ fprintf(stderr,
+ "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n",
+ MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
+ lwstats->key.instance, lwstats->sh_acquire_count,
+ lwstats->ex_acquire_count, lwstats->block_count,
+ lwstats->spin_delay_count);
}
- LWLockRelease(0);
+ LWLockRelease(&MainLWLockArray[0].lock);
+}
+
+static lwlock_stats *
+get_lwlock_stats_entry(LWLock *lock)
+{
+ lwlock_stats_key key;
+ lwlock_stats *lwstats;
+ bool found;
+
+ /* Set up local count state first time through in a given process */
+ if (counts_for_pid != MyProcPid)
+ init_lwlock_stats();
+
+ /* Fetch or create the entry. */
+ key.tranche = lock->tranche;
+ key.instance = T_ID(lock);
+ lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
+ if (!found)
+ {
+ lwstats->sh_acquire_count = 0;
+ lwstats->ex_acquire_count = 0;
+ lwstats->block_count = 0;
+ lwstats->spin_delay_count = 0;
+ }
+ return lwstats;
}
#endif /* LWLOCK_STATS */
/*
- * Compute number of LWLocks to allocate.
+ * Compute number of LWLocks to allocate in the main array.
*/
-int
+static int
NumLWLocks(void)
{
int numLocks;
@@ -180,7 +215,7 @@ NumLWLocks(void)
*/
/* Predefined LWLocks */
- numLocks = (int) NumFixedLWLocks;
+ numLocks = NUM_FIXED_LWLOCKS;
/* bufmgr.c needs two for each shared buffer */
numLocks += 2 * NBuffers;
@@ -248,56 +283,67 @@ LWLockShmemSize(void)
size = mul_size(numLocks, sizeof(LWLockPadded));
/* Space for dynamic allocation counter, plus room for alignment. */
- size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
+ size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE);
return size;
}
/*
- * Allocate shmem space for LWLocks and initialize the locks.
+ * Allocate shmem space for the main LWLock array and initialize it. We also
+ * register the main tranch here.
*/
void
CreateLWLocks(void)
{
- int numLocks = NumLWLocks();
- Size spaceLocks = LWLockShmemSize();
- LWLockPadded *lock;
- int *LWLockCounter;
- char *ptr;
- int id;
+ if (!IsUnderPostmaster)
+ {
+ int numLocks = NumLWLocks();
+ Size spaceLocks = LWLockShmemSize();
+ LWLockPadded *lock;
+ int *LWLockCounter;
+ char *ptr;
+ int id;
- /* Allocate space */
- ptr = (char *) ShmemAlloc(spaceLocks);
+ /* Allocate space */
+ ptr = (char *) ShmemAlloc(spaceLocks);
- /* Leave room for dynamic allocation counter */
- ptr += 2 * sizeof(int);
+ /* Leave room for dynamic allocation of locks and tranches */
+ ptr += 3 * sizeof(int);
- /* Ensure desired alignment of LWLock array */
- ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
+ /* Ensure desired alignment of LWLock array */
+ ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
- LWLockArray = (LWLockPadded *) ptr;
+ MainLWLockArray = (LWLockPadded *) ptr;
- /*
- * Initialize all LWLocks to "unlocked" state
- */
- for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
+ /* Initialize all LWLocks in main array */
+ for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++)
+ LWLockInitialize(&lock->lock, 0);
+
+ /*
+ * Initialize the dynamic-allocation counters, which are stored just
+ * before the first LWLock. LWLockCounter[0] is the allocation
+ * counter for lwlocks, LWLockCounter[1] is the maximum number that
+ * can be allocated from the main array, and LWLockCounter[2] is the
+ * allocation counter for tranches.
+ */
+ LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
+ LWLockCounter[0] = NUM_FIXED_LWLOCKS;
+ LWLockCounter[1] = numLocks;
+ LWLockCounter[2] = 1; /* 0 is the main array */
+ }
+
+ if (LWLockTrancheArray == NULL)
{
- SpinLockInit(&lock->lock.mutex);
- lock->lock.releaseOK = true;
- lock->lock.exclusive = 0;
- lock->lock.shared = 0;
- lock->lock.head = NULL;
- lock->lock.tail = NULL;
+ LWLockTranchesAllocated = 16;
+ LWLockTrancheArray = MemoryContextAlloc(TopMemoryContext,
+ LWLockTranchesAllocated * sizeof(LWLockTranche *));
}
- /*
- * Initialize the dynamic-allocation counter, which is stored just before
- * the first LWLock.
- */
- LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
- LWLockCounter[0] = (int) NumFixedLWLocks;
- LWLockCounter[1] = numLocks;
+ MainLWLockTranche.name = "main";
+ MainLWLockTranche.array_base = MainLWLockArray;
+ MainLWLockTranche.array_stride = sizeof(LWLockPadded);
+ LWLockRegisterTranche(0, &MainLWLockTranche);
}
@@ -309,26 +355,86 @@ CreateLWLocks(void)
* startup, but it is needed if any user-defined code tries to allocate
* LWLocks after startup.
*/
-LWLockId
+LWLock *
LWLockAssign(void)
{
- LWLockId result;
+ LWLock *result;
/* use volatile pointer to prevent code rearrangement */
volatile int *LWLockCounter;
- LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
+ LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
SpinLockAcquire(ShmemLock);
if (LWLockCounter[0] >= LWLockCounter[1])
{
SpinLockRelease(ShmemLock);
- elog(ERROR, "no more LWLockIds available");
+ elog(ERROR, "no more LWLocks available");
}
- result = (LWLockId) (LWLockCounter[0]++);
+ result = &MainLWLockArray[LWLockCounter[0]++].lock;
SpinLockRelease(ShmemLock);
return result;
}
+/*
+ * Allocate a new tranche ID.
+ */
+int
+LWLockNewTrancheId(void)
+{
+ int result;
+
+ /* use volatile pointer to prevent code rearrangement */
+ volatile int *LWLockCounter;
+
+ LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
+ SpinLockAcquire(ShmemLock);
+ result = LWLockCounter[2]++;
+ SpinLockRelease(ShmemLock);
+
+ return result;
+}
+
+/*
+ * Register a tranche ID in the lookup table for the current process. This
+ * routine will save a pointer to the tranche object passed as an argument,
+ * so that object should be allocated in a backend-lifetime context
+ * (TopMemoryContext, static variable, or similar).
+ */
+void
+LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
+{
+ Assert(LWLockTrancheArray != NULL);
+
+ if (tranche_id >= LWLockTranchesAllocated)
+ {
+ int i = LWLockTranchesAllocated;
+
+ while (i < tranche_id)
+ i *= 2;
+
+ LWLockTrancheArray = repalloc(LWLockTrancheArray,
+ i * sizeof(LWLockTranche *));
+ LWLockTranchesAllocated = i;
+ }
+
+ LWLockTrancheArray[tranche_id] = tranche;
+}
+
+/*
+ * LWLockInitialize - initialize a new lwlock; it's initially unlocked
+ */
+void
+LWLockInitialize(LWLock *lock, int tranche_id)
+{
+ SpinLockInit(&lock->mutex);
+ lock->releaseOK = true;
+ lock->exclusive = 0;
+ lock->shared = 0;
+ lock->tranche = tranche_id;
+ lock->head = NULL;
+ lock->tail = NULL;
+}
+
/*
* LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -338,24 +444,26 @@ LWLockAssign(void)
* Side effect: cancel/die interrupts are held off until lock release.
*/
void
-LWLockAcquire(LWLockId lockid, LWLockMode mode)
+LWLockAcquire(LWLock *l, LWLockMode mode)
{
- volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ volatile LWLock *lock = l;
PGPROC *proc = MyProc;
bool retry = false;
int extraWaits = 0;
+#ifdef LWLOCK_STATS
+ lwlock_stats *lwstats;
+#endif
- PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
+ PRINT_LWDEBUG("LWLockAcquire", lock);
#ifdef LWLOCK_STATS
- /* Set up local count state first time through in a given process */
- if (counts_for_pid != MyProcPid)
- init_lwlock_stats();
+ lwstats = get_lwlock_stats_entry(l);
+
/* Count lock acquisition attempts */
if (mode == LW_EXCLUSIVE)
- ex_acquire_counts[lockid]++;
+ lwstats->ex_acquire_count++;
else
- sh_acquire_counts[lockid]++;
+ lwstats->sh_acquire_count++;
#endif /* LWLOCK_STATS */
/*
@@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
/* Acquire mutex. Time spent holding mutex should be short! */
#ifdef LWLOCK_STATS
- spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
+ lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
@@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
* so that the lock manager or signal manager will see the received
* signal when it next waits.
*/
- LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
+ LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "waiting");
#ifdef LWLOCK_STATS
- block_counts[lockid]++;
+ lwstats->block_count++;
#endif
- TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+ TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
for (;;)
{
@@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
extraWaits++;
}
- TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+ TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
- LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
+ LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "awakened");
/* Now loop back and try to acquire lock again. */
retry = true;
@@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
- TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
+ TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
/* Add lock to list of locks held by this backend */
- held_lwlocks[num_held_lwlocks++] = lockid;
+ held_lwlocks[num_held_lwlocks++] = l;
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
@@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
* If successful, cancel/die interrupts are held off until lock release.
*/
bool
-LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
+LWLockConditionalAcquire(LWLock *l, LWLockMode mode)
{
- volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ volatile LWLock *lock = l;
bool mustwait;
- PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
+ PRINT_LWDEBUG("LWLockConditionalAcquire", lock);
/* Ensure we will have room to remember the lock */
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
{
/* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS();
- LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
- TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
+ LOG_LWDEBUG("LWLockConditionalAcquire", T_NAME(l), T_ID(l), "failed");
+ TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(l), T_ID(l), mode);
}
else
{
/* Add lock to list of locks held by this backend */
- held_lwlocks[num_held_lwlocks++] = lockid;
- TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
+ held_lwlocks[num_held_lwlocks++] = l;
+ TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(l), T_ID(l), mode);
}
return !mustwait;
@@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
* wake up, observe that their records have already been flushed, and return.
*/
bool
-LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
+LWLockAcquireOrWait(LWLock *l, LWLockMode mode)
{
- volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ volatile LWLock *lock = l;
PGPROC *proc = MyProc;
bool mustwait;
int extraWaits = 0;
+#ifdef LWLOCK_STATS
+ lwlock_stats *lwstats;
+#endif
- PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
+ PRINT_LWDEBUG("LWLockAcquireOrWait", lock);
#ifdef LWLOCK_STATS
- /* Set up local count state first time through in a given process */
- if (counts_for_pid != MyProcPid)
- init_lwlock_stats();
+ lwstats = get_lwlock_stats_entry(l);
#endif
/* Ensure we will have room to remember the lock */
@@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
* Wait until awakened. Like in LWLockAcquire, be prepared for bogus
* wakups, because we share the semaphore with ProcWaitForSignal.
*/
- LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
+ LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "waiting");
#ifdef LWLOCK_STATS
- block_counts[lockid]++;
+ lwstats->block_count++;
#endif
- TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+ TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
for (;;)
{
@@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
extraWaits++;
}
- TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+ TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
- LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
+ LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "awakened");
}
else
{
@@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
{
/* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS();
- LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
- TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+ LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "failed");
+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(T_NAME(l), T_ID(l), mode);
}
else
{
/* Add lock to list of locks held by this backend */
- held_lwlocks[num_held_lwlocks++] = lockid;
- TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+ held_lwlocks[num_held_lwlocks++] = l;
+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(l), T_ID(l), mode);
}
return !mustwait;
@@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
* LWLockRelease - release a previously acquired lock
*/
void
-LWLockRelease(LWLockId lockid)
+LWLockRelease(LWLock *l)
{
- volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ volatile LWLock *lock = l;
PGPROC *head;
PGPROC *proc;
int i;
- PRINT_LWDEBUG("LWLockRelease", lockid, lock);
+ PRINT_LWDEBUG("LWLockRelease", lock);
/*
* Remove lock from list of locks held. Usually, but not always, it will
@@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid)
*/
for (i = num_held_lwlocks; --i >= 0;)
{
- if (lockid == held_lwlocks[i])
+ if (l == held_lwlocks[i])
break;
}
if (i < 0)
- elog(ERROR, "lock %d is not held", (int) lockid);
+ elog(ERROR, "lock %s %d is not held", T_NAME(l), T_ID(l));
num_held_lwlocks--;
for (; i < num_held_lwlocks; i++)
held_lwlocks[i] = held_lwlocks[i + 1];
@@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid)
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
- TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
+ TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(l), T_ID(l));
/*
* Awaken any waiters I removed from the queue.
*/
while (head != NULL)
{
- LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
+ LOG_LWDEBUG("LWLockRelease", T_NAME(l), T_ID(l), "release waiter");
proc = head;
head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
@@ -874,13 +983,13 @@ LWLockReleaseAll(void)
* lock is held shared or exclusive.
*/
bool
-LWLockHeldByMe(LWLockId lockid)
+LWLockHeldByMe(LWLock *l)
{
int i;
for (i = 0; i < num_held_lwlocks; i++)
{
- if (held_lwlocks[i] == lockid)
+ if (held_lwlocks[i] == l)
return true;
}
return false;
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index e7f44cce841..67000720275 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -241,7 +241,10 @@
#define PredicateLockHashPartition(hashcode) \
((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
#define PredicateLockHashPartitionLock(hashcode) \
- ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
+ (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
+ PredicateLockHashPartition(hashcode)].lock)
+#define PredicateLockHashPartitionLockByIndex(i) \
+ (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
#define NPREDICATELOCKTARGETENTS() \
mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
@@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions;
*/
static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0};
static uint32 ScratchTargetTagHash;
-static int ScratchPartitionLock;
+static LWLock *ScratchPartitionLock;
/*
* The local hash table used to determine when to combine multiple fine-
@@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void)
* in ascending order, then SerializableXactHashLock.
*/
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
- LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
/* Get number of locks and allocate appropriately-sized arrays. */
@@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void)
/* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
- LWLockRelease(FirstPredicateLockMgrLock + i);
+ LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
return data;
}
@@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno)
{
PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash;
- LWLockId partitionLock;
+ LWLock *partitionLock;
PREDICATELOCKTARGET *target;
SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
@@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
{
uint32 oldtargettaghash;
- LWLockId partitionLock;
+ LWLock *partitionLock;
PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
@@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
PREDICATELOCKTARGET *target;
PREDICATELOCKTAG locktag;
PREDICATELOCK *lock;
- LWLockId partitionLock;
+ LWLock *partitionLock;
bool found;
partitionLock = PredicateLockHashPartitionLock(targettaghash);
@@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
bool removeOld)
{
uint32 oldtargettaghash;
- LWLockId oldpartitionLock;
+ LWLock *oldpartitionLock;
PREDICATELOCKTARGET *oldtarget;
uint32 newtargettaghash;
- LWLockId newpartitionLock;
+ LWLock *newpartitionLock;
bool found;
bool outOfShmem = false;
@@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
/* Acquire locks on all lock partitions */
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
- LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
+ LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
/*
@@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
/* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
- LWLockRelease(FirstPredicateLockMgrLock + i);
+ LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
LWLockRelease(SerializablePredicateLockListLock);
}
@@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void)
PREDICATELOCKTARGET *target;
PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash;
- LWLockId partitionLock;
+ LWLock *partitionLock;
tag = predlock->tag;
target = tag.myTarget;
@@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
PREDICATELOCKTARGET *target;
PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash;
- LWLockId partitionLock;
+ LWLock *partitionLock;
nextpredlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
@@ -4068,7 +4071,7 @@ static void
CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
{
uint32 targettaghash;
- LWLockId partitionLock;
+ LWLock *partitionLock;
PREDICATELOCKTARGET *target;
PREDICATELOCK *predlock;
PREDICATELOCK *mypredlock = NULL;
@@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation)
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
- LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
/* Scan through target list */
@@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation)
/* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
- LWLockRelease(FirstPredicateLockMgrLock + i);
+ LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
LWLockRelease(SerializablePredicateLockListLock);
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index ee6c24cea7d..1a683b83361 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -189,7 +189,8 @@ InitProcGlobal(void)
*/
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
ProcGlobal->allProcs = procs;
- ProcGlobal->allProcCount = TotalProcs;
+ /* XXX allProcCount isn't really all of them; it excludes prepared xacts */
+ ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -663,7 +664,7 @@ IsWaitingForLock(void)
void
LockErrorCleanup(void)
{
- LWLockId partitionLock;
+ LWLock *partitionLock;
DisableTimeoutParams timeouts[2];
AbortStrongLockAcquire();
@@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
LOCK *lock = locallock->lock;
PROCLOCK *proclock = locallock->proclock;
uint32 hashcode = locallock->hashcode;
- LWLockId partitionLock = LockHashPartitionLock(hashcode);
+ LWLock *partitionLock = LockHashPartitionLock(hashcode);
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMASK myHeldLocks = MyProc->heldLocks;
bool early_deadlock = false;
@@ -1440,7 +1441,7 @@ CheckDeadLock(void)
* interrupts.
*/
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE);
+ LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
/*
* Check to see if we've been awoken by anyone in the interim.
@@ -1522,7 +1523,7 @@ CheckDeadLock(void)
*/
check_done:
for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
- LWLockRelease(FirstLockMgrLock + i);
+ LWLockRelease(LockHashPartitionLockByIndex(i));
}