aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc/procarray.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc/procarray.c')
-rw-r--r--src/backend/storage/ipc/procarray.c1127
1 files changed, 1113 insertions, 14 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9a3d2f62606..c4ddf8f2bd8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -17,13 +17,27 @@
* as are the myProcLocks lists. They can be distinguished from regular
* backend PGPROCs at need by checking for pid == 0.
*
+ * During recovery, we also keep a list of XIDs representing transactions
+ * that are known to be running at current point in WAL recovery. This
+ * list is kept in the KnownAssignedXids array, and updated by watching
+ * the sequence of arriving xids. This is very important because if we leave
+ * those xids out of the snapshot then they will appear to be already complete.
+ * Later, when they have actually completed this could lead to confusion as to
+ * whether those xids are visible or not, blowing a huge hole in MVCC.
+ * We need 'em.
+ *
+ * It is theoretically possible for a FATAL error to explode before writing
+ * an abort record. This could tie up KnownAssignedXids indefinitely, so
+ * we prune the array when a valid list of running xids arrives. These quirks,
+ * if they do ever exist in reality will not effect the correctness of
+ * snapshots.
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.51 2009/07/29 15:57:11 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.52 2009/12/19 01:32:35 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,14 +45,18 @@
#include <signal.h>
+#include "access/clog.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/twophase.h"
#include "miscadmin.h"
#include "storage/procarray.h"
+#include "storage/standby.h"
+#include "utils/builtins.h"
#include "utils/snapmgr.h"
+static RunningTransactionsData CurrentRunningXactsData;
/* Our shared memory area */
typedef struct ProcArrayStruct
@@ -46,6 +64,14 @@ typedef struct ProcArrayStruct
int numProcs; /* number of valid procs entries */
int maxProcs; /* allocated size of procs array */
+ int numKnownAssignedXids; /* current number of known assigned xids */
+ int maxKnownAssignedXids; /* allocated size of known assigned xids */
+ /*
+ * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ * overflowing cached subxids in PGPROC entries.
+ */
+ TransactionId lastOverflowedXid;
+
/*
* We declare procs[] as 1 entry because C wants a fixed-size array, but
* actually it is maxProcs entries long.
@@ -55,6 +81,24 @@ typedef struct ProcArrayStruct
static ProcArrayStruct *procArray;
+/*
+ * Bookkeeping for tracking emulated transactions in recovery
+ */
+static HTAB *KnownAssignedXidsHash;
+static TransactionId latestObservedXid = InvalidTransactionId;
+
+/*
+ * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
+ * the highest xid that might still be running that we don't have in
+ * KnownAssignedXids.
+ */
+static TransactionId standbySnapshotPendingXmin;
+
+/*
+ * Oldest transaction still running according to the running-xacts snapshot
+ * we initialized standby mode from.
+ */
+static TransactionId snapshotOldestActiveXid;
#ifdef XIDCACHE_DEBUG
@@ -90,6 +134,17 @@ static void DisplayXidCache(void);
#define xc_slow_answer_inc() ((void) 0)
#endif /* XIDCACHE_DEBUG */
+/* Primitives for KnownAssignedXids array handling for standby */
+static Size KnownAssignedXidsShmemSize(int size);
+static void KnownAssignedXidsInit(int size);
+static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
+static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax);
+static bool KnownAssignedXidsExist(TransactionId xid);
+static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
+static void KnownAssignedXidsRemove(TransactionId xid);
+static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
+static void KnownAssignedXidsDisplay(int trace_level);
/*
* Report shared-memory space needed by CreateSharedProcArray.
@@ -100,8 +155,22 @@ ProcArrayShmemSize(void)
Size size;
size = offsetof(ProcArrayStruct, procs);
- size = add_size(size, mul_size(sizeof(PGPROC *),
- add_size(MaxBackends, max_prepared_xacts)));
+
+ /* Normal processing - MyProc slots */
+#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
+ size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+
+ /*
+ * During recovery processing we have a data structure called KnownAssignedXids,
+ * created in shared memory. Local data structures are also created in various
+ * backends during GetSnapshotData(), TransactionIdIsInProgress() and
+ * GetRunningTransactionData(). All of the main structures created in those
+ * functions must be identically sized, since we may at times copy the whole
+ * of the data structures around. We refer to this as TOTAL_MAX_CACHED_SUBXIDS.
+ */
+#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
+ if (XLogRequestRecoveryConnections)
+ size = add_size(size, KnownAssignedXidsShmemSize(TOTAL_MAX_CACHED_SUBXIDS));
return size;
}
@@ -116,15 +185,21 @@ CreateSharedProcArray(void)
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
- ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
+ ShmemInitStruct("Proc Array",
+ mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
+ &found);
if (!found)
{
/*
* We're the first - initialize.
*/
+ /* Normal processing */
procArray->numProcs = 0;
- procArray->maxProcs = MaxBackends + max_prepared_xacts;
+ procArray->maxProcs = PROCARRAY_MAXPROCS;
+
+ if (XLogRequestRecoveryConnections)
+ KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS);
}
}
@@ -302,6 +377,7 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
+ proc->recoveryConflictMode = 0;
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
@@ -312,6 +388,220 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->subxids.overflowed = false;
}
+void
+ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
+{
+ snapshotOldestActiveXid = oldestActiveXid;
+}
+
+/*
+ * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
+ *
+ * Takes us through 3 states: Uninitialized, Pending and Ready.
+ * Normal case is to go all the way to Ready straight away, though there
+ * are atypical cases where we need to take it in steps.
+ *
+ * Use the data about running transactions on master to create the initial
+ * state of KnownAssignedXids. We also these records to regularly prune
+ * KnownAssignedXids because we know it is possible that some transactions
+ * with FATAL errors do not write abort records, which could cause eventual
+ * overflow.
+ *
+ * Only used during recovery. Notice the signature is very similar to a
+ * _redo function and its difficult to decide exactly where this code should
+ * reside.
+ */
+void
+ProcArrayApplyRecoveryInfo(RunningTransactions running)
+{
+ int xid_index; /* main loop */
+ TransactionId *xids;
+ int nxids;
+
+ Assert(standbyState >= STANDBY_INITIALIZED);
+
+ /*
+ * Remove stale transactions, if any.
+ */
+ ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
+ StandbyReleaseOldLocks(running->oldestRunningXid);
+
+ /*
+ * If our snapshot is already valid, nothing else to do...
+ */
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ return;
+
+ /*
+ * If our initial RunningXactData had an overflowed snapshot then we
+ * knew we were missing some subxids from our snapshot. We can use
+ * this data as an initial snapshot, but we cannot yet mark it valid.
+ * We know that the missing subxids are equal to or earlier than
+ * nextXid. After we initialise we continue to apply changes during
+ * recovery, so once the oldestRunningXid is later than the nextXid
+ * from the initial snapshot we know that we no longer have missing
+ * information and can mark the snapshot as valid.
+ */
+ if (standbyState == STANDBY_SNAPSHOT_PENDING)
+ {
+ if (TransactionIdPrecedes(standbySnapshotPendingXmin,
+ running->oldestRunningXid))
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ elog(trace_recovery(DEBUG2),
+ "running xact data now proven complete");
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+ }
+ return;
+ }
+
+ /*
+ * OK, we need to initialise from the RunningXactData record
+ */
+ latestObservedXid = running->nextXid;
+ TransactionIdRetreat(latestObservedXid);
+
+ /*
+ * If the snapshot overflowed, then we still initialise with what we
+ * know, but the recovery snapshot isn't fully valid yet because we
+ * know there are some subxids missing (ergo we don't know which ones)
+ */
+ if (!running->subxid_overflow)
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ standbySnapshotPendingXmin = InvalidTransactionId;
+ }
+ else
+ {
+ standbyState = STANDBY_SNAPSHOT_PENDING;
+ standbySnapshotPendingXmin = latestObservedXid;
+ ereport(LOG,
+ (errmsg("consistent state delayed because recovery snapshot incomplete")));
+ }
+
+ nxids = running->xcnt;
+ xids = running->xids;
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Scan through the incoming array of RunningXacts and collect xids.
+ * We don't use SubtransSetParent because it doesn't matter yet. If
+ * we aren't overflowed then all xids will fit in snapshot and so we
+ * don't need subtrans. If we later overflow, an xid assignment record
+ * will add xids to subtrans. If RunningXacts is overflowed then we
+ * don't have enough information to correctly update subtrans anyway.
+ */
+
+ /*
+ * Nobody else is running yet, but take locks anyhow
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /* Reset latestCompletedXid */
+ ShmemVariableCache->latestCompletedXid = running->nextXid;
+ TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+
+ /*
+ * Add our new xids into the array
+ */
+ for (xid_index = 0; xid_index < running->xcnt; xid_index++)
+ {
+ TransactionId xid = running->xids[xid_index];
+
+ /*
+ * The running-xacts snapshot can contain xids that did finish between
+ * when the snapshot was taken and when it was written to WAL. Such
+ * transactions are not running anymore, so ignore them.
+ */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ continue;
+
+ KnownAssignedXidsAdd(&xid, 1);
+ }
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Update lastOverflowedXid if the snapshot had overflown. We don't know
+ * the exact value for this, so conservatively assume that it's nextXid-1
+ */
+ if (running->subxid_overflow &&
+ TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = latestObservedXid;
+ else if (TransactionIdFollows(running->oldestRunningXid,
+ procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = InvalidTransactionId;
+
+ LWLockRelease(ProcArrayLock);
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
+ ShmemVariableCache->nextXid = running->nextXid;
+
+ elog(trace_recovery(DEBUG2),
+ "running transaction data initialized");
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+}
+
+void
+ProcArrayApplyXidAssignment(TransactionId topxid,
+ int nsubxids, TransactionId *subxids)
+{
+ TransactionId max_xid;
+ int i;
+
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
+
+ /*
+ * Mark all the subtransactions as observed.
+ *
+ * NOTE: This will fail if the subxid contains too many previously
+ * unobserved xids to fit into known-assigned-xids. That shouldn't happen
+ * as the code stands, because xid-assignment records should never contain
+ * more than PGPROC_MAX_CACHED_SUBXIDS entries.
+ */
+ RecordKnownAssignedTransactionIds(max_xid);
+
+ /*
+ * Notice that we update pg_subtrans with the top-level xid, rather
+ * than the parent xid. This is a difference between normal
+ * processing and recovery, yet is still correct in all cases. The
+ * reason is that subtransaction commit is not marked in clog until
+ * commit processing, so all aborted subtransactions have already been
+ * clearly marked in clog. As a result we are able to refer directly
+ * to the top-level transaction's state rather than skipping through
+ * all the intermediate states in the subtransaction tree. This
+ * should be the first time we have attempted to SubTransSetParent().
+ */
+ for (i = 0; i < nsubxids; i++)
+ SubTransSetParent(subxids[i], topxid, false);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove from known-assigned-xacts.
+ */
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /*
+ * Advance lastOverflowedXid when required.
+ */
+ if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
+ procArray->lastOverflowedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
/*
* TransactionIdIsInProgress -- is given transaction running in some backend
@@ -384,8 +674,15 @@ TransactionIdIsInProgress(TransactionId xid)
*/
if (xids == NULL)
{
- xids = (TransactionId *)
- malloc(arrayP->maxProcs * sizeof(TransactionId));
+ /*
+ * In hot standby mode, reserve enough space to hold all xids in
+ * the known-assigned list. If we later finish recovery, we no longer
+ * need the bigger array, but we don't bother to shrink it.
+ */
+ int maxxids = RecoveryInProgress() ?
+ arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS;
+
+ xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
if (xids == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -465,11 +762,35 @@ TransactionIdIsInProgress(TransactionId xid)
xids[nxids++] = pxid;
}
+ /* In hot standby mode, check the known-assigned-xids list. */
+ if (RecoveryInProgress())
+ {
+ /* none of the PGPROC entries should have XIDs in hot standby mode */
+ Assert(nxids == 0);
+
+ if (KnownAssignedXidsExist(xid))
+ {
+ LWLockRelease(ProcArrayLock);
+ /* XXX: should we have a separate counter for this? */
+ /* xc_by_main_xid_inc(); */
+ return true;
+ }
+
+ /*
+ * If the KnownAssignedXids overflowed, we have to check
+ * pg_subtrans too. Copy all xids from KnownAssignedXids that are
+ * lower than xid, since if xid is a subtransaction its parent will
+ * always have a lower value.
+ */
+ if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
+ nxids = KnownAssignedXidsGet(xids, xid);
+ }
+
LWLockRelease(ProcArrayLock);
/*
* If none of the relevant caches overflowed, we know the Xid is not
- * running without looking at pg_subtrans.
+ * running without even looking at pg_subtrans.
*/
if (nxids == 0)
{
@@ -590,6 +911,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
TransactionId result;
int index;
+ /* Cannot look for individual databases during recovery */
+ Assert(allDbs || !RecoveryInProgress());
+
LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
@@ -635,6 +959,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
LWLockRelease(ProcArrayLock);
+ /*
+ * Compute the cutoff XID, being careful not to generate a "permanent" XID
+ */
+ result -= vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(result))
+ result = FirstNormalTransactionId;
+
return result;
}
@@ -656,7 +987,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
* but since PGPROC has only a limited cache area for subxact XIDs, full
* information may not be available. If we find any overflowed subxid arrays,
* we have to mark the snapshot's subxid data as overflowed, and extra work
- * will need to be done to determine what's running (see XidInMVCCSnapshot()
+ * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
* in tqual.c).
*
* We also update the following backend-global variables:
@@ -681,6 +1012,7 @@ GetSnapshotData(Snapshot snapshot)
int index;
int count = 0;
int subcount = 0;
+ bool suboverflowed = false;
Assert(snapshot != NULL);
@@ -698,7 +1030,8 @@ GetSnapshotData(Snapshot snapshot)
if (snapshot->xip == NULL)
{
/*
- * First call for this snapshot
+ * First call for this snapshot. Snapshot is same size whether
+ * or not we are in recovery, see later comments.
*/
snapshot->xip = (TransactionId *)
malloc(arrayP->maxProcs * sizeof(TransactionId));
@@ -708,13 +1041,15 @@ GetSnapshotData(Snapshot snapshot)
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)
- malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ snapshot->takenDuringRecovery = RecoveryInProgress();
+
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
@@ -763,6 +1098,7 @@ GetSnapshotData(Snapshot snapshot)
*/
if (TransactionIdIsNormal(xid))
{
+ Assert(!snapshot->takenDuringRecovery);
if (TransactionIdFollowsOrEquals(xid, xmax))
continue;
if (proc != MyProc)
@@ -785,16 +1121,17 @@ GetSnapshotData(Snapshot snapshot)
*
* Again, our own XIDs are not included in the snapshot.
*/
- if (subcount >= 0 && proc != MyProc)
+ if (!suboverflowed && proc != MyProc)
{
if (proc->subxids.overflowed)
- subcount = -1; /* overflowed */
+ suboverflowed = true;
else
{
int nxids = proc->subxids.nxids;
if (nxids > 0)
{
+ Assert(!snapshot->takenDuringRecovery);
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
@@ -804,6 +1141,40 @@ GetSnapshotData(Snapshot snapshot)
}
}
+ /*
+ * If in recovery get any known assigned xids.
+ */
+ if (snapshot->takenDuringRecovery)
+ {
+ Assert(count == 0);
+
+ /*
+ * We store all xids directly into subxip[]. Here's why:
+ *
+ * In recovery we don't know which xids are top-level and which are
+ * subxacts, a design choice that greatly simplifies xid processing.
+ *
+ * It seems like we would want to try to put xids into xip[] only,
+ * but that is fairly small. We would either need to make that bigger
+ * or to increase the rate at which we WAL-log xid assignment;
+ * neither is an appealing choice.
+ *
+ * We could try to store xids into xip[] first and then into subxip[]
+ * if there are too many xids. That only works if the snapshot doesn't
+ * overflow because we do not search subxip[] in that case. A simpler
+ * way is to just store all xids in the subxact array because this
+ * is by far the bigger array. We just leave the xip array empty.
+ *
+ * Either way we need to change the way XidInMVCCSnapshot() works
+ * depending upon when the snapshot was taken, or change normal
+ * snapshot processing so it matches.
+ */
+ subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax);
+
+ if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid))
+ suboverflowed = true;
+ }
+
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
@@ -818,13 +1189,16 @@ GetSnapshotData(Snapshot snapshot)
globalxmin = xmin;
/* Update global variables too */
- RecentGlobalXmin = globalxmin;
+ RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(RecentGlobalXmin))
+ RecentGlobalXmin = FirstNormalTransactionId;
RecentXmin = xmin;
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
+ snapshot->suboverflowed = suboverflowed;
snapshot->curcid = GetCurrentCommandId(false);
@@ -840,6 +1214,129 @@ GetSnapshotData(Snapshot snapshot)
}
/*
+ * GetRunningTransactionData -- returns information about running transactions.
+ *
+ * Similar to GetSnapshotData but returning more information. We include
+ * all PGPROCs with an assigned TransactionId, even VACUUM processes.
+ *
+ * This is never executed during recovery so there is no need to look at
+ * KnownAssignedXids.
+ *
+ * We don't worry about updating other counters, we want to keep this as
+ * simple as possible and leave GetSnapshotData() as the primary code for
+ * that bookkeeping.
+ */
+RunningTransactions
+GetRunningTransactionData(void)
+{
+ ProcArrayStruct *arrayP = procArray;
+ RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
+ TransactionId latestCompletedXid;
+ TransactionId oldestRunningXid;
+ TransactionId *xids;
+ int index;
+ int count;
+ int subcount;
+ bool suboverflowed;
+
+ Assert(!RecoveryInProgress());
+
+ /*
+ * Allocating space for maxProcs xids is usually overkill; numProcs would
+ * be sufficient. But it seems better to do the malloc while not holding
+ * the lock, so we can't look at numProcs. Likewise, we allocate much
+ * more subxip storage than is probably needed.
+ *
+ * Should only be allocated for bgwriter, since only ever executed
+ * during checkpoints.
+ */
+ if (CurrentRunningXacts->xids == NULL)
+ {
+ /*
+ * First call
+ */
+ CurrentRunningXacts->xids = (TransactionId *)
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ if (CurrentRunningXacts->xids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ xids = CurrentRunningXacts->xids;
+
+ count = subcount = 0;
+ suboverflowed = false;
+
+ /*
+ * Ensure that no xids enter or leave the procarray while we obtain
+ * snapshot.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ LWLockAcquire(XidGenLock, LW_SHARED);
+
+ latestCompletedXid = ShmemVariableCache->latestCompletedXid;
+
+ oldestRunningXid = ShmemVariableCache->nextXid;
+ /*
+ * Spin over procArray collecting all xids and subxids.
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+ TransactionId xid;
+ int nxids;
+
+ /* Fetch xid just once - see GetNewTransactionId */
+ xid = proc->xid;
+
+ /*
+ * We don't need to store transactions that don't have a TransactionId
+ * yet because they will not show as running on a standby server.
+ */
+ if (!TransactionIdIsValid(xid))
+ continue;
+
+ xids[count++] = xid;
+
+ if (TransactionIdPrecedes(xid, oldestRunningXid))
+ oldestRunningXid = xid;
+
+ /*
+ * Save subtransaction XIDs. Other backends can't add or remove entries
+ * while we're holding XidGenLock.
+ */
+ nxids = proc->subxids.nxids;
+ if (nxids > 0)
+ {
+ memcpy(&xids[count], (void *) proc->subxids.xids,
+ nxids * sizeof(TransactionId));
+ count += nxids;
+ subcount += nxids;
+
+ if (proc->subxids.overflowed)
+ suboverflowed = true;
+
+ /*
+ * Top-level XID of a transaction is always greater than any of
+ * its subxids, so we don't need to check if any of the subxids
+ * are smaller than oldestRunningXid
+ */
+ }
+ }
+
+ CurrentRunningXacts->xcnt = count;
+ CurrentRunningXacts->subxid_overflow = suboverflowed;
+ CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
+ CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+
+ LWLockRelease(XidGenLock);
+ LWLockRelease(ProcArrayLock);
+
+ return CurrentRunningXacts;
+}
+
+/*
* GetTransactionsInCommit -- Get the XIDs of transactions that are committing
*
* Constructs an array of XIDs of transactions that are currently in commit
@@ -1101,6 +1598,154 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
return vxids;
}
+/*
+ * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
+ *
+ * The array is palloc'd and is terminated with an invalid VXID.
+ *
+ * Usage is limited to conflict resolution during recovery on standby servers.
+ * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
+ * in cases where we cannot accurately determine a value for latestRemovedXid.
+ * If limitXmin is InvalidTransactionId then we know that the very
+ * latest xid that might have caused a cleanup record will be
+ * latestCompletedXid, so we set limitXmin to be latestCompletedXid instead.
+ * We then skip any backends with xmin > limitXmin. This means that
+ * cleanup records don't conflict with some recent snapshots.
+ *
+ * We replace InvalidTransactionId with latestCompletedXid here because
+ * this is the most convenient place to do that, while we hold ProcArrayLock.
+ * The originator of the cleanup record wanted to avoid checking the value of
+ * latestCompletedXid since doing so would be a performance issue during
+ * normal running, so we check it essentially for free on the standby.
+ *
+ * If dbOid is valid we skip backends attached to other databases. Some
+ * callers choose to skipExistingConflicts.
+ *
+ * Be careful to *not* pfree the result from this function. We reuse
+ * this array sufficiently often that we use malloc for the result.
+ */
+VirtualTransactionId *
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+ bool skipExistingConflicts)
+{
+ static VirtualTransactionId *vxids;
+ ProcArrayStruct *arrayP = procArray;
+ int count = 0;
+ int index;
+
+ /*
+ * If not first time through, get workspace to remember main XIDs in. We
+ * malloc it permanently to avoid repeated palloc/pfree overhead.
+ * Allow result space, remembering room for a terminator.
+ */
+ if (vxids == NULL)
+ {
+ vxids = (VirtualTransactionId *)
+ malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
+ if (vxids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * If we don't know the TransactionId that created the conflict, set
+ * it to latestCompletedXid which is the latest possible value.
+ */
+ if (!TransactionIdIsValid(limitXmin))
+ limitXmin = ShmemVariableCache->latestCompletedXid;
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+
+ /* Exclude prepared transactions */
+ if (proc->pid == 0)
+ continue;
+
+ if (skipExistingConflicts && proc->recoveryConflictMode > 0)
+ continue;
+
+ if (!OidIsValid(dbOid) ||
+ proc->databaseId == dbOid)
+ {
+ /* Fetch xmin just once - can't change on us, but good coding */
+ TransactionId pxmin = proc->xmin;
+
+ /*
+ * We ignore an invalid pxmin because this means that backend
+ * has no snapshot and cannot get another one while we hold exclusive lock.
+ */
+ if (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin))
+ {
+ VirtualTransactionId vxid;
+
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+ if (VirtualTransactionIdIsValid(vxid))
+ vxids[count++] = vxid;
+ }
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ /* add the terminator */
+ vxids[count].backendId = InvalidBackendId;
+ vxids[count].localTransactionId = InvalidLocalTransactionId;
+
+ return vxids;
+}
+
+/*
+ * CancelVirtualTransaction - used in recovery conflict processing
+ *
+ * Returns pid of the process signaled, or 0 if not found.
+ */
+pid_t
+CancelVirtualTransaction(VirtualTransactionId vxid, int cancel_mode)
+{
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+ pid_t pid = 0;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ VirtualTransactionId procvxid;
+ PGPROC *proc = arrayP->procs[index];
+
+ GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+ if (procvxid.backendId == vxid.backendId &&
+ procvxid.localTransactionId == vxid.localTransactionId)
+ {
+ /*
+ * Issue orders for the proc to read next time it receives SIGINT
+ */
+ if (proc->recoveryConflictMode < cancel_mode)
+ proc->recoveryConflictMode = cancel_mode;
+
+ pid = proc->pid;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ if (pid != 0)
+ {
+ /*
+ * Kill the pid if it's still here. If not, that's what we wanted
+ * so ignore any errors.
+ */
+ kill(pid, SIGINT);
+ }
+
+ return pid;
+}
/*
* CountActiveBackends --- count backends (other than myself) that are in
@@ -1400,3 +2045,457 @@ DisplayXidCache(void)
}
#endif /* XIDCACHE_DEBUG */
+
+/* ----------------------------------------------
+ * KnownAssignedTransactions sub-module
+ * ----------------------------------------------
+ */
+
+/*
+ * In Hot Standby mode, we maintain a list of transactions that are (or were)
+ * running in the master at the current point in WAL.
+ *
+ * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
+ * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
+ * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
+ * local variables, so should only be called by Startup process.
+ *
+ * We record all xids that we know have been assigned. That includes
+ * all the xids on the WAL record, plus all unobserved xids that
+ * we can deduce have been assigned. We can deduce the existence of
+ * unobserved xids because we know xids are in sequence, with no gaps.
+ *
+ * During recovery we do not fret too much about the distinction between
+ * top-level xids and subtransaction xids. We hold both together in
+ * a hash table called KnownAssignedXids. In backends, this is copied into
+ * snapshots in GetSnapshotData(), taking advantage
+ * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
+ * either. Subtransaction xids are effectively treated as top-level xids
+ * and in the typical case pg_subtrans is *not* maintained (and that
+ * does not effect visibility).
+ *
+ * KnownAssignedXids expands as new xids are observed or inferred, and
+ * contracts when transaction completion records arrive. We have room in a
+ * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
+ * every transaction must report their subtransaction xids in a special
+ * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
+ * to remove the subtransaction xids and update pg_subtrans instead. Snapshots
+ * are still correct yet we don't overflow SnapshotData structure. When we do
+ * this we need
+ * to keep track of which xids caused the snapshot to overflow. We do that
+ * by simply tracking the lastOverflowedXid - if it is within the bounds of
+ * the KnownAssignedXids then we know the snapshot overflowed. (Note that
+ * subxid overflow occurs on primary when 65th subxid arrives, whereas on
+ * standby it occurs when 64th subxid arrives - that is not an error).
+ *
+ * Should FATAL errors result in a backend on primary disappearing before
+ * it can write an abort record then we just leave those xids in
+ * KnownAssignedXids. They actually aborted but we think they were running;
+ * the distinction is irrelevant because either way any changes done by the
+ * transaction are not visible to backends in the standby.
+ * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
+ * ensure we do not overflow.
+ *
+ * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
+ * xids that are not present.
+ */
+void
+RecordKnownAssignedTransactionIds(TransactionId xid)
+{
+ /*
+ * Skip processing if the current snapshot is not initialized.
+ */
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ /*
+ * We can see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot, but that we
+ * know to have finished before the running-xacts snapshot was taken.
+ * Don't waste precious shared memory by keeping them in the hash table.
+ *
+ * We can also see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot for a different
+ * reason: the transaction started *after* the running-xacts snapshot
+ * was taken, but before it was written to WAL. We must be careful to
+ * not ignore such XIDs. Because such a transaction started after the
+ * running-xacts snapshot was taken, it must have an XID larger than
+ * the oldest XID according to the running-xacts snapshot.
+ */
+ if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
+ return;
+
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("record known xact %u latestObservedXid %u",
+ xid, latestObservedXid)));
+
+ /*
+ * When a newly observed xid arrives, it is frequently the case
+ * that it is *not* the next xid in sequence. When this occurs, we
+ * must treat the intervening xids as running also.
+ */
+ if (TransactionIdFollows(xid, latestObservedXid))
+ {
+ TransactionId next_expected_xid = latestObservedXid;
+ TransactionIdAdvance(next_expected_xid);
+
+ /*
+ * Locking requirement is currently higher than for xid assignment
+ * in normal running. However, we only get called here for new
+ * high xids - so on a multi-processor where it is common that xids
+ * arrive out of order the average number of locks per assignment
+ * will actually reduce. So not too worried about this locking.
+ *
+ * XXX It does seem possible that we could add a whole range
+ * of numbers atomically to KnownAssignedXids, if we use a sorted
+ * list for KnownAssignedXids. But that design also increases the
+ * length of time we hold lock when we process commits/aborts, so
+ * on balance don't worry about this.
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
+ {
+ if (TransactionIdPrecedes(next_expected_xid, xid))
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("recording unobserved xid %u (latestObservedXid %u)",
+ next_expected_xid, latestObservedXid)));
+ KnownAssignedXidsAdd(&next_expected_xid, 1);
+
+ /*
+ * Extend clog and subtrans like we do in GetNewTransactionId()
+ * during normal operation
+ */
+ ExtendCLOG(next_expected_xid);
+ ExtendSUBTRANS(next_expected_xid);
+
+ TransactionIdAdvance(next_expected_xid);
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ latestObservedXid = xid;
+ }
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollowsOrEquals(latestObservedXid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = latestObservedXid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+}
+
+void
+ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
+ TransactionId *subxids)
+{
+ int i;
+ TransactionId max_xid;
+
+ if (standbyState == STANDBY_DISABLED)
+ return;
+
+ max_xid = TransactionIdLatest(xid, nsubxids, subxids);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ if (TransactionIdIsValid(xid))
+ KnownAssignedXidsRemove(xid);
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /* Like in ProcArrayRemove, advance latestCompletedXid */
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->latestCompletedXid))
+ ShmemVariableCache->latestCompletedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireAllKnownAssignedTransactionIds(void)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireOldKnownAssignedTransactionIds(TransactionId xid)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(xid, true);
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * Private module functions to manipulate KnownAssignedXids
+ *
+ * There are 3 main users of the KnownAssignedXids data structure:
+ *
+ * * backends taking snapshots
+ * * startup process adding new knownassigned xids
+ * * startup process removing xids as transactions end
+ *
+ * If we make KnownAssignedXids a simple sorted array then the first two
+ * operations are fast, but the last one is at least O(N). If we make
+ * KnownAssignedXids a hash table then the last two operations are fast,
+ * though we have to do more work at snapshot time. Doing more work at
+ * commit could slow down taking snapshots anyway because of lwlock
+ * contention. Scanning the hash table is O(N) on the max size of the array,
+ * so performs poorly in comparison when we have very low numbers of
+ * write transactions to process. But at least it is constant overhead
+ * and a sequential memory scan will utilise hardware memory readahead
+ * to give much improved performance. In any case the emphasis must be on
+ * having the standby process changes quickly so that it can provide
+ * high availability. So we choose to implement as a hash table.
+ */
+
+static Size
+KnownAssignedXidsShmemSize(int size)
+{
+ return hash_estimate_size(size, sizeof(TransactionId));
+}
+
+static void
+KnownAssignedXidsInit(int size)
+{
+ HASHCTL info;
+
+ /* assume no locking is needed yet */
+
+ info.keysize = sizeof(TransactionId);
+ info.entrysize = sizeof(TransactionId);
+ info.hash = tag_hash;
+
+ KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
+ size, size,
+ &info,
+ HASH_ELEM | HASH_FUNCTION);
+
+ if (!KnownAssignedXidsHash)
+ elog(FATAL, "could not initialize known assigned xids hash table");
+
+ procArray->numKnownAssignedXids = 0;
+ procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
+ procArray->lastOverflowedXid = InvalidTransactionId;
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId *xids, int nxids)
+{
+ TransactionId *result;
+ bool found;
+ int i;
+
+ for (i = 0; i < nxids; i++)
+ {
+ Assert(TransactionIdIsValid(xids[i]));
+
+ elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
+
+ procArray->numKnownAssignedXids++;
+ if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("too many KnownAssignedXids")));
+ }
+
+ result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
+ &found);
+
+ if (!result)
+ {
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ }
+
+ if (found)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
+ }
+ }
+}
+
+/*
+ * Is an xid present in KnownAssignedXids?
+ *
+ * Must be called while holding ProcArrayLock in shared mode
+ */
+static bool
+KnownAssignedXidsExist(TransactionId xid)
+{
+ bool found;
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
+ return found;
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid)
+{
+ bool found;
+
+ Assert(TransactionIdIsValid(xid));
+
+ elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
+
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+
+ /*
+ * We can fail to find an xid if the xid came from a subtransaction
+ * that aborts, though the xid hadn't yet been reported and no WAL records
+ * have been written using the subxid. In that case the abort record will
+ * contain that subxid and we haven't seen it before.
+ *
+ * If we fail to find it for other reasons it might be a problem, but
+ * it isn't much use to log that it happened, since we can't divine much
+ * from just an isolated xid value.
+ */
+}
+
+/*
+ * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
+ * We filter out anything higher than xmax.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
+{
+ TransactionId xtmp = InvalidTransactionId;
+
+ return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
+}
+
+/*
+ * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
+ * to the lowest xid value seen if not already lower.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ int count = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ /*
+ * Filter out anything higher than xmax
+ */
+ if (TransactionIdPrecedes(xmax, *knownXid))
+ continue;
+
+ *xarray = *knownXid;
+ xarray++;
+ count++;
+
+ /* update xmin if required */
+ if (TransactionIdPrecedes(*knownXid, *xmin))
+ *xmin = *knownXid;
+ }
+
+ return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
+{
+ TransactionId *knownXid;
+ HASH_SEQ_STATUS status;
+
+ if (TransactionIdIsValid(xid))
+ elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
+ else
+ elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ TransactionId removeXid = *knownXid;
+ bool found;
+
+ if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
+ {
+ if (keepPreparedXacts && StandbyTransactionIdIsPrepared(xid))
+ continue;
+ else
+ {
+ (void) hash_search(KnownAssignedXidsHash, &removeXid,
+ HASH_REMOVE, &found);
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+ }
+ }
+ }
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+void
+KnownAssignedXidsDisplay(int trace_level)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ StringInfoData buf;
+ TransactionId *xids;
+ int nxids;
+ int i;
+
+ xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
+ nxids = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ xids[nxids++] = *knownXid;
+
+ qsort(xids, nxids, sizeof(TransactionId), xidComparator);
+
+ initStringInfo(&buf);
+
+ for (i = 0; i < nxids; i++)
+ appendStringInfo(&buf, "%u ", xids[i]);
+
+ elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
+
+ pfree(buf.data);
+}