aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/procarray.c222
-rw-r--r--src/backend/storage/ipc/standby.c30
2 files changed, 231 insertions, 21 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index eac418442d3..3376a353a40 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -50,11 +50,13 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/twophase.h"
+#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/spin.h"
#include "utils/builtins.h"
+#include "utils/rel.h"
#include "utils/snapmgr.h"
@@ -84,6 +86,8 @@ typedef struct ProcArrayStruct
/* oldest xmin of any replication slot */
TransactionId replication_slot_xmin;
+ /* oldest catalog xmin of any replication slot */
+ TransactionId replication_slot_catalog_xmin;
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
@@ -1108,21 +1112,22 @@ TransactionIdIsActive(TransactionId xid)
* GetOldestXmin -- returns oldest transaction that was running
* when any current transaction was started.
*
- * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
- * then only backends running in my own database are considered.
+ * If rel is NULL or a shared relation, all backends are considered, otherwise
+ * only backends running in this database are considered.
*
* If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
* ignored.
*
- * This is used by VACUUM to decide which deleted tuples must be preserved
- * in a table. allDbs = TRUE is needed for shared relations, but allDbs =
- * FALSE is sufficient for non-shared relations, since only backends in my
- * own database could ever see the tuples in them. Also, we can ignore
- * concurrently running lazy VACUUMs because (a) they must be working on other
- * tables, and (b) they don't need to do snapshot-based lookups.
+ * This is used by VACUUM to decide which deleted tuples must be preserved in
+ * the passed in table. For shared relations backends in all databases must be
+ * considered, but for non-shared relations that's not required, since only
+ * backends in my own database could ever see the tuples in them. Also, we can
+ * ignore concurrently running lazy VACUUMs because (a) they must be working
+ * on other tables, and (b) they don't need to do snapshot-based lookups.
*
- * This is also used to determine where to truncate pg_subtrans. allDbs
- * must be TRUE for that case, and ignoreVacuum FALSE.
+ * This is also used to determine where to truncate pg_subtrans. For that
+ * backends in all databases have to be considered, so rel = NULL has to be
+ * passed in.
*
* Note: we include all currently running xids in the set of considered xids.
* This ensures that if a just-started xact has not yet set its snapshot,
@@ -1133,7 +1138,7 @@ TransactionIdIsActive(TransactionId xid)
* backwards on repeated calls. The calculated value is conservative, so that
* anything older is definitely not considered as running by anyone anymore,
* but the exact value calculated depends on a number of things. For example,
- * if allDbs is FALSE and there are no transactions running in the current
+ * if rel = NULL and there are no transactions running in the current
* database, GetOldestXmin() returns latestCompletedXid. If a transaction
* begins after that, its xmin will include in-progress transactions in other
* databases that started earlier, so another call will return a lower value.
@@ -1152,12 +1157,22 @@ TransactionIdIsActive(TransactionId xid)
* GetOldestXmin() move backwards, with no consequences for data integrity.
*/
TransactionId
-GetOldestXmin(bool allDbs, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum)
{
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
+ bool allDbs;
+
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
+ volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+
+ /*
+ * If we're not computing a relation specific limit, or if a shared
+ * relation has been passed in, backends in all databases have to be
+ * considered.
+ */
+ allDbs = rel == NULL || rel->rd_rel->relisshared;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
@@ -1180,6 +1195,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
volatile PGPROC *proc = &allProcs[pgprocno];
volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ /*
+ * Backend is doing logical decoding which manages xmin separately,
+ * check below.
+ */
+ if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
+ continue;
+
if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
continue;
@@ -1211,6 +1233,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
/* fetch into volatile var while ProcArrayLock is held */
replication_slot_xmin = procArray->replication_slot_xmin;
+ replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
if (RecoveryInProgress())
{
@@ -1259,6 +1282,18 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
NormalTransactionIdPrecedes(replication_slot_xmin, result))
result = replication_slot_xmin;
+ /*
+ * After locks have been released and defer_cleanup_age has been applied,
+ * check whether we need to back up further to make logical decoding
+ * possible. We need to do so if we're computing the global limit (rel =
+ * NULL) or if the passed relation is a catalog relation of some kind.
+ */
+ if ((rel == NULL ||
+ RelationIsAccessibleInLogicalDecoding(rel)) &&
+ TransactionIdIsValid(replication_slot_catalog_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+ result = replication_slot_catalog_xmin;
+
return result;
}
@@ -1313,6 +1348,8 @@ GetMaxSnapshotSubxidCount(void)
* RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
* running transactions, except those running LAZY VACUUM). This is
* the same computation done by GetOldestXmin(true, true).
+ * RecentGlobalDataXmin: the global xmin for non-catalog tables
+ * >= RecentGlobalXmin
*
* Note: this function should probably not be called with an argument that's
* not statically allocated (see xip allocation below).
@@ -1329,6 +1366,7 @@ GetSnapshotData(Snapshot snapshot)
int subcount = 0;
bool suboverflowed = false;
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
+ volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@@ -1397,6 +1435,13 @@ GetSnapshotData(Snapshot snapshot)
volatile PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid;
+ /*
+ * Backend is doing logical decoding which manages xmin
+ * separately, check below.
+ */
+ if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
+ continue;
+
/* Ignore procs running LAZY VACUUM */
if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
@@ -1509,6 +1554,7 @@ GetSnapshotData(Snapshot snapshot)
/* fetch into volatile var while ProcArrayLock is held */
replication_slot_xmin = procArray->replication_slot_xmin;
+ replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
@@ -1533,6 +1579,17 @@ GetSnapshotData(Snapshot snapshot)
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_xmin;
+ /* Non-catalog tables can be vacuumed if older than this xid */
+ RecentGlobalDataXmin = RecentGlobalXmin;
+
+ /*
+ * Check whether there's a replication slot requiring an older catalog
+ * xmin.
+ */
+ if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = replication_slot_catalog_xmin;
+
RecentXmin = xmin;
snapshot->xmin = xmin;
@@ -1633,9 +1690,11 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
* Similar to GetSnapshotData but returns more information. We include
* all PGXACTs with an assigned TransactionId, even VACUUM processes.
*
- * We acquire XidGenLock, but the caller is responsible for releasing it.
- * This ensures that no new XIDs enter the proc array until the caller has
- * WAL-logged this snapshot, and releases the lock.
+ * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
+ * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
+ * array until the caller has WAL-logged this snapshot, and releases the
+ * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
+ * lock is released.
*
* The returned data structure is statically allocated; caller should not
* modify it, and must not assume it is valid past the next call.
@@ -1770,6 +1829,15 @@ GetRunningTransactionData(void)
}
}
+ /*
+ * It's important *not* to include the limits set by slots here because
+ * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
+ * were to be included here the initial value could never increase because
+ * of a circular dependency where slots only increase their limits when
+ * running xacts increases oldestRunningXid and running xacts only
+ * increases if slots do.
+ */
+
CurrentRunningXacts->xcnt = count - subcount;
CurrentRunningXacts->subxcnt = subcount;
CurrentRunningXacts->subxid_overflow = suboverflowed;
@@ -1777,13 +1845,12 @@ GetRunningTransactionData(void)
CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
- /* We don't release XidGenLock here, the caller is responsible for that */
- LWLockRelease(ProcArrayLock);
-
Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
+ /* We don't release the locks here, the caller is responsible for that */
+
return CurrentRunningXacts;
}
@@ -1853,6 +1920,92 @@ GetOldestActiveTransactionId(void)
}
/*
+ * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
+ *
+ * Returns the oldest xid that we can guarantee not to have been affected by
+ * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
+ * transaction aborted. Note that the value can (and most of the time will) be
+ * much more conservative than what really has been affected by vacuum, but we
+ * currently don't have better data available.
+ *
+ * This is useful to initalize the cutoff xid after which a new changeset
+ * extraction replication slot can start decoding changes.
+ *
+ * Must be called with ProcArrayLock held either shared or exclusively,
+ * although most callers will want to use exclusive mode since it is expected
+ * that the caller will immediately use the xid to peg the xmin horizon.
+ */
+TransactionId
+GetOldestSafeDecodingTransactionId(void)
+{
+ ProcArrayStruct *arrayP = procArray;
+ TransactionId oldestSafeXid;
+ int index;
+ bool recovery_in_progress = RecoveryInProgress();
+
+ Assert(LWLockHeldByMe(ProcArrayLock));
+
+ /*
+ * Acquire XidGenLock, so no transactions can acquire an xid while we're
+ * running. If no transaction with xid were running concurrently a new xid
+ * could influence the the RecentXmin et al.
+ *
+ * We initialize the computation to nextXid since that's guaranteed to be
+ * a safe, albeit pessimal, value.
+ */
+ LWLockAcquire(XidGenLock, LW_SHARED);
+ oldestSafeXid = ShmemVariableCache->nextXid;
+
+ /*
+ * If there's already a slot pegging the xmin horizon, we can start with
+ * that value, it's guaranteed to be safe since it's computed by this
+ * routine initally and has been enforced since.
+ */
+ if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
+ TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
+ oldestSafeXid))
+ oldestSafeXid = procArray->replication_slot_catalog_xmin;
+
+ /*
+ * If we're not in recovery, we walk over the procarray and collect the
+ * lowest xid. Since we're called with ProcArrayLock held and have
+ * acquired XidGenLock, no entries can vanish concurrently, since
+ * PGXACT->xid is only set with XidGenLock held and only cleared with
+ * ProcArrayLock held.
+ *
+ * In recovery we can't lower the safe value besides what we've computed
+ * above, so we'll have to wait a bit longer there. We unfortunately can
+ * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
+ * machinery can miss values and return an older value than is safe.
+ */
+ if (!recovery_in_progress)
+ {
+ /*
+ * Spin over procArray collecting all min(PGXACT->xid)
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId xid;
+
+ /* Fetch xid just once - see GetNewTransactionId */
+ xid = pgxact->xid;
+
+ if (!TransactionIdIsNormal(xid))
+ continue;
+
+ if (TransactionIdPrecedes(xid, oldestSafeXid))
+ oldestSafeXid = xid;
+ }
+ }
+
+ LWLockRelease(XidGenLock);
+
+ return oldestSafeXid;
+}
+
+/*
* GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
* delaying checkpoint because they have critical actions in progress.
*
@@ -2523,10 +2676,39 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
* replicaton slots.
*/
void
-ProcArraySetReplicationSlotXmin(TransactionId xmin)
+ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
+ bool already_locked)
{
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));
+
+ if (!already_locked)
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
procArray->replication_slot_xmin = xmin;
+ procArray->replication_slot_catalog_xmin = catalog_xmin;
+
+ if (!already_locked)
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * ProcArrayGetReplicationSlotXmin
+ *
+ * Return the current slot xmin limits. That's useful to be able to remove
+ * data that's older than those limits.
+ */
+void
+ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
+ TransactionId *catalog_xmin)
+{
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ if (xmin != NULL)
+ *xmin = procArray->replication_slot_xmin;
+
+ if (catalog_xmin != NULL)
+ *catalog_xmin = procArray->replication_slot_catalog_xmin;
+
LWLockRelease(ProcArrayLock);
}
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index fb5f18edfc7..aa8bea5538b 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -800,7 +800,9 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
/*
* Log details of the current snapshot to WAL. This allows the snapshot state
- * to be reconstructed on the standby.
+ * to be reconstructed on the standby and for logical decoding.
+ *
+ * This is used for Hot Standby as follows:
*
* We can move directly to STANDBY_SNAPSHOT_READY at startup if we
* start from a shutdown checkpoint because we know nothing was running
@@ -854,6 +856,12 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
* Zero xids should no longer be possible, but we may be replaying WAL
* from a time when they were possible.
*
+ * For logical decoding only the running xacts information is needed;
+ * there's no need to look at the locking information, but it's logged anyway,
+ * as there's no independent knob to just enable logical decoding. For
+ * details of how this is used, check snapbuild.c's introductory comment.
+ *
+ *
* Returns the RecPtr of the last inserted record.
*/
XLogRecPtr
@@ -879,8 +887,28 @@ LogStandbySnapshot(void)
* record we write, because standby will open up when it sees this.
*/
running = GetRunningTransactionData();
+
+ /*
+ * GetRunningTransactionData() acquired ProcArrayLock, we must release
+ * it. For Hot Standby this can be done before inserting the WAL record
+ * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
+ * the clog. For logical decoding, though, the lock can't be released
+ * early becuase the clog might be "in the future" from the POV of the
+ * historic snapshot. This would allow for situations where we're waiting
+ * for the end of a transaction listed in the xl_running_xacts record
+ * which, according to the WAL, have commit before the xl_running_xacts
+ * record. Fortunately this routine isn't executed frequently, and it's
+ * only a shared lock.
+ */
+ if (wal_level < WAL_LEVEL_LOGICAL)
+ LWLockRelease(ProcArrayLock);
+
recptr = LogCurrentRunningXacts(running);
+ /* Release lock if we kept it longer ... */
+ if (wal_level >= WAL_LEVEL_LOGICAL)
+ LWLockRelease(ProcArrayLock);
+
/* GetRunningTransactionData() acquired XidGenLock, we must release it */
LWLockRelease(XidGenLock);