aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c105
1 files changed, 44 insertions, 61 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index d23c292edcd..40de84e934e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -889,14 +889,21 @@ typedef struct TwoPhaseRecordOnDisk
/*
* During prepare, the state file is assembled in memory before writing it
- * to WAL and the actual state file. We use a chain of XLogRecData blocks
- * so that we will be able to pass the state file contents directly to
- * XLogInsert.
+ * to WAL and the actual state file. We use a chain of StateFileChunk blocks
+ * for that.
*/
+typedef struct StateFileChunk
+{
+ char *data;
+ uint32 len;
+ struct StateFileChunk *next;
+} StateFileChunk;
+
static struct xllist
{
- XLogRecData *head; /* first data block in the chain */
- XLogRecData *tail; /* last block in chain */
+ StateFileChunk *head; /* first data block in the chain */
+ StateFileChunk *tail; /* last block in chain */
+ uint32 num_chunks;
uint32 bytes_free; /* free bytes left in tail block */
uint32 total_len; /* total data bytes in chain */
} records;
@@ -917,11 +924,11 @@ save_state_data(const void *data, uint32 len)
if (padlen > records.bytes_free)
{
- records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail->next = palloc0(sizeof(StateFileChunk));
records.tail = records.tail->next;
- records.tail->buffer = InvalidBuffer;
records.tail->len = 0;
records.tail->next = NULL;
+ records.num_chunks++;
records.bytes_free = Max(padlen, 512);
records.tail->data = palloc(records.bytes_free);
@@ -951,8 +958,7 @@ StartPrepare(GlobalTransaction gxact)
SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
- records.head = palloc0(sizeof(XLogRecData));
- records.head->buffer = InvalidBuffer;
+ records.head = palloc0(sizeof(StateFileChunk));
records.head->len = 0;
records.head->next = NULL;
@@ -960,6 +966,7 @@ StartPrepare(GlobalTransaction gxact)
records.head->data = palloc(records.bytes_free);
records.tail = records.head;
+ records.num_chunks = 1;
records.total_len = 0;
@@ -1019,7 +1026,7 @@ EndPrepare(GlobalTransaction gxact)
TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
- XLogRecData *record;
+ StateFileChunk *record;
pg_crc32 statefile_crc;
pg_crc32 bogus_crc;
int fd;
@@ -1117,12 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
*/
+ XLogEnsureRecordSpace(0, records.num_chunks);
+
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
- records.head);
+ XLogBeginInsert();
+ for (record = records.head; record != NULL; record = record->next)
+ XLogRegisterData(record->data, record->len);
+ gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
XLogFlush(gxact->prepare_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1180,6 +1191,7 @@ EndPrepare(GlobalTransaction gxact)
SyncRepWaitForLSN(gxact->prepare_lsn);
records.tail = records.head = NULL;
+ records.num_chunks = 0;
}
/*
@@ -2071,8 +2083,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- XLogRecData rdata[4];
- int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -2094,39 +2104,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
xlrec.crec.nsubxacts = nchildren;
xlrec.crec.nmsgs = ninvalmsgs;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommitPrepared;
- rdata[0].buffer = InvalidBuffer;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
+
/* dump cache invalidation messages */
if (ninvalmsgs > 0)
- {
- rdata[lastrdata].next = &(rdata[3]);
- rdata[3].data = (char *) invalmsgs;
- rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
- rdata[3].buffer = InvalidBuffer;
- lastrdata = 3;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) invalmsgs,
+ ninvalmsgs * sizeof(SharedInvalidationMessage));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2169,8 +2164,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- XLogRecData rdata[3];
- int lastrdata = 0;
xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
@@ -2189,30 +2182,20 @@ RecordTransactionAbortPrepared(TransactionId xid,
xlrec.arec.xact_time = GetCurrentTimestamp();
xlrec.arec.nrels = nrels;
xlrec.arec.nsubxacts = nchildren;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactAbortPrepared;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);