diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/twophase.c | 821 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 18 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 10 | ||||
-rw-r--r-- | src/include/access/twophase.h | 7 |
4 files changed, 550 insertions, 306 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 83169cccc30..d0e2bbf2916 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,27 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * + * During replay and replication, TwoPhaseState also holds information + * about active prepared transactions that haven't been moved to disk yet. + * + * Replay of twophase records happens by the following rules: + * + * * At the beginning of recovery, pg_twophase is scanned once, filling + * TwoPhaseState with entries marked with gxact->inredo and + * gxact->ondisk. Two-phase file data older than the XID horizon of + * the redo position are discarded. + * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts. + * gxact->inredo is set to true for such entries. + * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries + * that have gxact->inredo set and are behind the redo_horizon. We + * save them to disk and then switch gxact->ondisk to true. + * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts. + * If gxact->ondisk is true, the corresponding entry from the disk + * is additionally deleted. + * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions() + * and PrescanPreparedTransactions() have been modified to go through + * gxact->inredo entries that have not made it to disk. * *------------------------------------------------------------------------- */ @@ -147,11 +166,13 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ + TransactionId xid; /* The GXACT id */ Oid owner; /* ID of user that executed the xact */ BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* TRUE if PGPROC entry is in proc array */ bool ondisk; /* TRUE if prepare state file is on disk */ + bool inredo; /* TRUE if entry was added via xlog_redo */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -198,6 +219,15 @@ static void ProcessRecords(char *bufptr, TransactionId xid, static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); +static char *ProcessTwoPhaseBuffer(TransactionId xid, + XLogRecPtr prepare_start_lsn, + bool fromdisk, bool overwriteOK, bool setParent, + TransactionId *result, TransactionId *maxsubxid); +static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, + const char *gid, TimestampTz prepared_at, Oid owner, + Oid databaseid); +static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); +static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); /* * Initialization of shared memory @@ -342,18 +372,12 @@ PostPrepare_Twophase(void) /* * MarkAsPreparing * Reserve the GID for the given transaction. - * - * Internally, this creates a gxact struct and puts it into the active array. - * NOTE: this is also used when reloading a gxact after a crash; so avoid - * assuming that we can use very much backend context. */ GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid) { GlobalTransaction gxact; - PGPROC *proc; - PGXACT *pgxact; int i; if (strlen(gid) >= GIDSIZE) @@ -401,6 +425,37 @@ MarkAsPreparing(TransactionId xid, const char *gid, gxact = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact->next; + MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid); + + gxact->ondisk = false; + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* + * MarkAsPreparingGuts + * + * This uses a gxact struct and puts it into the active array. + * NOTE: this is also used when reloading a gxact after a crash; so avoid + * assuming that we can use very much backend context. + * + * Note: This function should be called with appropriate locks held. + */ +static void +MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, + TimestampTz prepared_at, Oid owner, Oid databaseid) +{ + PGPROC *proc; + PGXACT *pgxact; + int i; + + Assert(gxact != NULL); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -431,28 +486,18 @@ MarkAsPreparing(TransactionId xid, const char *gid, pgxact->nxids = 0; gxact->prepared_at = prepared_at; - /* initialize LSN to InvalidXLogRecPtr */ - gxact->prepare_start_lsn = InvalidXLogRecPtr; - gxact->prepare_end_lsn = InvalidXLogRecPtr; + gxact->xid = xid; gxact->owner = owner; gxact->locking_backend = MyBackendId; gxact->valid = false; - gxact->ondisk = false; + gxact->inredo = false; strcpy(gxact->gid, gid); - /* And insert it into the active array */ - Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); - TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - /* * Remember that we have this GlobalTransaction entry locked for us. If we * abort after this, we must release it. */ MyLockedGxact = gxact; - - LWLockRelease(TwoPhaseStateLock); - - return gxact; } /* @@ -1244,9 +1289,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note clearly that this function can access WAL during normal operation, + * similarly to the way WALSender or Logical Decoding would do. + * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1255,8 +1300,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1501,7 +1544,7 @@ ProcessRecords(char *bufptr, TransactionId xid, * If giveWarning is false, do not complain about file-not-present; * this is an expected case during WAL replay. */ -void +static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) { char path[MAXPGPATH]; @@ -1521,7 +1564,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) * * Note: content and len don't include CRC. */ -void +static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len) { char path[MAXPGPATH]; @@ -1587,9 +1630,11 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) /* * CheckPointTwoPhase -- handle 2PC component of checkpointing. * - * We must fsync the state file of any GXACT that is valid and has a PREPARE - * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or - * has a later LSN, this checkpoint is not responsible for fsyncing it.) + * We must fsync the state file of any GXACT that is valid or has been + * generated during redo and has a PREPARE LSN <= the checkpoint's redo + * horizon. (If the gxact isn't valid yet, has not been generated in + * redo, or has a later LSN, this checkpoint is not responsible for + * fsyncing it.) * * This is deliberately run as late as possible in the checkpoint sequence, * because GXACTs ordinarily have short lifespans, and so it is quite @@ -1631,10 +1676,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { + /* Note that we are using gxact not pgxact so this works in recovery also */ GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->valid && + if ((gxact->valid || gxact->inredo) && !gxact->ondisk && gxact->prepare_end_lsn <= redo_horizon) { @@ -1642,8 +1687,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(pgxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->xid, buf, len); gxact->ondisk = true; + gxact->prepare_start_lsn = InvalidXLogRecPtr; + gxact->prepare_end_lsn = InvalidXLogRecPtr; pfree(buf); serialized_xacts++; } @@ -1671,12 +1718,49 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) } /* + * restoreTwoPhaseData + * + * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data. + * This is called once at the beginning of recovery, saving any extra + * lookups in the future. Two-phase files that are newer than the + * minimum XID horizon are discarded on the way. + */ +void +restoreTwoPhaseData(void) +{ + DIR *cldir; + struct dirent *clde; + + cldir = AllocateDir(TWOPHASE_DIR); + while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + char *buf; + + xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + + buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, + true, false, false, + NULL, NULL); + if (buf == NULL) + continue; + + PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + } + } + FreeDir(cldir); +} + +/* * PrescanPreparedTransactions * - * Scan the pg_twophase directory and determine the range of valid XIDs - * present. This is run during database startup, after we have completed - * reading WAL. ShmemVariableCache->nextXid has been set to one more than - * the highest XID for which evidence exists in WAL. + * Scan the shared memory entries of TwoPhaseState and determine the range + * of valid XIDs present. This is run during database startup, after we + * have completed reading WAL. ShmemVariableCache->nextXid has been set to + * one more than the highest XID for which evidence exists in WAL. * * We throw away any prepared xacts with main XID beyond nextXid --- if any * are present, it suggests that the DBA has done a PITR recovery to an @@ -1702,120 +1786,52 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) { TransactionId origNextXid = ShmemVariableCache->nextXid; TransactionId result = origNextXid; - DIR *cldir; - struct dirent *clde; + TransactionId maxsubxid = origNextXid; TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; + int i; - cldir = AllocateDir(TWOPHASE_DIR); - while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - if (strlen(clde->d_name) == 8 && - strspn(clde->d_name, "0123456789ABCDEF") == 8) - { - TransactionId xid; - char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; - - xid = (TransactionId) strtoul(clde->d_name, NULL, 16); - - /* Reject XID if too new */ - if (TransactionIdFollowsOrEquals(xid, origNextXid)) - { - ereport(WARNING, - (errmsg("removing future two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + TransactionId xid; + char *buf; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - /* - * Note: we can't check if already processed because clog - * subsystem isn't up yet. - */ + Assert(gxact->inredo); - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + xid = gxact->xid; - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - pfree(buf); - continue; - } + buf = ProcessTwoPhaseBuffer(xid, + gxact->prepare_start_lsn, + gxact->ondisk, false, false, + &result, &maxsubxid); - /* - * OK, we think this file is valid. Incorporate xid into the - * running-minimum result. - */ - if (TransactionIdPrecedes(xid, result)) - result = xid; + if (buf == NULL) + continue; - /* - * Examine subtransaction XIDs ... they should all follow main - * XID, and they may force us to advance nextXid. - * - * We don't expect anyone else to modify nextXid, hence we don't - * need to hold a lock while examining it. We still acquire the - * lock to modify it, though. - */ - subxids = (TransactionId *) (buf + - MAXALIGN(sizeof(TwoPhaseFileHeader)) + - MAXALIGN(hdr->gidlen)); - for (i = 0; i < hdr->nsubxacts; i++) + if (xids_p) + { + if (nxids == allocsize) { - TransactionId subxid = subxids[i]; - - Assert(TransactionIdFollows(subxid, xid)); - if (TransactionIdFollowsOrEquals(subxid, - ShmemVariableCache->nextXid)) + if (nxids == 0) { - LWLockAcquire(XidGenLock, LW_EXCLUSIVE); - ShmemVariableCache->nextXid = subxid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - LWLockRelease(XidGenLock); + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); } - } - - - if (xids_p) - { - if (nxids == allocsize) + else { - if (nxids == 0) - { - allocsize = 10; - xids = palloc(allocsize * sizeof(TransactionId)); - } - else - { - allocsize = allocsize * 2; - xids = repalloc(xids, allocsize * sizeof(TransactionId)); - } + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); } - xids[nxids++] = xid; } - - pfree(buf); + xids[nxids++] = xid; } + + pfree(buf); } - FreeDir(cldir); + LWLockRelease(TwoPhaseStateLock); if (xids_p) { @@ -1823,14 +1839,25 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) *nxids_p = nxids; } + /* update nextXid if needed */ + if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid)) + { + LWLockAcquire(XidGenLock, LW_EXCLUSIVE); + ShmemVariableCache->nextXid = maxsubxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + LWLockRelease(XidGenLock); + } + return result; } /* * StandbyRecoverPreparedTransactions * - * Scan the pg_twophase directory and setup all the required information to - * allow standby queries to treat prepared transactions as still active. + * Scan the shared memory entries of TwoPhaseState and setup all the required + * information to allow standby queries to treat prepared transactions as still + * active. + * * This is never called at the end of recovery - we use * RecoverPreparedTransactions() at that point. * @@ -1841,202 +1868,292 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) void StandbyRecoverPreparedTransactions(bool overwriteOK) { - DIR *cldir; - struct dirent *clde; + int i; - cldir = AllocateDir(TWOPHASE_DIR); - while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - if (strlen(clde->d_name) == 8 && - strspn(clde->d_name, "0123456789ABCDEF") == 8) - { - TransactionId xid; - char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; - - xid = (TransactionId) strtoul(clde->d_name, NULL, 16); - - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } - - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + TransactionId xid; + char *buf; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - pfree(buf); - continue; - } + Assert(gxact->inredo); - /* - * Examine subtransaction XIDs ... they should all follow main - * XID. - */ - subxids = (TransactionId *) (buf + - MAXALIGN(sizeof(TwoPhaseFileHeader)) + - MAXALIGN(hdr->gidlen)); - for (i = 0; i < hdr->nsubxacts; i++) - { - TransactionId subxid = subxids[i]; - - Assert(TransactionIdFollows(subxid, xid)); - SubTransSetParent(xid, subxid, overwriteOK); - } + xid = gxact->xid; + buf = ProcessTwoPhaseBuffer(xid, + gxact->prepare_start_lsn, + gxact->ondisk, overwriteOK, true, + NULL, NULL); + if (buf != NULL) pfree(buf); - } } - FreeDir(cldir); + LWLockRelease(TwoPhaseStateLock); } /* * RecoverPreparedTransactions * - * Scan the pg_twophase directory and reload shared-memory state for each - * prepared transaction (reacquire locks, etc). This is run during database - * startup. + * Scan the shared memory entries of TwoPhaseState and reload the state for + * each prepared transaction (reacquire locks, etc). + * + * This is run during database startup. */ void RecoverPreparedTransactions(void) { - char dir[MAXPGPATH]; - DIR *cldir; - struct dirent *clde; - bool overwriteOK = false; - - snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); + int i; - cldir = AllocateDir(dir); - while ((clde = ReadDir(cldir, dir)) != NULL) + /* + * Don't need a lock in the recovery phase. + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - if (strlen(clde->d_name) == 8 && - strspn(clde->d_name, "0123456789ABCDEF") == 8) - { - TransactionId xid; - char *buf; - char *bufptr; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - GlobalTransaction gxact; - const char *gid; - int i; + TransactionId xid; + char *buf; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + const char *gid; + bool overwriteOK = false; + int i; - xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + xid = gxact->xid; - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + buf = ProcessTwoPhaseBuffer(xid, + gxact->prepare_start_lsn, + gxact->ondisk, false, false, + NULL, NULL); + if (buf == NULL) + continue; - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + ereport(LOG, + (errmsg("recovering prepared transaction %u from shared memory", xid))); + + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + bufptr += MAXALIGN(hdr->gidlen); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); - ereport(LOG, - (errmsg("recovering prepared transaction %u", xid))); - - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - Assert(TransactionIdEquals(hdr->xid, xid)); - bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); - gid = (const char *) bufptr; - bufptr += MAXALIGN(hdr->gidlen); - subxids = (TransactionId *) bufptr; - bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); - bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + /* + * It's possible that SubTransSetParent has been set before, if + * the prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) + overwriteOK = true; - /* - * It's possible that SubTransSetParent has been set before, if - * the prepared transaction generated xid assignment records. Test - * here must match one used in AssignTransactionId(). - */ - if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || - XLogLogicalInfoActive())) - overwriteOK = true; + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart. Note that + * we are linking all the subtransactions directly to the + * top-level XID; there may originally have been a more complex + * hierarchy, but there's no need to restore that exactly. + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], xid, overwriteOK); - /* - * Reconstruct subtrans state for the transaction --- needed - * because pg_subtrans is not preserved over a restart. Note that - * we are linking all the subtransactions directly to the - * top-level XID; there may originally have been a more complex - * hierarchy, but there's no need to restore that exactly. - */ - for (i = 0; i < hdr->nsubxacts; i++) - SubTransSetParent(subxids[i], xid, overwriteOK); + /* + * Recreate its GXACT and dummy PGPROC. But, check whether + * it was added in redo and already has a shmem entry for + * it. + */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + MarkAsPreparingGuts(gxact, xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database); - /* - * Recreate its GXACT and dummy PGPROC - */ - gxact = MarkAsPreparing(xid, gid, - hdr->prepared_at, - hdr->owner, hdr->database); - gxact->ondisk = true; - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); - MarkAsPrepared(gxact); + /* recovered, so reset the flag for entries generated by redo */ + gxact->inredo = false; - /* - * Recover other state (notably locks) using resource managers - */ - ProcessRecords(bufptr, xid, twophase_recover_callbacks); + LWLockRelease(TwoPhaseStateLock); - /* - * Release locks held by the standby process after we process each - * prepared transaction. As a result, we don't need too many - * additional locks at any one time. - */ - if (InHotStandby) - StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact); - /* - * We're done with recovering this transaction. Clear - * MyLockedGxact, like we do in PrepareTransaction() during normal - * operation. - */ - PostPrepare_Twophase(); + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, xid, twophase_recover_callbacks); - pfree(buf); + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + + /* + * We're done with recovering this transaction. Clear + * MyLockedGxact, like we do in PrepareTransaction() during normal + * operation. + */ + PostPrepare_Twophase(); + + pfree(buf); + } +} + +/* + * ProcessTwoPhaseBuffer + * + * Given a transaction id, read it either from disk or read it directly + * via shmem xlog record pointer using the provided "prepare_start_lsn". + * + * If setParent is true, then use the overwriteOK parameter to set up + * subtransaction parent linkages. + * + * If result and maxsubxid are not NULL, fill them up with smallest + * running transaction id (lesser than ShmemVariableCache->nextXid) + * and largest subtransaction id for this transaction respectively. + */ +static char * +ProcessTwoPhaseBuffer(TransactionId xid, + XLogRecPtr prepare_start_lsn, + bool fromdisk, bool overwriteOK, + bool setParent, TransactionId *result, + TransactionId *maxsubxid) +{ + TransactionId origNextXid = ShmemVariableCache->nextXid; + TransactionId res; + TransactionId maxsub; + TransactionId *subxids; + char *buf; + TwoPhaseFileHeader *hdr; + int i; + + if (!fromdisk) + Assert(prepare_start_lsn != InvalidXLogRecPtr); + + if (result) + res = *result; + if (maxsubxid) + maxsub = *maxsubxid; + + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing stale two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); } + else + { + ereport(WARNING, + (errmsg("removing stale two-phase state from" + " shared memory for \"%u\"", xid))); + PrepareRedoRemove(xid, true); + } + return NULL; } - FreeDir(cldir); + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing future two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing future two-phase state from memory for \"%u\"", + xid))); + PrepareRedoRemove(xid, true); + } + return NULL; + } + + if (fromdisk) + { + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid, true); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + return NULL; + } + } + else + { + /* Read xlog data */ + XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL); + } + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + if (!TransactionIdEquals(hdr->xid, xid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state from memory for \"%u\"", + xid))); + PrepareRedoRemove(xid, true); + } + pfree(buf); + return NULL; + } + + /* + * OK, we think this buffer is valid. Incorporate xid into the + * running-minimum result. + */ + if (TransactionIdPrecedes(xid, res)) + res = xid; + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID, and they may force us to advance nextXid. + */ + subxids = (TransactionId *) (buf + + MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(hdr->gidlen)); + for (i = 0; i < hdr->nsubxacts; i++) + { + TransactionId subxid = subxids[i]; + + Assert(TransactionIdFollows(subxid, xid)); + if (TransactionIdFollowsOrEquals(subxid, maxsub)) + maxsub = subxid; + if (setParent) + SubTransSetParent(xid, subxid, overwriteOK); + } + + if (result) + *result = res; + if (maxsubxid) + *maxsubxid = maxsub; + + return buf; } + /* * RecordTransactionCommitPrepared * @@ -2187,3 +2304,111 @@ RecordTransactionAbortPrepared(TransactionId xid, */ SyncRepWaitForLSN(recptr, false); } + +/* + * PrepareRedoAdd + * + * Store pointers to the start/end of the WAL record along with the xid in + * a gxact entry in shared memory TwoPhaseState structure. If caller + * specifies InvalidXLogRecPtr as WAL position to fetch the two-phase + * data, the entry is marked as located on disk. + */ +void +PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; + char *bufptr; + const char *gid; + GlobalTransaction gxact; + + Assert(RecoveryInProgress()); + + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + + /* + * Reserve the GID for the given transaction in the redo code path. + * + * This creates a gxact struct and puts it into the active array. + * + * In redo, this struct is mainly used to track PREPARE/COMMIT entries + * in shared memory. Hence, we only fill up the bare minimum contents here. + * The gxact also gets marked with gxact->inredo set to true to indicate + * that it got added in the redo phase + */ + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + /* Get a free gxact from the freelist */ + if (TwoPhaseState->freeGXacts == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"), + errhint("Increase max_prepared_transactions (currently %d).", + max_prepared_xacts))); + gxact = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = gxact->next; + + gxact->prepared_at = hdr->prepared_at; + gxact->prepare_start_lsn = start_lsn; + gxact->prepare_end_lsn = end_lsn; + gxact->xid = hdr->xid; + gxact->owner = hdr->owner; + gxact->locking_backend = InvalidBackendId; + gxact->valid = false; + gxact->ondisk = XLogRecPtrIsInvalid(start_lsn); + gxact->inredo = true; /* yes, added in redo */ + strcpy(gxact->gid, gid); + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); +} + +/* + * PrepareRedoRemove + * + * Remove the corresponding gxact entry from TwoPhaseState. Also + * remove the 2PC file if a prepared transaction was saved via + * an earlier checkpoint. + */ +void +PrepareRedoRemove(TransactionId xid, bool giveWarning) +{ + GlobalTransaction gxact = NULL; + int i; + + Assert(RecoveryInProgress()); + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->xid == xid) + { + Assert(gxact->inredo); + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + /* + * Just leave if there is nothing, this is expected during WAL replay. + */ + if (gxact == NULL) + return; + + /* + * And now we can clean up any files we may have left. + */ + elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + if (gxact->ondisk) + RemoveTwoPhaseFile(xid, giveWarning); + RemoveGXact(gxact); + + return; +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c8751c697d4..6f614e4fad0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5615,7 +5615,9 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid, false); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5635,14 +5637,20 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid, false); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + /* + * Store xid and start/end pointers of the WAL record in + * TwoPhaseState gxact entry. + */ + PrepareRedoAdd(XLogRecGetData(record), + record->ReadRecPtr, + record->EndRecPtr); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f0983cf..287b3b13799 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6696,6 +6696,16 @@ StartupXLOG(void) */ restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI); + /* + * Before running in recovery, scan pg_twophase and fill in its status + * to be able to work on entries generated by redo. Doing a scan before + * taking any recovery action has the merit to discard any 2PC files that + * are newer than the first record to replay, saving from any conflicts at + * replay. This avoids as well any subsequent scans when doing recovery + * of the on-disk two-phase data. + */ + restoreTwoPhaseData(); + lastFullPageWrites = checkPoint.fullPageWrites; RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b2b7848fad2..4d547c55539 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -49,11 +49,12 @@ extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, extern void StandbyRecoverPreparedTransactions(bool overwriteOK); extern void RecoverPreparedTransactions(void); -extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); -extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); - extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, + XLogRecPtr end_lsn); +extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); +extern void restoreTwoPhaseData(void); #endif /* TWOPHASE_H */ |