aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/Makefile4
-rw-r--r--src/backend/storage/ipc/procarray.c1127
-rw-r--r--src/backend/storage/ipc/sinvaladt.c18
-rw-r--r--src/backend/storage/ipc/standby.c717
4 files changed, 1846 insertions, 20 deletions
diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index 20ac1e75e45..1d897c5afba 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -1,7 +1,7 @@
#
# Makefile for storage/ipc
#
-# $PostgreSQL: pgsql/src/backend/storage/ipc/Makefile,v 1.22 2009/07/31 20:26:23 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/storage/ipc/Makefile,v 1.23 2009/12/19 01:32:35 sriggs Exp $
#
subdir = src/backend/storage/ipc
@@ -16,6 +16,6 @@ endif
endif
OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
- sinval.o sinvaladt.o
+ sinval.o sinvaladt.o standby.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9a3d2f62606..c4ddf8f2bd8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -17,13 +17,27 @@
* as are the myProcLocks lists. They can be distinguished from regular
* backend PGPROCs at need by checking for pid == 0.
*
+ * During recovery, we also keep a list of XIDs representing transactions
+ * that are known to be running at current point in WAL recovery. This
+ * list is kept in the KnownAssignedXids array, and updated by watching
+ * the sequence of arriving xids. This is very important because if we leave
+ * those xids out of the snapshot then they will appear to be already complete.
+ * Later, when they have actually completed this could lead to confusion as to
+ * whether those xids are visible or not, blowing a huge hole in MVCC.
+ * We need 'em.
+ *
+ * It is theoretically possible for a FATAL error to explode before writing
+ * an abort record. This could tie up KnownAssignedXids indefinitely, so
+ * we prune the array when a valid list of running xids arrives. These quirks,
+ * if they do ever exist in reality will not effect the correctness of
+ * snapshots.
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.51 2009/07/29 15:57:11 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.52 2009/12/19 01:32:35 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,14 +45,18 @@
#include <signal.h>
+#include "access/clog.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/twophase.h"
#include "miscadmin.h"
#include "storage/procarray.h"
+#include "storage/standby.h"
+#include "utils/builtins.h"
#include "utils/snapmgr.h"
+static RunningTransactionsData CurrentRunningXactsData;
/* Our shared memory area */
typedef struct ProcArrayStruct
@@ -46,6 +64,14 @@ typedef struct ProcArrayStruct
int numProcs; /* number of valid procs entries */
int maxProcs; /* allocated size of procs array */
+ int numKnownAssignedXids; /* current number of known assigned xids */
+ int maxKnownAssignedXids; /* allocated size of known assigned xids */
+ /*
+ * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ * overflowing cached subxids in PGPROC entries.
+ */
+ TransactionId lastOverflowedXid;
+
/*
* We declare procs[] as 1 entry because C wants a fixed-size array, but
* actually it is maxProcs entries long.
@@ -55,6 +81,24 @@ typedef struct ProcArrayStruct
static ProcArrayStruct *procArray;
+/*
+ * Bookkeeping for tracking emulated transactions in recovery
+ */
+static HTAB *KnownAssignedXidsHash;
+static TransactionId latestObservedXid = InvalidTransactionId;
+
+/*
+ * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
+ * the highest xid that might still be running that we don't have in
+ * KnownAssignedXids.
+ */
+static TransactionId standbySnapshotPendingXmin;
+
+/*
+ * Oldest transaction still running according to the running-xacts snapshot
+ * we initialized standby mode from.
+ */
+static TransactionId snapshotOldestActiveXid;
#ifdef XIDCACHE_DEBUG
@@ -90,6 +134,17 @@ static void DisplayXidCache(void);
#define xc_slow_answer_inc() ((void) 0)
#endif /* XIDCACHE_DEBUG */
+/* Primitives for KnownAssignedXids array handling for standby */
+static Size KnownAssignedXidsShmemSize(int size);
+static void KnownAssignedXidsInit(int size);
+static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
+static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax);
+static bool KnownAssignedXidsExist(TransactionId xid);
+static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
+static void KnownAssignedXidsRemove(TransactionId xid);
+static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
+static void KnownAssignedXidsDisplay(int trace_level);
/*
* Report shared-memory space needed by CreateSharedProcArray.
@@ -100,8 +155,22 @@ ProcArrayShmemSize(void)
Size size;
size = offsetof(ProcArrayStruct, procs);
- size = add_size(size, mul_size(sizeof(PGPROC *),
- add_size(MaxBackends, max_prepared_xacts)));
+
+ /* Normal processing - MyProc slots */
+#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
+ size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+
+ /*
+ * During recovery processing we have a data structure called KnownAssignedXids,
+ * created in shared memory. Local data structures are also created in various
+ * backends during GetSnapshotData(), TransactionIdIsInProgress() and
+ * GetRunningTransactionData(). All of the main structures created in those
+ * functions must be identically sized, since we may at times copy the whole
+ * of the data structures around. We refer to this as TOTAL_MAX_CACHED_SUBXIDS.
+ */
+#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
+ if (XLogRequestRecoveryConnections)
+ size = add_size(size, KnownAssignedXidsShmemSize(TOTAL_MAX_CACHED_SUBXIDS));
return size;
}
@@ -116,15 +185,21 @@ CreateSharedProcArray(void)
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
- ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
+ ShmemInitStruct("Proc Array",
+ mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
+ &found);
if (!found)
{
/*
* We're the first - initialize.
*/
+ /* Normal processing */
procArray->numProcs = 0;
- procArray->maxProcs = MaxBackends + max_prepared_xacts;
+ procArray->maxProcs = PROCARRAY_MAXPROCS;
+
+ if (XLogRequestRecoveryConnections)
+ KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS);
}
}
@@ -302,6 +377,7 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
+ proc->recoveryConflictMode = 0;
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
@@ -312,6 +388,220 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->subxids.overflowed = false;
}
+void
+ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
+{
+ snapshotOldestActiveXid = oldestActiveXid;
+}
+
+/*
+ * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
+ *
+ * Takes us through 3 states: Uninitialized, Pending and Ready.
+ * Normal case is to go all the way to Ready straight away, though there
+ * are atypical cases where we need to take it in steps.
+ *
+ * Use the data about running transactions on master to create the initial
+ * state of KnownAssignedXids. We also these records to regularly prune
+ * KnownAssignedXids because we know it is possible that some transactions
+ * with FATAL errors do not write abort records, which could cause eventual
+ * overflow.
+ *
+ * Only used during recovery. Notice the signature is very similar to a
+ * _redo function and its difficult to decide exactly where this code should
+ * reside.
+ */
+void
+ProcArrayApplyRecoveryInfo(RunningTransactions running)
+{
+ int xid_index; /* main loop */
+ TransactionId *xids;
+ int nxids;
+
+ Assert(standbyState >= STANDBY_INITIALIZED);
+
+ /*
+ * Remove stale transactions, if any.
+ */
+ ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
+ StandbyReleaseOldLocks(running->oldestRunningXid);
+
+ /*
+ * If our snapshot is already valid, nothing else to do...
+ */
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ return;
+
+ /*
+ * If our initial RunningXactData had an overflowed snapshot then we
+ * knew we were missing some subxids from our snapshot. We can use
+ * this data as an initial snapshot, but we cannot yet mark it valid.
+ * We know that the missing subxids are equal to or earlier than
+ * nextXid. After we initialise we continue to apply changes during
+ * recovery, so once the oldestRunningXid is later than the nextXid
+ * from the initial snapshot we know that we no longer have missing
+ * information and can mark the snapshot as valid.
+ */
+ if (standbyState == STANDBY_SNAPSHOT_PENDING)
+ {
+ if (TransactionIdPrecedes(standbySnapshotPendingXmin,
+ running->oldestRunningXid))
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ elog(trace_recovery(DEBUG2),
+ "running xact data now proven complete");
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+ }
+ return;
+ }
+
+ /*
+ * OK, we need to initialise from the RunningXactData record
+ */
+ latestObservedXid = running->nextXid;
+ TransactionIdRetreat(latestObservedXid);
+
+ /*
+ * If the snapshot overflowed, then we still initialise with what we
+ * know, but the recovery snapshot isn't fully valid yet because we
+ * know there are some subxids missing (ergo we don't know which ones)
+ */
+ if (!running->subxid_overflow)
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ standbySnapshotPendingXmin = InvalidTransactionId;
+ }
+ else
+ {
+ standbyState = STANDBY_SNAPSHOT_PENDING;
+ standbySnapshotPendingXmin = latestObservedXid;
+ ereport(LOG,
+ (errmsg("consistent state delayed because recovery snapshot incomplete")));
+ }
+
+ nxids = running->xcnt;
+ xids = running->xids;
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Scan through the incoming array of RunningXacts and collect xids.
+ * We don't use SubtransSetParent because it doesn't matter yet. If
+ * we aren't overflowed then all xids will fit in snapshot and so we
+ * don't need subtrans. If we later overflow, an xid assignment record
+ * will add xids to subtrans. If RunningXacts is overflowed then we
+ * don't have enough information to correctly update subtrans anyway.
+ */
+
+ /*
+ * Nobody else is running yet, but take locks anyhow
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /* Reset latestCompletedXid */
+ ShmemVariableCache->latestCompletedXid = running->nextXid;
+ TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+
+ /*
+ * Add our new xids into the array
+ */
+ for (xid_index = 0; xid_index < running->xcnt; xid_index++)
+ {
+ TransactionId xid = running->xids[xid_index];
+
+ /*
+ * The running-xacts snapshot can contain xids that did finish between
+ * when the snapshot was taken and when it was written to WAL. Such
+ * transactions are not running anymore, so ignore them.
+ */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ continue;
+
+ KnownAssignedXidsAdd(&xid, 1);
+ }
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Update lastOverflowedXid if the snapshot had overflown. We don't know
+ * the exact value for this, so conservatively assume that it's nextXid-1
+ */
+ if (running->subxid_overflow &&
+ TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = latestObservedXid;
+ else if (TransactionIdFollows(running->oldestRunningXid,
+ procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = InvalidTransactionId;
+
+ LWLockRelease(ProcArrayLock);
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
+ ShmemVariableCache->nextXid = running->nextXid;
+
+ elog(trace_recovery(DEBUG2),
+ "running transaction data initialized");
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+}
+
+void
+ProcArrayApplyXidAssignment(TransactionId topxid,
+ int nsubxids, TransactionId *subxids)
+{
+ TransactionId max_xid;
+ int i;
+
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
+
+ /*
+ * Mark all the subtransactions as observed.
+ *
+ * NOTE: This will fail if the subxid contains too many previously
+ * unobserved xids to fit into known-assigned-xids. That shouldn't happen
+ * as the code stands, because xid-assignment records should never contain
+ * more than PGPROC_MAX_CACHED_SUBXIDS entries.
+ */
+ RecordKnownAssignedTransactionIds(max_xid);
+
+ /*
+ * Notice that we update pg_subtrans with the top-level xid, rather
+ * than the parent xid. This is a difference between normal
+ * processing and recovery, yet is still correct in all cases. The
+ * reason is that subtransaction commit is not marked in clog until
+ * commit processing, so all aborted subtransactions have already been
+ * clearly marked in clog. As a result we are able to refer directly
+ * to the top-level transaction's state rather than skipping through
+ * all the intermediate states in the subtransaction tree. This
+ * should be the first time we have attempted to SubTransSetParent().
+ */
+ for (i = 0; i < nsubxids; i++)
+ SubTransSetParent(subxids[i], topxid, false);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove from known-assigned-xacts.
+ */
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /*
+ * Advance lastOverflowedXid when required.
+ */
+ if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
+ procArray->lastOverflowedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
/*
* TransactionIdIsInProgress -- is given transaction running in some backend
@@ -384,8 +674,15 @@ TransactionIdIsInProgress(TransactionId xid)
*/
if (xids == NULL)
{
- xids = (TransactionId *)
- malloc(arrayP->maxProcs * sizeof(TransactionId));
+ /*
+ * In hot standby mode, reserve enough space to hold all xids in
+ * the known-assigned list. If we later finish recovery, we no longer
+ * need the bigger array, but we don't bother to shrink it.
+ */
+ int maxxids = RecoveryInProgress() ?
+ arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS;
+
+ xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
if (xids == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -465,11 +762,35 @@ TransactionIdIsInProgress(TransactionId xid)
xids[nxids++] = pxid;
}
+ /* In hot standby mode, check the known-assigned-xids list. */
+ if (RecoveryInProgress())
+ {
+ /* none of the PGPROC entries should have XIDs in hot standby mode */
+ Assert(nxids == 0);
+
+ if (KnownAssignedXidsExist(xid))
+ {
+ LWLockRelease(ProcArrayLock);
+ /* XXX: should we have a separate counter for this? */
+ /* xc_by_main_xid_inc(); */
+ return true;
+ }
+
+ /*
+ * If the KnownAssignedXids overflowed, we have to check
+ * pg_subtrans too. Copy all xids from KnownAssignedXids that are
+ * lower than xid, since if xid is a subtransaction its parent will
+ * always have a lower value.
+ */
+ if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
+ nxids = KnownAssignedXidsGet(xids, xid);
+ }
+
LWLockRelease(ProcArrayLock);
/*
* If none of the relevant caches overflowed, we know the Xid is not
- * running without looking at pg_subtrans.
+ * running without even looking at pg_subtrans.
*/
if (nxids == 0)
{
@@ -590,6 +911,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
TransactionId result;
int index;
+ /* Cannot look for individual databases during recovery */
+ Assert(allDbs || !RecoveryInProgress());
+
LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
@@ -635,6 +959,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
LWLockRelease(ProcArrayLock);
+ /*
+ * Compute the cutoff XID, being careful not to generate a "permanent" XID
+ */
+ result -= vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(result))
+ result = FirstNormalTransactionId;
+
return result;
}
@@ -656,7 +987,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
* but since PGPROC has only a limited cache area for subxact XIDs, full
* information may not be available. If we find any overflowed subxid arrays,
* we have to mark the snapshot's subxid data as overflowed, and extra work
- * will need to be done to determine what's running (see XidInMVCCSnapshot()
+ * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
* in tqual.c).
*
* We also update the following backend-global variables:
@@ -681,6 +1012,7 @@ GetSnapshotData(Snapshot snapshot)
int index;
int count = 0;
int subcount = 0;
+ bool suboverflowed = false;
Assert(snapshot != NULL);
@@ -698,7 +1030,8 @@ GetSnapshotData(Snapshot snapshot)
if (snapshot->xip == NULL)
{
/*
- * First call for this snapshot
+ * First call for this snapshot. Snapshot is same size whether
+ * or not we are in recovery, see later comments.
*/
snapshot->xip = (TransactionId *)
malloc(arrayP->maxProcs * sizeof(TransactionId));
@@ -708,13 +1041,15 @@ GetSnapshotData(Snapshot snapshot)
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)
- malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ snapshot->takenDuringRecovery = RecoveryInProgress();
+
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
@@ -763,6 +1098,7 @@ GetSnapshotData(Snapshot snapshot)
*/
if (TransactionIdIsNormal(xid))
{
+ Assert(!snapshot->takenDuringRecovery);
if (TransactionIdFollowsOrEquals(xid, xmax))
continue;
if (proc != MyProc)
@@ -785,16 +1121,17 @@ GetSnapshotData(Snapshot snapshot)
*
* Again, our own XIDs are not included in the snapshot.
*/
- if (subcount >= 0 && proc != MyProc)
+ if (!suboverflowed && proc != MyProc)
{
if (proc->subxids.overflowed)
- subcount = -1; /* overflowed */
+ suboverflowed = true;
else
{
int nxids = proc->subxids.nxids;
if (nxids > 0)
{
+ Assert(!snapshot->takenDuringRecovery);
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
@@ -804,6 +1141,40 @@ GetSnapshotData(Snapshot snapshot)
}
}
+ /*
+ * If in recovery get any known assigned xids.
+ */
+ if (snapshot->takenDuringRecovery)
+ {
+ Assert(count == 0);
+
+ /*
+ * We store all xids directly into subxip[]. Here's why:
+ *
+ * In recovery we don't know which xids are top-level and which are
+ * subxacts, a design choice that greatly simplifies xid processing.
+ *
+ * It seems like we would want to try to put xids into xip[] only,
+ * but that is fairly small. We would either need to make that bigger
+ * or to increase the rate at which we WAL-log xid assignment;
+ * neither is an appealing choice.
+ *
+ * We could try to store xids into xip[] first and then into subxip[]
+ * if there are too many xids. That only works if the snapshot doesn't
+ * overflow because we do not search subxip[] in that case. A simpler
+ * way is to just store all xids in the subxact array because this
+ * is by far the bigger array. We just leave the xip array empty.
+ *
+ * Either way we need to change the way XidInMVCCSnapshot() works
+ * depending upon when the snapshot was taken, or change normal
+ * snapshot processing so it matches.
+ */
+ subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax);
+
+ if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid))
+ suboverflowed = true;
+ }
+
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
@@ -818,13 +1189,16 @@ GetSnapshotData(Snapshot snapshot)
globalxmin = xmin;
/* Update global variables too */
- RecentGlobalXmin = globalxmin;
+ RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(RecentGlobalXmin))
+ RecentGlobalXmin = FirstNormalTransactionId;
RecentXmin = xmin;
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
+ snapshot->suboverflowed = suboverflowed;
snapshot->curcid = GetCurrentCommandId(false);
@@ -840,6 +1214,129 @@ GetSnapshotData(Snapshot snapshot)
}
/*
+ * GetRunningTransactionData -- returns information about running transactions.
+ *
+ * Similar to GetSnapshotData but returning more information. We include
+ * all PGPROCs with an assigned TransactionId, even VACUUM processes.
+ *
+ * This is never executed during recovery so there is no need to look at
+ * KnownAssignedXids.
+ *
+ * We don't worry about updating other counters, we want to keep this as
+ * simple as possible and leave GetSnapshotData() as the primary code for
+ * that bookkeeping.
+ */
+RunningTransactions
+GetRunningTransactionData(void)
+{
+ ProcArrayStruct *arrayP = procArray;
+ RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
+ TransactionId latestCompletedXid;
+ TransactionId oldestRunningXid;
+ TransactionId *xids;
+ int index;
+ int count;
+ int subcount;
+ bool suboverflowed;
+
+ Assert(!RecoveryInProgress());
+
+ /*
+ * Allocating space for maxProcs xids is usually overkill; numProcs would
+ * be sufficient. But it seems better to do the malloc while not holding
+ * the lock, so we can't look at numProcs. Likewise, we allocate much
+ * more subxip storage than is probably needed.
+ *
+ * Should only be allocated for bgwriter, since only ever executed
+ * during checkpoints.
+ */
+ if (CurrentRunningXacts->xids == NULL)
+ {
+ /*
+ * First call
+ */
+ CurrentRunningXacts->xids = (TransactionId *)
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ if (CurrentRunningXacts->xids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ xids = CurrentRunningXacts->xids;
+
+ count = subcount = 0;
+ suboverflowed = false;
+
+ /*
+ * Ensure that no xids enter or leave the procarray while we obtain
+ * snapshot.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ LWLockAcquire(XidGenLock, LW_SHARED);
+
+ latestCompletedXid = ShmemVariableCache->latestCompletedXid;
+
+ oldestRunningXid = ShmemVariableCache->nextXid;
+ /*
+ * Spin over procArray collecting all xids and subxids.
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+ TransactionId xid;
+ int nxids;
+
+ /* Fetch xid just once - see GetNewTransactionId */
+ xid = proc->xid;
+
+ /*
+ * We don't need to store transactions that don't have a TransactionId
+ * yet because they will not show as running on a standby server.
+ */
+ if (!TransactionIdIsValid(xid))
+ continue;
+
+ xids[count++] = xid;
+
+ if (TransactionIdPrecedes(xid, oldestRunningXid))
+ oldestRunningXid = xid;
+
+ /*
+ * Save subtransaction XIDs. Other backends can't add or remove entries
+ * while we're holding XidGenLock.
+ */
+ nxids = proc->subxids.nxids;
+ if (nxids > 0)
+ {
+ memcpy(&xids[count], (void *) proc->subxids.xids,
+ nxids * sizeof(TransactionId));
+ count += nxids;
+ subcount += nxids;
+
+ if (proc->subxids.overflowed)
+ suboverflowed = true;
+
+ /*
+ * Top-level XID of a transaction is always greater than any of
+ * its subxids, so we don't need to check if any of the subxids
+ * are smaller than oldestRunningXid
+ */
+ }
+ }
+
+ CurrentRunningXacts->xcnt = count;
+ CurrentRunningXacts->subxid_overflow = suboverflowed;
+ CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
+ CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+
+ LWLockRelease(XidGenLock);
+ LWLockRelease(ProcArrayLock);
+
+ return CurrentRunningXacts;
+}
+
+/*
* GetTransactionsInCommit -- Get the XIDs of transactions that are committing
*
* Constructs an array of XIDs of transactions that are currently in commit
@@ -1101,6 +1598,154 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
return vxids;
}
+/*
+ * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
+ *
+ * The array is palloc'd and is terminated with an invalid VXID.
+ *
+ * Usage is limited to conflict resolution during recovery on standby servers.
+ * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
+ * in cases where we cannot accurately determine a value for latestRemovedXid.
+ * If limitXmin is InvalidTransactionId then we know that the very
+ * latest xid that might have caused a cleanup record will be
+ * latestCompletedXid, so we set limitXmin to be latestCompletedXid instead.
+ * We then skip any backends with xmin > limitXmin. This means that
+ * cleanup records don't conflict with some recent snapshots.
+ *
+ * We replace InvalidTransactionId with latestCompletedXid here because
+ * this is the most convenient place to do that, while we hold ProcArrayLock.
+ * The originator of the cleanup record wanted to avoid checking the value of
+ * latestCompletedXid since doing so would be a performance issue during
+ * normal running, so we check it essentially for free on the standby.
+ *
+ * If dbOid is valid we skip backends attached to other databases. Some
+ * callers choose to skipExistingConflicts.
+ *
+ * Be careful to *not* pfree the result from this function. We reuse
+ * this array sufficiently often that we use malloc for the result.
+ */
+VirtualTransactionId *
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+ bool skipExistingConflicts)
+{
+ static VirtualTransactionId *vxids;
+ ProcArrayStruct *arrayP = procArray;
+ int count = 0;
+ int index;
+
+ /*
+ * If not first time through, get workspace to remember main XIDs in. We
+ * malloc it permanently to avoid repeated palloc/pfree overhead.
+ * Allow result space, remembering room for a terminator.
+ */
+ if (vxids == NULL)
+ {
+ vxids = (VirtualTransactionId *)
+ malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
+ if (vxids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * If we don't know the TransactionId that created the conflict, set
+ * it to latestCompletedXid which is the latest possible value.
+ */
+ if (!TransactionIdIsValid(limitXmin))
+ limitXmin = ShmemVariableCache->latestCompletedXid;
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+
+ /* Exclude prepared transactions */
+ if (proc->pid == 0)
+ continue;
+
+ if (skipExistingConflicts && proc->recoveryConflictMode > 0)
+ continue;
+
+ if (!OidIsValid(dbOid) ||
+ proc->databaseId == dbOid)
+ {
+ /* Fetch xmin just once - can't change on us, but good coding */
+ TransactionId pxmin = proc->xmin;
+
+ /*
+ * We ignore an invalid pxmin because this means that backend
+ * has no snapshot and cannot get another one while we hold exclusive lock.
+ */
+ if (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin))
+ {
+ VirtualTransactionId vxid;
+
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+ if (VirtualTransactionIdIsValid(vxid))
+ vxids[count++] = vxid;
+ }
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ /* add the terminator */
+ vxids[count].backendId = InvalidBackendId;
+ vxids[count].localTransactionId = InvalidLocalTransactionId;
+
+ return vxids;
+}
+
+/*
+ * CancelVirtualTransaction - used in recovery conflict processing
+ *
+ * Returns pid of the process signaled, or 0 if not found.
+ */
+pid_t
+CancelVirtualTransaction(VirtualTransactionId vxid, int cancel_mode)
+{
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+ pid_t pid = 0;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ VirtualTransactionId procvxid;
+ PGPROC *proc = arrayP->procs[index];
+
+ GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+ if (procvxid.backendId == vxid.backendId &&
+ procvxid.localTransactionId == vxid.localTransactionId)
+ {
+ /*
+ * Issue orders for the proc to read next time it receives SIGINT
+ */
+ if (proc->recoveryConflictMode < cancel_mode)
+ proc->recoveryConflictMode = cancel_mode;
+
+ pid = proc->pid;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ if (pid != 0)
+ {
+ /*
+ * Kill the pid if it's still here. If not, that's what we wanted
+ * so ignore any errors.
+ */
+ kill(pid, SIGINT);
+ }
+
+ return pid;
+}
/*
* CountActiveBackends --- count backends (other than myself) that are in
@@ -1400,3 +2045,457 @@ DisplayXidCache(void)
}
#endif /* XIDCACHE_DEBUG */
+
+/* ----------------------------------------------
+ * KnownAssignedTransactions sub-module
+ * ----------------------------------------------
+ */
+
+/*
+ * In Hot Standby mode, we maintain a list of transactions that are (or were)
+ * running in the master at the current point in WAL.
+ *
+ * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
+ * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
+ * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
+ * local variables, so should only be called by Startup process.
+ *
+ * We record all xids that we know have been assigned. That includes
+ * all the xids on the WAL record, plus all unobserved xids that
+ * we can deduce have been assigned. We can deduce the existence of
+ * unobserved xids because we know xids are in sequence, with no gaps.
+ *
+ * During recovery we do not fret too much about the distinction between
+ * top-level xids and subtransaction xids. We hold both together in
+ * a hash table called KnownAssignedXids. In backends, this is copied into
+ * snapshots in GetSnapshotData(), taking advantage
+ * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
+ * either. Subtransaction xids are effectively treated as top-level xids
+ * and in the typical case pg_subtrans is *not* maintained (and that
+ * does not effect visibility).
+ *
+ * KnownAssignedXids expands as new xids are observed or inferred, and
+ * contracts when transaction completion records arrive. We have room in a
+ * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
+ * every transaction must report their subtransaction xids in a special
+ * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
+ * to remove the subtransaction xids and update pg_subtrans instead. Snapshots
+ * are still correct yet we don't overflow SnapshotData structure. When we do
+ * this we need
+ * to keep track of which xids caused the snapshot to overflow. We do that
+ * by simply tracking the lastOverflowedXid - if it is within the bounds of
+ * the KnownAssignedXids then we know the snapshot overflowed. (Note that
+ * subxid overflow occurs on primary when 65th subxid arrives, whereas on
+ * standby it occurs when 64th subxid arrives - that is not an error).
+ *
+ * Should FATAL errors result in a backend on primary disappearing before
+ * it can write an abort record then we just leave those xids in
+ * KnownAssignedXids. They actually aborted but we think they were running;
+ * the distinction is irrelevant because either way any changes done by the
+ * transaction are not visible to backends in the standby.
+ * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
+ * ensure we do not overflow.
+ *
+ * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
+ * xids that are not present.
+ */
+void
+RecordKnownAssignedTransactionIds(TransactionId xid)
+{
+ /*
+ * Skip processing if the current snapshot is not initialized.
+ */
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ /*
+ * We can see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot, but that we
+ * know to have finished before the running-xacts snapshot was taken.
+ * Don't waste precious shared memory by keeping them in the hash table.
+ *
+ * We can also see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot for a different
+ * reason: the transaction started *after* the running-xacts snapshot
+ * was taken, but before it was written to WAL. We must be careful to
+ * not ignore such XIDs. Because such a transaction started after the
+ * running-xacts snapshot was taken, it must have an XID larger than
+ * the oldest XID according to the running-xacts snapshot.
+ */
+ if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
+ return;
+
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("record known xact %u latestObservedXid %u",
+ xid, latestObservedXid)));
+
+ /*
+ * When a newly observed xid arrives, it is frequently the case
+ * that it is *not* the next xid in sequence. When this occurs, we
+ * must treat the intervening xids as running also.
+ */
+ if (TransactionIdFollows(xid, latestObservedXid))
+ {
+ TransactionId next_expected_xid = latestObservedXid;
+ TransactionIdAdvance(next_expected_xid);
+
+ /*
+ * Locking requirement is currently higher than for xid assignment
+ * in normal running. However, we only get called here for new
+ * high xids - so on a multi-processor where it is common that xids
+ * arrive out of order the average number of locks per assignment
+ * will actually reduce. So not too worried about this locking.
+ *
+ * XXX It does seem possible that we could add a whole range
+ * of numbers atomically to KnownAssignedXids, if we use a sorted
+ * list for KnownAssignedXids. But that design also increases the
+ * length of time we hold lock when we process commits/aborts, so
+ * on balance don't worry about this.
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
+ {
+ if (TransactionIdPrecedes(next_expected_xid, xid))
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("recording unobserved xid %u (latestObservedXid %u)",
+ next_expected_xid, latestObservedXid)));
+ KnownAssignedXidsAdd(&next_expected_xid, 1);
+
+ /*
+ * Extend clog and subtrans like we do in GetNewTransactionId()
+ * during normal operation
+ */
+ ExtendCLOG(next_expected_xid);
+ ExtendSUBTRANS(next_expected_xid);
+
+ TransactionIdAdvance(next_expected_xid);
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ latestObservedXid = xid;
+ }
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollowsOrEquals(latestObservedXid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = latestObservedXid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+}
+
+void
+ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
+ TransactionId *subxids)
+{
+ int i;
+ TransactionId max_xid;
+
+ if (standbyState == STANDBY_DISABLED)
+ return;
+
+ max_xid = TransactionIdLatest(xid, nsubxids, subxids);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ if (TransactionIdIsValid(xid))
+ KnownAssignedXidsRemove(xid);
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /* Like in ProcArrayRemove, advance latestCompletedXid */
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->latestCompletedXid))
+ ShmemVariableCache->latestCompletedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireAllKnownAssignedTransactionIds(void)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireOldKnownAssignedTransactionIds(TransactionId xid)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(xid, true);
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * Private module functions to manipulate KnownAssignedXids
+ *
+ * There are 3 main users of the KnownAssignedXids data structure:
+ *
+ * * backends taking snapshots
+ * * startup process adding new knownassigned xids
+ * * startup process removing xids as transactions end
+ *
+ * If we make KnownAssignedXids a simple sorted array then the first two
+ * operations are fast, but the last one is at least O(N). If we make
+ * KnownAssignedXids a hash table then the last two operations are fast,
+ * though we have to do more work at snapshot time. Doing more work at
+ * commit could slow down taking snapshots anyway because of lwlock
+ * contention. Scanning the hash table is O(N) on the max size of the array,
+ * so performs poorly in comparison when we have very low numbers of
+ * write transactions to process. But at least it is constant overhead
+ * and a sequential memory scan will utilise hardware memory readahead
+ * to give much improved performance. In any case the emphasis must be on
+ * having the standby process changes quickly so that it can provide
+ * high availability. So we choose to implement as a hash table.
+ */
+
+static Size
+KnownAssignedXidsShmemSize(int size)
+{
+ return hash_estimate_size(size, sizeof(TransactionId));
+}
+
+static void
+KnownAssignedXidsInit(int size)
+{
+ HASHCTL info;
+
+ /* assume no locking is needed yet */
+
+ info.keysize = sizeof(TransactionId);
+ info.entrysize = sizeof(TransactionId);
+ info.hash = tag_hash;
+
+ KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
+ size, size,
+ &info,
+ HASH_ELEM | HASH_FUNCTION);
+
+ if (!KnownAssignedXidsHash)
+ elog(FATAL, "could not initialize known assigned xids hash table");
+
+ procArray->numKnownAssignedXids = 0;
+ procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
+ procArray->lastOverflowedXid = InvalidTransactionId;
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId *xids, int nxids)
+{
+ TransactionId *result;
+ bool found;
+ int i;
+
+ for (i = 0; i < nxids; i++)
+ {
+ Assert(TransactionIdIsValid(xids[i]));
+
+ elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
+
+ procArray->numKnownAssignedXids++;
+ if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("too many KnownAssignedXids")));
+ }
+
+ result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
+ &found);
+
+ if (!result)
+ {
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ }
+
+ if (found)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
+ }
+ }
+}
+
+/*
+ * Is an xid present in KnownAssignedXids?
+ *
+ * Must be called while holding ProcArrayLock in shared mode
+ */
+static bool
+KnownAssignedXidsExist(TransactionId xid)
+{
+ bool found;
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
+ return found;
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid)
+{
+ bool found;
+
+ Assert(TransactionIdIsValid(xid));
+
+ elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
+
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+
+ /*
+ * We can fail to find an xid if the xid came from a subtransaction
+ * that aborts, though the xid hadn't yet been reported and no WAL records
+ * have been written using the subxid. In that case the abort record will
+ * contain that subxid and we haven't seen it before.
+ *
+ * If we fail to find it for other reasons it might be a problem, but
+ * it isn't much use to log that it happened, since we can't divine much
+ * from just an isolated xid value.
+ */
+}
+
+/*
+ * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
+ * We filter out anything higher than xmax.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
+{
+ TransactionId xtmp = InvalidTransactionId;
+
+ return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
+}
+
+/*
+ * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
+ * to the lowest xid value seen if not already lower.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ int count = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ /*
+ * Filter out anything higher than xmax
+ */
+ if (TransactionIdPrecedes(xmax, *knownXid))
+ continue;
+
+ *xarray = *knownXid;
+ xarray++;
+ count++;
+
+ /* update xmin if required */
+ if (TransactionIdPrecedes(*knownXid, *xmin))
+ *xmin = *knownXid;
+ }
+
+ return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
+{
+ TransactionId *knownXid;
+ HASH_SEQ_STATUS status;
+
+ if (TransactionIdIsValid(xid))
+ elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
+ else
+ elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ TransactionId removeXid = *knownXid;
+ bool found;
+
+ if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
+ {
+ if (keepPreparedXacts && StandbyTransactionIdIsPrepared(xid))
+ continue;
+ else
+ {
+ (void) hash_search(KnownAssignedXidsHash, &removeXid,
+ HASH_REMOVE, &found);
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+ }
+ }
+ }
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+void
+KnownAssignedXidsDisplay(int trace_level)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ StringInfoData buf;
+ TransactionId *xids;
+ int nxids;
+ int i;
+
+ xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
+ nxids = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ xids[nxids++] = *knownXid;
+
+ qsort(xids, nxids, sizeof(TransactionId), xidComparator);
+
+ initStringInfo(&buf);
+
+ for (i = 0; i < nxids; i++)
+ appendStringInfo(&buf, "%u ", xids[i]);
+
+ elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
+
+ pfree(buf.data);
+}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index dfa0ad7b5eb..e33664fc488 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.79 2009/07/31 20:26:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.80 2009/12/19 01:32:35 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -145,6 +145,13 @@ typedef struct ProcState
bool signaled; /* backend has been sent catchup signal */
/*
+ * Backend only sends invalidations, never receives them. This only makes sense
+ * for Startup process during recovery because it doesn't maintain a relcache,
+ * yet it fires inval messages to allow query backends to see schema changes.
+ */
+ bool sendOnly; /* backend only sends, never receives */
+
+ /*
* Next LocalTransactionId to use for each idle backend slot. We keep
* this here because it is indexed by BackendId and it is convenient to
* copy the value to and from local memory when MyBackendId is set. It's
@@ -249,7 +256,7 @@ CreateSharedInvalidationState(void)
* Initialize a new backend to operate on the sinval buffer
*/
void
-SharedInvalBackendInit(void)
+SharedInvalBackendInit(bool sendOnly)
{
int index;
ProcState *stateP = NULL;
@@ -308,6 +315,7 @@ SharedInvalBackendInit(void)
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
+ stateP->sendOnly = sendOnly;
LWLockRelease(SInvalWriteLock);
@@ -579,7 +587,9 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
* furthest-back backend that needs signaling (if any), and reset any
- * backends that are too far back.
+ * backends that are too far back. Note that because we ignore sendOnly
+ * backends here it is possible for them to keep sending messages without
+ * a problem even when they are the only active backend.
*/
min = segP->maxMsgNum;
minsig = min - SIG_THRESHOLD;
@@ -591,7 +601,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
int n = stateP->nextMsgNum;
/* Ignore if inactive or already in reset state */
- if (stateP->procPid == 0 || stateP->resetState)
+ if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
continue;
/*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
new file mode 100644
index 00000000000..38bc005820b
--- /dev/null
+++ b/src/backend/storage/ipc/standby.c
@@ -0,0 +1,717 @@
+/*-------------------------------------------------------------------------
+ *
+ * standby.c
+ * Misc functions used in Hot Standby mode.
+ *
+ * InitRecoveryTransactionEnvironment()
+ * ShutdownRecoveryTransactionEnvironment()
+ *
+ * ResolveRecoveryConflictWithVirtualXIDs()
+ *
+ * All functions for handling RM_STANDBY_ID, which relate to
+ * AccessExclusiveLocks and starting snapshots for Hot Standby mode.
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.1 2009/12/19 01:32:35 sriggs Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/sinvaladt.h"
+#include "storage/standby.h"
+#include "utils/ps_status.h"
+
+int vacuum_defer_cleanup_age;
+
+static List *RecoveryLockList;
+
+static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
+static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+
+/*
+ * InitRecoveryTransactionEnvironment
+ * Initiallize tracking of in-progress transactions in master
+ *
+ * We need to issue shared invalidations and hold locks. Holding locks
+ * means others may want to wait on us, so we need to make lock table
+ * inserts to appear like a transaction. We could create and delete
+ * lock table entries for each transaction but its simpler just to create
+ * one permanent entry and leave it there all the time. Locks are then
+ * acquired and released as needed. Yes, this means you can see the
+ * Startup process in pg_locks once we have run this.
+ */
+void
+InitRecoveryTransactionEnvironment(void)
+{
+ VirtualTransactionId vxid;
+
+ /*
+ * Initialise shared invalidation management for Startup process,
+ * being careful to register ourselves as a sendOnly process so
+ * we don't need to read messages, nor will we get signalled
+ * when the queue starts filling up.
+ */
+ SharedInvalBackendInit(true);
+
+ /*
+ * Record the PID and PGPROC structure of the startup process.
+ */
+ PublishStartupProcessInformation();
+
+ /*
+ * Lock a virtual transaction id for Startup process.
+ *
+ * We need to do GetNextLocalTransactionId() because
+ * SharedInvalBackendInit() leaves localTransactionid invalid and
+ * the lock manager doesn't like that at all.
+ *
+ * Note that we don't need to run XactLockTableInsert() because nobody
+ * needs to wait on xids. That sounds a little strange, but table locks
+ * are held by vxids and row level locks are held by xids. All queries
+ * hold AccessShareLocks so never block while we write or lock new rows.
+ */
+ vxid.backendId = MyBackendId;
+ vxid.localTransactionId = GetNextLocalTransactionId();
+ VirtualXactLockTableInsert(vxid);
+
+ standbyState = STANDBY_INITIALIZED;
+}
+
+/*
+ * ShutdownRecoveryTransactionEnvironment
+ * Shut down transaction tracking
+ *
+ * Prepare to switch from hot standby mode to normal operation. Shut down
+ * recovery-time transaction tracking.
+ */
+void
+ShutdownRecoveryTransactionEnvironment(void)
+{
+ /* Mark all tracked in-progress transactions as finished. */
+ ExpireAllKnownAssignedTransactionIds();
+
+ /* Release all locks the tracked transactions were holding */
+ StandbyReleaseAllLocks();
+}
+
+
+/*
+ * -----------------------------------------------------
+ * Standby wait timers and backend cancel logic
+ * -----------------------------------------------------
+ */
+
+#define STANDBY_INITIAL_WAIT_US 1000
+static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
+
+/*
+ * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
+ * We wait here for a while then return. If we decide we can't wait any
+ * more then we return true, if we can wait some more return false.
+ */
+static bool
+WaitExceedsMaxStandbyDelay(void)
+{
+ long delay_secs;
+ int delay_usecs;
+
+ /* max_standby_delay = -1 means wait forever, if necessary */
+ if (MaxStandbyDelay < 0)
+ return false;
+
+ /* Are we past max_standby_delay? */
+ TimestampDifference(GetLatestXLogTime(), GetCurrentTimestamp(),
+ &delay_secs, &delay_usecs);
+ if (delay_secs > MaxStandbyDelay)
+ return true;
+
+ /*
+ * Sleep, then do bookkeeping.
+ */
+ pg_usleep(standbyWait_us);
+
+ /*
+ * Progressively increase the sleep times.
+ */
+ standbyWait_us *= 2;
+ if (standbyWait_us > 1000000)
+ standbyWait_us = 1000000;
+ if (standbyWait_us > MaxStandbyDelay * 1000000 / 4)
+ standbyWait_us = MaxStandbyDelay * 1000000 / 4;
+
+ return false;
+}
+
+/*
+ * This is the main executioner for any query backend that conflicts with
+ * recovery processing. Judgement has already been passed on it within
+ * a specific rmgr. Here we just issue the orders to the procs. The procs
+ * then throw the required error as instructed.
+ *
+ * We may ask for a specific cancel_mode, typically ERROR or FATAL.
+ */
+void
+ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
+ char *reason, int cancel_mode)
+{
+ char waitactivitymsg[100];
+
+ Assert(cancel_mode > 0);
+
+ while (VirtualTransactionIdIsValid(*waitlist))
+ {
+ long wait_s;
+ int wait_us; /* wait in microseconds (us) */
+ TimestampTz waitStart;
+ bool logged;
+
+ waitStart = GetCurrentTimestamp();
+ standbyWait_us = STANDBY_INITIAL_WAIT_US;
+ logged = false;
+
+ /* wait until the virtual xid is gone */
+ while(!ConditionalVirtualXactLockTableWait(*waitlist))
+ {
+ /*
+ * Report if we have been waiting for a while now...
+ */
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampDifference(waitStart, now, &wait_s, &wait_us);
+ if (!logged && (wait_s > 0 || wait_us > 500000))
+ {
+ const char *oldactivitymsg;
+ int len;
+
+ oldactivitymsg = get_ps_display(&len);
+ snprintf(waitactivitymsg, sizeof(waitactivitymsg),
+ "waiting for max_standby_delay (%u ms)",
+ MaxStandbyDelay);
+ set_ps_display(waitactivitymsg, false);
+ if (len > 100)
+ len = 100;
+ memcpy(waitactivitymsg, oldactivitymsg, len);
+
+ ereport(trace_recovery(DEBUG5),
+ (errmsg("virtual transaction %u/%u is blocking %s",
+ waitlist->backendId,
+ waitlist->localTransactionId,
+ reason)));
+
+ pgstat_report_waiting(true);
+
+ logged = true;
+ }
+
+ /* Is it time to kill it? */
+ if (WaitExceedsMaxStandbyDelay())
+ {
+ pid_t pid;
+
+ /*
+ * Now find out who to throw out of the balloon.
+ */
+ Assert(VirtualTransactionIdIsValid(*waitlist));
+ pid = CancelVirtualTransaction(*waitlist, cancel_mode);
+
+ if (pid != 0)
+ {
+ /*
+ * Startup process debug messages
+ */
+ switch (cancel_mode)
+ {
+ case CONFLICT_MODE_FATAL:
+ elog(trace_recovery(DEBUG1),
+ "recovery disconnects session with pid %d because of conflict with %s",
+ pid,
+ reason);
+ break;
+ case CONFLICT_MODE_ERROR:
+ elog(trace_recovery(DEBUG1),
+ "recovery cancels virtual transaction %u/%u pid %d because of conflict with %s",
+ waitlist->backendId,
+ waitlist->localTransactionId,
+ pid,
+ reason);
+ break;
+ default:
+ /* No conflict pending, so fall through */
+ break;
+ }
+
+ /*
+ * Wait awhile for it to die so that we avoid flooding an
+ * unresponsive backend when system is heavily loaded.
+ */
+ pg_usleep(5000);
+ }
+ }
+ }
+
+ /* Reset ps display */
+ if (logged)
+ {
+ set_ps_display(waitactivitymsg, false);
+ pgstat_report_waiting(false);
+ }
+
+ /* The virtual transaction is gone now, wait for the next one */
+ waitlist++;
+ }
+}
+
+/*
+ * -----------------------------------------------------
+ * Locking in Recovery Mode
+ * -----------------------------------------------------
+ *
+ * All locks are held by the Startup process using a single virtual
+ * transaction. This implementation is both simpler and in some senses,
+ * more correct. The locks held mean "some original transaction held
+ * this lock, so query access is not allowed at this time". So the Startup
+ * process is the proxy by which the original locks are implemented.
+ *
+ * We only keep track of AccessExclusiveLocks, which are only ever held by
+ * one transaction on one relation, and don't worry about lock queuing.
+ *
+ * We keep a single dynamically expandible list of locks in local memory,
+ * RelationLockList, so we can keep track of the various entried made by
+ * the Startup process's virtual xid in the shared lock table.
+ *
+ * List elements use type xl_rel_lock, since the WAL record type exactly
+ * matches the information that we need to keep track of.
+ *
+ * We use session locks rather than normal locks so we don't need
+ * ResourceOwners.
+ */
+
+
+void
+StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
+{
+ xl_standby_lock *newlock;
+ LOCKTAG locktag;
+ bool report_memory_error = false;
+ int num_attempts = 0;
+
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ return;
+
+ elog(trace_recovery(DEBUG4),
+ "adding recovery lock: db %d rel %d", dbOid, relOid);
+
+ /* dbOid is InvalidOid when we are locking a shared relation. */
+ Assert(OidIsValid(relOid));
+
+ newlock = palloc(sizeof(xl_standby_lock));
+ newlock->xid = xid;
+ newlock->dbOid = dbOid;
+ newlock->relOid = relOid;
+ RecoveryLockList = lappend(RecoveryLockList, newlock);
+
+ /*
+ * Attempt to acquire the lock as requested.
+ */
+ SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
+
+ /*
+ * Wait for lock to clear or kill anyone in our way.
+ */
+ while (LockAcquireExtended(&locktag, AccessExclusiveLock,
+ true, true, report_memory_error)
+ == LOCKACQUIRE_NOT_AVAIL)
+ {
+ VirtualTransactionId *backends;
+
+ /*
+ * If blowing away everybody with conflicting locks doesn't work,
+ * after the first two attempts then we just start blowing everybody
+ * away until it does work. We do this because its likely that we
+ * either have too many locks and we just can't get one at all,
+ * or that there are many people crowding for the same table.
+ * Recovery must win; the end justifies the means.
+ */
+ if (++num_attempts < 3)
+ backends = GetLockConflicts(&locktag, AccessExclusiveLock);
+ else
+ {
+ backends = GetConflictingVirtualXIDs(InvalidTransactionId,
+ InvalidOid,
+ true);
+ report_memory_error = true;
+ }
+
+ ResolveRecoveryConflictWithVirtualXIDs(backends,
+ "exclusive lock",
+ CONFLICT_MODE_ERROR);
+ }
+}
+
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+ ListCell *cell,
+ *prev,
+ *next;
+
+ /*
+ * Release all matching locks and remove them from list
+ */
+ prev = NULL;
+ for (cell = list_head(RecoveryLockList); cell; cell = next)
+ {
+ xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ next = lnext(cell);
+
+ if (!TransactionIdIsValid(xid) || lock->xid == xid)
+ {
+ LOCKTAG locktag;
+
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %d rel %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ elog(trace_recovery(LOG),
+ "RecoveryLockList contains entry for lock "
+ "no longer recorded by lock manager "
+ "xid %u database %d relation %d",
+ lock->xid, lock->dbOid, lock->relOid);
+
+ RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+ pfree(lock);
+ }
+ else
+ prev = cell;
+ }
+}
+
+/*
+ * Release locks for a transaction tree, starting at xid down, from
+ * RecoveryLockList.
+ *
+ * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
+ * to remove any AccessExclusiveLocks requested by a transaction.
+ */
+void
+StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
+{
+ int i;
+
+ StandbyReleaseLocks(xid);
+
+ for (i = 0; i < nsubxids; i++)
+ StandbyReleaseLocks(subxids[i]);
+}
+
+/*
+ * StandbyReleaseOldLocks
+ * Release standby locks held by XIDs < removeXid
+ * In some cases, keep prepared transactions.
+ */
+static void
+StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
+{
+ ListCell *cell,
+ *prev,
+ *next;
+ LOCKTAG locktag;
+
+ /*
+ * Release all matching locks.
+ */
+ prev = NULL;
+ for (cell = list_head(RecoveryLockList); cell; cell = next)
+ {
+ xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ next = lnext(cell);
+
+ if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid))
+ {
+ if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid))
+ continue;
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %d rel %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ elog(trace_recovery(LOG),
+ "RecoveryLockList contains entry for lock "
+ "no longer recorded by lock manager "
+ "xid %u database %d relation %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+ pfree(lock);
+ }
+ else
+ prev = cell;
+ }
+}
+
+/*
+ * Called at end of recovery and when we see a shutdown checkpoint.
+ */
+void
+StandbyReleaseAllLocks(void)
+{
+ elog(trace_recovery(DEBUG2), "release all standby locks");
+ StandbyReleaseLocksMany(InvalidTransactionId, false);
+}
+
+/*
+ * StandbyReleaseOldLocks
+ * Release standby locks held by XIDs < removeXid, as long
+ * as their not prepared transactions.
+ */
+void
+StandbyReleaseOldLocks(TransactionId removeXid)
+{
+ StandbyReleaseLocksMany(removeXid, true);
+}
+
+/*
+ * --------------------------------------------------------------------
+ * Recovery handling for Rmgr RM_STANDBY_ID
+ *
+ * These record types will only be created if XLogStandbyInfoActive()
+ * --------------------------------------------------------------------
+ */
+
+void
+standby_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ /* Do nothing if we're not in standby mode */
+ if (standbyState == STANDBY_DISABLED)
+ return;
+
+ if (info == XLOG_STANDBY_LOCK)
+ {
+ xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
+ int i;
+
+ for (i = 0; i < xlrec->nlocks; i++)
+ StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
+ xlrec->locks[i].dbOid,
+ xlrec->locks[i].relOid);
+ }
+ else if (info == XLOG_RUNNING_XACTS)
+ {
+ xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
+ RunningTransactionsData running;
+
+ running.xcnt = xlrec->xcnt;
+ running.subxid_overflow = xlrec->subxid_overflow;
+ running.nextXid = xlrec->nextXid;
+ running.oldestRunningXid = xlrec->oldestRunningXid;
+ running.xids = xlrec->xids;
+
+ ProcArrayApplyRecoveryInfo(&running);
+ }
+ else
+ elog(PANIC, "relation_redo: unknown op code %u", info);
+}
+
+static void
+standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
+{
+ int i;
+
+ appendStringInfo(buf,
+ " nextXid %u oldestRunningXid %u",
+ xlrec->nextXid,
+ xlrec->oldestRunningXid);
+ if (xlrec->xcnt > 0)
+ {
+ appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
+ for (i = 0; i < xlrec->xcnt; i++)
+ appendStringInfo(buf, " %u", xlrec->xids[i]);
+ }
+
+ if (xlrec->subxid_overflow)
+ appendStringInfo(buf, "; subxid ovf");
+}
+
+void
+standby_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_STANDBY_LOCK)
+ {
+ xl_standby_locks *xlrec = (xl_standby_locks *) rec;
+ int i;
+
+ appendStringInfo(buf, "AccessExclusive locks:");
+
+ for (i = 0; i < xlrec->nlocks; i++)
+ appendStringInfo(buf, " xid %u db %d rel %d",
+ xlrec->locks[i].xid, xlrec->locks[i].dbOid,
+ xlrec->locks[i].relOid);
+ }
+ else if (info == XLOG_RUNNING_XACTS)
+ {
+ xl_running_xacts *xlrec = (xl_running_xacts *) rec;
+
+ appendStringInfo(buf, " running xacts:");
+ standby_desc_running_xacts(buf, xlrec);
+ }
+ else
+ appendStringInfo(buf, "UNKNOWN");
+}
+
+/*
+ * Log details of the current snapshot to WAL. This allows the snapshot state
+ * to be reconstructed on the standby.
+ */
+void
+LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
+{
+ RunningTransactions running;
+ xl_standby_lock *locks;
+ int nlocks;
+
+ Assert(XLogStandbyInfoActive());
+
+ /*
+ * Get details of any AccessExclusiveLocks being held at the moment.
+ */
+ locks = GetRunningTransactionLocks(&nlocks);
+ if (nlocks > 0)
+ LogAccessExclusiveLocks(nlocks, locks);
+
+ /*
+ * Log details of all in-progress transactions. This should be the last
+ * record we write, because standby will open up when it sees this.
+ */
+ running = GetRunningTransactionData();
+ LogCurrentRunningXacts(running);
+
+ *oldestActiveXid = running->oldestRunningXid;
+ *nextXid = running->nextXid;
+}
+
+/*
+ * Record an enhanced snapshot of running transactions into WAL.
+ *
+ * The definitions of RunningTransactionData and xl_xact_running_xacts
+ * are similar. We keep them separate because xl_xact_running_xacts
+ * is a contiguous chunk of memory and never exists fully until it is
+ * assembled in WAL.
+ */
+static void
+LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
+{
+ xl_running_xacts xlrec;
+ XLogRecData rdata[2];
+ int lastrdata = 0;
+ XLogRecPtr recptr;
+
+ xlrec.xcnt = CurrRunningXacts->xcnt;
+ xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
+ xlrec.nextXid = CurrRunningXacts->nextXid;
+ xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
+
+ /* Header */
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactRunningXacts;
+ rdata[0].buffer = InvalidBuffer;
+
+ /* array of TransactionIds */
+ if (xlrec.xcnt > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) CurrRunningXacts->xids;
+ rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
+
+ if (CurrRunningXacts->subxid_overflow)
+ ereport(trace_recovery(DEBUG2),
+ (errmsg("snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u next xid %u)",
+ CurrRunningXacts->xcnt,
+ recptr.xlogid, recptr.xrecoff,
+ CurrRunningXacts->oldestRunningXid,
+ CurrRunningXacts->nextXid)));
+ else
+ ereport(trace_recovery(DEBUG2),
+ (errmsg("snapshot of %u running transaction ids (lsn %X/%X oldest xid %u next xid %u)",
+ CurrRunningXacts->xcnt,
+ recptr.xlogid, recptr.xrecoff,
+ CurrRunningXacts->oldestRunningXid,
+ CurrRunningXacts->nextXid)));
+
+}
+
+/*
+ * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
+ * logged, as described in backend/storage/lmgr/README.
+ */
+static void
+LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
+{
+ XLogRecData rdata[2];
+ xl_standby_locks xlrec;
+
+ xlrec.nlocks = nlocks;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = offsetof(xl_standby_locks, locks);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].data = (char *) locks;
+ rdata[1].len = nlocks * sizeof(xl_standby_lock);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].next = NULL;
+
+ (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
+}
+
+/*
+ * Individual logging of AccessExclusiveLocks for use during LockAcquire()
+ */
+void
+LogAccessExclusiveLock(Oid dbOid, Oid relOid)
+{
+ xl_standby_lock xlrec;
+
+ /*
+ * Ensure that a TransactionId has been assigned to this transaction.
+ * We don't actually need the xid yet but if we don't do this then
+ * RecordTransactionCommit() and RecordTransactionAbort() will optimise
+ * away the transaction completion record which recovery relies upon to
+ * release locks. It's a hack, but for a corner case not worth adding
+ * code for into the main commit path.
+ */
+ xlrec.xid = GetTopTransactionId();
+
+ /*
+ * Decode the locktag back to the original values, to avoid
+ * sending lots of empty bytes with every message. See
+ * lock.h to check how a locktag is defined for LOCKTAG_RELATION
+ */
+ xlrec.dbOid = dbOid;
+ xlrec.relOid = relOid;
+
+ LogAccessExclusiveLocks(1, &xlrec);
+}