diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/twophase.c | 115 | ||||
-rw-r--r-- | src/backend/access/transam/varsup.c | 11 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 4 |
3 files changed, 77 insertions, 53 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 477982d5fa5..d2fecb1ecb9 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -113,7 +113,8 @@ int max_prepared_xacts = 0; typedef struct GlobalTransactionData { - PGPROC proc; /* dummy proc */ + GlobalTransaction next; + int pgprocno; /* dummy proc */ BackendId dummyBackendId; /* similar to backend id for backends */ TimestampTz prepared_at; /* time of preparation */ XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ @@ -207,7 +208,8 @@ TwoPhaseShmemInit(void) sizeof(GlobalTransaction) * max_prepared_xacts)); for (i = 0; i < max_prepared_xacts; i++) { - gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts; + gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno; + gxacts[i].next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = &gxacts[i]; /* @@ -243,6 +245,8 @@ MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid) { GlobalTransaction gxact; + PGPROC *proc; + PGXACT *pgxact; int i; if (strlen(gid) >= GIDSIZE) @@ -274,7 +278,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, TwoPhaseState->numPrepXacts--; TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; /* and put it back in the freelist */ - gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts; + gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; /* Back up index count too, so we don't miss scanning one */ i--; @@ -302,32 +306,36 @@ MarkAsPreparing(TransactionId xid, const char *gid, errhint("Increase max_prepared_transactions (currently %d).", max_prepared_xacts))); gxact = TwoPhaseState->freeGXacts; - TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next; + TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next; - /* Initialize it */ - MemSet(&gxact->proc, 0, sizeof(PGPROC)); - SHMQueueElemInit(&(gxact->proc.links)); - gxact->proc.waitStatus = STATUS_OK; + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + /* Initialize the PGPROC entry */ + MemSet(proc, 0, sizeof(PGPROC)); + proc->pgprocno = gxact->pgprocno; + SHMQueueElemInit(&(proc->links)); + proc->waitStatus = STATUS_OK; /* We set up the gxact's VXID as InvalidBackendId/XID */ - gxact->proc.lxid = (LocalTransactionId) xid; - gxact->proc.xid = xid; - gxact->proc.xmin = InvalidTransactionId; - gxact->proc.pid = 0; - gxact->proc.backendId = InvalidBackendId; - gxact->proc.databaseId = databaseid; - gxact->proc.roleId = owner; - gxact->proc.inCommit = false; - gxact->proc.vacuumFlags = 0; - gxact->proc.lwWaiting = false; - gxact->proc.lwExclusive = false; - gxact->proc.lwWaitLink = NULL; - gxact->proc.waitLock = NULL; - gxact->proc.waitProcLock = NULL; + proc->lxid = (LocalTransactionId) xid; + pgxact->xid = xid; + pgxact->xmin = InvalidTransactionId; + pgxact->inCommit = false; + pgxact->vacuumFlags = 0; + proc->pid = 0; + proc->backendId = InvalidBackendId; + proc->databaseId = databaseid; + proc->roleId = owner; + proc->lwWaiting = false; + proc->lwExclusive = false; + proc->lwWaitLink = NULL; + proc->waitLock = NULL; + proc->waitProcLock = NULL; for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - SHMQueueInit(&(gxact->proc.myProcLocks[i])); + SHMQueueInit(&(proc->myProcLocks[i])); /* subxid data must be filled later by GXactLoadSubxactData */ - gxact->proc.subxids.overflowed = false; - gxact->proc.subxids.nxids = 0; + pgxact->overflowed = false; + pgxact->nxids = 0; gxact->prepared_at = prepared_at; /* initialize LSN to 0 (start of WAL) */ @@ -358,17 +366,19 @@ static void GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, TransactionId *children) { + PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; /* We need no extra lock since the GXACT isn't valid yet */ if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS) { - gxact->proc.subxids.overflowed = true; + pgxact->overflowed = true; nsubxacts = PGPROC_MAX_CACHED_SUBXIDS; } if (nsubxacts > 0) { - memcpy(gxact->proc.subxids.xids, children, + memcpy(proc->subxids.xids, children, nsubxacts * sizeof(TransactionId)); - gxact->proc.subxids.nxids = nsubxacts; + pgxact->nxids = nsubxacts; } } @@ -389,7 +399,7 @@ MarkAsPrepared(GlobalTransaction gxact) * Put it into the global ProcArray so TransactionIdIsInProgress considers * the XID as still running. */ - ProcArrayAdd(&gxact->proc); + ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]); } /* @@ -406,6 +416,7 @@ LockGXact(const char *gid, Oid user) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; /* Ignore not-yet-valid GIDs */ if (!gxact->valid) @@ -436,7 +447,7 @@ LockGXact(const char *gid, Oid user) * there may be some other issues as well. Hence disallow until * someone gets motivated to make it work. */ - if (MyDatabaseId != gxact->proc.databaseId) + if (MyDatabaseId != proc->databaseId) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("prepared transaction belongs to another database"), @@ -483,7 +494,7 @@ RemoveGXact(GlobalTransaction gxact) TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; /* and put it back in the freelist */ - gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts; + gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; LWLockRelease(TwoPhaseStateLock); @@ -518,8 +529,9 @@ TransactionIdIsPrepared(TransactionId xid) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->valid && gxact->proc.xid == xid) + if (gxact->valid && pgxact->xid == xid) { result = true; break; @@ -642,6 +654,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS) while (status->array != NULL && status->currIdx < status->ngxacts) { GlobalTransaction gxact = &status->array[status->currIdx++]; + PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; Datum values[5]; bool nulls[5]; HeapTuple tuple; @@ -656,11 +670,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS) MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - values[0] = TransactionIdGetDatum(gxact->proc.xid); + values[0] = TransactionIdGetDatum(pgxact->xid); values[1] = CStringGetTextDatum(gxact->gid); values[2] = TimestampTzGetDatum(gxact->prepared_at); values[3] = ObjectIdGetDatum(gxact->owner); - values[4] = ObjectIdGetDatum(gxact->proc.databaseId); + values[4] = ObjectIdGetDatum(proc->databaseId); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); result = HeapTupleGetDatum(tuple); @@ -711,10 +725,11 @@ TwoPhaseGetDummyProc(TransactionId xid) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->proc.xid == xid) + if (pgxact->xid == xid) { - result = &gxact->proc; + result = &ProcGlobal->allProcs[gxact->pgprocno]; break; } } @@ -841,7 +856,9 @@ save_state_data(const void *data, uint32 len) void StartPrepare(GlobalTransaction gxact) { - TransactionId xid = gxact->proc.xid; + PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + TransactionId xid = pgxact->xid; TwoPhaseFileHeader hdr; TransactionId *children; RelFileNode *commitrels; @@ -865,7 +882,7 @@ StartPrepare(GlobalTransaction gxact) hdr.magic = TWOPHASE_MAGIC; hdr.total_len = 0; /* EndPrepare will fill this in */ hdr.xid = xid; - hdr.database = gxact->proc.databaseId; + hdr.database = proc->databaseId; hdr.prepared_at = gxact->prepared_at; hdr.owner = gxact->owner; hdr.nsubxacts = xactGetCommittedChildren(&children); @@ -913,7 +930,8 @@ StartPrepare(GlobalTransaction gxact) void EndPrepare(GlobalTransaction gxact) { - TransactionId xid = gxact->proc.xid; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + TransactionId xid = pgxact->xid; TwoPhaseFileHeader *hdr; char path[MAXPGPATH]; XLogRecData *record; @@ -1021,7 +1039,7 @@ EndPrepare(GlobalTransaction gxact) */ START_CRIT_SECTION(); - MyProc->inCommit = true; + MyPgXact->inCommit = true; gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head); @@ -1069,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact) * checkpoint starting after this will certainly see the gxact as a * candidate for fsyncing. */ - MyProc->inCommit = false; + MyPgXact->inCommit = false; END_CRIT_SECTION(); @@ -1242,6 +1260,8 @@ void FinishPreparedTransaction(const char *gid, bool isCommit) { GlobalTransaction gxact; + PGPROC *proc; + PGXACT *pgxact; TransactionId xid; char *buf; char *bufptr; @@ -1260,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * try to commit the same GID at once. */ gxact = LockGXact(gid, GetUserId()); - xid = gxact->proc.xid; + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + xid = pgxact->xid; /* * Read and validate the state file @@ -1309,7 +1331,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->nabortrels, abortrels); - ProcArrayRemove(&gxact->proc, latestXid); + ProcArrayRemove(proc, latestXid); /* * In case we fail while running the callbacks, mark the gxact invalid so @@ -1540,10 +1562,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; if (gxact->valid && XLByteLE(gxact->prepare_lsn, redo_horizon)) - xids[nxids++] = gxact->proc.xid; + xids[nxids++] = pgxact->xid; } LWLockRelease(TwoPhaseStateLock); @@ -1972,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid, START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ - MyProc->inCommit = true; + MyPgXact->inCommit = true; /* Emit the XLOG commit record */ xlrec.xid = xid; @@ -2037,7 +2060,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionIdCommitTree(xid, nchildren, children); /* Checkpoint can proceed now */ - MyProc->inCommit = false; + MyPgXact->inCommit = false; END_CRIT_SECTION(); diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 61dcfedad43..443e5e4ea66 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -54,7 +54,7 @@ GetNewTransactionId(bool isSubXact) if (IsBootstrapProcessingMode()) { Assert(!isSubXact); - MyProc->xid = BootstrapTransactionId; + MyPgXact->xid = BootstrapTransactionId; return BootstrapTransactionId; } @@ -208,20 +208,21 @@ GetNewTransactionId(bool isSubXact) * TransactionId and int fetch/store are atomic. */ volatile PGPROC *myproc = MyProc; + volatile PGXACT *mypgxact = MyPgXact; if (!isSubXact) - myproc->xid = xid; + mypgxact->xid = xid; else { - int nxids = myproc->subxids.nxids; + int nxids = mypgxact->nxids; if (nxids < PGPROC_MAX_CACHED_SUBXIDS) { myproc->subxids.xids[nxids] = xid; - myproc->subxids.nxids = nxids + 1; + mypgxact->nxids = nxids + 1; } else - myproc->subxids.overflowed = true; + mypgxact->overflowed = true; } } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c151d3be191..c383011b5f6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -981,7 +981,7 @@ RecordTransactionCommit(void) * bit fuzzy, but it doesn't matter. */ START_CRIT_SECTION(); - MyProc->inCommit = true; + MyPgXact->inCommit = true; SetCurrentTransactionStopTimestamp(); @@ -1155,7 +1155,7 @@ RecordTransactionCommit(void) */ if (markXidCommitted) { - MyProc->inCommit = false; + MyPgXact->inCommit = false; END_CRIT_SECTION(); } |