aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/transam/twophase.c115
-rw-r--r--src/backend/access/transam/varsup.c11
-rw-r--r--src/backend/access/transam/xact.c4
3 files changed, 77 insertions, 53 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477982d5fa5..d2fecb1ecb9 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -113,7 +113,8 @@ int max_prepared_xacts = 0;
typedef struct GlobalTransactionData
{
- PGPROC proc; /* dummy proc */
+ GlobalTransaction next;
+ int pgprocno; /* dummy proc */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
@@ -207,7 +208,8 @@ TwoPhaseShmemInit(void)
sizeof(GlobalTransaction) * max_prepared_xacts));
for (i = 0; i < max_prepared_xacts; i++)
{
- gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+ gxacts[i].next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = &gxacts[i];
/*
@@ -243,6 +245,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
TimestampTz prepared_at, Oid owner, Oid databaseid)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGXACT *pgxact;
int i;
if (strlen(gid) >= GIDSIZE)
@@ -274,7 +278,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
TwoPhaseState->numPrepXacts--;
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
/* Back up index count too, so we don't miss scanning one */
i--;
@@ -302,32 +306,36 @@ MarkAsPreparing(TransactionId xid, const char *gid,
errhint("Increase max_prepared_transactions (currently %d).",
max_prepared_xacts)));
gxact = TwoPhaseState->freeGXacts;
- TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
+ TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
- /* Initialize it */
- MemSet(&gxact->proc, 0, sizeof(PGPROC));
- SHMQueueElemInit(&(gxact->proc.links));
- gxact->proc.waitStatus = STATUS_OK;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+ /* Initialize the PGPROC entry */
+ MemSet(proc, 0, sizeof(PGPROC));
+ proc->pgprocno = gxact->pgprocno;
+ SHMQueueElemInit(&(proc->links));
+ proc->waitStatus = STATUS_OK;
/* We set up the gxact's VXID as InvalidBackendId/XID */
- gxact->proc.lxid = (LocalTransactionId) xid;
- gxact->proc.xid = xid;
- gxact->proc.xmin = InvalidTransactionId;
- gxact->proc.pid = 0;
- gxact->proc.backendId = InvalidBackendId;
- gxact->proc.databaseId = databaseid;
- gxact->proc.roleId = owner;
- gxact->proc.inCommit = false;
- gxact->proc.vacuumFlags = 0;
- gxact->proc.lwWaiting = false;
- gxact->proc.lwExclusive = false;
- gxact->proc.lwWaitLink = NULL;
- gxact->proc.waitLock = NULL;
- gxact->proc.waitProcLock = NULL;
+ proc->lxid = (LocalTransactionId) xid;
+ pgxact->xid = xid;
+ pgxact->xmin = InvalidTransactionId;
+ pgxact->inCommit = false;
+ pgxact->vacuumFlags = 0;
+ proc->pid = 0;
+ proc->backendId = InvalidBackendId;
+ proc->databaseId = databaseid;
+ proc->roleId = owner;
+ proc->lwWaiting = false;
+ proc->lwExclusive = false;
+ proc->lwWaitLink = NULL;
+ proc->waitLock = NULL;
+ proc->waitProcLock = NULL;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- SHMQueueInit(&(gxact->proc.myProcLocks[i]));
+ SHMQueueInit(&(proc->myProcLocks[i]));
/* subxid data must be filled later by GXactLoadSubxactData */
- gxact->proc.subxids.overflowed = false;
- gxact->proc.subxids.nxids = 0;
+ pgxact->overflowed = false;
+ pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
/* initialize LSN to 0 (start of WAL) */
@@ -358,17 +366,19 @@ static void
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
TransactionId *children)
{
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
/* We need no extra lock since the GXACT isn't valid yet */
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
{
- gxact->proc.subxids.overflowed = true;
+ pgxact->overflowed = true;
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
}
if (nsubxacts > 0)
{
- memcpy(gxact->proc.subxids.xids, children,
+ memcpy(proc->subxids.xids, children,
nsubxacts * sizeof(TransactionId));
- gxact->proc.subxids.nxids = nsubxacts;
+ pgxact->nxids = nsubxacts;
}
}
@@ -389,7 +399,7 @@ MarkAsPrepared(GlobalTransaction gxact)
* Put it into the global ProcArray so TransactionIdIsInProgress considers
* the XID as still running.
*/
- ProcArrayAdd(&gxact->proc);
+ ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
}
/*
@@ -406,6 +416,7 @@ LockGXact(const char *gid, Oid user)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
/* Ignore not-yet-valid GIDs */
if (!gxact->valid)
@@ -436,7 +447,7 @@ LockGXact(const char *gid, Oid user)
* there may be some other issues as well. Hence disallow until
* someone gets motivated to make it work.
*/
- if (MyDatabaseId != gxact->proc.databaseId)
+ if (MyDatabaseId != proc->databaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("prepared transaction belongs to another database"),
@@ -483,7 +494,7 @@ RemoveGXact(GlobalTransaction gxact)
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
LWLockRelease(TwoPhaseStateLock);
@@ -518,8 +529,9 @@ TransactionIdIsPrepared(TransactionId xid)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid && gxact->proc.xid == xid)
+ if (gxact->valid && pgxact->xid == xid)
{
result = true;
break;
@@ -642,6 +654,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
while (status->array != NULL && status->currIdx < status->ngxacts)
{
GlobalTransaction gxact = &status->array[status->currIdx++];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
Datum values[5];
bool nulls[5];
HeapTuple tuple;
@@ -656,11 +670,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
- values[0] = TransactionIdGetDatum(gxact->proc.xid);
+ values[0] = TransactionIdGetDatum(pgxact->xid);
values[1] = CStringGetTextDatum(gxact->gid);
values[2] = TimestampTzGetDatum(gxact->prepared_at);
values[3] = ObjectIdGetDatum(gxact->owner);
- values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+ values[4] = ObjectIdGetDatum(proc->databaseId);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
@@ -711,10 +725,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->proc.xid == xid)
+ if (pgxact->xid == xid)
{
- result = &gxact->proc;
+ result = &ProcGlobal->allProcs[gxact->pgprocno];
break;
}
}
@@ -841,7 +856,9 @@ save_state_data(const void *data, uint32 len)
void
StartPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ TransactionId xid = pgxact->xid;
TwoPhaseFileHeader hdr;
TransactionId *children;
RelFileNode *commitrels;
@@ -865,7 +882,7 @@ StartPrepare(GlobalTransaction gxact)
hdr.magic = TWOPHASE_MAGIC;
hdr.total_len = 0; /* EndPrepare will fill this in */
hdr.xid = xid;
- hdr.database = gxact->proc.databaseId;
+ hdr.database = proc->databaseId;
hdr.prepared_at = gxact->prepared_at;
hdr.owner = gxact->owner;
hdr.nsubxacts = xactGetCommittedChildren(&children);
@@ -913,7 +930,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
XLogRecData *record;
@@ -1021,7 +1039,7 @@ EndPrepare(GlobalTransaction gxact)
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head);
@@ -1069,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
* checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing.
*/
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
@@ -1242,6 +1260,8 @@ void
FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGXACT *pgxact;
TransactionId xid;
char *buf;
char *bufptr;
@@ -1260,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
* try to commit the same GID at once.
*/
gxact = LockGXact(gid, GetUserId());
- xid = gxact->proc.xid;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ xid = pgxact->xid;
/*
* Read and validate the state file
@@ -1309,7 +1331,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
- ProcArrayRemove(&gxact->proc, latestXid);
+ ProcArrayRemove(proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
@@ -1540,10 +1562,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid &&
XLByteLE(gxact->prepare_lsn, redo_horizon))
- xids[nxids++] = gxact->proc.xid;
+ xids[nxids++] = pgxact->xid;
}
LWLockRelease(TwoPhaseStateLock);
@@ -1972,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
@@ -2037,7 +2060,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 61dcfedad43..443e5e4ea66 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -54,7 +54,7 @@ GetNewTransactionId(bool isSubXact)
if (IsBootstrapProcessingMode())
{
Assert(!isSubXact);
- MyProc->xid = BootstrapTransactionId;
+ MyPgXact->xid = BootstrapTransactionId;
return BootstrapTransactionId;
}
@@ -208,20 +208,21 @@ GetNewTransactionId(bool isSubXact)
* TransactionId and int fetch/store are atomic.
*/
volatile PGPROC *myproc = MyProc;
+ volatile PGXACT *mypgxact = MyPgXact;
if (!isSubXact)
- myproc->xid = xid;
+ mypgxact->xid = xid;
else
{
- int nxids = myproc->subxids.nxids;
+ int nxids = mypgxact->nxids;
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
{
myproc->subxids.xids[nxids] = xid;
- myproc->subxids.nxids = nxids + 1;
+ mypgxact->nxids = nxids + 1;
}
else
- myproc->subxids.overflowed = true;
+ mypgxact->overflowed = true;
}
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c151d3be191..c383011b5f6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -981,7 +981,7 @@ RecordTransactionCommit(void)
* bit fuzzy, but it doesn't matter.
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
SetCurrentTransactionStopTimestamp();
@@ -1155,7 +1155,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
}