aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2005-06-17 22:32:51 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2005-06-17 22:32:51 +0000
commitd0a89683a3a4dd8e76ef0a99101355999e519df5 (patch)
treeb19aaf7f03cbcc851b00ca6e472cc7d3e5a20ca1 /src/backend/access/transam/twophase.c
parent5495575903e35ceb40d32055ab55e9377460208f (diff)
downloadpostgresql-d0a89683a3a4dd8e76ef0a99101355999e519df5.tar.gz
postgresql-d0a89683a3a4dd8e76ef0a99101355999e519df5.zip
Two-phase commit. Original patch by Heikki Linnakangas, with additional
hacking by Alvaro Herrera and Tom Lane.
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c1659
1 files changed, 1659 insertions, 0 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
new file mode 100644
index 00000000000..01cc50a6a46
--- /dev/null
+++ b/src/backend/access/transam/twophase.c
@@ -0,0 +1,1659 @@
+/*-------------------------------------------------------------------------
+ *
+ * twophase.c
+ * Two-phase commit support functions.
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.1 2005/06/17 22:32:42 tgl Exp $
+ *
+ * NOTES
+ * Each global transaction is associated with a global transaction
+ * identifier (GID). The client assigns a GID to a postgres
+ * transaction with the PREPARE TRANSACTION command.
+ *
+ * We keep all active global transactions in a shared memory array.
+ * When the PREPARE TRANSACTION command is issued, the GID is
+ * reserved for the transaction in the array. This is done before
+ * a WAL entry is made, because the reservation checks for duplicate
+ * GIDs and aborts the transaction if there already is a global
+ * transaction in prepared state with the same GID.
+ *
+ * A global transaction (gxact) also has a dummy PGPROC that is entered
+ * into the ProcArray array; this is what keeps the XID considered
+ * running by TransactionIdIsInProgress. It is also convenient as a
+ * PGPROC to hook the gxact's locks to.
+ *
+ * In order to survive crashes and shutdowns, all prepared
+ * transactions must be stored in permanent storage. This includes
+ * locking information, pending notifications etc. All that state
+ * information is written to the per-transaction state file in
+ * the pg_twophase directory.
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/subtrans.h"
+#include "access/twophase.h"
+#include "access/twophase_rmgr.h"
+#include "access/xact.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "utils/builtins.h"
+#include "pgstat.h"
+
+
+/*
+ * Directory where Two-phase commit files reside within PGDATA
+ */
+#define TWOPHASE_DIR "pg_twophase"
+
+/* GUC variable, can't be changed after startup */
+int max_prepared_xacts = 50;
+
+/*
+ * This struct describes one global transaction that is in prepared state
+ * or attempting to become prepared.
+ *
+ * The first component of the struct is a dummy PGPROC that is inserted
+ * into the global ProcArray so that the transaction appears to still be
+ * running and holding locks. It must be first because we cast pointers
+ * to PGPROC and pointers to GlobalTransactionData back and forth.
+ *
+ * The lifecycle of a global transaction is:
+ *
+ * 1. After checking that the requested GID is not in use, set up an
+ * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
+ * with locking_xid = my own XID and valid = false.
+ *
+ * 2. After successfully completing prepare, set valid = true and enter the
+ * contained PGPROC into the global ProcArray.
+ *
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
+ * is valid and its locking_xid is no longer active, then store my current
+ * XID into locking_xid. This prevents concurrent attempts to commit or
+ * rollback the same prepared xact.
+ *
+ * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
+ * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
+ * the freelist.
+ *
+ * Note that if the preparing transaction fails between steps 1 and 2, the
+ * entry will remain in prepXacts until recycled. We can detect recyclable
+ * entries by checking for valid = false and locking_xid no longer active.
+ *
+ * typedef struct GlobalTransactionData *GlobalTransaction appears in
+ * twophase.h
+ */
+#define GIDSIZE 200
+
+typedef struct GlobalTransactionData
+{
+ PGPROC proc; /* dummy proc */
+ AclId owner; /* ID of user that executed the xact */
+ TransactionId locking_xid; /* top-level XID of backend working on xact */
+ bool valid; /* TRUE if fully prepared */
+ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
+} GlobalTransactionData;
+
+/*
+ * Two Phase Commit shared state. Access to this struct is protected
+ * by TwoPhaseStateLock.
+ */
+typedef struct TwoPhaseStateData
+{
+ /* Head of linked list of free GlobalTransactionData structs */
+ SHMEM_OFFSET freeGXacts;
+
+ /* Number of valid prepXacts entries. */
+ int numPrepXacts;
+
+ /*
+ * There are max_prepared_xacts items in this array, but C wants a
+ * fixed-size array.
+ */
+ GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */
+} TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
+
+static TwoPhaseStateData *TwoPhaseState;
+
+
+static void RecordTransactionCommitPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels);
+static void RecordTransactionAbortPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels);
+static void ProcessRecords(char *bufptr, TransactionId xid,
+ const TwoPhaseCallback callbacks[]);
+
+
+/*
+ * Initialization of shared memory
+ */
+int
+TwoPhaseShmemSize(void)
+{
+ /* Need the fixed struct, the array of pointers, and the GTD structs */
+ return MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
+ sizeof(GlobalTransaction) * max_prepared_xacts) +
+ sizeof(GlobalTransactionData) * max_prepared_xacts;
+}
+
+void
+TwoPhaseShmemInit(void)
+{
+ bool found;
+
+ TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
+ TwoPhaseShmemSize(),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ GlobalTransaction gxacts;
+ int i;
+
+ Assert(!found);
+ TwoPhaseState->freeGXacts = INVALID_OFFSET;
+ TwoPhaseState->numPrepXacts = 0;
+
+ /*
+ * Initialize the linked list of free GlobalTransactionData structs
+ */
+ gxacts = (GlobalTransaction)
+ ((char *) TwoPhaseState +
+ MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
+ sizeof(GlobalTransaction) * max_prepared_xacts));
+ for (i = 0; i < max_prepared_xacts; i++)
+ {
+ gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
+ }
+ }
+ else
+ Assert(found);
+}
+
+
+/*
+ * MarkAsPreparing
+ * Reserve the GID for the given transaction.
+ *
+ * Internally, this creates a gxact struct and puts it into the active array.
+ * NOTE: this is also used when reloading a gxact after a crash; so avoid
+ * assuming that we can use very much backend context.
+ */
+GlobalTransaction
+MarkAsPreparing(TransactionId xid, Oid databaseid, char *gid, AclId owner)
+{
+ GlobalTransaction gxact;
+ int i;
+
+ if (strlen(gid) >= GIDSIZE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("global transaction identifier \"%s\" is too long",
+ gid)));
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ /*
+ * First, find and recycle any gxacts that failed during prepare.
+ * We do this partly to ensure we don't mistakenly say their GIDs
+ * are still reserved, and partly so we don't fail on out-of-slots
+ * unnecessarily.
+ */
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = TwoPhaseState->prepXacts[i];
+ if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
+ {
+ /* It's dead Jim ... remove from the active array */
+ TwoPhaseState->numPrepXacts--;
+ TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
+ /* and put it back in the freelist */
+ gxact->proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+ /* Back up index count too, so we don't miss scanning one */
+ i--;
+ }
+ }
+
+ /* Check for conflicting GID */
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = TwoPhaseState->prepXacts[i];
+ if (strcmp(gxact->gid, gid) == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("global transaction identifier \"%s\" is already in use",
+ gid)));
+ }
+ }
+
+ /* Get a free gxact from the freelist */
+ if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("maximum number of prepared transactions reached"),
+ errhint("Increase max_prepared_transactions (currently %d).",
+ max_prepared_xacts)));
+ gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
+ TwoPhaseState->freeGXacts = gxact->proc.links.next;
+
+ /* Initialize it */
+ MemSet(&gxact->proc, 0, sizeof(PGPROC));
+ SHMQueueElemInit(&(gxact->proc.links));
+ gxact->proc.waitStatus = STATUS_OK;
+ gxact->proc.xid = xid;
+ gxact->proc.xmin = InvalidTransactionId;
+ gxact->proc.pid = 0;
+ gxact->proc.databaseId = databaseid;
+ gxact->proc.lwWaiting = false;
+ gxact->proc.lwExclusive = false;
+ gxact->proc.lwWaitLink = NULL;
+ gxact->proc.waitLock = NULL;
+ gxact->proc.waitProcLock = NULL;
+ SHMQueueInit(&(gxact->proc.procLocks));
+ /* subxid data must be filled later by GXactLoadSubxactData */
+ gxact->proc.subxids.overflowed = false;
+ gxact->proc.subxids.nxids = 0;
+
+ gxact->owner = owner;
+ gxact->locking_xid = xid;
+ gxact->valid = false;
+ strcpy(gxact->gid, gid);
+
+ /* And insert it into the active array */
+ Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+ TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return gxact;
+}
+
+/*
+ * GXactLoadSubxactData
+ *
+ * If the transaction being persisted had any subtransactions, this must
+ * be called before MarkAsPrepared() to load information into the dummy
+ * PGPROC.
+ */
+static void
+GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
+ TransactionId *children)
+{
+ /* We need no extra lock since the GXACT isn't valid yet */
+ if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
+ {
+ gxact->proc.subxids.overflowed = true;
+ nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
+ }
+ if (nsubxacts > 0)
+ {
+ memcpy(gxact->proc.subxids.xids, children,
+ nsubxacts * sizeof(TransactionId));
+ gxact->proc.subxids.nxids = nsubxacts;
+ }
+}
+
+/*
+ * MarkAsPrepared
+ * Mark the GXACT as fully valid, and enter it into the global ProcArray.
+ */
+void
+MarkAsPrepared(GlobalTransaction gxact)
+{
+ /* Lock here may be overkill, but I'm not convinced of that ... */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ Assert(!gxact->valid);
+ gxact->valid = true;
+ LWLockRelease(TwoPhaseStateLock);
+
+ /*
+ * Put it into the global ProcArray so TransactionIdInProgress considers
+ * the XID as still running.
+ */
+ ProcArrayAdd(&gxact->proc);
+}
+
+/*
+ * LockGXact
+ * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
+ */
+static GlobalTransaction
+LockGXact(char *gid, AclId user)
+{
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ /* Ignore not-yet-valid GIDs */
+ if (!gxact->valid)
+ continue;
+ if (strcmp(gxact->gid, gid) != 0)
+ continue;
+
+ /* Found it, but has someone else got it locked? */
+ if (TransactionIdIsValid(gxact->locking_xid))
+ {
+ if (TransactionIdIsActive(gxact->locking_xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("prepared transaction with gid \"%s\" is busy",
+ gid)));
+ gxact->locking_xid = InvalidTransactionId;
+ }
+
+ if (user != gxact->owner && !superuser_arg(user))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to finish prepared transaction"),
+ errhint("Must be superuser or the user that prepared the transaction.")));
+
+ /* OK for me to lock it */
+ gxact->locking_xid = GetTopTransactionId();
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return gxact;
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("prepared transaction with gid \"%s\" does not exist",
+ gid)));
+
+ /* NOTREACHED */
+ return NULL;
+}
+
+/*
+ * RemoveGXact
+ * Remove the prepared transaction from the shared memory array.
+ *
+ * NB: caller should have already removed it from ProcArray
+ */
+static void
+RemoveGXact(GlobalTransaction gxact)
+{
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ if (gxact == TwoPhaseState->prepXacts[i])
+ {
+ /* remove from the active array */
+ TwoPhaseState->numPrepXacts--;
+ TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
+
+ /* and put it back in the freelist */
+ gxact->proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
+}
+
+/*
+ * Returns an array of all prepared transactions for the user-level
+ * function pg_prepared_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the TwoPhaseStateLock.
+ *
+ * WARNING -- we return even those transactions that are not fully prepared
+ * yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+static int
+GetPreparedTransactionList(GlobalTransaction *gxacts)
+{
+ GlobalTransaction array;
+ int num;
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ if (TwoPhaseState->numPrepXacts == 0)
+ {
+ LWLockRelease(TwoPhaseStateLock);
+
+ *gxacts = NULL;
+ return 0;
+ }
+
+ num = TwoPhaseState->numPrepXacts;
+ array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
+ *gxacts = array;
+ for (i = 0; i < num; i++)
+ memcpy(array + i, TwoPhaseState->prepXacts[i],
+ sizeof(GlobalTransactionData));
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return num;
+}
+
+
+/* Working status for pg_prepared_xact */
+typedef struct
+{
+ GlobalTransaction array;
+ int ngxacts;
+ int currIdx;
+} Working_State;
+
+/*
+ * pg_prepared_xact
+ * Produce a view with one row per prepared transaction.
+ *
+ * This function is here so we don't have to export the
+ * GlobalTransactionData struct definition.
+ */
+Datum
+pg_prepared_xact(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ Working_State *status;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc;
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /*
+ * Switch to memory context appropriate for multiple function
+ * calls
+ */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* build tupdesc for result tuples */
+ /* this had better match pg_prepared_xacts view in system_views.sql */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "ownerid",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "dbid",
+ OIDOID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Collect all the 2PC status information that we will format and
+ * send out as a result set.
+ */
+ status = (Working_State *) palloc(sizeof(Working_State));
+ funcctx->user_fctx = (void *) status;
+
+ status->ngxacts = GetPreparedTransactionList(&status->array);
+ status->currIdx = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ status = (Working_State *) funcctx->user_fctx;
+
+ while (status->array != NULL && status->currIdx < status->ngxacts)
+ {
+ GlobalTransaction gxact = &status->array[status->currIdx++];
+ Datum values[4];
+ bool nulls[4];
+ HeapTuple tuple;
+ Datum result;
+
+ if (!gxact->valid)
+ continue;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = TransactionIdGetDatum(gxact->proc.xid);
+ values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
+ values[2] = Int32GetDatum(gxact->owner);
+ values[3] = ObjectIdGetDatum(gxact->proc.databaseId);
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * TwoPhaseGetDummyProc
+ * Get the PGPROC that represents a prepared transaction specified by XID
+ */
+PGPROC *
+TwoPhaseGetDummyProc(TransactionId xid)
+{
+ PGPROC *result = NULL;
+ int i;
+
+ static TransactionId cached_xid = InvalidTransactionId;
+ static PGPROC *cached_proc = NULL;
+
+ /*
+ * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
+ * repeatedly for the same XID. We can save work with a simple cache.
+ */
+ if (xid == cached_xid)
+ return cached_proc;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ if (gxact->proc.xid == xid)
+ {
+ result = &gxact->proc;
+ break;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ if (result == NULL) /* should not happen */
+ elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+
+ cached_xid = xid;
+ cached_proc = result;
+
+ return result;
+}
+
+/************************************************************************/
+/* State file support */
+/************************************************************************/
+
+#define TwoPhaseFilePath(path, xid) \
+ snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid)
+
+/*
+ * 2PC state file format:
+ *
+ * 1. TwoPhaseFileHeader
+ * 2. TransactionId[] (subtransactions)
+ * 3. RelFileNode[] (files to be deleted at commit)
+ * 4. RelFileNode[] (files to be deleted at abort)
+ * 5. TwoPhaseRecordOnDisk
+ * 6. ...
+ * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ * 8. CRC32
+ *
+ * Each segment except the final CRC32 is MAXALIGN'd.
+ */
+
+/*
+ * Header for a 2PC state file
+ */
+#define TWOPHASE_MAGIC 0x57F94530 /* format identifier */
+
+typedef struct TwoPhaseFileHeader
+{
+ uint32 magic; /* format identifier */
+ uint32 total_len; /* actual file length */
+ TransactionId xid; /* original transaction XID */
+ Oid database; /* OID of database it was in */
+ AclId owner; /* user running the transaction */
+ int32 nsubxacts; /* number of following subxact XIDs */
+ int32 ncommitrels; /* number of delete-on-commit rels */
+ int32 nabortrels; /* number of delete-on-abort rels */
+ char gid[GIDSIZE]; /* GID for transaction */
+} TwoPhaseFileHeader;
+
+/*
+ * Header for each record in a state file
+ *
+ * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
+ * The rmgr data will be stored starting on a MAXALIGN boundary.
+ */
+typedef struct TwoPhaseRecordOnDisk
+{
+ uint32 len; /* length of rmgr data */
+ TwoPhaseRmgrId rmid; /* resource manager for this record */
+ uint16 info; /* flag bits for use by rmgr */
+} TwoPhaseRecordOnDisk;
+
+/*
+ * During prepare, the state file is assembled in memory before writing it
+ * to WAL and the actual state file. We use a chain of XLogRecData blocks
+ * so that we will be able to pass the state file contents directly to
+ * XLogInsert.
+ */
+static struct xllist
+{
+ XLogRecData *head; /* first data block in the chain */
+ XLogRecData *tail; /* last block in chain */
+ uint32 bytes_free; /* free bytes left in tail block */
+ uint32 total_len; /* total data bytes in chain */
+} records;
+
+
+/*
+ * Append a block of data to records data structure.
+ *
+ * NB: each block is padded to a MAXALIGN multiple. This must be
+ * accounted for when the file is later read!
+ *
+ * The data is copied, so the caller is free to modify it afterwards.
+ */
+static void
+save_state_data(const void *data, uint32 len)
+{
+ uint32 padlen = MAXALIGN(len);
+
+ if (padlen > records.bytes_free)
+ {
+ records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail = records.tail->next;
+ records.tail->buffer = InvalidBuffer;
+ records.tail->len = 0;
+ records.tail->next = NULL;
+
+ records.bytes_free = Max(padlen, 512);
+ records.tail->data = palloc(records.bytes_free);
+ }
+
+ memcpy(((char *) records.tail->data) + records.tail->len, data, len);
+ records.tail->len += padlen;
+ records.bytes_free -= padlen;
+ records.total_len += padlen;
+}
+
+/*
+ * Start preparing a state file.
+ *
+ * Initializes data structure and inserts the 2PC file header record.
+ */
+void
+StartPrepare(GlobalTransaction gxact)
+{
+ TransactionId xid = gxact->proc.xid;
+ TwoPhaseFileHeader hdr;
+ TransactionId *children;
+ RelFileNode *commitrels;
+ RelFileNode *abortrels;
+
+ /* Initialize linked list */
+ records.head = palloc0(sizeof(XLogRecData));
+ records.head->buffer = InvalidBuffer;
+ records.head->len = 0;
+ records.head->next = NULL;
+
+ records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
+ records.head->data = palloc(records.bytes_free);
+
+ records.tail = records.head;
+
+ records.total_len = 0;
+
+ /* Create header */
+ hdr.magic = TWOPHASE_MAGIC;
+ hdr.total_len = 0; /* EndPrepare will fill this in */
+ hdr.xid = xid;
+ hdr.database = MyDatabaseId;
+ hdr.owner = GetUserId();
+ hdr.nsubxacts = xactGetCommittedChildren(&children);
+ hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
+ hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+ StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
+
+ save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
+
+ /* Add the additional info about subxacts and deletable files */
+ if (hdr.nsubxacts > 0)
+ {
+ save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
+ /* While we have the child-xact data, stuff it in the gxact too */
+ GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
+ pfree(children);
+ }
+ if (hdr.ncommitrels > 0)
+ {
+ save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
+ pfree(commitrels);
+ }
+ if (hdr.nabortrels > 0)
+ {
+ save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
+ pfree(abortrels);
+ }
+}
+
+/*
+ * Finish preparing state file.
+ *
+ * Calculates CRC and writes state file to WAL and in pg_twophase directory.
+ */
+void
+EndPrepare(GlobalTransaction gxact)
+{
+ TransactionId xid = gxact->proc.xid;
+ TwoPhaseFileHeader *hdr;
+ char path[MAXPGPATH];
+ XLogRecData *record;
+ XLogRecPtr recptr;
+ pg_crc32 statefile_crc;
+ pg_crc32 bogus_crc;
+ int fd;
+
+ /* Add the end sentinel to the list of 2PC records */
+ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
+ NULL, 0);
+
+ /* Go back and fill in total_len in the file header record */
+ hdr = (TwoPhaseFileHeader *) records.head->data;
+ Assert(hdr->magic == TWOPHASE_MAGIC);
+ hdr->total_len = records.total_len + sizeof(pg_crc32);
+
+ /*
+ * Create the 2PC state file.
+ *
+ * Note: because we use BasicOpenFile(), we are responsible for ensuring
+ * the FD gets closed in any error exit path. Once we get into the
+ * critical section, though, it doesn't matter since any failure causes
+ * PANIC anyway.
+ */
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create twophase state file \"%s\": %m",
+ path)));
+
+ /* Write data to file, and calculate CRC as we pass over it */
+ INIT_CRC32(statefile_crc);
+
+ for (record = records.head; record != NULL; record = record->next)
+ {
+ COMP_CRC32(statefile_crc, record->data, record->len);
+ if ((write(fd, record->data, record->len)) != record->len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+ }
+
+ FIN_CRC32(statefile_crc);
+
+ /*
+ * Write a deliberately bogus CRC to the state file, and flush it to disk.
+ * This is to minimize the odds of failure within the critical section
+ * below --- in particular, running out of disk space.
+ *
+ * On most filesystems, write() rather than fsync() detects out-of-space,
+ * so the fsync might be considered optional. Using it means there
+ * are three fsyncs not two associated with preparing a transaction; is
+ * the risk of an error from fsync high enough to justify that?
+ */
+ bogus_crc = ~ statefile_crc;
+
+ if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ /* Back up to prepare for rewriting the CRC */
+ if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek twophase state file: %m")));
+ }
+
+ /*
+ * The state file isn't valid yet, because we haven't written the correct
+ * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
+ *
+ * Between the time we have written the WAL entry and the time we
+ * flush the correct state file CRC to disk, we have an inconsistency:
+ * the xact is prepared according to WAL but not according to our on-disk
+ * state. We use a critical section to force a PANIC if we are unable to
+ * complete the flush --- then, WAL replay should repair the
+ * inconsistency.
+ *
+ * We have to lock out checkpoint start here, too; otherwise a checkpoint
+ * starting immediately after the WAL record is inserted could complete
+ * before we've finished flushing, meaning that the WAL record would not
+ * get replayed if a crash follows.
+ */
+ START_CRIT_SECTION();
+
+ LWLockAcquire(CheckpointStartLock, LW_SHARED);
+
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head);
+ XLogFlush(recptr);
+
+ /* If we crash now, we have prepared: WAL replay will fix things */
+
+ /* write correct CRC, flush, and close file */
+ if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ if (close(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close twophase state file: %m")));
+
+ LWLockRelease(CheckpointStartLock);
+
+ END_CRIT_SECTION();
+
+ records.tail = records.head = NULL;
+}
+
+/*
+ * Register a 2PC record to be written to state file.
+ */
+void
+RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
+ const void *data, uint32 len)
+{
+ TwoPhaseRecordOnDisk record;
+
+ record.rmid = rmid;
+ record.info = info;
+ record.len = len;
+ save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
+ if (len > 0)
+ save_state_data(data, len);
+}
+
+
+/*
+ * Read and validate the state file for xid.
+ *
+ * If it looks OK (has a valid magic number and CRC), return the palloc'd
+ * contents of the file. Otherwise return NULL.
+ */
+static char *
+ReadTwoPhaseFile(TransactionId xid)
+{
+ char path[MAXPGPATH];
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ int fd;
+ struct stat stat;
+ uint32 crc_offset;
+ pg_crc32 calc_crc, file_crc;
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ /*
+ * Check file length. We can determine a lower bound pretty easily.
+ * We set an upper bound mainly to avoid palloc() failure on a corrupt
+ * file.
+ */
+ if (fstat(fd, &stat))
+ {
+ close(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not stat twophase state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
+ MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
+ sizeof(pg_crc32)) ||
+ stat.st_size > 10000000)
+ {
+ close(fd);
+ return NULL;
+ }
+
+ crc_offset = stat.st_size - sizeof(pg_crc32);
+ if (crc_offset != MAXALIGN(crc_offset))
+ {
+ close(fd);
+ return NULL;
+ }
+
+ /*
+ * OK, slurp in the file.
+ */
+ buf = (char *) palloc(stat.st_size);
+
+ if (read(fd, buf, stat.st_size) != stat.st_size)
+ {
+ close(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read twophase state file \"%s\": %m",
+ path)));
+ pfree(buf);
+ return NULL;
+ }
+
+ close(fd);
+
+ hdr = (TwoPhaseFileHeader *) buf;
+ if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
+ {
+ pfree(buf);
+ return NULL;
+ }
+
+ INIT_CRC32(calc_crc);
+ COMP_CRC32(calc_crc, buf, crc_offset);
+ FIN_CRC32(calc_crc);
+
+ file_crc = *((pg_crc32 *) (buf + crc_offset));
+
+ if (!EQ_CRC32(calc_crc, file_crc))
+ {
+ pfree(buf);
+ return NULL;
+ }
+
+ return buf;
+}
+
+
+/*
+ * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
+ */
+void
+FinishPreparedTransaction(char *gid, bool isCommit)
+{
+ GlobalTransaction gxact;
+ TransactionId xid;
+ char *buf;
+ char *bufptr;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *children;
+ RelFileNode *commitrels;
+ RelFileNode *abortrels;
+ int i;
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends
+ * do not try to commit the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ xid = gxact->proc.xid;
+
+ /*
+ * Read and validate the state file
+ */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("twophase state file for transaction %u is corrupt",
+ xid)));
+
+ /*
+ * Disassemble the header area
+ */
+ hdr = (TwoPhaseFileHeader *) buf;
+ Assert(TransactionIdEquals(hdr->xid, xid));
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ children = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+ commitrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+ abortrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ /*
+ * The order of operations here is critical: make the XLOG entry for
+ * commit or abort, then mark the transaction committed or aborted in
+ * pg_clog, then remove its PGPROC from the global ProcArray (which
+ * means TransactionIdIsInProgress will stop saying the prepared xact
+ * is in progress), then run the post-commit or post-abort callbacks.
+ * The callbacks will release the locks the transaction held.
+ */
+ if (isCommit)
+ RecordTransactionCommitPrepared(xid,
+ hdr->nsubxacts, children,
+ hdr->ncommitrels, commitrels);
+ else
+ RecordTransactionAbortPrepared(xid,
+ hdr->nsubxacts, children,
+ hdr->nabortrels, abortrels);
+
+ ProcArrayRemove(&gxact->proc);
+
+ /*
+ * In case we fail while running the callbacks, mark the gxact invalid
+ * so no one else will try to commit/rollback, and so it can be recycled
+ * properly later. It is still locked by our XID so it won't go away yet.
+ */
+ gxact->valid = false;
+
+ if (isCommit)
+ ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
+ else
+ ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
+
+ /*
+ * We also have to remove any files that were supposed to be dropped.
+ * NB: this code knows that we couldn't be dropping any temp rels ...
+ */
+ if (isCommit)
+ {
+ for (i = 0; i < hdr->ncommitrels; i++)
+ smgrdounlink(smgropen(commitrels[i]), false, false);
+ }
+ else
+ {
+ for (i = 0; i < hdr->nabortrels; i++)
+ smgrdounlink(smgropen(abortrels[i]), false, false);
+ }
+
+ pgstat_count_xact_commit();
+
+ /*
+ * And now we can clean up our mess.
+ */
+ RemoveTwoPhaseFile(xid, true);
+
+ RemoveGXact(gxact);
+
+ pfree(buf);
+}
+
+/*
+ * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
+ * and call the indicated callbacks for each 2PC record.
+ */
+static void
+ProcessRecords(char *bufptr, TransactionId xid,
+ const TwoPhaseCallback callbacks[])
+{
+ for (;;)
+ {
+ TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
+
+ Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
+ if (record->rmid == TWOPHASE_RM_END_ID)
+ break;
+
+ bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
+
+ if (callbacks[record->rmid] != NULL)
+ callbacks[record->rmid](xid, record->info,
+ (void *) bufptr, record->len);
+
+ bufptr += MAXALIGN(record->len);
+ }
+}
+
+/*
+ * Remove the 2PC file for the specified XID.
+ *
+ * If giveWarning is false, do not complain about file-not-present;
+ * this is an expected case during WAL replay.
+ */
+void
+RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
+{
+ char path[MAXPGPATH];
+
+ TwoPhaseFilePath(path, xid);
+ if (unlink(path))
+ if (errno != ENOENT || giveWarning)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove two-phase state file \"%s\": %m",
+ path)));
+}
+
+/*
+ * Recreates a state file. This is used in WAL replay.
+ *
+ * Note: content and len don't include CRC.
+ */
+void
+RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+{
+ char path[MAXPGPATH];
+ pg_crc32 statefile_crc;
+ int fd;
+
+ /* Recompute CRC */
+ INIT_CRC32(statefile_crc);
+ COMP_CRC32(statefile_crc, content, len);
+ FIN_CRC32(statefile_crc);
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path,
+ O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not recreate twophase state file \"%s\": %m",
+ path)));
+
+ /* Write content and CRC */
+ if (write(fd, content, len) != len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+ if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ /* Sync and close the file */
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ if (close(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close twophase state file: %m")));
+}
+
+/*
+ * PrescanPreparedTransactions
+ *
+ * Scan the pg_twophase directory and determine the range of valid XIDs
+ * present. This is run during database startup, after we have completed
+ * reading WAL. ShmemVariableCache->nextXid has been set to one more than
+ * the highest XID for which evidence exists in WAL.
+ *
+ * We throw away any prepared xacts with main XID beyond nextXid --- if any
+ * are present, it suggests that the DBA has done a PITR recovery to an
+ * earlier point in time without cleaning out pg_twophase. We dare not
+ * try to recover such prepared xacts since they likely depend on database
+ * state that doesn't exist now.
+ *
+ * However, we will advance nextXid beyond any subxact XIDs belonging to
+ * valid prepared xacts. We need to do this since subxact commit doesn't
+ * write a WAL entry, and so there might be no evidence in WAL of those
+ * subxact XIDs.
+ *
+ * Our other responsibility is to determine and return the oldest valid XID
+ * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
+ * This is needed to synchronize pg_subtrans startup properly.
+ */
+TransactionId
+PrescanPreparedTransactions(void)
+{
+ TransactionId origNextXid = ShmemVariableCache->nextXid;
+ TransactionId result = origNextXid;
+ char dir[MAXPGPATH];
+ DIR *cldir;
+ struct dirent *clde;
+
+ snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+
+ cldir = AllocateDir(dir);
+ if (cldir == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open directory \"%s\": %m", dir)));
+
+ errno = 0;
+ while ((clde = readdir(cldir)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *subxids;
+ int i;
+
+ xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+ /* Reject XID if too new */
+ if (TransactionIdFollowsOrEquals(xid, origNextXid))
+ {
+ ereport(WARNING,
+ (errmsg("removing future twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /*
+ * Note: we can't check if already processed because clog
+ * subsystem isn't up yet.
+ */
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /* Deconstruct header */
+ hdr = (TwoPhaseFileHeader *) buf;
+ if (!TransactionIdEquals(hdr->xid, xid))
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ pfree(buf);
+ errno = 0;
+ continue;
+ }
+
+ /*
+ * OK, we think this file is valid. Incorporate xid into the
+ * running-minimum result.
+ */
+ if (TransactionIdPrecedes(xid, result))
+ result = xid;
+
+ /*
+ * Examine subtransaction XIDs ... they should all follow main
+ * XID, and they may force us to advance nextXid.
+ */
+ subxids = (TransactionId *)
+ (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
+ for (i = 0; i < hdr->nsubxacts; i++)
+ {
+ TransactionId subxid = subxids[i];
+
+ Assert(TransactionIdFollows(subxid, xid));
+ if (TransactionIdFollowsOrEquals(subxid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = subxid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+ }
+
+ pfree(buf);
+ }
+ errno = 0;
+ }
+#ifdef WIN32
+
+ /*
+ * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
+ * not in released version
+ */
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ errno = 0;
+#endif
+ if (errno)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read directory \"%s\": %m", dir)));
+
+ FreeDir(cldir);
+
+ return result;
+}
+
+/*
+ * RecoverPreparedTransactions
+ *
+ * Scan the pg_twophase directory and reload shared-memory state for each
+ * prepared transaction (reacquire locks, etc). This is run during database
+ * startup.
+ */
+void
+RecoverPreparedTransactions(void)
+{
+ char dir[MAXPGPATH];
+ DIR *cldir;
+ struct dirent *clde;
+
+ snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+
+ cldir = AllocateDir(dir);
+ if (cldir == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open directory \"%s\": %m", dir)));
+
+ errno = 0;
+ while ((clde = readdir(cldir)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ char *buf;
+ char *bufptr;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *subxids;
+ GlobalTransaction gxact;
+ int i;
+
+ xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ {
+ ereport(WARNING,
+ (errmsg("removing stale twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ ereport(LOG,
+ (errmsg("recovering prepared transaction %u", xid)));
+
+ /* Deconstruct header */
+ hdr = (TwoPhaseFileHeader *) buf;
+ Assert(TransactionIdEquals(hdr->xid, xid));
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ subxids = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ /*
+ * Reconstruct subtrans state for the transaction --- needed
+ * because pg_subtrans is not preserved over a restart
+ */
+ for (i = 0; i < hdr->nsubxacts; i++)
+ SubTransSetParent(subxids[i], xid);
+
+ /*
+ * Recreate its GXACT and dummy PGPROC
+ */
+ gxact = MarkAsPreparing(xid, hdr->database, hdr->gid, hdr->owner);
+ GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
+ MarkAsPrepared(gxact);
+
+ /*
+ * Recover other state (notably locks) using resource managers
+ */
+ ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+
+ pfree(buf);
+ }
+ errno = 0;
+ }
+#ifdef WIN32
+
+ /*
+ * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
+ * not in released version
+ */
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ errno = 0;
+#endif
+ if (errno)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read directory \"%s\": %m", dir)));
+
+ FreeDir(cldir);
+}
+
+/*
+ * RecordTransactionCommitPrepared
+ *
+ * This is basically the same as RecordTransactionCommit: in particular,
+ * we must take the CheckpointStartLock to avoid a race condition.
+ *
+ * We know the transaction made at least one XLOG entry (its PREPARE),
+ * so it is never possible to optimize out the commit record.
+ */
+static void
+RecordTransactionCommitPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels)
+{
+ XLogRecData rdata[3];
+ int lastrdata = 0;
+ xl_xact_commit_prepared xlrec;
+ XLogRecPtr recptr;
+
+ START_CRIT_SECTION();
+
+ /* See notes in RecordTransactionCommit */
+ LWLockAcquire(CheckpointStartLock, LW_SHARED);
+
+ /* Emit the XLOG commit record */
+ xlrec.xid = xid;
+ xlrec.crec.xtime = time(NULL);
+ xlrec.crec.nrels = nrels;
+ xlrec.crec.nsubxacts = nchildren;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactCommitPrepared;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) rels;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ rdata[2].buffer = InvalidBuffer;
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_XACT_ID,
+ XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
+ rdata);
+
+ /* we don't currently try to sleep before flush here ... */
+
+ /* Flush XLOG to disk */
+ XLogFlush(recptr);
+
+ /* Mark the transaction committed in pg_clog */
+ TransactionIdCommit(xid);
+ /* to avoid race conditions, the parent must commit first */
+ TransactionIdCommitTree(nchildren, children);
+
+ /* Checkpoint is allowed again */
+ LWLockRelease(CheckpointStartLock);
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * RecordTransactionAbortPrepared
+ *
+ * This is basically the same as RecordTransactionAbort.
+ *
+ * We know the transaction made at least one XLOG entry (its PREPARE),
+ * so it is never possible to optimize out the abort record.
+ */
+static void
+RecordTransactionAbortPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels)
+{
+ XLogRecData rdata[3];
+ int lastrdata = 0;
+ xl_xact_abort_prepared xlrec;
+ XLogRecPtr recptr;
+
+ /*
+ * Catch the scenario where we aborted partway through
+ * RecordTransactionCommitPrepared ...
+ */
+ if (TransactionIdDidCommit(xid))
+ elog(PANIC, "cannot abort transaction %u, it was already committed",
+ xid);
+
+ START_CRIT_SECTION();
+
+ /* Emit the XLOG abort record */
+ xlrec.xid = xid;
+ xlrec.arec.xtime = time(NULL);
+ xlrec.arec.nrels = nrels;
+ xlrec.arec.nsubxacts = nchildren;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactAbortPrepared;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) rels;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ rdata[2].buffer = InvalidBuffer;
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_XACT_ID,
+ XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
+ rdata);
+
+ /* Always flush, since we're about to remove the 2PC state file */
+ XLogFlush(recptr);
+
+ /*
+ * Mark the transaction aborted in clog. This is not absolutely
+ * necessary but we may as well do it while we are here.
+ */
+ TransactionIdAbort(xid);
+ TransactionIdAbortTree(nchildren, children);
+
+ END_CRIT_SECTION();
+}
+