diff options
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 8 | ||||
-rw-r--r-- | src/backend/storage/ipc/ipci.c | 3 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 34 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 401 | ||||
-rw-r--r-- | src/backend/storage/lmgr/predicate.c | 35 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 11 |
6 files changed, 304 insertions, 188 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 91f0c7eb36e..19eecab4c28 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ - LWLockId newPartitionLock; /* buffer partition lock for it */ + LWLock *newPartitionLock; /* buffer partition lock for it */ int buf_id; /* create a tag so we can lookup the buffer */ @@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ - LWLockId newPartitionLock; /* buffer partition lock for it */ + LWLock *newPartitionLock; /* buffer partition lock for it */ BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ - LWLockId oldPartitionLock; /* buffer partition lock for it */ + LWLock *oldPartitionLock; /* buffer partition lock for it */ BufFlags oldFlags; int buf_id; volatile BufferDesc *buf; @@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf) { BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ - LWLockId oldPartitionLock; /* buffer partition lock for it */ + LWLock *oldPartitionLock; /* buffer partition lock for it */ BufFlags oldFlags; /* Save the original buffer tag before dropping the spinlock */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cc219237097..2e717457b12 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -182,8 +182,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) * Now initialize LWLocks, which do shared memory allocation and are * needed for InitShmemIndex. */ - if (!IsUnderPostmaster) - CreateLWLocks(); + CreateLWLocks(); /* * Set up shmem.c index hashtable diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 5c8b4b0656c..6335129ac25 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; - LWLockId partitionLock; + LWLock *partitionLock; bool hasWaiters = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) @@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag, bool found; ResourceOwner owner; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; int status; bool log_lock = false; @@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; - LWLockId partitionLock; + LWLock *partitionLock; bool wakeupNeeded; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) @@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { - LWLockId partitionLock = FirstLockMgrLock + partition; + LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; + partitionLock = LockHashPartitionLockByIndex(partition); + /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is trickier than @@ -2475,7 +2477,7 @@ static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode) { - LWLockId partitionLock = LockHashPartitionLock(hashcode); + LWLock *partitionLock = LockHashPartitionLock(hashcode); Oid relid = locktag->locktag_field2; uint32 i; @@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; LOCKTAG *locktag = &locallock->tag.lock; PROCLOCK *proclock = NULL; - LWLockId partitionLock = LockHashPartitionLock(locallock->hashcode); + LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode); Oid relid = locktag->locktag_field2; uint32 f; @@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode) SHM_QUEUE *procLocks; PROCLOCK *proclock; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; int count = 0; int fast_count = 0; @@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, PROCLOCKTAG proclocktag; uint32 hashcode; uint32 proclock_hashcode; - LWLockId partitionLock; + LWLock *partitionLock; bool wakeupNeeded; hashcode = LockTagHashCode(locktag); @@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid) */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { - LWLockId partitionLock = FirstLockMgrLock + partition; + LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; + partitionLock = LockHashPartitionLockByIndex(partition); + /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is safer than the @@ -3400,7 +3404,7 @@ GetLockStatusData(void) * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ data->nelements = el + hash_get_num_entries(LockMethodProcLockHash); @@ -3442,7 +3446,7 @@ GetLockStatusData(void) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); Assert(el == data->nelements); @@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks) * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ els = hash_get_num_entries(LockMethodProcLockHash); @@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); *nlocks = index; return accessExclusiveLocks; @@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, uint32 hashcode; uint32 proclock_hashcode; int partition; - LWLockId partitionLock; + LWLock *partitionLock; LockMethod lockMethodTable; Assert(len == sizeof(TwoPhaseLockRecord)); @@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) { PROCLOCK *proclock; uint32 hashcode; - LWLockId partitionLock; + LWLock *partitionLock; hashcode = LockTagHashCode(&tag); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 0e319a7e6ac..55d9d7837ca 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -31,50 +31,37 @@ #include "storage/predicate.h" #include "storage/proc.h" #include "storage/spin.h" +#include "utils/memutils.h" + +#ifdef LWLOCK_STATS +#include "utils/hsearch.h" +#endif /* We use the ShmemLock spinlock to protect LWLockAssign */ extern slock_t *ShmemLock; - -typedef struct LWLock -{ - slock_t mutex; /* Protects LWLock and queue of PGPROCs */ - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ - /* tail is undefined when head is NULL */ -} LWLock; - /* - * All the LWLock structs are allocated as an array in shared memory. - * (LWLockIds are indexes into the array.) We force the array stride to - * be a power of 2, which saves a few cycles in indexing, but more - * importantly also ensures that individual LWLocks don't cross cache line - * boundaries. This reduces cache contention problems, especially on AMD - * Opterons. (Of course, we have to also ensure that the array start - * address is suitably aligned.) - * - * LWLock is between 16 and 32 bytes on all known platforms, so these two - * cases are sufficient. + * This is indexed by tranche ID and stores metadata for all tranches known + * to the current backend. */ -#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32) +static LWLockTranche **LWLockTrancheArray = NULL; +static int LWLockTranchesAllocated = 0; -typedef union LWLockPadded -{ - LWLock lock; - char pad[LWLOCK_PADDED_SIZE]; -} LWLockPadded; +#define T_NAME(lock) \ + (LWLockTrancheArray[(lock)->tranche]->name) +#define T_ID(lock) \ + ((int) ((((char *) lock) - \ + ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \ + LWLockTrancheArray[(lock)->tranche]->array_stride)) /* - * This points to the array of LWLocks in shared memory. Backends inherit + * This points to the main array of LWLocks in shared memory. Backends inherit * the pointer by fork from the postmaster (except in the EXEC_BACKEND case, * where we have special measures to pass it down). */ -NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; - +LWLockPadded *MainLWLockArray = NULL; +static LWLockTranche MainLWLockTranche; /* * We use this structure to keep track of locked LWLocks for release @@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; #define MAX_SIMUL_LWLOCKS 100 static int num_held_lwlocks = 0; -static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; +static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; #ifdef LWLOCK_STATS +typedef struct lwlock_stats_key +{ + int tranche; + int instance; +} lwlock_stats_key; + +typedef struct lwlock_stats +{ + lwlock_stats_key key; + int sh_acquire_count; + int ex_acquire_count; + int block_count; + int spin_delay_count; +} lwlock_stats; + static int counts_for_pid = 0; -static int *sh_acquire_counts; -static int *ex_acquire_counts; -static int *block_counts; -static int *spin_delay_counts; +static HTAB *lwlock_stats_htab; #endif #ifdef LOCK_DEBUG bool Trace_lwlocks = false; inline static void -PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) +PRINT_LWDEBUG(const char *where, const volatile LWLock *lock) { if (Trace_lwlocks) - elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d", - where, (int) lockid, + elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d", + where, T_NAME(lock), T_ID(lock), (int) lock->exclusive, lock->shared, lock->head, (int) lock->releaseOK); } inline static void -LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg) +LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg) { if (Trace_lwlocks) - elog(LOG, "%s(%d): %s", where, (int) lockid, msg); + elog(LOG, "%s(%s %d): %s", where, name, index, msg); } #else /* not LOCK_DEBUG */ -#define PRINT_LWDEBUG(a,b,c) -#define LOG_LWDEBUG(a,b,c) +#define PRINT_LWDEBUG(a,b) +#define LOG_LWDEBUG(a,b,c,d) #endif /* LOCK_DEBUG */ #ifdef LWLOCK_STATS static void init_lwlock_stats(void); static void print_lwlock_stats(int code, Datum arg); +static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid); static void init_lwlock_stats(void) { - int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - int numLocks = LWLockCounter[1]; + HASHCTL ctl; + + if (lwlock_stats_htab != NULL) + { + hash_destroy(lwlock_stats_htab); + lwlock_stats_htab = NULL; + } - sh_acquire_counts = calloc(numLocks, sizeof(int)); - ex_acquire_counts = calloc(numLocks, sizeof(int)); - spin_delay_counts = calloc(numLocks, sizeof(int)); - block_counts = calloc(numLocks, sizeof(int)); + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(lwlock_stats_key); + ctl.entrysize = sizeof(lwlock_stats); + ctl.hash = tag_hash; + lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl, + HASH_ELEM | HASH_FUNCTION); counts_for_pid = MyProcPid; on_shmem_exit(print_lwlock_stats, 0); } @@ -144,30 +151,58 @@ init_lwlock_stats(void) static void print_lwlock_stats(int code, Datum arg) { - int i; - int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - int numLocks = LWLockCounter[1]; + HASH_SEQ_STATUS scan; + lwlock_stats *lwstats; + + hash_seq_init(&scan, lwlock_stats_htab); /* Grab an LWLock to keep different backends from mixing reports */ - LWLockAcquire(0, LW_EXCLUSIVE); + LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE); - for (i = 0; i < numLocks; i++) + while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) { - if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i]) - fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n", - MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i], - block_counts[i], spin_delay_counts[i]); + fprintf(stderr, + "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n", + MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name, + lwstats->key.instance, lwstats->sh_acquire_count, + lwstats->ex_acquire_count, lwstats->block_count, + lwstats->spin_delay_count); } - LWLockRelease(0); + LWLockRelease(&MainLWLockArray[0].lock); +} + +static lwlock_stats * +get_lwlock_stats_entry(LWLock *lock) +{ + lwlock_stats_key key; + lwlock_stats *lwstats; + bool found; + + /* Set up local count state first time through in a given process */ + if (counts_for_pid != MyProcPid) + init_lwlock_stats(); + + /* Fetch or create the entry. */ + key.tranche = lock->tranche; + key.instance = T_ID(lock); + lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found); + if (!found) + { + lwstats->sh_acquire_count = 0; + lwstats->ex_acquire_count = 0; + lwstats->block_count = 0; + lwstats->spin_delay_count = 0; + } + return lwstats; } #endif /* LWLOCK_STATS */ /* - * Compute number of LWLocks to allocate. + * Compute number of LWLocks to allocate in the main array. */ -int +static int NumLWLocks(void) { int numLocks; @@ -180,7 +215,7 @@ NumLWLocks(void) */ /* Predefined LWLocks */ - numLocks = (int) NumFixedLWLocks; + numLocks = NUM_FIXED_LWLOCKS; /* bufmgr.c needs two for each shared buffer */ numLocks += 2 * NBuffers; @@ -248,56 +283,67 @@ LWLockShmemSize(void) size = mul_size(numLocks, sizeof(LWLockPadded)); /* Space for dynamic allocation counter, plus room for alignment. */ - size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE); + size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE); return size; } /* - * Allocate shmem space for LWLocks and initialize the locks. + * Allocate shmem space for the main LWLock array and initialize it. We also + * register the main tranch here. */ void CreateLWLocks(void) { - int numLocks = NumLWLocks(); - Size spaceLocks = LWLockShmemSize(); - LWLockPadded *lock; - int *LWLockCounter; - char *ptr; - int id; + if (!IsUnderPostmaster) + { + int numLocks = NumLWLocks(); + Size spaceLocks = LWLockShmemSize(); + LWLockPadded *lock; + int *LWLockCounter; + char *ptr; + int id; - /* Allocate space */ - ptr = (char *) ShmemAlloc(spaceLocks); + /* Allocate space */ + ptr = (char *) ShmemAlloc(spaceLocks); - /* Leave room for dynamic allocation counter */ - ptr += 2 * sizeof(int); + /* Leave room for dynamic allocation of locks and tranches */ + ptr += 3 * sizeof(int); - /* Ensure desired alignment of LWLock array */ - ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; + /* Ensure desired alignment of LWLock array */ + ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; - LWLockArray = (LWLockPadded *) ptr; + MainLWLockArray = (LWLockPadded *) ptr; - /* - * Initialize all LWLocks to "unlocked" state - */ - for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) + /* Initialize all LWLocks in main array */ + for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++) + LWLockInitialize(&lock->lock, 0); + + /* + * Initialize the dynamic-allocation counters, which are stored just + * before the first LWLock. LWLockCounter[0] is the allocation + * counter for lwlocks, LWLockCounter[1] is the maximum number that + * can be allocated from the main array, and LWLockCounter[2] is the + * allocation counter for tranches. + */ + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); + LWLockCounter[0] = NUM_FIXED_LWLOCKS; + LWLockCounter[1] = numLocks; + LWLockCounter[2] = 1; /* 0 is the main array */ + } + + if (LWLockTrancheArray == NULL) { - SpinLockInit(&lock->lock.mutex); - lock->lock.releaseOK = true; - lock->lock.exclusive = 0; - lock->lock.shared = 0; - lock->lock.head = NULL; - lock->lock.tail = NULL; + LWLockTranchesAllocated = 16; + LWLockTrancheArray = MemoryContextAlloc(TopMemoryContext, + LWLockTranchesAllocated * sizeof(LWLockTranche *)); } - /* - * Initialize the dynamic-allocation counter, which is stored just before - * the first LWLock. - */ - LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); - LWLockCounter[0] = (int) NumFixedLWLocks; - LWLockCounter[1] = numLocks; + MainLWLockTranche.name = "main"; + MainLWLockTranche.array_base = MainLWLockArray; + MainLWLockTranche.array_stride = sizeof(LWLockPadded); + LWLockRegisterTranche(0, &MainLWLockTranche); } @@ -309,26 +355,86 @@ CreateLWLocks(void) * startup, but it is needed if any user-defined code tries to allocate * LWLocks after startup. */ -LWLockId +LWLock * LWLockAssign(void) { - LWLockId result; + LWLock *result; /* use volatile pointer to prevent code rearrangement */ volatile int *LWLockCounter; - LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); SpinLockAcquire(ShmemLock); if (LWLockCounter[0] >= LWLockCounter[1]) { SpinLockRelease(ShmemLock); - elog(ERROR, "no more LWLockIds available"); + elog(ERROR, "no more LWLocks available"); } - result = (LWLockId) (LWLockCounter[0]++); + result = &MainLWLockArray[LWLockCounter[0]++].lock; SpinLockRelease(ShmemLock); return result; } +/* + * Allocate a new tranche ID. + */ +int +LWLockNewTrancheId(void) +{ + int result; + + /* use volatile pointer to prevent code rearrangement */ + volatile int *LWLockCounter; + + LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int)); + SpinLockAcquire(ShmemLock); + result = LWLockCounter[2]++; + SpinLockRelease(ShmemLock); + + return result; +} + +/* + * Register a tranche ID in the lookup table for the current process. This + * routine will save a pointer to the tranche object passed as an argument, + * so that object should be allocated in a backend-lifetime context + * (TopMemoryContext, static variable, or similar). + */ +void +LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche) +{ + Assert(LWLockTrancheArray != NULL); + + if (tranche_id >= LWLockTranchesAllocated) + { + int i = LWLockTranchesAllocated; + + while (i < tranche_id) + i *= 2; + + LWLockTrancheArray = repalloc(LWLockTrancheArray, + i * sizeof(LWLockTranche *)); + LWLockTranchesAllocated = i; + } + + LWLockTrancheArray[tranche_id] = tranche; +} + +/* + * LWLockInitialize - initialize a new lwlock; it's initially unlocked + */ +void +LWLockInitialize(LWLock *lock, int tranche_id) +{ + SpinLockInit(&lock->mutex); + lock->releaseOK = true; + lock->exclusive = 0; + lock->shared = 0; + lock->tranche = tranche_id; + lock->head = NULL; + lock->tail = NULL; +} + /* * LWLockAcquire - acquire a lightweight lock in the specified mode @@ -338,24 +444,26 @@ LWLockAssign(void) * Side effect: cancel/die interrupts are held off until lock release. */ void -LWLockAcquire(LWLockId lockid, LWLockMode mode) +LWLockAcquire(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *proc = MyProc; bool retry = false; int extraWaits = 0; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; +#endif - PRINT_LWDEBUG("LWLockAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockAcquire", lock); #ifdef LWLOCK_STATS - /* Set up local count state first time through in a given process */ - if (counts_for_pid != MyProcPid) - init_lwlock_stats(); + lwstats = get_lwlock_stats_entry(l); + /* Count lock acquisition attempts */ if (mode == LW_EXCLUSIVE) - ex_acquire_counts[lockid]++; + lwstats->ex_acquire_count++; else - sh_acquire_counts[lockid]++; + lwstats->sh_acquire_count++; #endif /* LWLOCK_STATS */ /* @@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) /* Acquire mutex. Time spent holding mutex should be short! */ #ifdef LWLOCK_STATS - spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex); + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); #else SpinLockAcquire(&lock->mutex); #endif @@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockAcquire", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + lwstats->block_count++; #endif - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); for (;;) { @@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) extraWaits++; } - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); - LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "awakened"); /* Now loop back and try to acquire lock again. */ retry = true; @@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks++] = l; /* * Fix the process wait semaphore's count for any absorbed wakeups. @@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * If successful, cancel/die interrupts are held off until lock release. */ bool -LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) +LWLockConditionalAcquire(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; bool mustwait; - PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockConditionalAcquire", lock); /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) @@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); - TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); + LOG_LWDEBUG("LWLockConditionalAcquire", T_NAME(l), T_ID(l), "failed"); + TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(l), T_ID(l), mode); } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; - TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); + held_lwlocks[num_held_lwlocks++] = l; + TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(l), T_ID(l), mode); } return !mustwait; @@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) * wake up, observe that their records have already been flushed, and return. */ bool -LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) +LWLockAcquireOrWait(LWLock *l, LWLockMode mode) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *proc = MyProc; bool mustwait; int extraWaits = 0; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; +#endif - PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock); + PRINT_LWDEBUG("LWLockAcquireOrWait", lock); #ifdef LWLOCK_STATS - /* Set up local count state first time through in a given process */ - if (counts_for_pid != MyProcPid) - init_lwlock_stats(); + lwstats = get_lwlock_stats_entry(l); #endif /* Ensure we will have room to remember the lock */ @@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) * Wait until awakened. Like in LWLockAcquire, be prepared for bogus * wakups, because we share the semaphore with ProcWaitForSignal. */ - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + lwstats->block_count++; #endif - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); for (;;) { @@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) extraWaits++; } - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "awakened"); } else { @@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed"); - TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); + LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "failed"); + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(T_NAME(l), T_ID(l), mode); } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; - TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); + held_lwlocks[num_held_lwlocks++] = l; + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(l), T_ID(l), mode); } return !mustwait; @@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) * LWLockRelease - release a previously acquired lock */ void -LWLockRelease(LWLockId lockid) +LWLockRelease(LWLock *l) { - volatile LWLock *lock = &(LWLockArray[lockid].lock); + volatile LWLock *lock = l; PGPROC *head; PGPROC *proc; int i; - PRINT_LWDEBUG("LWLockRelease", lockid, lock); + PRINT_LWDEBUG("LWLockRelease", lock); /* * Remove lock from list of locks held. Usually, but not always, it will @@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid) */ for (i = num_held_lwlocks; --i >= 0;) { - if (lockid == held_lwlocks[i]) + if (l == held_lwlocks[i]) break; } if (i < 0) - elog(ERROR, "lock %d is not held", (int) lockid); + elog(ERROR, "lock %s %d is not held", T_NAME(l), T_ID(l)); num_held_lwlocks--; for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; @@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); + TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(l), T_ID(l)); /* * Awaken any waiters I removed from the queue. */ while (head != NULL) { - LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); + LOG_LWDEBUG("LWLockRelease", T_NAME(l), T_ID(l), "release waiter"); proc = head; head = proc->lwWaitLink; proc->lwWaitLink = NULL; @@ -874,13 +983,13 @@ LWLockReleaseAll(void) * lock is held shared or exclusive. */ bool -LWLockHeldByMe(LWLockId lockid) +LWLockHeldByMe(LWLock *l) { int i; for (i = 0; i < num_held_lwlocks; i++) { - if (held_lwlocks[i] == lockid) + if (held_lwlocks[i] == l) return true; } return false; diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e7f44cce841..67000720275 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -241,7 +241,10 @@ #define PredicateLockHashPartition(hashcode) \ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS) #define PredicateLockHashPartitionLock(hashcode) \ - ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode))) + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \ + PredicateLockHashPartition(hashcode)].lock) +#define PredicateLockHashPartitionLockByIndex(i) \ + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock) #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) @@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions; */ static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0}; static uint32 ScratchTargetTagHash; -static int ScratchPartitionLock; +static LWLock *ScratchPartitionLock; /* * The local hash table used to determine when to combine multiple fine- @@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void) * in ascending order, then SerializableXactHashLock. */ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Get number of locks and allocate appropriately-sized arrays. */ @@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); return data; } @@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno) { PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; SET_PREDICATELOCKTARGETTAG_PAGE(targettag, @@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) if (TargetTagIsCoveredBy(oldtargettag, *newtargettag)) { uint32 oldtargettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY; oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); @@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, PREDICATELOCKTARGET *target; PREDICATELOCKTAG locktag; PREDICATELOCK *lock; - LWLockId partitionLock; + LWLock *partitionLock; bool found; partitionLock = PredicateLockHashPartitionLock(targettaghash); @@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool removeOld) { uint32 oldtargettaghash; - LWLockId oldpartitionLock; + LWLock *oldpartitionLock; PREDICATELOCKTARGET *oldtarget; uint32 newtargettaghash; - LWLockId newpartitionLock; + LWLock *newpartitionLock; bool found; bool outOfShmem = false; @@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Acquire locks on all lock partitions */ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); /* @@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } @@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void) PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; tag = predlock->tag; target = tag.myTarget; @@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; nextpredlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), @@ -4068,7 +4071,7 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; @@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation) LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Scan through target list */ @@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index ee6c24cea7d..1a683b83361 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -189,7 +189,8 @@ InitProcGlobal(void) */ procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC)); ProcGlobal->allProcs = procs; - ProcGlobal->allProcCount = TotalProcs; + /* XXX allProcCount isn't really all of them; it excludes prepared xacts */ + ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS; if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -663,7 +664,7 @@ IsWaitingForLock(void) void LockErrorCleanup(void) { - LWLockId partitionLock; + LWLock *partitionLock; DisableTimeoutParams timeouts[2]; AbortStrongLockAcquire(); @@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LOCK *lock = locallock->lock; PROCLOCK *proclock = locallock->proclock; uint32 hashcode = locallock->hashcode; - LWLockId partitionLock = LockHashPartitionLock(hashcode); + LWLock *partitionLock = LockHashPartitionLock(hashcode); PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMASK myHeldLocks = MyProc->heldLocks; bool early_deadlock = false; @@ -1440,7 +1441,7 @@ CheckDeadLock(void) * interrupts. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE); + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE); /* * Check to see if we've been awoken by anyone in the interim. @@ -1522,7 +1523,7 @@ CheckDeadLock(void) */ check_done: for (i = NUM_LOCK_PARTITIONS; --i >= 0;) - LWLockRelease(FirstLockMgrLock + i); + LWLockRelease(LockHashPartitionLockByIndex(i)); } |