diff options
Diffstat (limited to 'src/backend/storage/ipc/procarray.c')
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 1127 |
1 files changed, 1113 insertions, 14 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9a3d2f62606..c4ddf8f2bd8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -17,13 +17,27 @@ * as are the myProcLocks lists. They can be distinguished from regular * backend PGPROCs at need by checking for pid == 0. * + * During recovery, we also keep a list of XIDs representing transactions + * that are known to be running at current point in WAL recovery. This + * list is kept in the KnownAssignedXids array, and updated by watching + * the sequence of arriving xids. This is very important because if we leave + * those xids out of the snapshot then they will appear to be already complete. + * Later, when they have actually completed this could lead to confusion as to + * whether those xids are visible or not, blowing a huge hole in MVCC. + * We need 'em. + * + * It is theoretically possible for a FATAL error to explode before writing + * an abort record. This could tie up KnownAssignedXids indefinitely, so + * we prune the array when a valid list of running xids arrives. These quirks, + * if they do ever exist in reality will not effect the correctness of + * snapshots. * * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.51 2009/07/29 15:57:11 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.52 2009/12/19 01:32:35 sriggs Exp $ * *------------------------------------------------------------------------- */ @@ -31,14 +45,18 @@ #include <signal.h> +#include "access/clog.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" #include "access/twophase.h" #include "miscadmin.h" #include "storage/procarray.h" +#include "storage/standby.h" +#include "utils/builtins.h" #include "utils/snapmgr.h" +static RunningTransactionsData CurrentRunningXactsData; /* Our shared memory area */ typedef struct ProcArrayStruct @@ -46,6 +64,14 @@ typedef struct ProcArrayStruct int numProcs; /* number of valid procs entries */ int maxProcs; /* allocated size of procs array */ + int numKnownAssignedXids; /* current number of known assigned xids */ + int maxKnownAssignedXids; /* allocated size of known assigned xids */ + /* + * Highest subxid that overflowed KnownAssignedXids array. Similar to + * overflowing cached subxids in PGPROC entries. + */ + TransactionId lastOverflowedXid; + /* * We declare procs[] as 1 entry because C wants a fixed-size array, but * actually it is maxProcs entries long. @@ -55,6 +81,24 @@ typedef struct ProcArrayStruct static ProcArrayStruct *procArray; +/* + * Bookkeeping for tracking emulated transactions in recovery + */ +static HTAB *KnownAssignedXidsHash; +static TransactionId latestObservedXid = InvalidTransactionId; + +/* + * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is + * the highest xid that might still be running that we don't have in + * KnownAssignedXids. + */ +static TransactionId standbySnapshotPendingXmin; + +/* + * Oldest transaction still running according to the running-xacts snapshot + * we initialized standby mode from. + */ +static TransactionId snapshotOldestActiveXid; #ifdef XIDCACHE_DEBUG @@ -90,6 +134,17 @@ static void DisplayXidCache(void); #define xc_slow_answer_inc() ((void) 0) #endif /* XIDCACHE_DEBUG */ +/* Primitives for KnownAssignedXids array handling for standby */ +static Size KnownAssignedXidsShmemSize(int size); +static void KnownAssignedXidsInit(int size); +static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); +static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, + TransactionId xmax); +static bool KnownAssignedXidsExist(TransactionId xid); +static void KnownAssignedXidsAdd(TransactionId *xids, int nxids); +static void KnownAssignedXidsRemove(TransactionId xid); +static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts); +static void KnownAssignedXidsDisplay(int trace_level); /* * Report shared-memory space needed by CreateSharedProcArray. @@ -100,8 +155,22 @@ ProcArrayShmemSize(void) Size size; size = offsetof(ProcArrayStruct, procs); - size = add_size(size, mul_size(sizeof(PGPROC *), - add_size(MaxBackends, max_prepared_xacts))); + + /* Normal processing - MyProc slots */ +#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) + size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS)); + + /* + * During recovery processing we have a data structure called KnownAssignedXids, + * created in shared memory. Local data structures are also created in various + * backends during GetSnapshotData(), TransactionIdIsInProgress() and + * GetRunningTransactionData(). All of the main structures created in those + * functions must be identically sized, since we may at times copy the whole + * of the data structures around. We refer to this as TOTAL_MAX_CACHED_SUBXIDS. + */ +#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) + if (XLogRequestRecoveryConnections) + size = add_size(size, KnownAssignedXidsShmemSize(TOTAL_MAX_CACHED_SUBXIDS)); return size; } @@ -116,15 +185,21 @@ CreateSharedProcArray(void) /* Create or attach to the ProcArray shared structure */ procArray = (ProcArrayStruct *) - ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found); + ShmemInitStruct("Proc Array", + mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS), + &found); if (!found) { /* * We're the first - initialize. */ + /* Normal processing */ procArray->numProcs = 0; - procArray->maxProcs = MaxBackends + max_prepared_xacts; + procArray->maxProcs = PROCARRAY_MAXPROCS; + + if (XLogRequestRecoveryConnections) + KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS); } } @@ -302,6 +377,7 @@ ProcArrayClearTransaction(PGPROC *proc) proc->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; proc->xmin = InvalidTransactionId; + proc->recoveryConflictMode = 0; /* redundant, but just in case */ proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; @@ -312,6 +388,220 @@ ProcArrayClearTransaction(PGPROC *proc) proc->subxids.overflowed = false; } +void +ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) +{ + snapshotOldestActiveXid = oldestActiveXid; +} + +/* + * ProcArrayApplyRecoveryInfo -- apply recovery info about xids + * + * Takes us through 3 states: Uninitialized, Pending and Ready. + * Normal case is to go all the way to Ready straight away, though there + * are atypical cases where we need to take it in steps. + * + * Use the data about running transactions on master to create the initial + * state of KnownAssignedXids. We also these records to regularly prune + * KnownAssignedXids because we know it is possible that some transactions + * with FATAL errors do not write abort records, which could cause eventual + * overflow. + * + * Only used during recovery. Notice the signature is very similar to a + * _redo function and its difficult to decide exactly where this code should + * reside. + */ +void +ProcArrayApplyRecoveryInfo(RunningTransactions running) +{ + int xid_index; /* main loop */ + TransactionId *xids; + int nxids; + + Assert(standbyState >= STANDBY_INITIALIZED); + + /* + * Remove stale transactions, if any. + */ + ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid); + StandbyReleaseOldLocks(running->oldestRunningXid); + + /* + * If our snapshot is already valid, nothing else to do... + */ + if (standbyState == STANDBY_SNAPSHOT_READY) + return; + + /* + * If our initial RunningXactData had an overflowed snapshot then we + * knew we were missing some subxids from our snapshot. We can use + * this data as an initial snapshot, but we cannot yet mark it valid. + * We know that the missing subxids are equal to or earlier than + * nextXid. After we initialise we continue to apply changes during + * recovery, so once the oldestRunningXid is later than the nextXid + * from the initial snapshot we know that we no longer have missing + * information and can mark the snapshot as valid. + */ + if (standbyState == STANDBY_SNAPSHOT_PENDING) + { + if (TransactionIdPrecedes(standbySnapshotPendingXmin, + running->oldestRunningXid)) + { + standbyState = STANDBY_SNAPSHOT_READY; + elog(trace_recovery(DEBUG2), + "running xact data now proven complete"); + elog(trace_recovery(DEBUG2), + "recovery snapshots are now enabled"); + } + return; + } + + /* + * OK, we need to initialise from the RunningXactData record + */ + latestObservedXid = running->nextXid; + TransactionIdRetreat(latestObservedXid); + + /* + * If the snapshot overflowed, then we still initialise with what we + * know, but the recovery snapshot isn't fully valid yet because we + * know there are some subxids missing (ergo we don't know which ones) + */ + if (!running->subxid_overflow) + { + standbyState = STANDBY_SNAPSHOT_READY; + standbySnapshotPendingXmin = InvalidTransactionId; + } + else + { + standbyState = STANDBY_SNAPSHOT_PENDING; + standbySnapshotPendingXmin = latestObservedXid; + ereport(LOG, + (errmsg("consistent state delayed because recovery snapshot incomplete"))); + } + + nxids = running->xcnt; + xids = running->xids; + + KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); + + /* + * Scan through the incoming array of RunningXacts and collect xids. + * We don't use SubtransSetParent because it doesn't matter yet. If + * we aren't overflowed then all xids will fit in snapshot and so we + * don't need subtrans. If we later overflow, an xid assignment record + * will add xids to subtrans. If RunningXacts is overflowed then we + * don't have enough information to correctly update subtrans anyway. + */ + + /* + * Nobody else is running yet, but take locks anyhow + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + /* Reset latestCompletedXid */ + ShmemVariableCache->latestCompletedXid = running->nextXid; + TransactionIdRetreat(ShmemVariableCache->latestCompletedXid); + + /* + * Add our new xids into the array + */ + for (xid_index = 0; xid_index < running->xcnt; xid_index++) + { + TransactionId xid = running->xids[xid_index]; + + /* + * The running-xacts snapshot can contain xids that did finish between + * when the snapshot was taken and when it was written to WAL. Such + * transactions are not running anymore, so ignore them. + */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + continue; + + KnownAssignedXidsAdd(&xid, 1); + } + + KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); + + /* + * Update lastOverflowedXid if the snapshot had overflown. We don't know + * the exact value for this, so conservatively assume that it's nextXid-1 + */ + if (running->subxid_overflow && + TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid)) + procArray->lastOverflowedXid = latestObservedXid; + else if (TransactionIdFollows(running->oldestRunningXid, + procArray->lastOverflowedXid)) + procArray->lastOverflowedXid = InvalidTransactionId; + + LWLockRelease(ProcArrayLock); + + /* nextXid must be beyond any observed xid */ + if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid)) + ShmemVariableCache->nextXid = running->nextXid; + + elog(trace_recovery(DEBUG2), + "running transaction data initialized"); + if (standbyState == STANDBY_SNAPSHOT_READY) + elog(trace_recovery(DEBUG2), + "recovery snapshots are now enabled"); +} + +void +ProcArrayApplyXidAssignment(TransactionId topxid, + int nsubxids, TransactionId *subxids) +{ + TransactionId max_xid; + int i; + + if (standbyState < STANDBY_SNAPSHOT_PENDING) + return; + + max_xid = TransactionIdLatest(topxid, nsubxids, subxids); + + /* + * Mark all the subtransactions as observed. + * + * NOTE: This will fail if the subxid contains too many previously + * unobserved xids to fit into known-assigned-xids. That shouldn't happen + * as the code stands, because xid-assignment records should never contain + * more than PGPROC_MAX_CACHED_SUBXIDS entries. + */ + RecordKnownAssignedTransactionIds(max_xid); + + /* + * Notice that we update pg_subtrans with the top-level xid, rather + * than the parent xid. This is a difference between normal + * processing and recovery, yet is still correct in all cases. The + * reason is that subtransaction commit is not marked in clog until + * commit processing, so all aborted subtransactions have already been + * clearly marked in clog. As a result we are able to refer directly + * to the top-level transaction's state rather than skipping through + * all the intermediate states in the subtransaction tree. This + * should be the first time we have attempted to SubTransSetParent(). + */ + for (i = 0; i < nsubxids; i++) + SubTransSetParent(subxids[i], topxid, false); + + /* + * Uses same locking as transaction commit + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + /* + * Remove from known-assigned-xacts. + */ + for (i = 0; i < nsubxids; i++) + KnownAssignedXidsRemove(subxids[i]); + + /* + * Advance lastOverflowedXid when required. + */ + if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid)) + procArray->lastOverflowedXid = max_xid; + + LWLockRelease(ProcArrayLock); +} /* * TransactionIdIsInProgress -- is given transaction running in some backend @@ -384,8 +674,15 @@ TransactionIdIsInProgress(TransactionId xid) */ if (xids == NULL) { - xids = (TransactionId *) - malloc(arrayP->maxProcs * sizeof(TransactionId)); + /* + * In hot standby mode, reserve enough space to hold all xids in + * the known-assigned list. If we later finish recovery, we no longer + * need the bigger array, but we don't bother to shrink it. + */ + int maxxids = RecoveryInProgress() ? + arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS; + + xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId)); if (xids == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -465,11 +762,35 @@ TransactionIdIsInProgress(TransactionId xid) xids[nxids++] = pxid; } + /* In hot standby mode, check the known-assigned-xids list. */ + if (RecoveryInProgress()) + { + /* none of the PGPROC entries should have XIDs in hot standby mode */ + Assert(nxids == 0); + + if (KnownAssignedXidsExist(xid)) + { + LWLockRelease(ProcArrayLock); + /* XXX: should we have a separate counter for this? */ + /* xc_by_main_xid_inc(); */ + return true; + } + + /* + * If the KnownAssignedXids overflowed, we have to check + * pg_subtrans too. Copy all xids from KnownAssignedXids that are + * lower than xid, since if xid is a subtransaction its parent will + * always have a lower value. + */ + if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid)) + nxids = KnownAssignedXidsGet(xids, xid); + } + LWLockRelease(ProcArrayLock); /* * If none of the relevant caches overflowed, we know the Xid is not - * running without looking at pg_subtrans. + * running without even looking at pg_subtrans. */ if (nxids == 0) { @@ -590,6 +911,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) TransactionId result; int index; + /* Cannot look for individual databases during recovery */ + Assert(allDbs || !RecoveryInProgress()); + LWLockAcquire(ProcArrayLock, LW_SHARED); /* @@ -635,6 +959,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) LWLockRelease(ProcArrayLock); + /* + * Compute the cutoff XID, being careful not to generate a "permanent" XID + */ + result -= vacuum_defer_cleanup_age; + if (!TransactionIdIsNormal(result)) + result = FirstNormalTransactionId; + return result; } @@ -656,7 +987,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) * but since PGPROC has only a limited cache area for subxact XIDs, full * information may not be available. If we find any overflowed subxid arrays, * we have to mark the snapshot's subxid data as overflowed, and extra work - * will need to be done to determine what's running (see XidInMVCCSnapshot() + * *may* need to be done to determine what's running (see XidInMVCCSnapshot() * in tqual.c). * * We also update the following backend-global variables: @@ -681,6 +1012,7 @@ GetSnapshotData(Snapshot snapshot) int index; int count = 0; int subcount = 0; + bool suboverflowed = false; Assert(snapshot != NULL); @@ -698,7 +1030,8 @@ GetSnapshotData(Snapshot snapshot) if (snapshot->xip == NULL) { /* - * First call for this snapshot + * First call for this snapshot. Snapshot is same size whether + * or not we are in recovery, see later comments. */ snapshot->xip = (TransactionId *) malloc(arrayP->maxProcs * sizeof(TransactionId)); @@ -708,13 +1041,15 @@ GetSnapshotData(Snapshot snapshot) errmsg("out of memory"))); Assert(snapshot->subxip == NULL); snapshot->subxip = (TransactionId *) - malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId)); + malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId)); if (snapshot->subxip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } + snapshot->takenDuringRecovery = RecoveryInProgress(); + /* * It is sufficient to get shared lock on ProcArrayLock, even if we are * going to set MyProc->xmin. @@ -763,6 +1098,7 @@ GetSnapshotData(Snapshot snapshot) */ if (TransactionIdIsNormal(xid)) { + Assert(!snapshot->takenDuringRecovery); if (TransactionIdFollowsOrEquals(xid, xmax)) continue; if (proc != MyProc) @@ -785,16 +1121,17 @@ GetSnapshotData(Snapshot snapshot) * * Again, our own XIDs are not included in the snapshot. */ - if (subcount >= 0 && proc != MyProc) + if (!suboverflowed && proc != MyProc) { if (proc->subxids.overflowed) - subcount = -1; /* overflowed */ + suboverflowed = true; else { int nxids = proc->subxids.nxids; if (nxids > 0) { + Assert(!snapshot->takenDuringRecovery); memcpy(snapshot->subxip + subcount, (void *) proc->subxids.xids, nxids * sizeof(TransactionId)); @@ -804,6 +1141,40 @@ GetSnapshotData(Snapshot snapshot) } } + /* + * If in recovery get any known assigned xids. + */ + if (snapshot->takenDuringRecovery) + { + Assert(count == 0); + + /* + * We store all xids directly into subxip[]. Here's why: + * + * In recovery we don't know which xids are top-level and which are + * subxacts, a design choice that greatly simplifies xid processing. + * + * It seems like we would want to try to put xids into xip[] only, + * but that is fairly small. We would either need to make that bigger + * or to increase the rate at which we WAL-log xid assignment; + * neither is an appealing choice. + * + * We could try to store xids into xip[] first and then into subxip[] + * if there are too many xids. That only works if the snapshot doesn't + * overflow because we do not search subxip[] in that case. A simpler + * way is to just store all xids in the subxact array because this + * is by far the bigger array. We just leave the xip array empty. + * + * Either way we need to change the way XidInMVCCSnapshot() works + * depending upon when the snapshot was taken, or change normal + * snapshot processing so it matches. + */ + subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax); + + if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid)) + suboverflowed = true; + } + if (!TransactionIdIsValid(MyProc->xmin)) MyProc->xmin = TransactionXmin = xmin; @@ -818,13 +1189,16 @@ GetSnapshotData(Snapshot snapshot) globalxmin = xmin; /* Update global variables too */ - RecentGlobalXmin = globalxmin; + RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age; + if (!TransactionIdIsNormal(RecentGlobalXmin)) + RecentGlobalXmin = FirstNormalTransactionId; RecentXmin = xmin; snapshot->xmin = xmin; snapshot->xmax = xmax; snapshot->xcnt = count; snapshot->subxcnt = subcount; + snapshot->suboverflowed = suboverflowed; snapshot->curcid = GetCurrentCommandId(false); @@ -840,6 +1214,129 @@ GetSnapshotData(Snapshot snapshot) } /* + * GetRunningTransactionData -- returns information about running transactions. + * + * Similar to GetSnapshotData but returning more information. We include + * all PGPROCs with an assigned TransactionId, even VACUUM processes. + * + * This is never executed during recovery so there is no need to look at + * KnownAssignedXids. + * + * We don't worry about updating other counters, we want to keep this as + * simple as possible and leave GetSnapshotData() as the primary code for + * that bookkeeping. + */ +RunningTransactions +GetRunningTransactionData(void) +{ + ProcArrayStruct *arrayP = procArray; + RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData; + TransactionId latestCompletedXid; + TransactionId oldestRunningXid; + TransactionId *xids; + int index; + int count; + int subcount; + bool suboverflowed; + + Assert(!RecoveryInProgress()); + + /* + * Allocating space for maxProcs xids is usually overkill; numProcs would + * be sufficient. But it seems better to do the malloc while not holding + * the lock, so we can't look at numProcs. Likewise, we allocate much + * more subxip storage than is probably needed. + * + * Should only be allocated for bgwriter, since only ever executed + * during checkpoints. + */ + if (CurrentRunningXacts->xids == NULL) + { + /* + * First call + */ + CurrentRunningXacts->xids = (TransactionId *) + malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId)); + if (CurrentRunningXacts->xids == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + xids = CurrentRunningXacts->xids; + + count = subcount = 0; + suboverflowed = false; + + /* + * Ensure that no xids enter or leave the procarray while we obtain + * snapshot. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + LWLockAcquire(XidGenLock, LW_SHARED); + + latestCompletedXid = ShmemVariableCache->latestCompletedXid; + + oldestRunningXid = ShmemVariableCache->nextXid; + /* + * Spin over procArray collecting all xids and subxids. + */ + for (index = 0; index < arrayP->numProcs; index++) + { + volatile PGPROC *proc = arrayP->procs[index]; + TransactionId xid; + int nxids; + + /* Fetch xid just once - see GetNewTransactionId */ + xid = proc->xid; + + /* + * We don't need to store transactions that don't have a TransactionId + * yet because they will not show as running on a standby server. + */ + if (!TransactionIdIsValid(xid)) + continue; + + xids[count++] = xid; + + if (TransactionIdPrecedes(xid, oldestRunningXid)) + oldestRunningXid = xid; + + /* + * Save subtransaction XIDs. Other backends can't add or remove entries + * while we're holding XidGenLock. + */ + nxids = proc->subxids.nxids; + if (nxids > 0) + { + memcpy(&xids[count], (void *) proc->subxids.xids, + nxids * sizeof(TransactionId)); + count += nxids; + subcount += nxids; + + if (proc->subxids.overflowed) + suboverflowed = true; + + /* + * Top-level XID of a transaction is always greater than any of + * its subxids, so we don't need to check if any of the subxids + * are smaller than oldestRunningXid + */ + } + } + + CurrentRunningXacts->xcnt = count; + CurrentRunningXacts->subxid_overflow = suboverflowed; + CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid; + CurrentRunningXacts->oldestRunningXid = oldestRunningXid; + + LWLockRelease(XidGenLock); + LWLockRelease(ProcArrayLock); + + return CurrentRunningXacts; +} + +/* * GetTransactionsInCommit -- Get the XIDs of transactions that are committing * * Constructs an array of XIDs of transactions that are currently in commit @@ -1101,6 +1598,154 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0, return vxids; } +/* + * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs. + * + * The array is palloc'd and is terminated with an invalid VXID. + * + * Usage is limited to conflict resolution during recovery on standby servers. + * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId + * in cases where we cannot accurately determine a value for latestRemovedXid. + * If limitXmin is InvalidTransactionId then we know that the very + * latest xid that might have caused a cleanup record will be + * latestCompletedXid, so we set limitXmin to be latestCompletedXid instead. + * We then skip any backends with xmin > limitXmin. This means that + * cleanup records don't conflict with some recent snapshots. + * + * We replace InvalidTransactionId with latestCompletedXid here because + * this is the most convenient place to do that, while we hold ProcArrayLock. + * The originator of the cleanup record wanted to avoid checking the value of + * latestCompletedXid since doing so would be a performance issue during + * normal running, so we check it essentially for free on the standby. + * + * If dbOid is valid we skip backends attached to other databases. Some + * callers choose to skipExistingConflicts. + * + * Be careful to *not* pfree the result from this function. We reuse + * this array sufficiently often that we use malloc for the result. + */ +VirtualTransactionId * +GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid, + bool skipExistingConflicts) +{ + static VirtualTransactionId *vxids; + ProcArrayStruct *arrayP = procArray; + int count = 0; + int index; + + /* + * If not first time through, get workspace to remember main XIDs in. We + * malloc it permanently to avoid repeated palloc/pfree overhead. + * Allow result space, remembering room for a terminator. + */ + if (vxids == NULL) + { + vxids = (VirtualTransactionId *) + malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1)); + if (vxids == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + /* + * If we don't know the TransactionId that created the conflict, set + * it to latestCompletedXid which is the latest possible value. + */ + if (!TransactionIdIsValid(limitXmin)) + limitXmin = ShmemVariableCache->latestCompletedXid; + + for (index = 0; index < arrayP->numProcs; index++) + { + volatile PGPROC *proc = arrayP->procs[index]; + + /* Exclude prepared transactions */ + if (proc->pid == 0) + continue; + + if (skipExistingConflicts && proc->recoveryConflictMode > 0) + continue; + + if (!OidIsValid(dbOid) || + proc->databaseId == dbOid) + { + /* Fetch xmin just once - can't change on us, but good coding */ + TransactionId pxmin = proc->xmin; + + /* + * We ignore an invalid pxmin because this means that backend + * has no snapshot and cannot get another one while we hold exclusive lock. + */ + if (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)) + { + VirtualTransactionId vxid; + + GET_VXID_FROM_PGPROC(vxid, *proc); + if (VirtualTransactionIdIsValid(vxid)) + vxids[count++] = vxid; + } + } + } + + LWLockRelease(ProcArrayLock); + + /* add the terminator */ + vxids[count].backendId = InvalidBackendId; + vxids[count].localTransactionId = InvalidLocalTransactionId; + + return vxids; +} + +/* + * CancelVirtualTransaction - used in recovery conflict processing + * + * Returns pid of the process signaled, or 0 if not found. + */ +pid_t +CancelVirtualTransaction(VirtualTransactionId vxid, int cancel_mode) +{ + ProcArrayStruct *arrayP = procArray; + int index; + pid_t pid = 0; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + VirtualTransactionId procvxid; + PGPROC *proc = arrayP->procs[index]; + + GET_VXID_FROM_PGPROC(procvxid, *proc); + + if (procvxid.backendId == vxid.backendId && + procvxid.localTransactionId == vxid.localTransactionId) + { + /* + * Issue orders for the proc to read next time it receives SIGINT + */ + if (proc->recoveryConflictMode < cancel_mode) + proc->recoveryConflictMode = cancel_mode; + + pid = proc->pid; + break; + } + } + + LWLockRelease(ProcArrayLock); + + if (pid != 0) + { + /* + * Kill the pid if it's still here. If not, that's what we wanted + * so ignore any errors. + */ + kill(pid, SIGINT); + } + + return pid; +} /* * CountActiveBackends --- count backends (other than myself) that are in @@ -1400,3 +2045,457 @@ DisplayXidCache(void) } #endif /* XIDCACHE_DEBUG */ + +/* ---------------------------------------------- + * KnownAssignedTransactions sub-module + * ---------------------------------------------- + */ + +/* + * In Hot Standby mode, we maintain a list of transactions that are (or were) + * running in the master at the current point in WAL. + * + * RecordKnownAssignedTransactionIds() should be run for *every* WAL record + * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first + * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses + * local variables, so should only be called by Startup process. + * + * We record all xids that we know have been assigned. That includes + * all the xids on the WAL record, plus all unobserved xids that + * we can deduce have been assigned. We can deduce the existence of + * unobserved xids because we know xids are in sequence, with no gaps. + * + * During recovery we do not fret too much about the distinction between + * top-level xids and subtransaction xids. We hold both together in + * a hash table called KnownAssignedXids. In backends, this is copied into + * snapshots in GetSnapshotData(), taking advantage + * of the fact that XidInMVCCSnapshot() doesn't care about the distinction + * either. Subtransaction xids are effectively treated as top-level xids + * and in the typical case pg_subtrans is *not* maintained (and that + * does not effect visibility). + * + * KnownAssignedXids expands as new xids are observed or inferred, and + * contracts when transaction completion records arrive. We have room in a + * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so + * every transaction must report their subtransaction xids in a special + * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us + * to remove the subtransaction xids and update pg_subtrans instead. Snapshots + * are still correct yet we don't overflow SnapshotData structure. When we do + * this we need + * to keep track of which xids caused the snapshot to overflow. We do that + * by simply tracking the lastOverflowedXid - if it is within the bounds of + * the KnownAssignedXids then we know the snapshot overflowed. (Note that + * subxid overflow occurs on primary when 65th subxid arrives, whereas on + * standby it occurs when 64th subxid arrives - that is not an error). + * + * Should FATAL errors result in a backend on primary disappearing before + * it can write an abort record then we just leave those xids in + * KnownAssignedXids. They actually aborted but we think they were running; + * the distinction is irrelevant because either way any changes done by the + * transaction are not visible to backends in the standby. + * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to + * ensure we do not overflow. + * + * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove + * xids that are not present. + */ +void +RecordKnownAssignedTransactionIds(TransactionId xid) +{ + /* + * Skip processing if the current snapshot is not initialized. + */ + if (standbyState < STANDBY_SNAPSHOT_PENDING) + return; + + /* + * We can see WAL records before the running-xacts snapshot that + * contain XIDs that are not in the running-xacts snapshot, but that we + * know to have finished before the running-xacts snapshot was taken. + * Don't waste precious shared memory by keeping them in the hash table. + * + * We can also see WAL records before the running-xacts snapshot that + * contain XIDs that are not in the running-xacts snapshot for a different + * reason: the transaction started *after* the running-xacts snapshot + * was taken, but before it was written to WAL. We must be careful to + * not ignore such XIDs. Because such a transaction started after the + * running-xacts snapshot was taken, it must have an XID larger than + * the oldest XID according to the running-xacts snapshot. + */ + if (TransactionIdPrecedes(xid, snapshotOldestActiveXid)) + return; + + ereport(trace_recovery(DEBUG4), + (errmsg("record known xact %u latestObservedXid %u", + xid, latestObservedXid))); + + /* + * When a newly observed xid arrives, it is frequently the case + * that it is *not* the next xid in sequence. When this occurs, we + * must treat the intervening xids as running also. + */ + if (TransactionIdFollows(xid, latestObservedXid)) + { + TransactionId next_expected_xid = latestObservedXid; + TransactionIdAdvance(next_expected_xid); + + /* + * Locking requirement is currently higher than for xid assignment + * in normal running. However, we only get called here for new + * high xids - so on a multi-processor where it is common that xids + * arrive out of order the average number of locks per assignment + * will actually reduce. So not too worried about this locking. + * + * XXX It does seem possible that we could add a whole range + * of numbers atomically to KnownAssignedXids, if we use a sorted + * list for KnownAssignedXids. But that design also increases the + * length of time we hold lock when we process commits/aborts, so + * on balance don't worry about this. + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + while (TransactionIdPrecedesOrEquals(next_expected_xid, xid)) + { + if (TransactionIdPrecedes(next_expected_xid, xid)) + ereport(trace_recovery(DEBUG4), + (errmsg("recording unobserved xid %u (latestObservedXid %u)", + next_expected_xid, latestObservedXid))); + KnownAssignedXidsAdd(&next_expected_xid, 1); + + /* + * Extend clog and subtrans like we do in GetNewTransactionId() + * during normal operation + */ + ExtendCLOG(next_expected_xid); + ExtendSUBTRANS(next_expected_xid); + + TransactionIdAdvance(next_expected_xid); + } + + LWLockRelease(ProcArrayLock); + + latestObservedXid = xid; + } + + /* nextXid must be beyond any observed xid */ + if (TransactionIdFollowsOrEquals(latestObservedXid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = latestObservedXid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } +} + +void +ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, + TransactionId *subxids) +{ + int i; + TransactionId max_xid; + + if (standbyState == STANDBY_DISABLED) + return; + + max_xid = TransactionIdLatest(xid, nsubxids, subxids); + + /* + * Uses same locking as transaction commit + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + if (TransactionIdIsValid(xid)) + KnownAssignedXidsRemove(xid); + for (i = 0; i < nsubxids; i++) + KnownAssignedXidsRemove(subxids[i]); + + /* Like in ProcArrayRemove, advance latestCompletedXid */ + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->latestCompletedXid)) + ShmemVariableCache->latestCompletedXid = max_xid; + + LWLockRelease(ProcArrayLock); +} + +void +ExpireAllKnownAssignedTransactionIds(void) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + KnownAssignedXidsRemoveMany(InvalidTransactionId, false); + LWLockRelease(ProcArrayLock); +} + +void +ExpireOldKnownAssignedTransactionIds(TransactionId xid) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + KnownAssignedXidsRemoveMany(xid, true); + LWLockRelease(ProcArrayLock); +} + +/* + * Private module functions to manipulate KnownAssignedXids + * + * There are 3 main users of the KnownAssignedXids data structure: + * + * * backends taking snapshots + * * startup process adding new knownassigned xids + * * startup process removing xids as transactions end + * + * If we make KnownAssignedXids a simple sorted array then the first two + * operations are fast, but the last one is at least O(N). If we make + * KnownAssignedXids a hash table then the last two operations are fast, + * though we have to do more work at snapshot time. Doing more work at + * commit could slow down taking snapshots anyway because of lwlock + * contention. Scanning the hash table is O(N) on the max size of the array, + * so performs poorly in comparison when we have very low numbers of + * write transactions to process. But at least it is constant overhead + * and a sequential memory scan will utilise hardware memory readahead + * to give much improved performance. In any case the emphasis must be on + * having the standby process changes quickly so that it can provide + * high availability. So we choose to implement as a hash table. + */ + +static Size +KnownAssignedXidsShmemSize(int size) +{ + return hash_estimate_size(size, sizeof(TransactionId)); +} + +static void +KnownAssignedXidsInit(int size) +{ + HASHCTL info; + + /* assume no locking is needed yet */ + + info.keysize = sizeof(TransactionId); + info.entrysize = sizeof(TransactionId); + info.hash = tag_hash; + + KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash", + size, size, + &info, + HASH_ELEM | HASH_FUNCTION); + + if (!KnownAssignedXidsHash) + elog(FATAL, "could not initialize known assigned xids hash table"); + + procArray->numKnownAssignedXids = 0; + procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS; + procArray->lastOverflowedXid = InvalidTransactionId; +} + +/* + * Add xids into KnownAssignedXids. + * + * Must be called while holding ProcArrayLock in Exclusive mode + */ +static void +KnownAssignedXidsAdd(TransactionId *xids, int nxids) +{ + TransactionId *result; + bool found; + int i; + + for (i = 0; i < nxids; i++) + { + Assert(TransactionIdIsValid(xids[i])); + + elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]); + + procArray->numKnownAssignedXids++; + if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids) + { + KnownAssignedXidsDisplay(LOG); + LWLockRelease(ProcArrayLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("too many KnownAssignedXids"))); + } + + result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER, + &found); + + if (!result) + { + LWLockRelease(ProcArrayLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"))); + } + + if (found) + { + KnownAssignedXidsDisplay(LOG); + LWLockRelease(ProcArrayLock); + elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]); + } + } +} + +/* + * Is an xid present in KnownAssignedXids? + * + * Must be called while holding ProcArrayLock in shared mode + */ +static bool +KnownAssignedXidsExist(TransactionId xid) +{ + bool found; + (void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found); + return found; +} + +/* + * Remove one xid from anywhere in KnownAssignedXids. + * + * Must be called while holding ProcArrayLock in Exclusive mode + */ +static void +KnownAssignedXidsRemove(TransactionId xid) +{ + bool found; + + Assert(TransactionIdIsValid(xid)); + + elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid); + + (void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found); + + if (found) + procArray->numKnownAssignedXids--; + Assert(procArray->numKnownAssignedXids >= 0); + + /* + * We can fail to find an xid if the xid came from a subtransaction + * that aborts, though the xid hadn't yet been reported and no WAL records + * have been written using the subxid. In that case the abort record will + * contain that subxid and we haven't seen it before. + * + * If we fail to find it for other reasons it might be a problem, but + * it isn't much use to log that it happened, since we can't divine much + * from just an isolated xid value. + */ +} + +/* + * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids. + * We filter out anything higher than xmax. + * + * Must be called while holding ProcArrayLock (in shared mode) + */ +static int +KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax) +{ + TransactionId xtmp = InvalidTransactionId; + + return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax); +} + +/* + * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin + * to the lowest xid value seen if not already lower. + * + * Must be called while holding ProcArrayLock (in shared mode) + */ +static int +KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, + TransactionId xmax) +{ + HASH_SEQ_STATUS status; + TransactionId *knownXid; + int count = 0; + + hash_seq_init(&status, KnownAssignedXidsHash); + while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL) + { + /* + * Filter out anything higher than xmax + */ + if (TransactionIdPrecedes(xmax, *knownXid)) + continue; + + *xarray = *knownXid; + xarray++; + count++; + + /* update xmin if required */ + if (TransactionIdPrecedes(*knownXid, *xmin)) + *xmin = *knownXid; + } + + return count; +} + +/* + * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid + * then clear the whole table. + * + * Must be called while holding ProcArrayLock in Exclusive mode. + */ +static void +KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts) +{ + TransactionId *knownXid; + HASH_SEQ_STATUS status; + + if (TransactionIdIsValid(xid)) + elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid); + else + elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids"); + + hash_seq_init(&status, KnownAssignedXidsHash); + while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL) + { + TransactionId removeXid = *knownXid; + bool found; + + if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid)) + { + if (keepPreparedXacts && StandbyTransactionIdIsPrepared(xid)) + continue; + else + { + (void) hash_search(KnownAssignedXidsHash, &removeXid, + HASH_REMOVE, &found); + if (found) + procArray->numKnownAssignedXids--; + Assert(procArray->numKnownAssignedXids >= 0); + } + } + } +} + +/* + * Display KnownAssignedXids to provide debug trail + * + * Must be called while holding ProcArrayLock (in shared mode) + */ +void +KnownAssignedXidsDisplay(int trace_level) +{ + HASH_SEQ_STATUS status; + TransactionId *knownXid; + StringInfoData buf; + TransactionId *xids; + int nxids; + int i; + + xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS); + nxids = 0; + + hash_seq_init(&status, KnownAssignedXidsHash); + while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL) + xids[nxids++] = *knownXid; + + qsort(xids, nxids, sizeof(TransactionId), xidComparator); + + initStringInfo(&buf); + + for (i = 0; i < nxids; i++) + appendStringInfo(&buf, "%u ", xids[i]); + + elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data); + + pfree(buf.data); +} |