diff options
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r-- | src/backend/access/transam/twophase.c | 105 |
1 files changed, 44 insertions, 61 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index d23c292edcd..40de84e934e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -889,14 +889,21 @@ typedef struct TwoPhaseRecordOnDisk /* * During prepare, the state file is assembled in memory before writing it - * to WAL and the actual state file. We use a chain of XLogRecData blocks - * so that we will be able to pass the state file contents directly to - * XLogInsert. + * to WAL and the actual state file. We use a chain of StateFileChunk blocks + * for that. */ +typedef struct StateFileChunk +{ + char *data; + uint32 len; + struct StateFileChunk *next; +} StateFileChunk; + static struct xllist { - XLogRecData *head; /* first data block in the chain */ - XLogRecData *tail; /* last block in chain */ + StateFileChunk *head; /* first data block in the chain */ + StateFileChunk *tail; /* last block in chain */ + uint32 num_chunks; uint32 bytes_free; /* free bytes left in tail block */ uint32 total_len; /* total data bytes in chain */ } records; @@ -917,11 +924,11 @@ save_state_data(const void *data, uint32 len) if (padlen > records.bytes_free) { - records.tail->next = palloc0(sizeof(XLogRecData)); + records.tail->next = palloc0(sizeof(StateFileChunk)); records.tail = records.tail->next; - records.tail->buffer = InvalidBuffer; records.tail->len = 0; records.tail->next = NULL; + records.num_chunks++; records.bytes_free = Max(padlen, 512); records.tail->data = palloc(records.bytes_free); @@ -951,8 +958,7 @@ StartPrepare(GlobalTransaction gxact) SharedInvalidationMessage *invalmsgs; /* Initialize linked list */ - records.head = palloc0(sizeof(XLogRecData)); - records.head->buffer = InvalidBuffer; + records.head = palloc0(sizeof(StateFileChunk)); records.head->len = 0; records.head->next = NULL; @@ -960,6 +966,7 @@ StartPrepare(GlobalTransaction gxact) records.head->data = palloc(records.bytes_free); records.tail = records.head; + records.num_chunks = 1; records.total_len = 0; @@ -1019,7 +1026,7 @@ EndPrepare(GlobalTransaction gxact) TransactionId xid = pgxact->xid; TwoPhaseFileHeader *hdr; char path[MAXPGPATH]; - XLogRecData *record; + StateFileChunk *record; pg_crc32 statefile_crc; pg_crc32 bogus_crc; int fd; @@ -1117,12 +1124,16 @@ EndPrepare(GlobalTransaction gxact) * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. */ + XLogEnsureRecordSpace(0, records.num_chunks); + START_CRIT_SECTION(); MyPgXact->delayChkpt = true; - gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, - records.head); + XLogBeginInsert(); + for (record = records.head; record != NULL; record = record->next) + XLogRegisterData(record->data, record->len); + gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); XLogFlush(gxact->prepare_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1180,6 +1191,7 @@ EndPrepare(GlobalTransaction gxact) SyncRepWaitForLSN(gxact->prepare_lsn); records.tail = records.head = NULL; + records.num_chunks = 0; } /* @@ -2071,8 +2083,6 @@ RecordTransactionCommitPrepared(TransactionId xid, SharedInvalidationMessage *invalmsgs, bool initfileinval) { - XLogRecData rdata[4]; - int lastrdata = 0; xl_xact_commit_prepared xlrec; XLogRecPtr recptr; @@ -2094,39 +2104,24 @@ RecordTransactionCommitPrepared(TransactionId xid, xlrec.crec.nsubxacts = nchildren; xlrec.crec.nmsgs = ninvalmsgs; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactCommitPrepared; - rdata[0].buffer = InvalidBuffer; + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared); + /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); + /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); + /* dump cache invalidation messages */ if (ninvalmsgs > 0) - { - rdata[lastrdata].next = &(rdata[3]); - rdata[3].data = (char *) invalmsgs; - rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); - rdata[3].buffer = InvalidBuffer; - lastrdata = 3; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) invalmsgs, + ninvalmsgs * sizeof(SharedInvalidationMessage)); - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED); /* * We don't currently try to sleep before flush here ... nor is there any @@ -2169,8 +2164,6 @@ RecordTransactionAbortPrepared(TransactionId xid, int nrels, RelFileNode *rels) { - XLogRecData rdata[3]; - int lastrdata = 0; xl_xact_abort_prepared xlrec; XLogRecPtr recptr; @@ -2189,30 +2182,20 @@ RecordTransactionAbortPrepared(TransactionId xid, xlrec.arec.xact_time = GetCurrentTimestamp(); xlrec.arec.nrels = nrels; xlrec.arec.nsubxacts = nchildren; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactAbortPrepared; - rdata[0].buffer = InvalidBuffer; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared); + /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); + /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); |