diff options
author | Robert Haas <rhaas@postgresql.org> | 2014-01-27 11:07:44 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2014-01-27 11:07:44 -0500 |
commit | ea9df812d8502fff74e7bc37d61bdc7d66d77a7f (patch) | |
tree | 7e138cbe713ccbf24c3be5603bcc84cae1f3079e /src/backend/storage | |
parent | f62eba204f367acbfea7e63991524bf981b307f8 (diff) | |
download | postgresql-ea9df812d8502fff74e7bc37d61bdc7d66d77a7f.tar.gz postgresql-ea9df812d8502fff74e7bc37d61bdc7d66d77a7f.zip |
Relax the requirement that all lwlocks be stored in a single array.
This makes it possible to store lwlocks as part of some other data
structure in the main shared memory segment, or in a dynamic shared
memory segment. There is still a main LWLock array and this patch does
not move anything out of it, but it provides necessary infrastructure
for doing that in the future.
This change is likely to increase the size of LWLockPadded on some
platforms, especially 32-bit platforms where it was previously only
16 bytes.
Patch by me. Review by Andres Freund and KaiGai Kohei.
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 8 | ||||
-rw-r--r-- | src/backend/storage/ipc/ipci.c | 3 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 34 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 401 | ||||
-rw-r--r-- | src/backend/storage/lmgr/predicate.c | 35 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 11 |
6 files changed, 304 insertions, 188 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 91f0c7eb36e..19eecab4c28 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ - LWLockId newPartitionLock; /* buffer partition lock for it */ + LWLock *newPartitionLock; /* buffer partition lock for it */ int buf_id; /* create a tag so we can lookup the buffer */ @@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ - LWLockId newPartitionLock; /* buffer partition lock for it */ + LWLock *newPartitionLock; /* buffer partition lock for it */ BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ - LWLockId oldPartitionLock; /* buffer partition lock for it */ + LWLock *oldPartitionLock; /* buffer partition lock for it */ BufFlags oldFlags; int buf_id; volatile BufferDesc *buf; @@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf) { BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ - LWLockId oldPartitionLock; /* buffer partition lock for it */ + LWLock *oldPartitionLock; /* buffer partition lock for it */ BufFlags oldFlags; /* Save the original buffer tag before dropping the spinlock */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cc219237097..2e717457b12 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -182,8 +182,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) * Now initialize LWLocks, which do shared memory allocation and are * needed for InitShmemIndex. */ - if (!IsUnderPostmaster) - CreateLWLocks(); + CreateLWLocks(); /* * Set up shmem.c index hashtable diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 5c8b4b0656c..6335129ac25 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; - LWLockId partitionLock; + LWLock *partitionLock; bool hasWaiters = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) @@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag, bool found; ResourceOwner owner; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; int status; bool log_lock = false; @@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; - LWLockId partitionLock; + LWLock *partitionLock; bool wakeupNeeded; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) @@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { - LWLockId partitionLock = FirstLockMgrLock + partition; + LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; + partitionLock = LockHashPartitionLockByIndex(partition); + /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is trickier than @@ -2475,7 +2477,7 @@ static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode) { - LWLockId partitionLock = LockHashPartitionLock(hashcode); + LWLock *partitionLock = LockHashPartitionLock(hashcode); Oid relid = locktag->locktag_field2; uint32 i; @@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; LOCKTAG *locktag = &locallock->tag.lock; PROCLOCK *proclock = NULL; - LWLockId partitionLock = LockHashPartitionLock(locallock->hashcode); + LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode); Oid relid = locktag->locktag_field2; uint32 f; @@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode) SHM_QUEUE *procLocks; PROCLOCK *proclock; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; int count = 0; int fast_count = 0; @@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, PROCLOCKTAG proclocktag; uint32 hashcode; uint32 proclock_hashcode; - LWLockId partitionLock; + LWLock *partitionLock; bool wakeupNeeded; hashcode = LockTagHashCode(locktag); @@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid) */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { - LWLockId partitionLock = FirstLockMgrLock + partition; + LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; + partitionLock = LockHashPartitionLockByIndex(partition); + /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is safer than the @@ -3400,7 +3404,7 @@ GetLockStatusData(void) * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ data->nelements = el + hash_get_num_entries(LockMethodProcLockHash); @@ -3442,7 +3446,7 @@ GetLockStatusData(void) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); Assert(el == data->nelements); @@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks) * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ els = hash_get_num_entries(LockMethodProcLockHash); @@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); *nlocks = index; return accessExclusiveLocks; @@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, uint32 hashcode; uint32 proclock_hashcode; int partition; - LWLockId partitionLock; + LWLock *partitionLock; LockMethod lockMethodTable; Assert(len == sizeof(TwoPhaseLockRecord)); @@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) { PROCLOCK *proclock; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; hashcode = LockTagHashCode(&tag); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 0e319a7e6ac..55d9d7837ca 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -31,50 +31,37 @@ #include "storage/predicate.h" #include "storage/proc.h" #include "storage/spin.h" +#include "utils/memutils.h" + +#ifdef LWLOCK_STATS +#include "utils/hsearch.h" +#endif /* We use the ShmemLock spinlock to protect LWLockAssign */ extern slock_t *ShmemLock; - -typedef struct LWLock -{ - slock_t mutex; /* Protects LWLock and queue of PGPROCs */ - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ - /* tail is undefined when head is NULL */ -} LWLock; - /* - * All the LWLock structs are allocated as an array in shared memory. - * (LWLockIds are indexes into the array.) We force the array stride to - * be a power of 2, which saves a few cycles in indexing, but more - * importantly also ensures that individual LWLocks don't cross cache line - * boundaries. This reduces cache contention problems, especially on AMD - * Opterons. (Of course, we have to also ensure that the array start - * address is suitably aligned.) - * - * LWLock is between 16 and 32 bytes on all known platforms, so these two - * cases are sufficient. + * This is indexed by tranche ID and stores metadata for all tranches known + * to the current backend. */ -#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32) +static LWLockTranche **LWLockTrancheArray = NULL; +static int LWLockTranchesAllocated = 0; -typedef union LWLockPadded -{ - LWLock lock; - char pad[LWLOCK_PADDED_SIZE]; -} LWLockPadded; +#define T_NAME(lock) \ + (LWLockTrancheArray[(lock)->tranche]->name) +#define T_ID(lock) \ + ((int) ((((char *) lock) - \ + ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \ + LWLockTrancheArray[(lock)->tranche]->array_stride)) /* - * This points to the array of LWLocks in shared memory. Backends inherit + * This points to the main array of LWLocks in shared memory. Backends inherit * the pointer by fork from the postmaster (except in the EXEC_BACKEND case, * where we have special measures to pass it down). */ -NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; - +LWLockPadded *MainLWLockArray = NULL; +static LWLockTranche MainLWLockTranche; /* * We use this structure to keep track of locked LWLocks for release @@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; #define MAX_SIMUL_LWLOCKS 100 static int num_held_lwlocks = 0; -static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; +static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; #ifdef LWLOCK_STATS +typedef struct lwlock_stats_key +{ + int tranche; + int instance; +} lwlock_stats_key; + +typedef struct lwlock_stats +{ + lwlock_stats_key key; + int sh_acquire_count; + int ex_acquire_count; + int block_count; + int spin_delay_count; +} lwlock_stats; + static int counts_for_pid = 0; -static int *sh_acquire_counts; -static int *ex_acquire_counts; -static int *block_counts; -static int *spin_delay_counts; +static HTAB *lwlock_stats_htab; #endif #ifdef LOCK_DEBUG bool Trace_lwlocks = false; inline static void -PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) +PRINT_LWDEBUG(const char *where, const volatile LWLock *lock) { if (Trace_lwlocks) - elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d", - where, (int) lockid, + elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d", + where, T_NAME(lock), T_ID(lock), (int) lock->exclusive, lock->shared, lock->head, (int) lock->releaseOK); } inline static void -LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg) +LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg) { if (Trace_lwlocks) - elog(LOG, "%s(%d): %s", where, (int) lockid, msg); + elog(LOG, "%s(%s %d): %s", where, name, index, msg); } #else /* not LOCK_DEBUG */ -#define PRINT_LWDEBUG(a,b,c) -#define LOG_LWDEBUG(a,b,c) +#define PRINT_LWDEBUG(a,b) +#define LOG_LWDEBUG(a,b,c,d) #endif /* LOCK_DEBUG */ #ifdef LWLOCK_STATS static void init_lwlock_stats(void); static void print_lwlock_stats(int code, Datum arg); +static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid); static void init_lwlock_stats(void) { - int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - int numLocks = LWLockCounter[1]; + HASHCTL ctl; + + if (lwlock_stats_htab != NULL) + { + hash_destroy(lwlock_stats_htab); + lwlock_stats_htab = NULL; + } - sh_acquire_counts = calloc(numLocks, sizeof(int)); - ex_acquire_counts = calloc(numLocks, sizeof(int)); - spin_delay_counts = calloc(numLocks, sizeof(int)); - block_counts = calloc(numLocks, sizeof(int)); + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(lwlock_stats_key); + ctl.entrysize = sizeof(lwlock_stats); + ctl.hash = tag_hash; + lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl, + HASH_ELEM | HASH_FUNCTION); counts_for_pid = MyProcPid; on_shmem_exit(print_lwlock_stats, 0); } @@ -144,30 +151,58 @@ init_lwlock_stats(void) static void print_lwlock_stats(int code, Datum arg) { - int i; - int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - int numLocks = LWLockCounter[1]; + HASH_SEQ_STATUS scan; + lwlock_stats *lwstats; + + hash_seq_init(&scan, lwlock_stats_htab); /* Grab an LWLock to keep different backends from mixing reports */ - LWLockAcquire(0, LW_EXCLUSIVE); + LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE); - for (i = 0; i < numLocks; i++) + while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) { - if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i]) - fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n", - MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i], - block_counts[i], spin_delay_counts[i]); + fprintf(stderr, + "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n", + MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name, + lwstats->key.instance, lwstats->sh_acquire_count, + lwstats->ex_acquire_count, lwstats->block_count, + lwstats->spin_delay_count); } - LWLockRelease(0); + LWLockRelease(&MainLWLockArray[0].lock); +} + +static lwlock_stats * +get_lwlock_stats_entry(LWLock *lock) +{ + lwlock_stats_key key; + lwlock_stats *lwstats; + bool found; + + /* Set up local count state first time through in a given process */ + if (counts_for_pid != MyProcPid) + init_lwlock_stats(); + + /* Fetch or create the entry. */ + key.tranche = lock->tranche; + key.instance = T_ID(lock); + lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found); + if (!found) + { + lwstats->sh_acquire_count = 0; + lwstats->ex_acquire_count = 0; + lwstats->block_count = 0; + lwstats->spin_delay_count = 0; + } + return lwstats; } #endif /* LWLOCK_STATS */ /* - * Compute number of LWLocks to allocate. + * Compute number of LWLocks to allocate in the main array. */ -int +static int NumLWLocks(void) { int numLocks; @@ -180,7 +215,7 @@ NumLWLocks(void) */ /* Predefined LWLocks */ - numLocks = (int) NumFixedLWLocks; + numLocks = NUM_FIXED_LWLOCKS; /* bufmgr.c needs two for each shared buffer */ numLocks += 2 * NBuffers; @@ -248,56 +283,67 @@ LWLockShmemSize(void) size = mul_size(numLocks, sizeof(LWLockPadded)); /* Space for dynamic allocation counter, plus room for alignment. */ - size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE); + size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE); return size; } /* - * Allocate shmem space for LWLocks and initialize the locks. + * Allocate shmem space for the main LWLock array and initialize it. We also + * register the main tranch here. */ void CreateLWLocks(void) { - int numLocks = NumLWLocks(); - Size spaceLocks = LWLockShmemSize(); - LWLockPadded *lock; - int *LWLockCounter; - char *ptr; - int id; + if (!IsUnderPostmaster) + { + int numLocks = NumLWLocks(); + Size spaceLocks = LWLockShmemSize(); + LWLockPadded *lock; + int *LWLockCounter; + char *ptr; + int id; - /* Allocate space */ - ptr = (char *) ShmemAlloc(spaceLocks); + /* Allocate space */ + ptr = (char *) ShmemAlloc(spaceLocks); - /* Leave room for dynamic allocation counter */ - ptr += 2 * sizeof(int); + /* Leave room for dynamic allocation of locks and tranches */ + ptr += 3 * sizeof(int); - /* Ensure desired alignment of LWLock array */ - ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; + /* Ensure desired alignment of LWLock array */ + ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; - LWLockArray = (LWLockPadded *) ptr; + MainLWLockArray = (LWLockPadded *) ptr; - /* - * Initialize all LWLocks to "unlocked" state - */ - for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) + /* Initialize all LWLocks in main array */ + for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++) + LWLockInitialize(&lock->lock, 0); + + /* + * Initialize the dynamic-allocation counters, which are stored just + * before the first LWLock. LWLockCounter[0] is the allocation + * counter for lwlocks, LWLockCounter[1] is the maximum number that + * can be allocated from the main array, and LWLockCounter[2] is the + * allocation counter for tranches. + */ + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); + LWLockCounter[0] = NUM_FIXED_LWLOCKS; + LWLockCounter[1] = numLocks; + LWLockCounter[2] = 1; /* 0 is the main array */ + } + + if (LWLockTrancheArray == NULL) { - SpinLockInit(&lock->lock.mutex); - lock->lock.releaseOK = true; - lock->lock.exclusive = 0; - lock->lock.shared = 0; - lock->lock.head = NULL; - lock->lock.tail = NULL; + LWLockTranchesAllocated = 16; + LWLockTrancheArray = MemoryContextAlloc(TopMemoryContext, + LWLockTranchesAllocated * sizeof(LWLockTranche *)); } - /* - * Initialize the dynamic-allocation counter, which is stored just before - * the first LWLock. - */ - LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - LWLockCounter[0] = (int) NumFixedLWLocks; - LWLockCounter[1] = numLocks; + MainLWLockTranche.name = "main"; + MainLWLockTranche.array_base = MainLWLockArray; + MainLWLockTranche.array_stride = sizeof(LWLockPadded); + LWLockRegisterTranche(0, &MainLWLockTranche); } @@ -309,26 +355,86 @@ CreateLWLocks(void) * startup, but it is needed if any user-defined code tries to allocate * LWLocks after startup. */ -LWLockId +LWLock * LWLockAssign(void) { - LWLockId result; + LWLock *result; /* use volatile pointer to prevent code rearrangement */ volatile int *LWLockCounter; - LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); SpinLockAcquire(ShmemLock); if (LWLockCounter[0] >= LWLockCounter[1]) { SpinLockRelease(ShmemLock); - elog(ERROR, "no more LWLockIds available"); + elog(ERROR, "no more LWLocks available"); } - result = (LWLockId) (LWLockCounter[0]++); + result = &MainLWLockArray[LWLockCounter[0]++].lock; SpinLockRelease(ShmemLock); return result; } +/* + * Allocate a new tranche ID. + */ +int +LWLockNewTrancheId(void) +{ + int result; + + /* use volatile pointer to prevent code rearrangement */ + volatile int *LWLockCounter; + + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); + SpinLockAcquire(ShmemLock); + result = LWLockCounter[2]++; + SpinLockRelease(ShmemLock); + + return result; +} + +/* + * Register a tranche ID in the lookup table for the current process. This + * routine will save a pointer to the tranche object passed as an argument, + * so that object should be allocated in a backend-lifetime context + * (TopMemoryContext, static variable, or similar). + */ +void +LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche) +{ + Assert(LWLockTrancheArray != NULL); + + if (tranche_id >= LWLockTranchesAllocated) + { + int i = LWLockTranchesAllocated; + + while (i < tranche_id) + i *= 2; + + LWLockTrancheArray = repalloc(LWLockTrancheArray, + i * sizeof(LWLockTranche *)); + LWLockTranchesAllocated = i; + } + + LWLockTrancheArray[tranche_id] = tranche; +} + +/* + * LWLockInitialize - initialize a new lwlock; it's initially unlocked + */ +void +LWLockInitialize(LWLock *lock, int tranche_id) +{ + SpinLockInit(&lock->mutex); + lock->releaseOK = true; + lock->exclusive = 0; + lock->shared = 0; + lock->tranche = tranche_id; + lock->head = NULL; + lock->tail = NULL; +} + /* * LWLockAcquire - acquire a lightweight lock in the specified mode @@ -338,24 +444,26 @@ LWLockAssign(void) * Side effect: cancel/die interrupts are held off until lock release. */ void -LWLockAcquire(LWLockId lockid, LWLockMode mode) +LWLockAcquire(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *proc = MyProc; bool retry = false; int extraWaits = 0; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; +#endif - PRINT_LWDEBUG("LWLockAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockAcquire", lock); #ifdef LWLOCK_STATS - /* Set up local count state first time through in a given process */ - if (counts_for_pid != MyProcPid) - init_lwlock_stats(); + lwstats = get_lwlock_stats_entry(l); + /* Count lock acquisition attempts */ if (mode == LW_EXCLUSIVE) - ex_acquire_counts[lockid]++; + lwstats->ex_acquire_count++; else - sh_acquire_counts[lockid]++; + lwstats->sh_acquire_count++; #endif /* LWLOCK_STATS */ /* @@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) /* Acquire mutex. Time spent holding mutex should be short! */ #ifdef LWLOCK_STATS - spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex); + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); #else SpinLockAcquire(&lock->mutex); #endif @@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockAcquire", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + lwstats->block_count++; #endif - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); for (;;) { @@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) extraWaits++; } - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); - LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "awakened"); /* Now loop back and try to acquire lock again. */ retry = true; @@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks++] = l; /* * Fix the process wait semaphore's count for any absorbed wakeups. @@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * If successful, cancel/die interrupts are held off until lock release. */ bool -LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) +LWLockConditionalAcquire(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; bool mustwait; - PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockConditionalAcquire", lock); /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) @@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); - TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); + LOG_LWDEBUG("LWLockConditionalAcquire", T_NAME(l), T_ID(l), "failed"); + TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(l), T_ID(l), mode); } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; - TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); + held_lwlocks[num_held_lwlocks++] = l; + TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(l), T_ID(l), mode); } return !mustwait; @@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) * wake up, observe that their records have already been flushed, and return. */ bool -LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) +LWLockAcquireOrWait(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *proc = MyProc; bool mustwait; int extraWaits = 0; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; +#endif - PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock); + PRINT_LWDEBUG("LWLockAcquireOrWait", lock); #ifdef LWLOCK_STATS - /* Set up local count state first time through in a given process */ - if (counts_for_pid != MyProcPid) - init_lwlock_stats(); + lwstats = get_lwlock_stats_entry(l); #endif /* Ensure we will have room to remember the lock */ @@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) * Wait until awakened. Like in LWLockAcquire, be prepared for bogus * wakups, because we share the semaphore with ProcWaitForSignal. */ - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + lwstats->block_count++; #endif - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); for (;;) { @@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) extraWaits++; } - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "awakened"); } else { @@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed"); - TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "failed"); + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(T_NAME(l), T_ID(l), mode); } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; - TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); + held_lwlocks[num_held_lwlocks++] = l; + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(l), T_ID(l), mode); } return !mustwait; @@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) * LWLockRelease - release a previously acquired lock */ void -LWLockRelease(LWLockId lockid) +LWLockRelease(LWLock *l) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *head; PGPROC *proc; int i; - PRINT_LWDEBUG("LWLockRelease", lockid, lock); + PRINT_LWDEBUG("LWLockRelease", lock); /* * Remove lock from list of locks held. Usually, but not always, it will @@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid) */ for (i = num_held_lwlocks; --i >= 0;) { - if (lockid == held_lwlocks[i]) + if (l == held_lwlocks[i]) break; } if (i < 0) - elog(ERROR, "lock %d is not held", (int) lockid); + elog(ERROR, "lock %s %d is not held", T_NAME(l), T_ID(l)); num_held_lwlocks--; for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; @@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); + TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(l), T_ID(l)); /* * Awaken any waiters I removed from the queue. */ while (head != NULL) { - LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); + LOG_LWDEBUG("LWLockRelease", T_NAME(l), T_ID(l), "release waiter"); proc = head; head = proc->lwWaitLink; proc->lwWaitLink = NULL; @@ -874,13 +983,13 @@ LWLockReleaseAll(void) * lock is held shared or exclusive. */ bool -LWLockHeldByMe(LWLockId lockid) +LWLockHeldByMe(LWLock *l) { int i; for (i = 0; i < num_held_lwlocks; i++) { - if (held_lwlocks[i] == lockid) + if (held_lwlocks[i] == l) return true; } return false; diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e7f44cce841..67000720275 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -241,7 +241,10 @@ #define PredicateLockHashPartition(hashcode) \ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS) #define PredicateLockHashPartitionLock(hashcode) \ - ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode))) + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \ + PredicateLockHashPartition(hashcode)].lock) +#define PredicateLockHashPartitionLockByIndex(i) \ + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock) #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) @@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions; */ static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0}; static uint32 ScratchTargetTagHash; -static int ScratchPartitionLock; +static LWLock *ScratchPartitionLock; /* * The local hash table used to determine when to combine multiple fine- @@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void) * in ascending order, then SerializableXactHashLock. */ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Get number of locks and allocate appropriately-sized arrays. */ @@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); return data; } @@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno) { PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; SET_PREDICATELOCKTARGETTAG_PAGE(targettag, @@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) if (TargetTagIsCoveredBy(oldtargettag, *newtargettag)) { uint32 oldtargettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY; oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); @@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, PREDICATELOCKTARGET *target; PREDICATELOCKTAG locktag; PREDICATELOCK *lock; - LWLockId partitionLock; + LWLock *partitionLock; bool found; partitionLock = PredicateLockHashPartitionLock(targettaghash); @@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool removeOld) { uint32 oldtargettaghash; - LWLockId oldpartitionLock; + LWLock *oldpartitionLock; PREDICATELOCKTARGET *oldtarget; uint32 newtargettaghash; - LWLockId newpartitionLock; + LWLock *newpartitionLock; bool found; bool outOfShmem = false; @@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Acquire locks on all lock partitions */ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); /* @@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } @@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void) PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; tag = predlock->tag; target = tag.myTarget; @@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; nextpredlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), @@ -4068,7 +4071,7 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; @@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation) LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Scan through target list */ @@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index ee6c24cea7d..1a683b83361 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -189,7 +189,8 @@ InitProcGlobal(void) */ procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC)); ProcGlobal->allProcs = procs; - ProcGlobal->allProcCount = TotalProcs; + /* XXX allProcCount isn't really all of them; it excludes prepared xacts */ + ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS; if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -663,7 +664,7 @@ IsWaitingForLock(void) void LockErrorCleanup(void) { - LWLockId partitionLock; + LWLock *partitionLock; DisableTimeoutParams timeouts[2]; AbortStrongLockAcquire(); @@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LOCK *lock = locallock->lock; PROCLOCK *proclock = locallock->proclock; uint32 hashcode = locallock->hashcode; - LWLockId partitionLock = LockHashPartitionLock(hashcode); + LWLock *partitionLock = LockHashPartitionLock(hashcode); PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMASK myHeldLocks = MyProc->heldLocks; bool early_deadlock = false; @@ -1440,7 +1441,7 @@ CheckDeadLock(void) * interrupts. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE); /* * Check to see if we've been awoken by anyone in the interim. @@ -1522,7 +1523,7 @@ CheckDeadLock(void) */ check_done: for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); } |