aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/commit_ts.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/commit_ts.c')
-rw-r--r--src/backend/access/transam/commit_ts.c149
1 files changed, 79 insertions, 70 deletions
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 24b82910835..b21a31345f7 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
#define CommitTsCtl (&CommitTsCtlData)
/*
- * We keep a cache of the last value set in shared memory. This is protected
- * by CommitTsLock.
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status. We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock. In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
*/
typedef struct CommitTimestampShared
{
TransactionId xidLastCommit;
CommitTimestampEntry dataLastCommit;
+ bool commitTsActive;
} CommitTimestampShared;
CommitTimestampShared *commitTsShared;
@@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp;
-/*
- * When this is set, commit_ts is force-enabled during recovery. This is so
- * that a standby can replay WAL records coming from a master with the setting
- * enabled. (Note that this doesn't enable SQL access to the data; it's
- * effectively write-only until the GUC itself is enabled.)
- */
-static bool enable_during_recovery;
-
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int pageno);
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2);
static void ActivateCommitTs(void);
-static void DeactivateCommitTs(bool do_wal);
+static void DeactivateCommitTs(void);
static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
@@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId newestXact;
/*
- * No-op if the module is not enabled, but allow writes in a standby
- * during recovery.
+ * No-op if the module is not active.
+ *
+ * An unlocked read here is fine, because in a standby (the only place
+ * where the flag can change in flight) this routine is only called by
+ * the recovery process, which is also the only process which can change
+ * the flag.
*/
- if (!track_commit_timestamp && !enable_during_recovery)
+ if (!commitTsShared->commitTsActive)
return;
/*
@@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
TransactionId oldestCommitTs;
TransactionId newestCommitTs;
+ /* error if the given Xid doesn't normally commit */
+ if (!TransactionIdIsNormal(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
+
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
/* Error if module not enabled */
- if (!track_commit_timestamp)
+ if (!commitTsShared->commitTsActive)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not get commit timestamp data"),
errhint("Make sure the configuration parameter \"%s\" is set.",
"track_commit_timestamp")));
- /* error if the given Xid doesn't normally commit */
- if (!TransactionIdIsNormal(xid))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
-
/*
- * Return empty if the requested value is outside our valid range.
+ * If we're asked for the cached value, return that. Otherwise, fall
+ * through to read from SLRU.
*/
- LWLockAcquire(CommitTsLock, LW_SHARED);
+ if (commitTsShared->xidLastCommit == xid)
+ {
+ *ts = commitTsShared->dataLastCommit.time;
+ if (nodeid)
+ *nodeid = commitTsShared->dataLastCommit.nodeid;
+
+ LWLockRelease(CommitTsLock);
+ return *ts != 0;
+ }
+
oldestCommitTs = ShmemVariableCache->oldestCommitTs;
newestCommitTs = ShmemVariableCache->newestCommitTs;
/* neither is invalid, or both are */
Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
LWLockRelease(CommitTsLock);
+ /*
+ * Return empty if the requested value is outside our valid range.
+ */
if (!TransactionIdIsValid(oldestCommitTs) ||
TransactionIdPrecedes(xid, oldestCommitTs) ||
TransactionIdPrecedes(newestCommitTs, xid))
@@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
return false;
}
- /*
- * Use an unlocked atomic read on our cached value in shared memory; if
- * it's a hit, acquire a lock and read the data, after verifying that it's
- * still what we initially read. Otherwise, fall through to read from
- * SLRU.
- */
- if (commitTsShared->xidLastCommit == xid)
- {
- LWLockAcquire(CommitTsLock, LW_SHARED);
- if (commitTsShared->xidLastCommit == xid)
- {
- *ts = commitTsShared->dataLastCommit.time;
- if (nodeid)
- *nodeid = commitTsShared->dataLastCommit.nodeid;
-
- LWLockRelease(CommitTsLock);
- return *ts != 0;
- }
- LWLockRelease(CommitTsLock);
- }
-
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
memcpy(&entry,
@@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
{
TransactionId xid;
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
/* Error if module not enabled */
- if (!track_commit_timestamp)
+ if (!commitTsShared->commitTsActive)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not get commit timestamp data"),
errhint("Make sure the configuration parameter \"%s\" is set.",
"track_commit_timestamp")));
- LWLockAcquire(CommitTsLock, LW_SHARED);
xid = commitTsShared->xidLastCommit;
if (ts)
*ts = commitTsShared->dataLastCommit.time;
@@ -493,6 +492,7 @@ CommitTsShmemInit(void)
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->commitTsActive = false;
}
else
Assert(found);
@@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
* any leftover data.
*/
if (!track_commit_timestamp)
- DeactivateCommitTs(true);
+ DeactivateCommitTs();
}
/*
@@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
*/
if (newvalue)
{
- if (!track_commit_timestamp && !oldvalue)
+ if (!commitTsShared->commitTsActive)
ActivateCommitTs();
}
- else if (!track_commit_timestamp && oldvalue)
- DeactivateCommitTs(false);
+ else if (commitTsShared->commitTsActive)
+ DeactivateCommitTs();
}
/*
@@ -645,7 +645,7 @@ ActivateCommitTs(void)
}
LWLockRelease(CommitTsLock);
- /* Finally, create the current segment file, if necessary */
+ /* Create the current segment file, if necessary */
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
{
int slotno;
@@ -657,8 +657,10 @@ ActivateCommitTs(void)
LWLockRelease(CommitTsControlLock);
}
- /* We can now replay xlog records from this module */
- enable_during_recovery = true;
+ /* Change the activation status in shared memory. */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ commitTsShared->commitTsActive = true;
+ LWLockRelease(CommitTsLock);
}
/*
@@ -672,21 +674,25 @@ ActivateCommitTs(void)
* possibly-invalid data; also removes segments of old data.
*/
static void
-DeactivateCommitTs(bool do_wal)
+DeactivateCommitTs(void)
{
- TransactionId xid = ShmemVariableCache->nextXid;
- int pageno = TransactionIdToCTsPage(xid);
-
/*
- * Re-Initialize our idea of the latest page number.
+ * Cleanup the status in the shared memory.
+ *
+ * We reset everything in the commitTsShared record to prevent user from
+ * getting confusing data about last committed transaction on the standby
+ * when the module was activated repeatedly on the primary.
*/
- LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
- CommitTsCtl->shared->latest_page_number = pageno;
- LWLockRelease(CommitTsControlLock);
-
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+
+ commitTsShared->commitTsActive = false;
+ commitTsShared->xidLastCommit = InvalidTransactionId;
+ TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+
LWLockRelease(CommitTsLock);
/*
@@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
*/
+ LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-
- /* No longer enabled on recovery */
- enable_during_recovery = false;
+ LWLockRelease(CommitTsControlLock);
}
/*
@@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
{
int pageno;
- /* nothing to do if module not enabled */
- if (!track_commit_timestamp && !enable_during_recovery)
+ /*
+ * Nothing to do if module not enabled. Note we do an unlocked read of the
+ * flag here, which is okay because this routine is only called from
+ * GetNewTransactionId, which is never called in a standby.
+ */
+ Assert(!InRecovery);
+ if (!commitTsShared->commitTsActive)
return;
/*
@@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
* Note that we don't need to flush XLOG here.
*/
void
-TruncateCommitTs(TransactionId oldestXact, bool do_wal)
+TruncateCommitTs(TransactionId oldestXact)
{
int cutoffPage;
@@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
return; /* nothing to remove */
/* Write XLOG record */
- if (do_wal)
- WriteTruncateXlogRec(cutoffPage);
+ WriteTruncateXlogRec(cutoffPage);
/* Now we can remove the old CommitTs segment(s) */
SimpleLruTruncate(CommitTsCtl, cutoffPage);