diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 8 | ||||
-rw-r--r-- | src/backend/storage/ipc/standby.c | 110 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 12 | ||||
-rw-r--r-- | src/include/storage/standby.h | 2 |
4 files changed, 88 insertions, 44 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 6ea0a28fb72..dc2768b3364 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -499,7 +499,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) * Remove stale transactions, if any. */ ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid); - StandbyReleaseOldLocks(running->oldestRunningXid); + StandbyReleaseOldLocks(running->xcnt, running->xids); /* * If our snapshot is already valid, nothing else to do... @@ -554,12 +554,6 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) */ /* - * Release any locks belonging to old transactions that are not running - * according to the running-xacts record. - */ - StandbyReleaseOldLocks(running->nextXid); - - /* * Nobody else is running yet, but take locks anyhow */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index c88557c7699..dc6833b3b14 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -525,7 +525,9 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) LOCKTAG locktag; /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + if (!TransactionIdIsValid(xid) || + TransactionIdDidCommit(xid) || + TransactionIdDidAbort(xid)) return; elog(trace_recovery(DEBUG4), @@ -607,34 +609,86 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids) } /* - * StandbyReleaseLocksMany - * Release standby locks held by XIDs < removeXid - * - * If keepPreparedXacts is true, keep prepared transactions even if - * they're older than removeXid + * Called at end of recovery and when we see a shutdown checkpoint. */ -static void -StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts) +void +StandbyReleaseAllLocks(void) +{ + ListCell *cell, + *prev, + *next; + LOCKTAG locktag; + + elog(trace_recovery(DEBUG2), "release all standby locks"); + + prev = NULL; + for (cell = list_head(RecoveryLockList); cell; cell = next) + { + xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); + + next = lnext(cell); + + elog(trace_recovery(DEBUG4), + "releasing recovery lock: xid %u db %u rel %u", + lock->xid, lock->dbOid, lock->relOid); + SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); + if (!LockRelease(&locktag, AccessExclusiveLock, true)) + elog(LOG, + "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + lock->xid, lock->dbOid, lock->relOid); + RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); + pfree(lock); + } +} + +/* + * StandbyReleaseOldLocks + * Release standby locks held by XIDs that aren't running, as long + * as they're not prepared transactions. + */ +void +StandbyReleaseOldLocks(int nxids, TransactionId *xids) { ListCell *cell, *prev, *next; LOCKTAG locktag; - /* - * Release all matching locks. - */ prev = NULL; for (cell = list_head(RecoveryLockList); cell; cell = next) { xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); + bool remove = false; next = lnext(cell); - if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid)) + Assert(TransactionIdIsValid(lock->xid)); + + if (StandbyTransactionIdIsPrepared(lock->xid)) + remove = false; + else + { + int i; + bool found = false; + + for (i = 0; i < nxids; i++) + { + if (lock->xid == xids[i]) + { + found = true; + break; + } + } + + /* + * If its not a running transaction, remove it. + */ + if (!found) + remove = true; + } + + if (remove) { - if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid)) - continue; elog(trace_recovery(DEBUG4), "releasing recovery lock: xid %u db %u rel %u", lock->xid, lock->dbOid, lock->relOid); @@ -652,27 +706,6 @@ StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts) } /* - * Called at end of recovery and when we see a shutdown checkpoint. - */ -void -StandbyReleaseAllLocks(void) -{ - elog(trace_recovery(DEBUG2), "release all standby locks"); - StandbyReleaseLocksMany(InvalidTransactionId, false); -} - -/* - * StandbyReleaseOldLocks - * Release standby locks held by XIDs < removeXid, as long - * as they're not prepared transactions. - */ -void -StandbyReleaseOldLocks(TransactionId removeXid) -{ - StandbyReleaseLocksMany(removeXid, true); -} - -/* * -------------------------------------------------------------------- * Recovery handling for Rmgr RM_STANDBY_ID * @@ -813,6 +846,13 @@ standby_desc(StringInfo buf, uint8 xl_info, char *rec) * Later, when we apply the running xact data we must be careful to ignore * transactions already committed, since those commits raced ahead when * making WAL entries. + * + * The loose timing also means that locks may be recorded that have a + * zero xid, since xids are removed from procs before locks are removed. + * So we must prune the lock list down to ensure we hold locks only for + * currently running xids, performed by StandbyReleaseOldLocks(). + * Zero xids should no longer be possible, but we may be replaying WAL + * from a time when they were possible. */ void LogStandbySnapshot(TransactionId *nextXid) diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 762294affb0..a98dfcab7f0 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -3190,8 +3190,18 @@ GetRunningTransactionLocks(int *nlocks) PGPROC *proc = proclock->tag.myProc; PGXACT *pgxact = &ProcGlobal->allPgXact[proc->pgprocno]; LOCK *lock = proclock->tag.myLock; + TransactionId xid = pgxact->xid; - accessExclusiveLocks[index].xid = pgxact->xid; + /* + * Don't record locks for transactions if we know they have already + * issued their WAL record for commit but not yet released lock. + * It is still possible that we see locks held by already complete + * transactions, if they haven't yet zeroed their xids. + */ + if (!TransactionIdIsValid(xid)) + continue; + + accessExclusiveLocks[index].xid = xid; accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1; accessExclusiveLocks[index].relOid = lock->tag.locktag_field2; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index a539ec2a9c9..1027bbcf9e9 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -48,7 +48,7 @@ extern void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid extern void StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids); extern void StandbyReleaseAllLocks(void); -extern void StandbyReleaseOldLocks(TransactionId removeXid); +extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids); /* * XLOG message types |