aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r--src/backend/storage/lmgr/lock.c218
1 files changed, 209 insertions, 9 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 6a292104960..03459a71ece 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.188 2009/06/11 14:49:02 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.189 2009/12/19 01:32:35 sriggs Exp $
*
* NOTES
* A lock table is a shared memory hash table. When
@@ -38,6 +38,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
+#include "storage/standby.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
@@ -469,6 +470,25 @@ LockAcquire(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait)
{
+ return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+}
+
+/*
+ * LockAcquireExtended - allows us to specify additional options
+ *
+ * reportMemoryError specifies whether a lock request that fills the
+ * lock table should generate an ERROR or not. This allows a priority
+ * caller to note that the lock table is full and then begin taking
+ * extreme action to reduce the number of other lock holders before
+ * retrying the action.
+ */
+LockAcquireResult
+LockAcquireExtended(const LOCKTAG *locktag,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait,
+ bool reportMemoryError)
+{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
LOCALLOCKTAG localtag;
@@ -490,6 +510,16 @@ LockAcquire(const LOCKTAG *locktag,
if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
elog(ERROR, "unrecognized lock mode: %d", lockmode);
+ if (RecoveryInProgress() && !InRecovery &&
+ (locktag->locktag_type == LOCKTAG_OBJECT ||
+ locktag->locktag_type == LOCKTAG_RELATION ) &&
+ lockmode > RowExclusiveLock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot acquire lockmode %s on database objects while recovery is in progress",
+ lockMethodTable->lockModeNames[lockmode]),
+ errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
+
#ifdef LOCK_DEBUG
if (LOCK_DEBUG_ENABLED(locktag))
elog(LOG, "LockAcquire: lock [%u,%u] %s",
@@ -578,10 +608,13 @@ LockAcquire(const LOCKTAG *locktag,
if (!lock)
{
LWLockRelease(partitionLock);
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of shared memory"),
- errhint("You might need to increase max_locks_per_transaction.")));
+ if (reportMemoryError)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_locks_per_transaction.")));
+ else
+ return LOCKACQUIRE_NOT_AVAIL;
}
locallock->lock = lock;
@@ -644,10 +677,13 @@ LockAcquire(const LOCKTAG *locktag,
elog(PANIC, "lock table corrupted");
}
LWLockRelease(partitionLock);
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of shared memory"),
- errhint("You might need to increase max_locks_per_transaction.")));
+ if (reportMemoryError)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_locks_per_transaction.")));
+ else
+ return LOCKACQUIRE_NOT_AVAIL;
}
locallock->proclock = proclock;
@@ -779,6 +815,25 @@ LockAcquire(const LOCKTAG *locktag,
}
/*
+ * In Hot Standby we abort the lock wait if Startup process is waiting
+ * since this would result in a deadlock. The deadlock occurs because
+ * if we are waiting it must be behind an AccessExclusiveLock, which
+ * can only clear when a transaction completion record is replayed.
+ * If Startup process is waiting we never will clear that lock, so to
+ * wait for it just causes a deadlock.
+ */
+ if (RecoveryInProgress() && !InRecovery &&
+ locktag->locktag_type == LOCKTAG_RELATION)
+ {
+ LWLockRelease(partitionLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
+ errmsg("possible deadlock detected"),
+ errdetail("process conflicts with recovery - please resubmit query later"),
+ errdetail_log("process conflicts with recovery")));
+ }
+
+ /*
* Set bitmask of locks this process already holds on this object.
*/
MyProc->heldLocks = proclock->holdMask;
@@ -827,6 +882,27 @@ LockAcquire(const LOCKTAG *locktag,
LWLockRelease(partitionLock);
+ /*
+ * Emit a WAL record if acquisition of this lock need to be replayed in
+ * a standby server. Only AccessExclusiveLocks can conflict with lock
+ * types that read-only transactions can acquire in a standby server.
+ *
+ * Make sure this definition matches the one GetRunningTransactionLocks().
+ */
+ if (lockmode >= AccessExclusiveLock &&
+ locktag->locktag_type == LOCKTAG_RELATION &&
+ !RecoveryInProgress() &&
+ XLogStandbyInfoActive())
+ {
+ /*
+ * Decode the locktag back to the original values, to avoid
+ * sending lots of empty bytes with every message. See
+ * lock.h to check how a locktag is defined for LOCKTAG_RELATION
+ */
+ LogAccessExclusiveLock(locktag->locktag_field1,
+ locktag->locktag_field2);
+ }
+
return LOCKACQUIRE_OK;
}
@@ -2193,6 +2269,79 @@ GetLockStatusData(void)
return data;
}
+/*
+ * Returns a list of currently held AccessExclusiveLocks, for use
+ * by GetRunningTransactionData().
+ */
+xl_standby_lock *
+GetRunningTransactionLocks(int *nlocks)
+{
+ PROCLOCK *proclock;
+ HASH_SEQ_STATUS seqstat;
+ int i;
+ int index;
+ int els;
+ xl_standby_lock *accessExclusiveLocks;
+
+ /*
+ * Acquire lock on the entire shared lock data structure.
+ *
+ * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
+ */
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+
+ /* Now scan the tables to copy the data */
+ hash_seq_init(&seqstat, LockMethodProcLockHash);
+
+ /* Now we can safely count the number of proclocks */
+ els = hash_get_num_entries(LockMethodProcLockHash);
+
+ /*
+ * Allocating enough space for all locks in the lock table is overkill,
+ * but it's more convenient and faster than having to enlarge the array.
+ */
+ accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
+
+ /*
+ * If lock is a currently granted AccessExclusiveLock then
+ * it will have just one proclock holder, so locks are never
+ * accessed twice in this particular case. Don't copy this code
+ * for use elsewhere because in the general case this will
+ * give you duplicate locks when looking at non-exclusive lock types.
+ */
+ index = 0;
+ while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
+ {
+ /* make sure this definition matches the one used in LockAcquire */
+ if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
+ proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
+ {
+ PGPROC *proc = proclock->tag.myProc;
+ LOCK *lock = proclock->tag.myLock;
+
+ accessExclusiveLocks[index].xid = proc->xid;
+ accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
+ accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
+
+ index++;
+ }
+ }
+
+ /*
+ * And release locks. We do this in reverse order for two reasons: (1)
+ * Anyone else who needs more than one of the locks will be trying to lock
+ * them in increasing order; we don't want to release the other process
+ * until it can get all the locks it needs. (2) This avoids O(N^2)
+ * behavior inside LWLockRelease.
+ */
+ for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
+ LWLockRelease(FirstLockMgrLock + i);
+
+ *nlocks = index;
+ return accessExclusiveLocks;
+}
+
/* Provide the textual name of any lock mode */
const char *
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
@@ -2288,6 +2437,24 @@ DumpAllLocks(void)
* Because this function is run at db startup, re-acquiring the locks should
* never conflict with running transactions because there are none. We
* assume that the lock state represented by the stored 2PC files is legal.
+ *
+ * When switching from Hot Standby mode to normal operation, the locks will
+ * be already held by the startup process. The locks are acquired for the new
+ * procs without checking for conflicts, so we don'get a conflict between the
+ * startup process and the dummy procs, even though we will momentarily have
+ * a situation where two procs are holding the same AccessExclusiveLock,
+ * which isn't normally possible because the conflict. If we're in standby
+ * mode, but a recovery snapshot hasn't been established yet, it's possible
+ * that some but not all of the locks are already held by the startup process.
+ *
+ * This approach is simple, but also a bit dangerous, because if there isn't
+ * enough shared memory to acquire the locks, an error will be thrown, which
+ * is promoted to FATAL and recovery will abort, bringing down postmaster.
+ * A safer approach would be to transfer the locks like we do in
+ * AtPrepare_Locks, but then again, in hot standby mode it's possible for
+ * read-only backends to use up all the shared lock memory anyway, so that
+ * replaying the WAL record that needs to acquire a lock will throw an error
+ * and PANIC anyway.
*/
void
lock_twophase_recover(TransactionId xid, uint16 info,
@@ -2443,6 +2610,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,
/*
* We ignore any possible conflicts and just grant ourselves the lock.
+ * Not only because we don't bother, but also to avoid deadlocks when
+ * switching from standby to normal mode. See function comment.
*/
GrantLock(lock, proclock, lockmode);
@@ -2450,6 +2619,37 @@ lock_twophase_recover(TransactionId xid, uint16 info,
}
/*
+ * Re-acquire a lock belonging to a transaction that was prepared, when
+ * when starting up into hot standby mode.
+ */
+void
+lock_twophase_standby_recover(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+{
+ TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
+ LOCKTAG *locktag;
+ LOCKMODE lockmode;
+ LOCKMETHODID lockmethodid;
+
+ Assert(len == sizeof(TwoPhaseLockRecord));
+ locktag = &rec->locktag;
+ lockmode = rec->lockmode;
+ lockmethodid = locktag->locktag_lockmethodid;
+
+ if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+ elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+ if (lockmode == AccessExclusiveLock &&
+ locktag->locktag_type == LOCKTAG_RELATION)
+ {
+ StandbyAcquireAccessExclusiveLock(xid,
+ locktag->locktag_field1 /* dboid */,
+ locktag->locktag_field2 /* reloid */);
+ }
+}
+
+
+/*
* 2PC processing routine for COMMIT PREPARED case.
*
* Find and release the lock indicated by the 2PC record.