aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/snapbuild.c14
-rw-r--r--src/backend/storage/ipc/procarray.c11
-rw-r--r--src/backend/storage/lmgr/predicate.c26
-rw-r--r--src/backend/utils/time/snapmgr.c119
-rw-r--r--src/include/storage/predicate.h4
-rw-r--r--src/include/storage/procarray.h2
6 files changed, 112 insertions, 64 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 8848f5b4ec1..e06aa0992a6 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -262,7 +262,7 @@ static bool ExportInProgress = false;
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
static void SnapBuildFreeSnapshot(Snapshot snap);
@@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap)
* and ->subxip/subxcnt values.
*/
static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
+SnapBuildBuildSnapshot(SnapBuild *builder)
{
Snapshot snapshot;
Size ssize;
@@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
if (TransactionIdIsValid(MyPgXact->xmin))
elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
- snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
+ snap = SnapBuildBuildSnapshot(builder);
/*
* We know that snap->xmin is alive, enforced by the logical xmin
@@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
@@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
@@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
if (builder->snapshot)
SnapBuildSnapDecRefcount(builder->snapshot);
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* we might need to execute invalidations, add snapshot */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
{
SnapBuildSnapDecRefcount(builder->snapshot);
}
- builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferSetRestartPoint(builder->reorder, lsn);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a715367918..dfddfc4002c 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot)
* Returns TRUE if successful, FALSE if source xact is no longer running.
*/
bool
-ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
+ProcArrayInstallImportedXmin(TransactionId xmin,
+ VirtualTransactionId *sourcevxid)
{
bool result = false;
ProcArrayStruct *arrayP = procArray;
int index;
Assert(TransactionIdIsNormal(xmin));
- if (!TransactionIdIsNormal(sourcexid))
+ if (!sourcevxid)
return false;
/* Get lock so source xact can't end while we're doing this */
@@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
- xid = pgxact->xid; /* fetch just once */
- if (xid != sourcexid)
+ /* We are only interested in the specific virtual transaction. */
+ if (proc->backendId != sourcevxid->backendId)
+ continue;
+ if (proc->lxid != sourcevxid->localTransactionId)
continue;
/*
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af91cb2..bce505a3fac 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -148,7 +148,7 @@
* predicate lock maintenance
* GetSerializableTransactionSnapshot(Snapshot snapshot)
* SetSerializableTransactionSnapshot(Snapshot snapshot,
- * TransactionId sourcexid)
+ * VirtualTransactionId *sourcevxid)
* RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno,
@@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
static void SummarizeOldestCommittedSxact(void);
static Snapshot GetSafeSnapshot(Snapshot snapshot);
static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
- TransactionId sourcexid);
+ VirtualTransactionId *sourcevxid,
+ int sourcepid);
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
PREDICATELOCKTARGETTAG *parent);
@@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* one passed to it, but we avoid assuming that here.
*/
snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
- InvalidTransactionId);
+ NULL, InvalidPid);
if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */
@@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
return GetSafeSnapshot(snapshot);
return GetSerializableTransactionSnapshotInt(snapshot,
- InvalidTransactionId);
+ NULL, InvalidPid);
}
/*
@@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
*/
void
SetSerializableTransactionSnapshot(Snapshot snapshot,
- TransactionId sourcexid)
+ VirtualTransactionId *sourcevxid,
+ int sourcepid)
{
Assert(IsolationIsSerializable());
@@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
- (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+ (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
+ sourcepid);
}
/*
@@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
*/
static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot,
- TransactionId sourcexid)
+ VirtualTransactionId *sourcevxid,
+ int sourcepid)
{
PGPROC *proc;
VirtualTransactionId vxid;
@@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
} while (!sxact);
/* Get the snapshot, or check that it's safe to use */
- if (!TransactionIdIsValid(sourcexid))
+ if (!sourcevxid)
snapshot = GetSnapshotData(snapshot);
- else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
+ else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
{
ReleasePredXact(sxact);
LWLockRelease(SerializableXactHashLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
- errdetail("The source transaction %u is not running anymore.",
- sourcexid)));
+ errdetail("The source process with pid %d is not running anymore.",
+ sourcepid)));
}
/*
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index b3d4fe3ae2a..2b6fca92414 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -58,6 +58,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -211,11 +212,15 @@ static Snapshot FirstXactSnapshot = NULL;
/* Define pathname of exported-snapshot files */
#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
-#define XactExportFilePath(path, xid, num, suffix) \
- snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
- xid, num, suffix)
-/* Current xact's exported snapshots (a list of Snapshot structs) */
+/* Structure holding info about exported snapshot. */
+typedef struct ExportedSnapshot
+{
+ char *snapfile;
+ Snapshot snapshot;
+} ExportedSnapshot;
+
+/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
@@ -558,8 +563,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot.
*/
static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
- PGPROC *sourceproc)
+SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
+ int sourcepid, PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
@@ -617,12 +622,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
errmsg("could not import the requested snapshot"),
errdetail("The source transaction is not running anymore.")));
}
- else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+ else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
- errdetail("The source transaction %u is not running anymore.",
- sourcexid)));
+ errdetail("The source process with pid %d is not running anymore.",
+ sourcepid)));
/*
* In transaction-snapshot mode, the first snapshot must live until end of
@@ -632,7 +637,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
if (IsolationUsesXactSnapshot())
{
if (IsolationIsSerializable())
- SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
+ SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
+ sourcepid);
/* Make a saved copy */
CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot;
@@ -1075,33 +1081,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
*/
if (exportedSnapshots != NIL)
{
- TransactionId myxid = GetTopTransactionId();
- int i;
- char buf[MAXPGPATH];
ListCell *lc;
/*
* Get rid of the files. Unlink failure is only a WARNING because (1)
* it's too late to abort the transaction, and (2) leaving a leaked
* file around has little real consequence anyway.
- */
- for (i = 1; i <= list_length(exportedSnapshots); i++)
- {
- XactExportFilePath(buf, myxid, i, "");
- if (unlink(buf))
- elog(WARNING, "could not unlink file \"%s\": %m", buf);
- }
-
- /*
- * As with the FirstXactSnapshot, we needn't spend any effort on
- * cleaning up the per-snapshot data structures, but we do need to
- * remove them from RegisteredSnapshots to prevent a warning below.
+ *
+ * We also also need to remove the snapshots from RegisteredSnapshots
+ * to prevent a warning below.
+ *
+ * As with the FirstXactSnapshot, we don't need to free resources of
+ * the snapshot iself as it will go away with the memory context.
*/
foreach(lc, exportedSnapshots)
{
- Snapshot snap = (Snapshot) lfirst(lc);
+ ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
- pairingheap_remove(&RegisteredSnapshots, &snap->ph_node);
+ if (unlink(esnap->snapfile))
+ elog(WARNING, "could not unlink file \"%s\": %m",
+ esnap->snapfile);
+
+ pairingheap_remove(&RegisteredSnapshots,
+ &esnap->snapshot->ph_node);
}
exportedSnapshots = NIL;
@@ -1159,6 +1161,7 @@ ExportSnapshot(Snapshot snapshot)
{
TransactionId topXid;
TransactionId *children;
+ ExportedSnapshot *esnap;
int nchildren;
int addTopXid;
StringInfoData buf;
@@ -1183,9 +1186,9 @@ ExportSnapshot(Snapshot snapshot)
*/
/*
- * This will assign a transaction ID if we do not yet have one.
+ * Get our transaction ID if there is one, to include in the snapshot.
*/
- topXid = GetTopTransactionId();
+ topXid = GetTopTransactionIdIfAny();
/*
* We cannot export a snapshot from a subtransaction because there's no
@@ -1205,6 +1208,13 @@ ExportSnapshot(Snapshot snapshot)
nchildren = xactGetCommittedChildren(&children);
/*
+ * Generate file path for the snapshot. We start numbering of snapshots
+ * inside the transaction from 1.
+ */
+ snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
+ MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
+
+ /*
* Copy the snapshot into TopTransactionContext, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to
* ensure that the snapshot's xmin is honored for the rest of the
@@ -1213,7 +1223,10 @@ ExportSnapshot(Snapshot snapshot)
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
- exportedSnapshots = lappend(exportedSnapshots, snapshot);
+ esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
+ esnap->snapfile = pstrdup(path);
+ esnap->snapshot = snapshot;
+ exportedSnapshots = lappend(exportedSnapshots, esnap);
MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++;
@@ -1226,7 +1239,8 @@ ExportSnapshot(Snapshot snapshot)
*/
initStringInfo(&buf);
- appendStringInfo(&buf, "xid:%u\n", topXid);
+ appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
+ appendStringInfo(&buf, "pid:%d\n", MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
@@ -1245,7 +1259,8 @@ ExportSnapshot(Snapshot snapshot)
* xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.)
*/
- addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
+ addTopXid = (TransactionIdIsValid(topXid) &&
+ TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
for (i = 0; i < snapshot->xcnt; i++)
appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
@@ -1276,7 +1291,7 @@ ExportSnapshot(Snapshot snapshot)
* ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check).
*/
- XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
+ snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR,
(errcode_for_file_access(),
@@ -1298,8 +1313,6 @@ ExportSnapshot(Snapshot snapshot)
* Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix.
*/
- XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
-
if (rename(pathtmp, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -1384,6 +1397,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename)
return val;
}
+static void
+parseVxidFromText(const char *prefix, char **s, const char *filename,
+ VirtualTransactionId *vxid)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+}
+
/*
* ImportSnapshot
* Import a previously exported snapshot. The argument should be a
@@ -1399,7 +1436,8 @@ ImportSnapshot(const char *idstr)
char *filebuf;
int xcnt;
int i;
- TransactionId src_xid;
+ VirtualTransactionId src_vxid;
+ int src_pid;
Oid src_dbid;
int src_isolevel;
bool src_readonly;
@@ -1463,7 +1501,8 @@ ImportSnapshot(const char *idstr)
*/
memset(&snapshot, 0, sizeof(snapshot));
- src_xid = parseXidFromText("xid:", &filebuf, path);
+ parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
+ src_pid = parseIntFromText("pid:", &filebuf, path);
/* we abuse parseXidFromText a bit here ... */
src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path);
@@ -1513,7 +1552,7 @@ ImportSnapshot(const char *idstr)
* don't trouble to check the array elements, just the most critical
* fields.
*/
- if (!TransactionIdIsNormal(src_xid) ||
+ if (!VirtualTransactionIdIsValid(src_vxid) ||
!OidIsValid(src_dbid) ||
!TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax))
@@ -1554,7 +1593,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
- SetTransactionSnapshot(&snapshot, src_xid, NULL);
+ SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
}
/*
@@ -2141,5 +2180,5 @@ RestoreSnapshot(char *start_address)
void
RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
{
- SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+ SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
}
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 8f9ea29917b..941ba7119ef 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -14,6 +14,7 @@
#ifndef PREDICATE_H
#define PREDICATE_H
+#include "storage/lock.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
@@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
/* predicate lock maintenance */
extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
- TransactionId sourcexid);
+ VirtualTransactionId *sourcevxid,
+ int sourcepid);
extern void RegisterPredicateLockingXid(TransactionId xid);
extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 22955a79dd4..5cf8ff75384 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -82,7 +82,7 @@ extern int GetMaxSnapshotSubxidCount(void);
extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
- TransactionId sourcexid);
+ VirtualTransactionId *sourcevxid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
extern RunningTransactions GetRunningTransactionData(void);