diff options
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 222 | ||||
-rw-r--r-- | src/backend/storage/ipc/standby.c | 30 |
2 files changed, 231 insertions, 21 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index eac418442d3..3376a353a40 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -50,11 +50,13 @@ #include "access/transam.h" #include "access/xact.h" #include "access/twophase.h" +#include "catalog/catalog.h" #include "miscadmin.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/spin.h" #include "utils/builtins.h" +#include "utils/rel.h" #include "utils/snapmgr.h" @@ -84,6 +86,8 @@ typedef struct ProcArrayStruct /* oldest xmin of any replication slot */ TransactionId replication_slot_xmin; + /* oldest catalog xmin of any replication slot */ + TransactionId replication_slot_catalog_xmin; /* * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, @@ -1108,21 +1112,22 @@ TransactionIdIsActive(TransactionId xid) * GetOldestXmin -- returns oldest transaction that was running * when any current transaction was started. * - * If allDbs is TRUE then all backends are considered; if allDbs is FALSE - * then only backends running in my own database are considered. + * If rel is NULL or a shared relation, all backends are considered, otherwise + * only backends running in this database are considered. * * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are * ignored. * - * This is used by VACUUM to decide which deleted tuples must be preserved - * in a table. allDbs = TRUE is needed for shared relations, but allDbs = - * FALSE is sufficient for non-shared relations, since only backends in my - * own database could ever see the tuples in them. Also, we can ignore - * concurrently running lazy VACUUMs because (a) they must be working on other - * tables, and (b) they don't need to do snapshot-based lookups. + * This is used by VACUUM to decide which deleted tuples must be preserved in + * the passed in table. For shared relations backends in all databases must be + * considered, but for non-shared relations that's not required, since only + * backends in my own database could ever see the tuples in them. Also, we can + * ignore concurrently running lazy VACUUMs because (a) they must be working + * on other tables, and (b) they don't need to do snapshot-based lookups. * - * This is also used to determine where to truncate pg_subtrans. allDbs - * must be TRUE for that case, and ignoreVacuum FALSE. + * This is also used to determine where to truncate pg_subtrans. For that + * backends in all databases have to be considered, so rel = NULL has to be + * passed in. * * Note: we include all currently running xids in the set of considered xids. * This ensures that if a just-started xact has not yet set its snapshot, @@ -1133,7 +1138,7 @@ TransactionIdIsActive(TransactionId xid) * backwards on repeated calls. The calculated value is conservative, so that * anything older is definitely not considered as running by anyone anymore, * but the exact value calculated depends on a number of things. For example, - * if allDbs is FALSE and there are no transactions running in the current + * if rel = NULL and there are no transactions running in the current * database, GetOldestXmin() returns latestCompletedXid. If a transaction * begins after that, its xmin will include in-progress transactions in other * databases that started earlier, so another call will return a lower value. @@ -1152,12 +1157,22 @@ TransactionIdIsActive(TransactionId xid) * GetOldestXmin() move backwards, with no consequences for data integrity. */ TransactionId -GetOldestXmin(bool allDbs, bool ignoreVacuum) +GetOldestXmin(Relation rel, bool ignoreVacuum) { ProcArrayStruct *arrayP = procArray; TransactionId result; int index; + bool allDbs; + volatile TransactionId replication_slot_xmin = InvalidTransactionId; + volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + + /* + * If we're not computing a relation specific limit, or if a shared + * relation has been passed in, backends in all databases have to be + * considered. + */ + allDbs = rel == NULL || rel->rd_rel->relisshared; /* Cannot look for individual databases during recovery */ Assert(allDbs || !RecoveryInProgress()); @@ -1180,6 +1195,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) volatile PGPROC *proc = &allProcs[pgprocno]; volatile PGXACT *pgxact = &allPgXact[pgprocno]; + /* + * Backend is doing logical decoding which manages xmin separately, + * check below. + */ + if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING) + continue; + if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM)) continue; @@ -1211,6 +1233,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; + replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; if (RecoveryInProgress()) { @@ -1259,6 +1282,18 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) NormalTransactionIdPrecedes(replication_slot_xmin, result)) result = replication_slot_xmin; + /* + * After locks have been released and defer_cleanup_age has been applied, + * check whether we need to back up further to make logical decoding + * possible. We need to do so if we're computing the global limit (rel = + * NULL) or if the passed relation is a catalog relation of some kind. + */ + if ((rel == NULL || + RelationIsAccessibleInLogicalDecoding(rel)) && + TransactionIdIsValid(replication_slot_catalog_xmin) && + NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) + result = replication_slot_catalog_xmin; + return result; } @@ -1313,6 +1348,8 @@ GetMaxSnapshotSubxidCount(void) * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions, except those running LAZY VACUUM). This is * the same computation done by GetOldestXmin(true, true). + * RecentGlobalDataXmin: the global xmin for non-catalog tables + * >= RecentGlobalXmin * * Note: this function should probably not be called with an argument that's * not statically allocated (see xip allocation below). @@ -1329,6 +1366,7 @@ GetSnapshotData(Snapshot snapshot) int subcount = 0; bool suboverflowed = false; volatile TransactionId replication_slot_xmin = InvalidTransactionId; + volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1397,6 +1435,13 @@ GetSnapshotData(Snapshot snapshot) volatile PGXACT *pgxact = &allPgXact[pgprocno]; TransactionId xid; + /* + * Backend is doing logical decoding which manages xmin + * separately, check below. + */ + if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING) + continue; + /* Ignore procs running LAZY VACUUM */ if (pgxact->vacuumFlags & PROC_IN_VACUUM) continue; @@ -1509,6 +1554,7 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; + replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1533,6 +1579,17 @@ GetSnapshotData(Snapshot snapshot) NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) RecentGlobalXmin = replication_slot_xmin; + /* Non-catalog tables can be vacuumed if older than this xid */ + RecentGlobalDataXmin = RecentGlobalXmin; + + /* + * Check whether there's a replication slot requiring an older catalog + * xmin. + */ + if (TransactionIdIsNormal(replication_slot_catalog_xmin) && + NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin)) + RecentGlobalXmin = replication_slot_catalog_xmin; + RecentXmin = xmin; snapshot->xmin = xmin; @@ -1633,9 +1690,11 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) * Similar to GetSnapshotData but returns more information. We include * all PGXACTs with an assigned TransactionId, even VACUUM processes. * - * We acquire XidGenLock, but the caller is responsible for releasing it. - * This ensures that no new XIDs enter the proc array until the caller has - * WAL-logged this snapshot, and releases the lock. + * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for + * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc + * array until the caller has WAL-logged this snapshot, and releases the + * lock. Acquiring ProcArrayLock ensures that no transactions commit until the + * lock is released. * * The returned data structure is statically allocated; caller should not * modify it, and must not assume it is valid past the next call. @@ -1770,6 +1829,15 @@ GetRunningTransactionData(void) } } + /* + * It's important *not* to include the limits set by slots here because + * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those + * were to be included here the initial value could never increase because + * of a circular dependency where slots only increase their limits when + * running xacts increases oldestRunningXid and running xacts only + * increases if slots do. + */ + CurrentRunningXacts->xcnt = count - subcount; CurrentRunningXacts->subxcnt = subcount; CurrentRunningXacts->subxid_overflow = suboverflowed; @@ -1777,13 +1845,12 @@ GetRunningTransactionData(void) CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->latestCompletedXid = latestCompletedXid; - /* We don't release XidGenLock here, the caller is responsible for that */ - LWLockRelease(ProcArrayLock); - Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid)); + /* We don't release the locks here, the caller is responsible for that */ + return CurrentRunningXacts; } @@ -1853,6 +1920,92 @@ GetOldestActiveTransactionId(void) } /* + * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum + * + * Returns the oldest xid that we can guarantee not to have been affected by + * vacuum, i.e. no rows >= that xid have been vacuumed away unless the + * transaction aborted. Note that the value can (and most of the time will) be + * much more conservative than what really has been affected by vacuum, but we + * currently don't have better data available. + * + * This is useful to initalize the cutoff xid after which a new changeset + * extraction replication slot can start decoding changes. + * + * Must be called with ProcArrayLock held either shared or exclusively, + * although most callers will want to use exclusive mode since it is expected + * that the caller will immediately use the xid to peg the xmin horizon. + */ +TransactionId +GetOldestSafeDecodingTransactionId(void) +{ + ProcArrayStruct *arrayP = procArray; + TransactionId oldestSafeXid; + int index; + bool recovery_in_progress = RecoveryInProgress(); + + Assert(LWLockHeldByMe(ProcArrayLock)); + + /* + * Acquire XidGenLock, so no transactions can acquire an xid while we're + * running. If no transaction with xid were running concurrently a new xid + * could influence the the RecentXmin et al. + * + * We initialize the computation to nextXid since that's guaranteed to be + * a safe, albeit pessimal, value. + */ + LWLockAcquire(XidGenLock, LW_SHARED); + oldestSafeXid = ShmemVariableCache->nextXid; + + /* + * If there's already a slot pegging the xmin horizon, we can start with + * that value, it's guaranteed to be safe since it's computed by this + * routine initally and has been enforced since. + */ + if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && + TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, + oldestSafeXid)) + oldestSafeXid = procArray->replication_slot_catalog_xmin; + + /* + * If we're not in recovery, we walk over the procarray and collect the + * lowest xid. Since we're called with ProcArrayLock held and have + * acquired XidGenLock, no entries can vanish concurrently, since + * PGXACT->xid is only set with XidGenLock held and only cleared with + * ProcArrayLock held. + * + * In recovery we can't lower the safe value besides what we've computed + * above, so we'll have to wait a bit longer there. We unfortunately can + * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids + * machinery can miss values and return an older value than is safe. + */ + if (!recovery_in_progress) + { + /* + * Spin over procArray collecting all min(PGXACT->xid) + */ + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGXACT *pgxact = &allPgXact[pgprocno]; + TransactionId xid; + + /* Fetch xid just once - see GetNewTransactionId */ + xid = pgxact->xid; + + if (!TransactionIdIsNormal(xid)) + continue; + + if (TransactionIdPrecedes(xid, oldestSafeXid)) + oldestSafeXid = xid; + } + } + + LWLockRelease(XidGenLock); + + return oldestSafeXid; +} + +/* * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are * delaying checkpoint because they have critical actions in progress. * @@ -2523,10 +2676,39 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) * replicaton slots. */ void -ProcArraySetReplicationSlotXmin(TransactionId xmin) +ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, + bool already_locked) { - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + Assert(!already_locked || LWLockHeldByMe(ProcArrayLock)); + + if (!already_locked) + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + procArray->replication_slot_xmin = xmin; + procArray->replication_slot_catalog_xmin = catalog_xmin; + + if (!already_locked) + LWLockRelease(ProcArrayLock); +} + +/* + * ProcArrayGetReplicationSlotXmin + * + * Return the current slot xmin limits. That's useful to be able to remove + * data that's older than those limits. + */ +void +ProcArrayGetReplicationSlotXmin(TransactionId *xmin, + TransactionId *catalog_xmin) +{ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + if (xmin != NULL) + *xmin = procArray->replication_slot_xmin; + + if (catalog_xmin != NULL) + *catalog_xmin = procArray->replication_slot_catalog_xmin; + LWLockRelease(ProcArrayLock); } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index fb5f18edfc7..aa8bea5538b 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -800,7 +800,9 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record) /* * Log details of the current snapshot to WAL. This allows the snapshot state - * to be reconstructed on the standby. + * to be reconstructed on the standby and for logical decoding. + * + * This is used for Hot Standby as follows: * * We can move directly to STANDBY_SNAPSHOT_READY at startup if we * start from a shutdown checkpoint because we know nothing was running @@ -854,6 +856,12 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record) * Zero xids should no longer be possible, but we may be replaying WAL * from a time when they were possible. * + * For logical decoding only the running xacts information is needed; + * there's no need to look at the locking information, but it's logged anyway, + * as there's no independent knob to just enable logical decoding. For + * details of how this is used, check snapbuild.c's introductory comment. + * + * * Returns the RecPtr of the last inserted record. */ XLogRecPtr @@ -879,8 +887,28 @@ LogStandbySnapshot(void) * record we write, because standby will open up when it sees this. */ running = GetRunningTransactionData(); + + /* + * GetRunningTransactionData() acquired ProcArrayLock, we must release + * it. For Hot Standby this can be done before inserting the WAL record + * because ProcArrayApplyRecoveryInfo() rechecks the commit status using + * the clog. For logical decoding, though, the lock can't be released + * early becuase the clog might be "in the future" from the POV of the + * historic snapshot. This would allow for situations where we're waiting + * for the end of a transaction listed in the xl_running_xacts record + * which, according to the WAL, have commit before the xl_running_xacts + * record. Fortunately this routine isn't executed frequently, and it's + * only a shared lock. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + LWLockRelease(ProcArrayLock); + recptr = LogCurrentRunningXacts(running); + /* Release lock if we kept it longer ... */ + if (wal_level >= WAL_LEVEL_LOGICAL) + LWLockRelease(ProcArrayLock); + /* GetRunningTransactionData() acquired XidGenLock, we must release it */ LWLockRelease(XidGenLock); |