diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 218 |
1 files changed, 209 insertions, 9 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 6a292104960..03459a71ece 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.188 2009/06/11 14:49:02 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.189 2009/12/19 01:32:35 sriggs Exp $ * * NOTES * A lock table is a shared memory hash table. When @@ -38,6 +38,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "storage/standby.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/resowner.h" @@ -469,6 +470,25 @@ LockAcquire(const LOCKTAG *locktag, bool sessionLock, bool dontWait) { + return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true); +} + +/* + * LockAcquireExtended - allows us to specify additional options + * + * reportMemoryError specifies whether a lock request that fills the + * lock table should generate an ERROR or not. This allows a priority + * caller to note that the lock table is full and then begin taking + * extreme action to reduce the number of other lock holders before + * retrying the action. + */ +LockAcquireResult +LockAcquireExtended(const LOCKTAG *locktag, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait, + bool reportMemoryError) +{ LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; @@ -490,6 +510,16 @@ LockAcquire(const LOCKTAG *locktag, if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); + if (RecoveryInProgress() && !InRecovery && + (locktag->locktag_type == LOCKTAG_OBJECT || + locktag->locktag_type == LOCKTAG_RELATION ) && + lockmode > RowExclusiveLock) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot acquire lockmode %s on database objects while recovery is in progress", + lockMethodTable->lockModeNames[lockmode]), + errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery."))); + #ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockAcquire: lock [%u,%u] %s", @@ -578,10 +608,13 @@ LockAcquire(const LOCKTAG *locktag, if (!lock) { LWLockRelease(partitionLock); - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of shared memory"), - errhint("You might need to increase max_locks_per_transaction."))); + if (reportMemoryError) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + else + return LOCKACQUIRE_NOT_AVAIL; } locallock->lock = lock; @@ -644,10 +677,13 @@ LockAcquire(const LOCKTAG *locktag, elog(PANIC, "lock table corrupted"); } LWLockRelease(partitionLock); - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of shared memory"), - errhint("You might need to increase max_locks_per_transaction."))); + if (reportMemoryError) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + else + return LOCKACQUIRE_NOT_AVAIL; } locallock->proclock = proclock; @@ -779,6 +815,25 @@ LockAcquire(const LOCKTAG *locktag, } /* + * In Hot Standby we abort the lock wait if Startup process is waiting + * since this would result in a deadlock. The deadlock occurs because + * if we are waiting it must be behind an AccessExclusiveLock, which + * can only clear when a transaction completion record is replayed. + * If Startup process is waiting we never will clear that lock, so to + * wait for it just causes a deadlock. + */ + if (RecoveryInProgress() && !InRecovery && + locktag->locktag_type == LOCKTAG_RELATION) + { + LWLockRelease(partitionLock); + ereport(ERROR, + (errcode(ERRCODE_T_R_DEADLOCK_DETECTED), + errmsg("possible deadlock detected"), + errdetail("process conflicts with recovery - please resubmit query later"), + errdetail_log("process conflicts with recovery"))); + } + + /* * Set bitmask of locks this process already holds on this object. */ MyProc->heldLocks = proclock->holdMask; @@ -827,6 +882,27 @@ LockAcquire(const LOCKTAG *locktag, LWLockRelease(partitionLock); + /* + * Emit a WAL record if acquisition of this lock need to be replayed in + * a standby server. Only AccessExclusiveLocks can conflict with lock + * types that read-only transactions can acquire in a standby server. + * + * Make sure this definition matches the one GetRunningTransactionLocks(). + */ + if (lockmode >= AccessExclusiveLock && + locktag->locktag_type == LOCKTAG_RELATION && + !RecoveryInProgress() && + XLogStandbyInfoActive()) + { + /* + * Decode the locktag back to the original values, to avoid + * sending lots of empty bytes with every message. See + * lock.h to check how a locktag is defined for LOCKTAG_RELATION + */ + LogAccessExclusiveLock(locktag->locktag_field1, + locktag->locktag_field2); + } + return LOCKACQUIRE_OK; } @@ -2193,6 +2269,79 @@ GetLockStatusData(void) return data; } +/* + * Returns a list of currently held AccessExclusiveLocks, for use + * by GetRunningTransactionData(). + */ +xl_standby_lock * +GetRunningTransactionLocks(int *nlocks) +{ + PROCLOCK *proclock; + HASH_SEQ_STATUS seqstat; + int i; + int index; + int els; + xl_standby_lock *accessExclusiveLocks; + + /* + * Acquire lock on the entire shared lock data structure. + * + * Must grab LWLocks in partition-number order to avoid LWLock deadlock. + */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); + + /* Now scan the tables to copy the data */ + hash_seq_init(&seqstat, LockMethodProcLockHash); + + /* Now we can safely count the number of proclocks */ + els = hash_get_num_entries(LockMethodProcLockHash); + + /* + * Allocating enough space for all locks in the lock table is overkill, + * but it's more convenient and faster than having to enlarge the array. + */ + accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock)); + + /* + * If lock is a currently granted AccessExclusiveLock then + * it will have just one proclock holder, so locks are never + * accessed twice in this particular case. Don't copy this code + * for use elsewhere because in the general case this will + * give you duplicate locks when looking at non-exclusive lock types. + */ + index = 0; + while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) + { + /* make sure this definition matches the one used in LockAcquire */ + if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) && + proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION) + { + PGPROC *proc = proclock->tag.myProc; + LOCK *lock = proclock->tag.myLock; + + accessExclusiveLocks[index].xid = proc->xid; + accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1; + accessExclusiveLocks[index].relOid = lock->tag.locktag_field2; + + index++; + } + } + + /* + * And release locks. We do this in reverse order for two reasons: (1) + * Anyone else who needs more than one of the locks will be trying to lock + * them in increasing order; we don't want to release the other process + * until it can get all the locks it needs. (2) This avoids O(N^2) + * behavior inside LWLockRelease. + */ + for (i = NUM_LOCK_PARTITIONS; --i >= 0;) + LWLockRelease(FirstLockMgrLock + i); + + *nlocks = index; + return accessExclusiveLocks; +} + /* Provide the textual name of any lock mode */ const char * GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode) @@ -2288,6 +2437,24 @@ DumpAllLocks(void) * Because this function is run at db startup, re-acquiring the locks should * never conflict with running transactions because there are none. We * assume that the lock state represented by the stored 2PC files is legal. + * + * When switching from Hot Standby mode to normal operation, the locks will + * be already held by the startup process. The locks are acquired for the new + * procs without checking for conflicts, so we don'get a conflict between the + * startup process and the dummy procs, even though we will momentarily have + * a situation where two procs are holding the same AccessExclusiveLock, + * which isn't normally possible because the conflict. If we're in standby + * mode, but a recovery snapshot hasn't been established yet, it's possible + * that some but not all of the locks are already held by the startup process. + * + * This approach is simple, but also a bit dangerous, because if there isn't + * enough shared memory to acquire the locks, an error will be thrown, which + * is promoted to FATAL and recovery will abort, bringing down postmaster. + * A safer approach would be to transfer the locks like we do in + * AtPrepare_Locks, but then again, in hot standby mode it's possible for + * read-only backends to use up all the shared lock memory anyway, so that + * replaying the WAL record that needs to acquire a lock will throw an error + * and PANIC anyway. */ void lock_twophase_recover(TransactionId xid, uint16 info, @@ -2443,6 +2610,8 @@ lock_twophase_recover(TransactionId xid, uint16 info, /* * We ignore any possible conflicts and just grant ourselves the lock. + * Not only because we don't bother, but also to avoid deadlocks when + * switching from standby to normal mode. See function comment. */ GrantLock(lock, proclock, lockmode); @@ -2450,6 +2619,37 @@ lock_twophase_recover(TransactionId xid, uint16 info, } /* + * Re-acquire a lock belonging to a transaction that was prepared, when + * when starting up into hot standby mode. + */ +void +lock_twophase_standby_recover(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + LOCKTAG *locktag; + LOCKMODE lockmode; + LOCKMETHODID lockmethodid; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmode = rec->lockmode; + lockmethodid = locktag->locktag_lockmethodid; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + if (lockmode == AccessExclusiveLock && + locktag->locktag_type == LOCKTAG_RELATION) + { + StandbyAcquireAccessExclusiveLock(xid, + locktag->locktag_field1 /* dboid */, + locktag->locktag_field2 /* reloid */); + } +} + + +/* * 2PC processing routine for COMMIT PREPARED case. * * Find and release the lock indicated by the 2PC record. |