aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c39
-rw-r--r--src/backend/access/transam/twophase.c105
-rw-r--r--src/backend/access/transam/xact.c78
-rw-r--r--src/include/access/twophase.h5
-rw-r--r--src/include/access/xact.h27
5 files changed, 230 insertions, 24 deletions
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e5eef9ea439..b3e2fc3036c 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
data += sizeof(xl_xact_xinfo);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ xl_xact_origin xl_origin;
+
+ /* we're only guaranteed 4 byte alignment, so copy onto stack */
+ memcpy(&xl_origin, data, sizeof(xl_origin));
+
+ parsed->origin_lsn = xl_origin.origin_lsn;
+ parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+ data += sizeof(xl_xact_origin);
}
}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c479c4881b9..d6e4b7980f3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -144,11 +144,7 @@ int max_prepared_xacts = 0;
*
* typedef struct GlobalTransactionData *GlobalTransaction appears in
* twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
*/
-#define GIDSIZE 200
typedef struct GlobalTransactionData
{
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval);
+ bool initfileinval,
+ const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
@@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
/*
* Header for a 2PC state file
*/
-#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
@@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
+ XLogRecPtr origin_lsn; /* lsn of this record at origin node */
+ TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
/*
@@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
{
TwoPhaseFileHeader *hdr;
StateFileChunk *record;
+ bool replorigin;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
+
+ if (replorigin)
+ {
+ Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+ hdr->origin_lsn = replorigin_session_origin_lsn;
+ hdr->origin_timestamp = replorigin_session_origin_timestamp;
+ }
+ else
+ {
+ hdr->origin_lsn = InvalidXLogRecPtr;
+ hdr->origin_timestamp = 0;
+ }
+
/*
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
+
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ gxact->prepare_end_lsn);
+
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+ TwoPhaseFileHeader *hdr;
+ char *bufptr;
+
+ hdr = (TwoPhaseFileHeader *) xlrec;
+ bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+ parsed->origin_lsn = hdr->origin_lsn;
+ parsed->origin_timestamp = hdr->origin_timestamp;
+ parsed->twophase_xid = hdr->xid;
+ parsed->dbId = hdr->database;
+ parsed->nsubxacts = hdr->nsubxacts;
+ parsed->nrels = hdr->ncommitrels;
+ parsed->nabortrels = hdr->nabortrels;
+ parsed->nmsgs = hdr->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+ bufptr += MAXALIGN(hdr->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+ parsed->xnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
+
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
- hdr->initfileinval);
+ hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
- hdr->nabortrels, abortrels);
+ hdr->nabortrels, abortrels,
+ gid);
ProcArrayRemove(proc, latestXid);
@@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
if (buf == NULL)
continue;
- PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ PrepareRedoAdd(buf, InvalidXLogRecPtr,
+ InvalidXLogRecPtr, InvalidRepOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
@@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval)
+ bool initfileinval,
+ const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs, invalmsgs,
initfileinval, false,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
if (replorigin)
@@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ const char *gid)
{
XLogRecPtr recptr;
@@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
nchildren, children,
nrels, rels,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
@@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data, the entry is marked as located on disk.
*/
void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn, RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
@@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+ if (origin_id != InvalidRepOriginId)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cfc62011b50..b88d4ccf746 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1226,7 +1226,7 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
- InvalidTransactionId /* plain commit */ );
+ InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
@@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact)
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
- MyXactFlags, InvalidTransactionId);
+ MyXactFlags, InvalidTransactionId,
+ NULL);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
@@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
@@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
}
/* dump transaction origin information */
@@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5379,15 +5398,19 @@ XLogRecPtr
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_abort xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_twophase xl_twophase;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
@@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
+ }
+
+ if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ /* dump transaction origin information only for abort prepared */
+ if ( (replorigin_session_origin != InvalidRepOriginId) &&
+ TransactionIdIsValid(twophase_xid) &&
+ XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_session_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
@@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo != 0)
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
{
XLogRegisterData((char *) (&xl_subxacts),
@@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ if (TransactionIdIsValid(twophase_xid))
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);
}
@@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record)
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
- record->EndRecPtr);
+ record->EndRecPtr,
+ XLogRecGetOrigin(record));
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 34d9470811b..f05cde202f7 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
#define TWOPHASE_H
#include "access/xlogdefs.h"
+#include "access/xact.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+ xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
@@ -54,7 +57,7 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
- XLogRecPtr end_lsn);
+ XLogRecPtr end_lsn, RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
#endif /* TWOPHASE_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 87ae2cd4df6..a46396f2d92 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,13 @@
#include "storage/sinval.h"
#include "utils/datetime.h"
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ *
+ * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
+ * specified in TwoPhaseFileHeader.
+ */
+#define GIDSIZE 200
/*
* Xact isolation levels
@@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
#define XACT_XINFO_HAS_AE_LOCKS (1U << 6)
+#define XACT_XINFO_HAS_GID (1U << 7)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
@@ -286,7 +294,6 @@ typedef struct xl_xact_abort
typedef struct xl_xact_parsed_commit
{
TimestampTz xact_time;
-
uint32 xinfo;
Oid dbId; /* MyDatabaseId */
@@ -302,16 +309,24 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE]; /* only for 2PC */
+ int nabortrels; /* only for 2PC */
+ RelFileNode *abortnodes; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
+typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
+
typedef struct xl_xact_parsed_abort
{
TimestampTz xact_time;
uint32 xinfo;
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+
int nsubxacts;
TransactionId *subxacts;
@@ -319,6 +334,10 @@ typedef struct xl_xact_parsed_abort
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE]; /* only for 2PC */
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
} xl_xact_parsed_abort;
@@ -386,12 +405,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
int xactflags,
- TransactionId twophase_xid);
+ TransactionId twophase_xid,
+ const char *twophase_gid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid);
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */