aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/Makefile4
-rw-r--r--src/backend/access/transam/clog.c71
-rw-r--r--src/backend/access/transam/rmgr.c6
-rw-r--r--src/backend/access/transam/slru.c57
-rw-r--r--src/backend/access/transam/subtrans.c388
-rw-r--r--src/backend/access/transam/transam.c187
-rw-r--r--src/backend/access/transam/varsup.c13
-rw-r--r--src/backend/access/transam/xact.c1501
-rw-r--r--src/backend/access/transam/xlog.c7
9 files changed, 1916 insertions, 318 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 762ecf0ab7f..fe740a045f8 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -4,7 +4,7 @@
# Makefile for access/transam
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.18 2003/11/29 19:51:40 pgsql Exp $
+# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.19 2004/07/01 00:49:42 tgl Exp $
#
#-------------------------------------------------------------------------
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o
+OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o
all: SUBSYS.o
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 97f887d0a06..54514a24e71 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -13,7 +13,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.20 2004/05/31 03:47:54 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.21 2004/07/01 00:49:42 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -21,14 +21,13 @@
#include <fcntl.h>
#include <dirent.h>
-#include <errno.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/clog.h"
#include "access/slru.h"
-#include "storage/lwlock.h"
#include "miscadmin.h"
+#include "storage/lwlock.h"
/*
@@ -65,7 +64,7 @@
* is guaranteed flushed through the XLOG commit record before we are called
* to log a commit, so the WAL rule "write xlog before data" is satisfied
* automatically for commits, and we don't really care for aborts. Therefore,
- * we don't need to mark XLOG pages with LSN information; we have enough
+ * we don't need to mark CLOG pages with LSN information; we have enough
* synchronization already.
*----------
*/
@@ -95,20 +94,22 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
char *byteptr;
Assert(status == TRANSACTION_STATUS_COMMITTED ||
- status == TRANSACTION_STATUS_ABORTED);
+ status == TRANSACTION_STATUS_ABORTED ||
+ status == TRANSACTION_STATUS_SUB_COMMITTED);
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
byteptr = SimpleLruReadPage(ClogCtl, pageno, xid, true);
byteptr += byteno;
- /* Current state should be 0 or target state */
+ /* Current state should be 0, subcommitted or target state */
Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
+ ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == TRANSACTION_STATUS_SUB_COMMITTED ||
((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);
*byteptr |= (status << bshift);
- /* ...->page_status[slotno] = CLOG_PAGE_DIRTY; already done */
+ /* ...->page_status[slotno] = SLRU_PAGE_DIRTY; already done */
LWLockRelease(ClogCtl->ControlLock);
}
@@ -117,7 +118,7 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
* Interrogate the state of a transaction in the commit log.
*
* NB: this is a low-level routine and is NOT the preferred entry point
- * for most uses; TransactionLogTest() in transam.c is the intended caller.
+ * for most uses; TransactionLogFetch() in transam.c is the intended caller.
*/
XidStatus
TransactionIdGetStatus(TransactionId xid)
@@ -176,7 +177,7 @@ BootStrapCLOG(void)
/* Make sure it's written out */
SimpleLruWritePage(ClogCtl, slotno, NULL);
- /* Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN); */
+ /* Assert(ClogCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
LWLockRelease(ClogCtl->ControlLock);
}
@@ -211,7 +212,8 @@ StartupCLOG(void)
/*
* Initialize our idea of the latest page number.
*/
- SimpleLruSetLatestPage(ClogCtl, TransactionIdToPage(ShmemVariableCache->nextXid));
+ SimpleLruSetLatestPage(ClogCtl,
+ TransactionIdToPage(ShmemVariableCache->nextXid));
}
/*
@@ -333,51 +335,20 @@ WriteZeroPageXlogRec(int pageno)
rdata.data = (char *) (&pageno);
rdata.len = sizeof(int);
rdata.next = NULL;
- (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
-}
-
-/*
- * CLOG resource manager's routines
- */
-void
-clog_redo(XLogRecPtr lsn, XLogRecord *record)
-{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- if (info == CLOG_ZEROPAGE)
- {
- int pageno;
- int slotno;
-
- memcpy(&pageno, XLogRecGetData(record), sizeof(int));
-
- LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
-
- slotno = ZeroCLOGPage(pageno, false);
- SimpleLruWritePage(ClogCtl, slotno, NULL);
- /* Assert(ClogCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
-
- LWLockRelease(ClogCtl->ControlLock);
- }
+ (void) XLogInsert(RM_SLRU_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
}
+/* Redo a ZEROPAGE action during WAL replay */
void
-clog_undo(XLogRecPtr lsn, XLogRecord *record)
+clog_zeropage_redo(int pageno)
{
-}
+ int slotno;
-void
-clog_desc(char *buf, uint8 xl_info, char *rec)
-{
- uint8 info = xl_info & ~XLR_INFO_MASK;
+ LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
- if (info == CLOG_ZEROPAGE)
- {
- int pageno;
+ slotno = ZeroCLOGPage(pageno, false);
+ SimpleLruWritePage(ClogCtl, slotno, NULL);
+ /* Assert(ClogCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
- memcpy(&pageno, rec, sizeof(int));
- sprintf(buf + strlen(buf), "zeropage: %d", pageno);
- }
- else
- strcat(buf, "UNKNOWN");
+ LWLockRelease(ClogCtl->ControlLock);
}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 112363bf291..d6c8c93ca6e 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -3,16 +3,16 @@
*
* Resource managers definition
*
- * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.12 2003/11/29 19:51:40 pgsql Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.13 2004/07/01 00:49:42 tgl Exp $
*/
#include "postgres.h"
-#include "access/clog.h"
#include "access/gist.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/rtree.h"
+#include "access/slru.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/smgr.h"
@@ -23,7 +23,7 @@ RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"XLOG", xlog_redo, xlog_undo, xlog_desc, NULL, NULL},
{"Transaction", xact_redo, xact_undo, xact_desc, NULL, NULL},
{"Storage", smgr_redo, smgr_undo, smgr_desc, NULL, NULL},
- {"CLOG", clog_redo, clog_undo, clog_desc, NULL, NULL},
+ {"SLRU", slru_redo, slru_undo, slru_desc, NULL, NULL},
{"Reserved 4", NULL, NULL, NULL, NULL, NULL},
{"Reserved 5", NULL, NULL, NULL, NULL, NULL},
{"Reserved 6", NULL, NULL, NULL, NULL, NULL},
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 58798d0f07f..0181e2d6260 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.16 2004/05/31 03:47:54 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.17 2004/07/01 00:49:42 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,8 +16,9 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/clog.h"
#include "access/slru.h"
-#include "access/clog.h" /* only for NUM_CLOG_BUFFERS */
+#include "access/subtrans.h"
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
@@ -1025,3 +1026,55 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
return found;
}
+
+/*
+ * SLRU resource manager's routines
+ */
+void
+slru_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ int pageno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+ switch (info)
+ {
+ case CLOG_ZEROPAGE:
+ clog_zeropage_redo(pageno);
+ break;
+ case SUBTRANS_ZEROPAGE:
+ subtrans_zeropage_redo(pageno);
+ break;
+ default:
+ elog(PANIC, "slru_redo: unknown op code %u", info);
+ }
+}
+
+void
+slru_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+}
+
+void
+slru_desc(char *buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ if (info == CLOG_ZEROPAGE)
+ {
+ int pageno;
+
+ memcpy(&pageno, rec, sizeof(int));
+ sprintf(buf + strlen(buf), "clog zeropage: %d", pageno);
+ }
+ else if (info == SUBTRANS_ZEROPAGE)
+ {
+ int pageno;
+
+ memcpy(&pageno, rec, sizeof(int));
+ sprintf(buf + strlen(buf), "subtrans zeropage: %d", pageno);
+ }
+ else
+ strcat(buf, "UNKNOWN");
+}
diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c
new file mode 100644
index 00000000000..1babedbe590
--- /dev/null
+++ b/src/backend/access/transam/subtrans.c
@@ -0,0 +1,388 @@
+/*-------------------------------------------------------------------------
+ *
+ * subtrans.c
+ * PostgreSQL subtrans-log manager
+ *
+ * The pg_subtrans manager is a pg_clog-like manager which stores the parent
+ * transaction Id for each transaction. It is a fundamental part of the
+ * nested transactions implementation. A main transaction has a parent
+ * of InvalidTransactionId, and each subtransaction has its immediate parent.
+ * The tree can easily be walked from child to parent, but not in the
+ * opposite direction.
+ *
+ * This code is mostly derived from clog.c.
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL: pgsql/src/backend/access/transam/subtrans.c,v 1.1 2004/07/01 00:49:42 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "miscadmin.h"
+#include "storage/lwlock.h"
+
+
+/*
+ * Defines for SubTrans page and segment sizes. A page is the same BLCKSZ
+ * as is used everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * SubTrans page numbering also wraps around at
+ * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE, and segment numbering at
+ * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE/SLRU_SEGMENTS_PER_PAGE. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateSubTrans (see SubTransPagePrecedes).
+ */
+
+/* We need four bytes per xact */
+#define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId))
+
+#define TransactionIdToPage(xid) ((xid) / (TransactionId) SUBTRANS_XACTS_PER_PAGE)
+#define TransactionIdToEntry(xid) ((xid) % (TransactionId) SUBTRANS_XACTS_PER_PAGE)
+
+
+/*----------
+ * Shared-memory data structures for SUBTRANS control
+ *
+ * XLOG interactions: this module generates an XLOG record whenever a new
+ * SUBTRANS page is initialized to zeroes. Other writes of SUBTRANS come from
+ * recording of transaction commit or abort in xact.c, which generates its
+ * own XLOG records for these events and will re-perform the status update
+ * on redo; so we need make no additional XLOG entry here. Also, the XLOG
+ * is guaranteed flushed through the XLOG commit record before we are called
+ * to log a commit, so the WAL rule "write xlog before data" is satisfied
+ * automatically for commits, and we don't really care for aborts. Therefore,
+ * we don't need to mark SUBTRANS pages with LSN information; we have enough
+ * synchronization already.
+ *----------
+ */
+
+
+static SlruCtlData SubTransCtlData;
+static SlruCtl SubTransCtl = &SubTransCtlData;
+
+
+static int ZeroSUBTRANSPage(int pageno, bool writeXlog);
+static bool SubTransPagePrecedes(int page1, int page2);
+static void WriteZeroPageXlogRec(int pageno);
+
+
+/*
+ * Record the parent of a subtransaction in the subtrans log.
+ */
+void
+SubTransSetParent(TransactionId xid, TransactionId parent)
+{
+ int pageno = TransactionIdToPage(xid);
+ int entryno = TransactionIdToEntry(xid);
+ TransactionId *ptr;
+
+ LWLockAcquire(SubTransCtl->ControlLock, LW_EXCLUSIVE);
+
+ ptr = (TransactionId *) SimpleLruReadPage(SubTransCtl, pageno, xid, true);
+ ptr += entryno;
+
+ /* Current state should be 0 or target state */
+ Assert(*ptr == InvalidTransactionId || *ptr == parent);
+
+ *ptr = parent;
+
+ /* ...->page_status[slotno] = SLRU_PAGE_DIRTY; already done */
+
+ LWLockRelease(SubTransCtl->ControlLock);
+}
+
+/*
+ * Interrogate the parent of a transaction in the subtrans log.
+ */
+TransactionId
+SubTransGetParent(TransactionId xid)
+{
+ int pageno = TransactionIdToPage(xid);
+ int entryno = TransactionIdToEntry(xid);
+ TransactionId *ptr;
+ TransactionId parent;
+
+ /* Bootstrap and frozen XIDs have no parent */
+ if (!TransactionIdIsNormal(xid))
+ return InvalidTransactionId;
+
+ LWLockAcquire(SubTransCtl->ControlLock, LW_EXCLUSIVE);
+
+ ptr = (TransactionId *) SimpleLruReadPage(SubTransCtl, pageno, xid, false);
+ ptr += entryno;
+
+ parent = *ptr;
+
+ LWLockRelease(SubTransCtl->ControlLock);
+
+ return parent;
+}
+
+/*
+ * SubTransGetTopmostTransaction
+ *
+ * Returns the topmost transaction of the given transaction id.
+ */
+TransactionId
+SubTransGetTopmostTransaction(TransactionId xid)
+{
+ TransactionId parentXid = xid,
+ previousXid = xid;
+
+ while (TransactionIdIsValid(parentXid))
+ {
+ previousXid = parentXid;
+ parentXid = SubTransGetParent(parentXid);
+ }
+
+ Assert(TransactionIdIsValid(previousXid));
+
+ return previousXid;
+}
+
+/*
+ * SubTransXidsHaveCommonAncestor
+ *
+ * Returns true iff the Xids have a common ancestor
+ */
+bool
+SubTransXidsHaveCommonAncestor(TransactionId xid1, TransactionId xid2)
+{
+ if (TransactionIdEquals(xid1, xid2))
+ return true;
+
+ while (TransactionIdIsValid(xid1) && TransactionIdIsValid(xid2))
+ {
+ if (TransactionIdPrecedes(xid2, xid1))
+ xid1 = SubTransGetParent(xid1);
+ else
+ xid2 = SubTransGetParent(xid2);
+
+ if (TransactionIdEquals(xid1, xid2))
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Initialization of shared memory for Subtrans
+ */
+
+int
+SUBTRANSShmemSize(void)
+{
+ return SimpleLruShmemSize();
+}
+
+void
+SUBTRANSShmemInit(void)
+{
+ SimpleLruInit(SubTransCtl, "SUBTRANS Ctl", "pg_subtrans");
+ SubTransCtl->PagePrecedes = SubTransPagePrecedes;
+}
+
+/*
+ * This func must be called ONCE on system install. It creates
+ * the initial SubTrans segment. (The SubTrans directory is assumed to
+ * have been created by initdb, and SubTransShmemInit must have been called
+ * already.)
+ */
+void
+BootStrapSUBTRANS(void)
+{
+ int slotno;
+
+ LWLockAcquire(SubTransCtl->ControlLock, LW_EXCLUSIVE);
+
+ /* Create and zero the first page of the commit log */
+ slotno = ZeroSUBTRANSPage(0, false);
+
+ /* Make sure it's written out */
+ SimpleLruWritePage(SubTransCtl, slotno, NULL);
+ /* Assert(SubTransCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
+
+ LWLockRelease(SubTransCtl->ControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of SubTrans to zeroes.
+ * If writeXlog is TRUE, also emit an XLOG record saying we did this.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroSUBTRANSPage(int pageno, bool writeXlog)
+{
+ int slotno = SimpleLruZeroPage(SubTransCtl, pageno);
+
+ if (writeXlog)
+ WriteZeroPageXlogRec(pageno);
+
+ return slotno;
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ */
+void
+StartupSUBTRANS(void)
+{
+ /*
+ * Initialize our idea of the latest page number.
+ */
+ SimpleLruSetLatestPage(SubTransCtl,
+ TransactionIdToPage(ShmemVariableCache->nextXid));
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownSUBTRANS(void)
+{
+ SimpleLruFlush(SubTransCtl, false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointSUBTRANS(void)
+{
+ SimpleLruFlush(SubTransCtl, true);
+}
+
+
+/*
+ * Make sure that SubTrans has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock. We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty subtrans or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendSUBTRANS(TransactionId newestXact)
+{
+ int pageno;
+
+ /*
+ * No work except at first XID of a page. But beware: just after
+ * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ */
+ if (TransactionIdToEntry(newestXact) != 0 &&
+ !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ return;
+
+ pageno = TransactionIdToPage(newestXact);
+
+ LWLockAcquire(SubTransCtl->ControlLock, LW_EXCLUSIVE);
+
+ /* Zero the page and make an XLOG entry about it */
+ ZeroSUBTRANSPage(pageno, true);
+
+ LWLockRelease(SubTransCtl->ControlLock);
+}
+
+
+/*
+ * Remove all SubTrans segments before the one holding the passed transaction ID
+ *
+ * When this is called, we know that the database logically contains no
+ * reference to transaction IDs older than oldestXact. However, we must
+ * not truncate the SubTrans until we have performed a checkpoint, to ensure
+ * that no such references remain on disk either; else a crash just after
+ * the truncation might leave us with a problem. Since SubTrans segments hold
+ * a large number of transactions, the opportunity to actually remove a
+ * segment is fairly rare, and so it seems best not to do the checkpoint
+ * unless we have confirmed that there is a removable segment. Therefore
+ * we issue the checkpoint command here, not in higher-level code as might
+ * seem cleaner.
+ */
+void
+TruncateSUBTRANS(TransactionId oldestXact)
+{
+ int cutoffPage;
+
+ /*
+ * The cutoff point is the start of the segment containing oldestXact.
+ * We pass the *page* containing oldestXact to SimpleLruTruncate.
+ */
+ cutoffPage = TransactionIdToPage(oldestXact);
+ SimpleLruTruncate(SubTransCtl, cutoffPage);
+}
+
+
+/*
+ * Decide which of two SubTrans page numbers is "older" for truncation purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic. However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+SubTransPagePrecedes(int page1, int page2)
+{
+ TransactionId xid1;
+ TransactionId xid2;
+
+ xid1 = ((TransactionId) page1) * SUBTRANS_XACTS_PER_PAGE;
+ xid1 += FirstNormalTransactionId;
+ xid2 = ((TransactionId) page2) * SUBTRANS_XACTS_PER_PAGE;
+ xid2 += FirstNormalTransactionId;
+
+ return TransactionIdPrecedes(xid1, xid2);
+}
+
+
+/*
+ * Write a ZEROPAGE xlog record
+ *
+ * Note: xlog record is marked as outside transaction control, since we
+ * want it to be redone whether the invoking transaction commits or not.
+ * (Besides which, this is normally done just before entering a transaction.)
+ */
+static void
+WriteZeroPageXlogRec(int pageno)
+{
+ XLogRecData rdata;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) (&pageno);
+ rdata.len = sizeof(int);
+ rdata.next = NULL;
+ (void) XLogInsert(RM_SLRU_ID, SUBTRANS_ZEROPAGE | XLOG_NO_TRAN, &rdata);
+}
+
+/* Redo a ZEROPAGE action during WAL replay */
+void
+subtrans_zeropage_redo(int pageno)
+{
+ int slotno;
+
+ LWLockAcquire(SubTransCtl->ControlLock, LW_EXCLUSIVE);
+
+ slotno = ZeroSUBTRANSPage(pageno, false);
+ SimpleLruWritePage(SubTransCtl, slotno, NULL);
+ /* Assert(SubTransCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
+
+ LWLockRelease(SubTransCtl->ControlLock);
+}
diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c
index bbd4f08bf06..34d281de587 100644
--- a/src/backend/access/transam/transam.c
+++ b/src/backend/access/transam/transam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/transam.c,v 1.56 2003/11/29 19:51:40 pgsql Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/transam.c,v 1.57 2004/07/01 00:49:42 tgl Exp $
*
* NOTES
* This file contains the high level access-method interface to the
@@ -20,6 +20,7 @@
#include "postgres.h"
#include "access/clog.h"
+#include "access/subtrans.h"
#include "access/transam.h"
@@ -35,44 +36,40 @@
bool AMI_OVERRIDE = false;
-static bool TransactionLogTest(TransactionId transactionId, XidStatus status);
+static XidStatus TransactionLogFetch(TransactionId transactionId);
static void TransactionLogUpdate(TransactionId transactionId,
XidStatus status);
/* ----------------
- * Single-item cache for results of TransactionLogTest.
+ * Single-item cache for results of TransactionLogFetch.
* ----------------
*/
-static TransactionId cachedTestXid = InvalidTransactionId;
-static XidStatus cachedTestXidStatus;
+static TransactionId cachedFetchXid = InvalidTransactionId;
+static XidStatus cachedFetchXidStatus;
/* ----------------------------------------------------------------
* postgres log access method interface
*
- * TransactionLogTest
+ * TransactionLogFetch
* TransactionLogUpdate
* ----------------------------------------------------------------
*/
-/* --------------------------------
- * TransactionLogTest
- * --------------------------------
+/*
+ * TransactionLogFetch --- fetch commit status of specified transaction id
*/
-
-static bool /* true/false: does transaction id have
- * specified status? */
-TransactionLogTest(TransactionId transactionId, /* transaction id to test */
- XidStatus status) /* transaction status */
+static XidStatus
+TransactionLogFetch(TransactionId transactionId)
{
- XidStatus xidstatus; /* recorded status of xid */
+ XidStatus xidstatus;
/*
* Before going to the commit log manager, check our single item cache
* to see if we didn't just check the transaction status a moment ago.
*/
- if (TransactionIdEquals(transactionId, cachedTestXid))
- return (status == cachedTestXidStatus);
+ if (TransactionIdEquals(transactionId, cachedFetchXid))
+ return cachedFetchXidStatus;
/*
* Also, check to see if the transaction ID is a permanent one.
@@ -80,10 +77,10 @@ TransactionLogTest(TransactionId transactionId, /* transaction id to test */
if (!TransactionIdIsNormal(transactionId))
{
if (TransactionIdEquals(transactionId, BootstrapTransactionId))
- return (status == TRANSACTION_STATUS_COMMITTED);
+ return TRANSACTION_STATUS_COMMITTED;
if (TransactionIdEquals(transactionId, FrozenTransactionId))
- return (status == TRANSACTION_STATUS_COMMITTED);
- return (status == TRANSACTION_STATUS_ABORTED);
+ return TRANSACTION_STATUS_COMMITTED;
+ return TRANSACTION_STATUS_ABORTED;
}
/*
@@ -92,15 +89,17 @@ TransactionLogTest(TransactionId transactionId, /* transaction id to test */
xidstatus = TransactionIdGetStatus(transactionId);
/*
- * DO NOT cache status for unfinished transactions!
+ * DO NOT cache status for unfinished or sub-committed transactions!
+ * We only cache status that is guaranteed not to change.
*/
- if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
+ if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS &&
+ xidstatus != TRANSACTION_STATUS_SUB_COMMITTED)
{
- TransactionIdStore(transactionId, &cachedTestXid);
- cachedTestXidStatus = xidstatus;
+ TransactionIdStore(transactionId, &cachedFetchXid);
+ cachedFetchXidStatus = xidstatus;
}
- return (status == xidstatus);
+ return xidstatus;
}
/* --------------------------------
@@ -115,12 +114,23 @@ TransactionLogUpdate(TransactionId transactionId, /* trans id to update */
* update the commit log
*/
TransactionIdSetStatus(transactionId, status);
+}
- /*
- * update (invalidate) our single item TransactionLogTest cache.
- */
- TransactionIdStore(transactionId, &cachedTestXid);
- cachedTestXidStatus = status;
+/*
+ * TransactionLogMultiUpdate
+ *
+ * Update multiple transaction identifiers to a given status.
+ * Don't depend on this being atomic; it's not.
+ */
+static void
+TransactionLogMultiUpdate(int nxids, TransactionId *xids, XidStatus status)
+{
+ int i;
+
+ Assert(nxids != 0);
+
+ for (i = 0; i < nxids; i++)
+ TransactionIdSetStatus(xids[i], status);
}
/* --------------------------------
@@ -171,13 +181,38 @@ AmiTransactionOverride(bool flag)
bool /* true if given transaction committed */
TransactionIdDidCommit(TransactionId transactionId)
{
+ XidStatus xidstatus;
+
if (AMI_OVERRIDE)
{
Assert(transactionId == BootstrapTransactionId);
return true;
}
- return TransactionLogTest(transactionId, TRANSACTION_STATUS_COMMITTED);
+ xidstatus = TransactionLogFetch(transactionId);
+
+ /*
+ * If it's marked committed, it's committed.
+ */
+ if (xidstatus == TRANSACTION_STATUS_COMMITTED)
+ return true;
+
+ /*
+ * If it's marked subcommitted, we have to check the parent recursively.
+ */
+ if (xidstatus == TRANSACTION_STATUS_SUB_COMMITTED)
+ {
+ TransactionId parentXid;
+
+ parentXid = SubTransGetParent(transactionId);
+ Assert(TransactionIdIsValid(parentXid));
+ return TransactionIdDidCommit(parentXid);
+ }
+
+ /*
+ * It's not committed.
+ */
+ return false;
}
/*
@@ -190,35 +225,49 @@ TransactionIdDidCommit(TransactionId transactionId)
bool /* true if given transaction aborted */
TransactionIdDidAbort(TransactionId transactionId)
{
+ XidStatus xidstatus;
+
if (AMI_OVERRIDE)
{
Assert(transactionId == BootstrapTransactionId);
return false;
}
- return TransactionLogTest(transactionId, TRANSACTION_STATUS_ABORTED);
-}
+ xidstatus = TransactionLogFetch(transactionId);
-/*
- * Now this func in shmem.c and gives quality answer by scanning
- * PGPROC structures of all running backend. - vadim 11/26/96
- *
- * Old comments:
- * true if given transaction has neither committed nor aborted
- */
-#ifdef NOT_USED
-bool
-TransactionIdIsInProgress(TransactionId transactionId)
-{
- if (AMI_OVERRIDE)
+ /*
+ * If it's marked aborted, it's aborted.
+ */
+ if (xidstatus == TRANSACTION_STATUS_ABORTED)
+ return true;
+
+ /*
+ * If it's marked subcommitted, we have to check the parent recursively.
+ *
+ * If we detect that the parent has aborted, update pg_clog to show the
+ * subtransaction as aborted. This is only needed when the parent
+ * crashed before either committing or aborting. We want to clean up
+ * pg_clog so future visitors don't need to make this check again.
+ */
+ if (xidstatus == TRANSACTION_STATUS_SUB_COMMITTED)
{
- Assert(transactionId == BootstrapTransactionId);
- return false;
+ TransactionId parentXid;
+ bool parentAborted;
+
+ parentXid = SubTransGetParent(transactionId);
+ parentAborted = TransactionIdDidAbort(parentXid);
+
+ if (parentAborted)
+ TransactionIdAbort(transactionId);
+
+ return parentAborted;
}
- return TransactionLogTest(transactionId, TRANSACTION_STATUS_IN_PROGRESS);
+ /*
+ * It's not aborted.
+ */
+ return false;
}
-#endif /* NOT_USED */
/* --------------------------------
* TransactionId Commit
@@ -252,6 +301,46 @@ TransactionIdAbort(TransactionId transactionId)
TransactionLogUpdate(transactionId, TRANSACTION_STATUS_ABORTED);
}
+/*
+ * TransactionIdSubCommit
+ * Marks the subtransaction associated with the identifier as
+ * sub-committed.
+ */
+void
+TransactionIdSubCommit(TransactionId transactionId)
+{
+ TransactionLogUpdate(transactionId, TRANSACTION_STATUS_SUB_COMMITTED);
+}
+
+/*
+ * TransactionIdCommitTree
+ * Marks all the given transaction ids as committed.
+ *
+ * The caller has to be sure that this is used only to mark subcommitted
+ * subtransactions as committed, and only *after* marking the toplevel
+ * parent as committed. Otherwise there is a race condition against
+ * TransactionIdDidCommit.
+ */
+void
+TransactionIdCommitTree(int nxids, TransactionId *xids)
+{
+ if (nxids > 0)
+ TransactionLogMultiUpdate(nxids, xids, TRANSACTION_STATUS_COMMITTED);
+}
+
+/*
+ * TransactionIdAbortTree
+ * Marks all the given transaction ids as aborted.
+ *
+ * We don't need to worry about the non-atomic behavior, since any onlookers
+ * will consider all the xacts as not-yet-committed anyway.
+ */
+void
+TransactionIdAbortTree(int nxids, TransactionId *xids)
+{
+ if (nxids > 0)
+ TransactionLogMultiUpdate(nxids, xids, TRANSACTION_STATUS_ABORTED);
+}
/*
* TransactionIdPrecedes --- is id1 logically < id2?
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 617c7d19c43..9d3b0b323aa 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -6,7 +6,7 @@
* Copyright (c) 2000-2003, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/varsup.c,v 1.55 2004/01/26 19:15:59 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/varsup.c,v 1.56 2004/07/01 00:49:42 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -14,6 +14,7 @@
#include "postgres.h"
#include "access/clog.h"
+#include "access/subtrans.h"
#include "access/transam.h"
#include "storage/ipc.h"
#include "storage/proc.h"
@@ -30,7 +31,7 @@ VariableCache ShmemVariableCache = NULL;
* Allocate the next XID for my new transaction.
*/
TransactionId
-GetNewTransactionId(void)
+GetNewTransactionId(bool isSubXact)
{
TransactionId xid;
@@ -52,8 +53,11 @@ GetNewTransactionId(void)
* commit a later XID before we zero the page. Fortunately, a page of
* the commit log holds 32K or more transactions, so we don't have to
* do this very often.
+ *
+ * Extend pg_subtrans too.
*/
ExtendCLOG(xid);
+ ExtendSUBTRANS(xid);
/*
* Now advance the nextXid counter. This must not happen until after
@@ -82,8 +86,11 @@ GetNewTransactionId(void)
* its own spinlock used only for fetching/storing that PGPROC's xid.
* (SInvalLock would then mean primarily that PGPROCs couldn't be added/
* removed while holding the lock.)
+ *
+ * We don't want a subtransaction to update the stored Xid; we'll check
+ * if a transaction Xid is a running subxact by checking pg_subtrans.
*/
- if (MyProc != NULL)
+ if (MyProc != NULL && !isSubXact)
MyProc->xid = xid;
LWLockRelease(XidGenLock);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2ae0fc5b21d..fcf5b374453 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.168 2004/06/03 02:08:00 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.169 2004/07/01 00:49:42 tgl Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
@@ -148,6 +148,7 @@
#include "access/hash.h"
#include "access/nbtree.h"
#include "access/rtree.h"
+#include "access/subtrans.h"
#include "access/xact.h"
#include "catalog/heap.h"
#include "catalog/index.h"
@@ -190,20 +191,53 @@ static void CommitTransaction(void);
static void RecordTransactionAbort(void);
static void StartTransaction(void);
+static void RecordSubTransactionCommit(void);
+static void StartSubTransaction(void);
+static void CommitSubTransaction(void);
+static void AbortSubTransaction(void);
+static void CleanupSubTransaction(void);
+static void PushTransaction(void);
+static void PopTransaction(void);
+
+static void AtSubAbort_Locks(void);
+static void AtSubAbort_Memory(void);
+static void AtSubCleanup_Memory(void);
+static void AtSubCommit_Memory(void);
+static void AtSubStart_Memory(void);
+
+static void ShowTransactionState(const char *str);
+static void ShowTransactionStateRec(TransactionState state);
+static const char *BlockStateAsString(TBlockState blockState);
+static const char *TransStateAsString(TransState state);
+
/*
- * global variables holding the current transaction state.
+ * CurrentTransactionState always points to the current transaction state
+ * block. It will point to TopTransactionStateData when not in a
+ * transaction at all, or when in a top-level transaction.
*/
-static TransactionStateData CurrentTransactionStateData = {
+static TransactionStateData TopTransactionStateData = {
0, /* transaction id */
FirstCommandId, /* command id */
- 0, /* scan command id */
- 0x0, /* start time */
TRANS_DEFAULT, /* transaction state */
- TBLOCK_DEFAULT /* transaction block state from the client
+ TBLOCK_DEFAULT, /* transaction block state from the client
* perspective */
+ 0, /* nesting level */
+ NULL, /* cur transaction context */
+ NIL, /* subcommitted child Xids */
+ 0, /* entry-time current userid */
+ NULL /* link to parent state block */
};
-static TransactionState CurrentTransactionState = &CurrentTransactionStateData;
+static TransactionState CurrentTransactionState = &TopTransactionStateData;
+
+/*
+ * These vars hold the value of now(), ie, the transaction start time.
+ * This does not change as we enter and exit subtransactions, so we don't
+ * keep it inside the TransactionState stack.
+ */
+static AbsoluteTime xactStartTime; /* integer part */
+static int xactStartTimeUsec; /* microsecond part */
+
/*
* User-tweakable parameters
@@ -282,7 +316,8 @@ IsAbortedTransactionBlockState(void)
{
TransactionState s = CurrentTransactionState;
- if (s->blockState == TBLOCK_ABORT)
+ if (s->blockState == TBLOCK_ABORT ||
+ s->blockState == TBLOCK_SUBABORT)
return true;
return false;
@@ -290,6 +325,19 @@ IsAbortedTransactionBlockState(void)
/*
+ * GetTopTransactionId
+ *
+ * Get the ID of the main transaction, even if we are currently inside
+ * a subtransaction.
+ */
+TransactionId
+GetTopTransactionId(void)
+{
+ return TopTransactionStateData.transactionIdData;
+}
+
+
+/*
* GetCurrentTransactionId
*/
TransactionId
@@ -319,9 +367,7 @@ GetCurrentCommandId(void)
AbsoluteTime
GetCurrentTransactionStartTime(void)
{
- TransactionState s = CurrentTransactionState;
-
- return s->startTime;
+ return xactStartTime;
}
@@ -331,11 +377,23 @@ GetCurrentTransactionStartTime(void)
AbsoluteTime
GetCurrentTransactionStartTimeUsec(int *msec)
{
- TransactionState s = CurrentTransactionState;
+ *msec = xactStartTimeUsec;
+ return xactStartTime;
+}
+
- *msec = s->startTimeUsec;
+/*
+ * GetCurrentTransactionNestLevel
+ *
+ * Note: this will return zero when not inside any transaction, one when
+ * inside a top-level transaction, etc.
+ */
+int
+GetCurrentTransactionNestLevel(void)
+{
+ TransactionState s = CurrentTransactionState;
- return s->startTime;
+ return s->nestingLevel;
}
@@ -358,19 +416,27 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
return false;
}
- return TransactionIdEquals(xid, s->transactionIdData);
-}
+ /*
+ * We will return true for the Xid of the current subtransaction,
+ * any of its subcommitted children, any of its parents, or any of
+ * their previously subcommitted children.
+ */
+ while (s != NULL)
+ {
+ ListCell *cell;
+ if (TransactionIdEquals(xid, s->transactionIdData))
+ return true;
+ foreach(cell, s->childXids)
+ {
+ if (TransactionIdEquals(xid, lfirst_int(cell)))
+ return true;
+ }
-/*
- * CommandIdIsCurrentCommandId
- */
-bool
-CommandIdIsCurrentCommandId(CommandId cid)
-{
- TransactionState s = CurrentTransactionState;
+ s = s->parent;
+ }
- return (cid == s->commandId);
+ return false;
}
@@ -437,13 +503,15 @@ AtStart_Locks(void)
static void
AtStart_Memory(void)
{
+ TransactionState s = CurrentTransactionState;
+
/*
* We shouldn't have a transaction context already.
*/
Assert(TopTransactionContext == NULL);
/*
- * Create a toplevel context for the transaction, and make it active.
+ * Create a toplevel context for the transaction.
*/
TopTransactionContext =
AllocSetContextCreate(TopMemoryContext,
@@ -452,9 +520,47 @@ AtStart_Memory(void)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(TopTransactionContext);
+ /*
+ * In a top-level transaction, CurTransactionContext is the same as
+ * TopTransactionContext.
+ */
+ CurTransactionContext = TopTransactionContext;
+ s->curTransactionContext = CurTransactionContext;
+
+ /* Make the CurTransactionContext active. */
+ MemoryContextSwitchTo(CurTransactionContext);
}
+/* ----------------------------------------------------------------
+ * StartSubTransaction stuff
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * AtSubStart_Memory
+ */
+static void
+AtSubStart_Memory(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(CurTransactionContext != NULL);
+
+ /*
+ * Create a CurTransactionContext, which will be used to hold data that
+ * survives subtransaction commit but disappears on subtransaction abort.
+ * We make it a child of the immediate parent's CurTransactionContext.
+ */
+ CurTransactionContext = AllocSetContextCreate(CurTransactionContext,
+ "CurTransactionContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ s->curTransactionContext = CurTransactionContext;
+
+ /* Make the CurTransactionContext active. */
+ MemoryContextSwitchTo(CurTransactionContext);
+}
/* ----------------------------------------------------------------
* CommitTransaction stuff
@@ -467,13 +573,25 @@ AtStart_Memory(void)
void
RecordTransactionCommit(void)
{
+ int nrels;
+ RelFileNode *rptr;
+ int nchildren;
+ TransactionId *children;
+
+ /* Get data needed for commit record */
+ nrels = smgrGetPendingDeletes(true, &rptr);
+ nchildren = xactGetCommittedChildren(&children, false);
+
/*
- * If we made neither any XLOG entries nor any temp-rel updates, we
- * can omit recording the transaction commit at all.
+ * If we made neither any XLOG entries nor any temp-rel updates,
+ * and have no files to be deleted, we can omit recording the transaction
+ * commit at all. (This test includes the effects of subtransactions,
+ * so the presence of committed subxacts need not alone force a write.)
*/
- if (MyXactMadeXLogEntry || MyXactMadeTempRelUpdate)
+ if (MyXactMadeXLogEntry || MyXactMadeTempRelUpdate || nrels > 0)
{
TransactionId xid = GetCurrentTransactionId();
+ bool madeTCentries;
XLogRecPtr recptr;
/* Tell bufmgr and smgr to prepare for commit */
@@ -482,40 +600,46 @@ RecordTransactionCommit(void)
START_CRIT_SECTION();
/*
- * We only need to log the commit in xlog if the transaction made
- * any transaction-controlled XLOG entries. (Otherwise, its XID
- * appears nowhere in permanent storage, so no one else will ever
- * care if it committed.)
+ * We only need to log the commit in XLOG if the transaction made
+ * any transaction-controlled XLOG entries or will delete files.
+ * (If it made no transaction-controlled XLOG entries, its XID
+ * appears nowhere in permanent storage, so no one else will ever care
+ * if it committed.)
*/
- if (MyLastRecPtr.xrecoff != 0)
+ madeTCentries = (MyLastRecPtr.xrecoff != 0);
+ if (madeTCentries || nrels > 0)
{
- /* Need to emit a commit record */
- XLogRecData rdata[2];
+ XLogRecData rdata[3];
+ int lastrdata = 0;
xl_xact_commit xlrec;
- int nrels;
- RelFileNode *rptr;
-
- nrels = smgrGetPendingDeletes(true, &rptr);
xlrec.xtime = time(NULL);
+ xlrec.nrels = nrels;
+ xlrec.nsubxacts = nchildren;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
+ /* dump rels to delete */
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) rptr;
rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].next = NULL;
+ lastrdata = 1;
}
- else
- rdata[0].next = NULL;
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
-
- if (rptr)
- pfree(rptr);
}
else
{
@@ -529,6 +653,9 @@ RecordTransactionCommit(void)
* example, if we reported a nextval() result to the client, this
* ensures that any XLOG record generated by nextval will hit the
* disk before we report the transaction committed.
+ *
+ * Note: if we generated a commit record above, MyXactMadeXLogEntry
+ * will certainly be set now.
*/
if (MyXactMadeXLogEntry)
{
@@ -560,8 +687,12 @@ RecordTransactionCommit(void)
* is okay because no one else will ever care whether we
* committed.
*/
- if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate)
+ if (madeTCentries || MyXactMadeTempRelUpdate)
+ {
TransactionIdCommit(xid);
+ /* to avoid race conditions, the parent must commit first */
+ TransactionIdCommitTree(nchildren, children);
+ }
END_CRIT_SECTION();
}
@@ -573,6 +704,12 @@ RecordTransactionCommit(void)
/* Show myself as out of the transaction in PGPROC array */
MyProc->logRec.xrecoff = 0;
+
+ /* And clean up local data */
+ if (rptr)
+ pfree(rptr);
+ if (children)
+ pfree(children);
}
@@ -590,7 +727,7 @@ AtCommit_Cache(void)
/*
* Make catalog changes visible to all backends.
*/
- AtEOXactInvalidationMessages(true);
+ AtEOXact_Inval(true);
}
/*
@@ -602,7 +739,7 @@ AtCommit_LocalCache(void)
/*
* Make catalog changes visible to me for the next command.
*/
- CommandEndInvalidationMessages(true);
+ CommandEndInvalidationMessages();
}
/*
@@ -616,7 +753,7 @@ AtCommit_Locks(void)
*
* Then you're up a creek! -mer 5/24/92
*/
- ProcReleaseLocks(true);
+ ProcReleaseLocks(ReleaseAllExceptSession, 0, NULL);
}
/*
@@ -638,6 +775,88 @@ AtCommit_Memory(void)
Assert(TopTransactionContext != NULL);
MemoryContextDelete(TopTransactionContext);
TopTransactionContext = NULL;
+ CurTransactionContext = NULL;
+ CurrentTransactionState->curTransactionContext = NULL;
+}
+
+/* ----------------------------------------------------------------
+ * CommitSubTransaction stuff
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * AtSubCommit_Memory
+ *
+ * We do not throw away the child's CurTransactionContext, since the data
+ * it contains will be needed at upper commit.
+ */
+static void
+AtSubCommit_Memory(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parent != NULL);
+
+ /* Return to parent transaction level's memory context. */
+ CurTransactionContext = s->parent->curTransactionContext;
+ MemoryContextSwitchTo(CurTransactionContext);
+}
+
+/*
+ * AtSubCommit_childXids
+ *
+ * Pass my own XID and my child XIDs up to my parent as committed children.
+ */
+static void
+AtSubCommit_childXids(void)
+{
+ TransactionState s = CurrentTransactionState;
+ MemoryContext old_cxt;
+
+ Assert(s->parent != NULL);
+
+ old_cxt = MemoryContextSwitchTo(s->parent->curTransactionContext);
+
+ s->parent->childXids = list_concat(s->parent->childXids, s->childXids);
+ s->childXids = NIL; /* ensure list not doubly referenced */
+
+ s->parent->childXids = lappend_int(s->parent->childXids,
+ s->transactionIdData);
+
+ MemoryContextSwitchTo(old_cxt);
+}
+
+/*
+ * RecordSubTransactionCommit
+ */
+static void
+RecordSubTransactionCommit(void)
+{
+ /*
+ * We do not log the subcommit in XLOG; it doesn't matter until
+ * the top-level transaction commits.
+ *
+ * We must mark the subtransaction subcommitted in clog if its XID
+ * appears either in permanent rels or in local temporary rels. We
+ * test this by seeing if we made transaction-controlled entries
+ * *OR* local-rel tuple updates. (The test here actually covers the
+ * entire transaction tree so far, so it may mark subtransactions that
+ * don't really need it, but it's probably not worth being tenser.
+ * Note that if a prior subtransaction dirtied these variables, then
+ * RecordTransactionCommit will have to do the full pushup anyway...)
+ */
+ if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate)
+ {
+ TransactionId xid = GetCurrentTransactionId();
+
+ /* XXX does this really need to be a critical section? */
+ START_CRIT_SECTION();
+
+ /* Record subtransaction subcommit */
+ TransactionIdSubCommit(xid);
+
+ END_CRIT_SECTION();
+ }
}
/* ----------------------------------------------------------------
@@ -651,14 +870,24 @@ AtCommit_Memory(void)
static void
RecordTransactionAbort(void)
{
+ int nrels;
+ RelFileNode *rptr;
+ int nchildren;
+ TransactionId *children;
+
+ /* Get data needed for abort record */
+ nrels = smgrGetPendingDeletes(false, &rptr);
+ nchildren = xactGetCommittedChildren(&children, false);
+
/*
* If we made neither any transaction-controlled XLOG entries nor any
- * temp-rel updates, we can omit recording the transaction abort at
- * all. No one will ever care that it aborted.
+ * temp-rel updates, and are not going to delete any files, we can omit
+ * recording the transaction abort at all. No one will ever care that
+ * it aborted. (These tests cover our whole transaction tree.)
*/
- if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate)
+ if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate || nrels > 0)
{
- TransactionId xid = GetCurrentTransactionId();
+ TransactionId xid = GetCurrentTransactionId();
/*
* Catch the scenario where we aborted partway through
@@ -671,50 +900,64 @@ RecordTransactionAbort(void)
/*
* We only need to log the abort in XLOG if the transaction made
- * any transaction-controlled XLOG entries. (Otherwise, its XID
- * appears nowhere in permanent storage, so no one else will ever
- * care if it committed.) We do not flush XLOG to disk unless
- * deleting files, since the default assumption after a crash
- * would be that we aborted, anyway.
+ * any transaction-controlled XLOG entries or will delete files.
+ * (If it made no transaction-controlled XLOG entries, its XID
+ * appears nowhere in permanent storage, so no one else will ever care
+ * if it committed.)
+ *
+ * We do not flush XLOG to disk unless deleting files, since the
+ * default assumption after a crash would be that we aborted, anyway.
*/
- if (MyLastRecPtr.xrecoff != 0)
+ if (MyLastRecPtr.xrecoff != 0 || nrels > 0)
{
- XLogRecData rdata[2];
+ XLogRecData rdata[3];
+ int lastrdata = 0;
xl_xact_abort xlrec;
- int nrels;
- RelFileNode *rptr;
XLogRecPtr recptr;
- nrels = smgrGetPendingDeletes(false, &rptr);
-
xlrec.xtime = time(NULL);
+ xlrec.nrels = nrels;
+ xlrec.nsubxacts = nchildren;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactAbort;
+ /* dump rels to delete */
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) rptr;
rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].next = NULL;
+ lastrdata = 1;
}
- else
- rdata[0].next = NULL;
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata);
+ /* Must flush if we are deleting files... */
if (nrels > 0)
XLogFlush(recptr);
-
- if (rptr)
- pfree(rptr);
}
/*
* Mark the transaction aborted in clog. This is not absolutely
* necessary but we may as well do it while we are here.
+ *
+ * The ordering here isn't critical but it seems best to mark the
+ * parent last. That reduces the chance that concurrent
+ * TransactionIdDidAbort calls will decide they need to do redundant
+ * work.
*/
+ TransactionIdAbortTree(nchildren, children);
TransactionIdAbort(xid);
END_CRIT_SECTION();
@@ -727,6 +970,12 @@ RecordTransactionAbort(void)
/* Show myself as out of the transaction in PGPROC array */
MyProc->logRec.xrecoff = 0;
+
+ /* And clean up local data */
+ if (rptr)
+ pfree(rptr);
+ if (children)
+ pfree(children);
}
/*
@@ -736,7 +985,7 @@ static void
AtAbort_Cache(void)
{
AtEOXact_RelationCache(false);
- AtEOXactInvalidationMessages(false);
+ AtEOXact_Inval(false);
}
/*
@@ -750,7 +999,7 @@ AtAbort_Locks(void)
*
* Then you're up a creek without a paddle! -mer
*/
- ProcReleaseLocks(false);
+ ProcReleaseLocks(ReleaseAll, 0, NULL);
}
@@ -779,6 +1028,127 @@ AtAbort_Memory(void)
MemoryContextSwitchTo(TopMemoryContext);
}
+/*
+ * AtSubAbort_Locks
+ */
+static void
+AtSubAbort_Locks(void)
+{
+ int nxids;
+ TransactionId *xids;
+
+ nxids = xactGetCommittedChildren(&xids, true);
+
+ ProcReleaseLocks(ReleaseGivenXids, nxids, xids);
+
+ pfree(xids);
+}
+
+
+/*
+ * AtSubAbort_Memory
+ */
+static void
+AtSubAbort_Memory(void)
+{
+ Assert(TopTransactionContext != NULL);
+
+ MemoryContextSwitchTo(TopTransactionContext);
+}
+
+/*
+ * RecordSubTransactionAbort
+ */
+static void
+RecordSubTransactionAbort(void)
+{
+ int nrels;
+ RelFileNode *rptr;
+ int nchildren;
+ TransactionId *children;
+
+ /* Get data needed for abort record */
+ nrels = smgrGetPendingDeletes(false, &rptr);
+ nchildren = xactGetCommittedChildren(&children, false);
+
+ /*
+ * If we made neither any transaction-controlled XLOG entries nor any
+ * temp-rel updates, and are not going to delete any files, we can omit
+ * recording the transaction abort at all. No one will ever care that
+ * it aborted. (These tests cover our whole transaction tree, and
+ * therefore may mark subxacts that don't really need it, but it's
+ * probably not worth being tenser.)
+ *
+ * In this case we needn't worry about marking subcommitted children as
+ * aborted, because they didn't mark themselves as subcommitted in the
+ * first place; see the optimization in RecordSubTransactionCommit.
+ */
+ if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate || nrels > 0)
+ {
+ TransactionId xid = GetCurrentTransactionId();
+
+ START_CRIT_SECTION();
+
+ /*
+ * We only need to log the abort in XLOG if the transaction made
+ * any transaction-controlled XLOG entries or will delete files.
+ */
+ if (MyLastRecPtr.xrecoff != 0 || nrels > 0)
+ {
+ XLogRecData rdata[3];
+ int lastrdata = 0;
+ xl_xact_abort xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.xtime = time(NULL);
+ xlrec.nrels = nrels;
+ xlrec.nsubxacts = nchildren;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactAbort;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) rptr;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata);
+
+ /* Must flush if we are deleting files... */
+ if (nrels > 0)
+ XLogFlush(recptr);
+ }
+
+ /*
+ * Mark the transaction aborted in clog. This is not absolutely
+ * necessary but we may as well do it while we are here.
+ */
+ TransactionIdAbortTree(nchildren, children);
+ TransactionIdAbort(xid);
+
+ END_CRIT_SECTION();
+ }
+
+ /* And clean up local data */
+ if (rptr)
+ pfree(rptr);
+ if (children)
+ pfree(children);
+}
/* ----------------------------------------------------------------
* CleanupTransaction stuff
@@ -798,16 +1168,47 @@ AtCleanup_Memory(void)
*/
MemoryContextSwitchTo(TopMemoryContext);
+ Assert(CurrentTransactionState->parent == NULL);
+
/*
* Release all transaction-local memory.
*/
if (TopTransactionContext != NULL)
MemoryContextDelete(TopTransactionContext);
TopTransactionContext = NULL;
+ CurTransactionContext = NULL;
+ CurrentTransactionState->curTransactionContext = NULL;
}
/* ----------------------------------------------------------------
+ * CleanupSubTransaction stuff
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * AtSubCleanup_Memory
+ */
+static void
+AtSubCleanup_Memory(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parent != NULL);
+
+ /* Make sure we're not in an about-to-be-deleted context */
+ MemoryContextSwitchTo(s->parent->curTransactionContext);
+ CurTransactionContext = s->parent->curTransactionContext;
+
+ /*
+ * Delete the subxact local memory contexts. Its CurTransactionContext
+ * can go too (note this also kills CurTransactionContexts from any
+ * children of the subxact).
+ */
+ MemoryContextDelete(s->curTransactionContext);
+}
+
+/* ----------------------------------------------------------------
* interface routines
* ----------------------------------------------------------------
*/
@@ -842,20 +1243,34 @@ StartTransaction(void)
/*
* generate a new transaction id
*/
- s->transactionIdData = GetNewTransactionId();
+ s->transactionIdData = GetNewTransactionId(false);
XactLockTableInsert(s->transactionIdData);
/*
+ * set now()
+ */
+ xactStartTime = GetCurrentAbsoluteTimeUsec(&(xactStartTimeUsec));
+
+ /*
* initialize current transaction state fields
*/
s->commandId = FirstCommandId;
- s->startTime = GetCurrentAbsoluteTimeUsec(&(s->startTimeUsec));
+ s->nestingLevel = 1;
+ s->childXids = NIL;
+
+ /*
+ * You might expect to see "s->currentUser = GetUserId();" here, but
+ * you won't because it doesn't work during startup; the userid isn't
+ * set yet during a backend's first transaction start. We only use
+ * the currentUser field in sub-transaction state structs.
+ */
/*
* initialize the various transaction subsystems
*/
AtStart_Memory();
+ AtStart_Inval();
AtStart_Cache();
AtStart_Locks();
@@ -870,6 +1285,7 @@ StartTransaction(void)
*/
s->state = TRANS_INPROGRESS;
+ ShowTransactionState("StartTransaction");
}
/*
@@ -880,11 +1296,14 @@ CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
+ ShowTransactionState("CommitTransaction");
+
/*
* check the current transaction state
*/
if (s->state != TRANS_INPROGRESS)
elog(WARNING, "CommitTransaction and not in in-progress state");
+ Assert(s->parent == NULL);
/*
* Tell the trigger manager that this transaction is about to be
@@ -970,19 +1389,22 @@ CommitTransaction(void)
AtCommit_Locks();
CallEOXactCallbacks(true);
- AtEOXact_GUC(true);
+ AtEOXact_GUC(true, false);
AtEOXact_SPI(true);
AtEOXact_gist();
AtEOXact_hash();
AtEOXact_nbtree();
AtEOXact_rtree();
- AtEOXact_on_commit_actions(true);
+ AtEOXact_on_commit_actions(true, s->transactionIdData);
AtEOXact_Namespace(true);
AtEOXact_CatCache(true);
AtEOXact_Files();
pgstat_count_xact_commit();
AtCommit_Memory();
+ s->nestingLevel = 0;
+ s->childXids = NIL;
+
/*
* done with commit processing, set current transaction state back to
* default
@@ -1026,6 +1448,7 @@ AbortTransaction(void)
*/
if (s->state != TRANS_INPROGRESS)
elog(WARNING, "AbortTransaction and not in in-progress state");
+ Assert(s->parent == NULL);
/*
* set the current transaction state information appropriately during
@@ -1037,7 +1460,14 @@ AbortTransaction(void)
AtAbort_Memory();
/*
- * Reset user id which might have been changed transiently
+ * Reset user id which might have been changed transiently. We cannot
+ * use s->currentUser, but must get the session userid from miscinit.c.
+ *
+ * (Note: it is not necessary to restore session authorization here
+ * because that can only be changed via GUC, and GUC will take care of
+ * rolling it back if need be. However, an error within a SECURITY
+ * DEFINER function could send control here with the wrong current
+ * userid.)
*/
SetUserId(GetSessionUserId());
@@ -1080,13 +1510,13 @@ AbortTransaction(void)
AtAbort_Locks();
CallEOXactCallbacks(false);
- AtEOXact_GUC(false);
+ AtEOXact_GUC(false, false);
AtEOXact_SPI(false);
AtEOXact_gist();
AtEOXact_hash();
AtEOXact_nbtree();
AtEOXact_rtree();
- AtEOXact_on_commit_actions(false);
+ AtEOXact_on_commit_actions(false, s->transactionIdData);
AtEOXact_Namespace(false);
AtEOXact_CatCache(false);
AtEOXact_Files();
@@ -1119,6 +1549,9 @@ CleanupTransaction(void)
AtCleanup_Portals(); /* now safe to release portal memory */
AtCleanup_Memory(); /* and transaction memory */
+ s->nestingLevel = 0;
+ s->childXids = NIL;
+
/*
* done with abort processing, set current transaction state back to
* default
@@ -1146,45 +1579,13 @@ StartTransactionCommand(void)
break;
/*
- * We should never experience this -- it means the STARTED state
- * was not changed in the previous CommitTransactionCommand.
- */
- case TBLOCK_STARTED:
- elog(WARNING, "StartTransactionCommand: unexpected TBLOCK_STARTED");
- break;
-
- /*
- * We should never experience this -- if we do it means the
- * BEGIN state was not changed in the previous
- * CommitTransactionCommand(). If we get it, we print a
- * warning and change to the in-progress state.
- */
- case TBLOCK_BEGIN:
- elog(WARNING, "StartTransactionCommand: unexpected TBLOCK_BEGIN");
- s->blockState = TBLOCK_INPROGRESS;
- break;
-
- /*
* This is the case when are somewhere in a transaction block
* and about to start a new command. For now we do nothing
* but someday we may do command-local resource
* initialization.
*/
case TBLOCK_INPROGRESS:
- break;
-
- /*
- * As with BEGIN, we should never experience this if we do it
- * means the END state was not changed in the previous
- * CommitTransactionCommand(). If we get it, we print a
- * warning, commit the transaction, start a new transaction
- * and change to the default state.
- */
- case TBLOCK_END:
- elog(WARNING, "StartTransactionCommand: unexpected TBLOCK_END");
- CommitTransaction();
- StartTransaction();
- s->blockState = TBLOCK_DEFAULT;
+ case TBLOCK_SUBINPROGRESS:
break;
/*
@@ -1194,26 +1595,30 @@ StartTransactionCommand(void)
* TRANSACTION" which will set things straight.
*/
case TBLOCK_ABORT:
+ case TBLOCK_SUBABORT:
break;
- /*
- * This means we somehow aborted and the last call to
- * CommitTransactionCommand() didn't clear the state so we
- * remain in the ENDABORT state and maybe next time we get to
- * CommitTransactionCommand() the state will get reset to
- * default.
- */
+ /* These cases are invalid. */
+ case TBLOCK_STARTED:
+ case TBLOCK_BEGIN:
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ case TBLOCK_END:
+ case TBLOCK_SUBEND:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
case TBLOCK_ENDABORT:
- elog(WARNING, "StartTransactionCommand: unexpected TBLOCK_ENDABORT");
+ elog(FATAL, "StartTransactionCommand: unexpected state %s",
+ BlockStateAsString(s->blockState));
break;
}
/*
- * We must switch to TopTransactionContext before returning. This is
+ * We must switch to CurTransactionContext before returning. This is
* already done if we called StartTransaction, otherwise not.
*/
- Assert(TopTransactionContext != NULL);
- MemoryContextSwitchTo(TopTransactionContext);
+ Assert(CurTransactionContext != NULL);
+ MemoryContextSwitchTo(CurTransactionContext);
}
/*
@@ -1232,7 +1637,7 @@ CommitTransactionCommand(void)
* appropiately.
*/
case TBLOCK_DEFAULT:
- elog(WARNING, "CommitTransactionCommand: unexpected TBLOCK_DEFAULT");
+ elog(FATAL, "CommitTransactionCommand: unexpected TBLOCK_DEFAULT");
break;
/*
@@ -1291,6 +1696,71 @@ CommitTransactionCommand(void)
CleanupTransaction();
s->blockState = TBLOCK_DEFAULT;
break;
+
+ /*
+ * We were just issued a BEGIN inside a transaction block.
+ * Start a subtransaction.
+ */
+ case TBLOCK_SUBBEGIN:
+ StartSubTransaction();
+ s->blockState = TBLOCK_SUBINPROGRESS;
+ break;
+
+ /*
+ * We were issued a BEGIN inside an aborted transaction block.
+ * Start a subtransaction, and put it in aborted state.
+ */
+ case TBLOCK_SUBBEGINABORT:
+ StartSubTransaction();
+ AbortSubTransaction();
+ s->blockState = TBLOCK_SUBABORT;
+ break;
+
+ /*
+ * Inside a subtransaction, increment the command counter.
+ */
+ case TBLOCK_SUBINPROGRESS:
+ CommandCounterIncrement();
+ break;
+
+ /*
+ * We where issued a COMMIT command, so we end the current
+ * subtransaction and return to the parent transaction.
+ */
+ case TBLOCK_SUBEND:
+ CommitSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ break;
+
+ /*
+ * If we are in an aborted subtransaction, do nothing.
+ */
+ case TBLOCK_SUBABORT:
+ break;
+
+ /*
+ * We are ending a subtransaction that aborted nicely,
+ * so the parent can be allowed to live.
+ */
+ case TBLOCK_SUBENDABORT_OK:
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ break;
+
+ /*
+ * We are ending a subtransaction that aborted in a unclean
+ * way (e.g. the user issued COMMIT in an aborted subtrasaction.)
+ * Abort the subtransaction, and abort the parent too.
+ */
+ case TBLOCK_SUBENDABORT_ERROR:
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ Assert(s->blockState != TBLOCK_SUBENDABORT_ERROR);
+ AbortCurrentTransaction();
+ break;
}
}
@@ -1362,6 +1832,7 @@ AbortCurrentTransaction(void)
* state.
*/
case TBLOCK_ABORT:
+ case TBLOCK_SUBABORT:
break;
/*
@@ -1374,6 +1845,53 @@ AbortCurrentTransaction(void)
CleanupTransaction();
s->blockState = TBLOCK_DEFAULT;
break;
+
+ /*
+ * If we are just starting a subtransaction, put it
+ * in aborted state.
+ */
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ PushTransaction();
+ s = CurrentTransactionState; /* changed by push */
+ StartSubTransaction();
+ AbortSubTransaction();
+ s->blockState = TBLOCK_SUBABORT;
+ break;
+
+ case TBLOCK_SUBINPROGRESS:
+ AbortSubTransaction();
+ s->blockState = TBLOCK_SUBABORT;
+ break;
+
+ /*
+ * If we are aborting an ending transaction,
+ * we have to abort the parent transaction too.
+ */
+ case TBLOCK_SUBEND:
+ AbortSubTransaction();
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ Assert(s->blockState != TBLOCK_SUBEND &&
+ s->blockState != TBLOCK_SUBENDABORT_OK &&
+ s->blockState != TBLOCK_SUBENDABORT_ERROR);
+ AbortCurrentTransaction();
+ break;
+
+ /*
+ * Same as above, except the Abort() was already done.
+ */
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ Assert(s->blockState != TBLOCK_SUBEND &&
+ s->blockState != TBLOCK_SUBENDABORT_OK &&
+ s->blockState != TBLOCK_SUBENDABORT_ERROR);
+ AbortCurrentTransaction();
+ break;
}
}
@@ -1387,7 +1905,7 @@ AbortCurrentTransaction(void)
* If we have already started a transaction block, issue an error; also issue
* an error if we appear to be running inside a user-defined function (which
* could issue more commands and possibly cause a failure after the statement
- * completes).
+ * completes). Subtransactions are verboten too.
*
* stmtNode: pointer to parameter block for statement; this is used in
* a very klugy way to determine whether we are inside a function.
@@ -1407,6 +1925,16 @@ PreventTransactionChain(void *stmtNode, const char *stmtType)
stmtType)));
/*
+ * subtransaction?
+ */
+ if (IsSubTransaction())
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ /* translator: %s represents an SQL statement name */
+ errmsg("%s cannot run inside a subtransaction",
+ stmtType)));
+
+ /*
* Are we inside a function call? If the statement's parameter block
* was allocated in QueryContext, assume it is an interactive command.
* Otherwise assume it is coming from a function.
@@ -1416,10 +1944,11 @@ PreventTransactionChain(void *stmtNode, const char *stmtType)
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
/* translator: %s represents an SQL statement name */
errmsg("%s cannot be executed from a function", stmtType)));
+
/* If we got past IsTransactionBlock test, should be in default state */
if (CurrentTransactionState->blockState != TBLOCK_DEFAULT &&
CurrentTransactionState->blockState != TBLOCK_STARTED)
- elog(ERROR, "cannot prevent transaction chain");
+ elog(FATAL, "cannot prevent transaction chain");
/* all okay */
}
@@ -1433,8 +1962,8 @@ PreventTransactionChain(void *stmtNode, const char *stmtType)
*
* If we appear to be running inside a user-defined function, we do not
* issue an error, since the function could issue more commands that make
- * use of the current statement's results. Thus this is an inverse for
- * PreventTransactionChain.
+ * use of the current statement's results. Likewise subtransactions.
+ * Thus this is an inverse for PreventTransactionChain.
*
* stmtNode: pointer to parameter block for statement; this is used in
* a very klugy way to determine whether we are inside a function.
@@ -1450,6 +1979,12 @@ RequireTransactionChain(void *stmtNode, const char *stmtType)
return;
/*
+ * subtransaction?
+ */
+ if (IsSubTransaction())
+ return;
+
+ /*
* Are we inside a function call? If the statement's parameter block
* was allocated in QueryContext, assume it is an interactive command.
* Otherwise assume it is coming from a function.
@@ -1483,6 +2018,9 @@ IsInTransactionChain(void *stmtNode)
if (IsTransactionBlock())
return true;
+ if (IsSubTransaction())
+ return true;
+
if (!MemoryContextContains(QueryContext, stmtNode))
return true;
@@ -1571,26 +2109,40 @@ BeginTransactionBlock(void)
s->blockState = TBLOCK_BEGIN;
break;
- /* Already a transaction block in progress. */
+ /*
+ * Already a transaction block in progress.
+ * Start a subtransaction.
+ */
case TBLOCK_INPROGRESS:
- ereport(WARNING,
- (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
- errmsg("there is already a transaction in progress")));
+ case TBLOCK_SUBINPROGRESS:
+ PushTransaction();
+ s = CurrentTransactionState; /* changed by push */
+ s->blockState = TBLOCK_SUBBEGIN;
+ break;
/*
- * This shouldn't happen, because a transaction in aborted state
- * will not be allowed to call BeginTransactionBlock.
+ * An aborted transaction block should be allowed to start
+ * a subtransaction, but it must put it in aborted state.
*/
case TBLOCK_ABORT:
- elog(WARNING, "BeginTransactionBlock: unexpected TBLOCK_ABORT");
+ case TBLOCK_SUBABORT:
+ PushTransaction();
+ s = CurrentTransactionState; /* changed by push */
+ s->blockState = TBLOCK_SUBBEGINABORT;
break;
/* These cases are invalid. Reject them altogether. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
case TBLOCK_ENDABORT:
case TBLOCK_END:
- elog(FATAL, "BeginTransactionBlock: not in a user-allowed state!");
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ case TBLOCK_SUBEND:
+ elog(FATAL, "BeginTransactionBlock: unexpected state %s",
+ BlockStateAsString(s->blockState));
break;
}
}
@@ -1615,6 +2167,15 @@ EndTransactionBlock(void)
break;
/*
+ * here we are in a subtransaction block. Signal
+ * CommitTransactionCommand() to end it and return to the
+ * parent transaction.
+ */
+ case TBLOCK_SUBINPROGRESS:
+ s->blockState = TBLOCK_SUBEND;
+ break;
+
+ /*
* here, we are in a transaction block which aborted and since the
* AbortTransaction() was already done, we do whatever is needed
* and change to the special "END ABORT" state. The upcoming
@@ -1625,12 +2186,21 @@ EndTransactionBlock(void)
s->blockState = TBLOCK_ENDABORT;
break;
+ /*
+ * here we are in an aborted subtransaction. Signal
+ * CommitTransactionCommand() to clean up and return to the
+ * parent transaction.
+ */
+ case TBLOCK_SUBABORT:
+ s->blockState = TBLOCK_SUBENDABORT_ERROR;
+ break;
+
case TBLOCK_STARTED:
/*
- * here, the user issued COMMIT when not inside a transaction. Issue a
- * WARNING and go to abort state. The upcoming call to
- * CommitTransactionCommand() will then put us back into the default
- * state.
+ * here, the user issued COMMIT when not inside a
+ * transaction. Issue a WARNING and go to abort state. The
+ * upcoming call to CommitTransactionCommand() will then put us
+ * back into the default state.
*/
ereport(WARNING,
(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
@@ -1644,7 +2214,13 @@ EndTransactionBlock(void)
case TBLOCK_BEGIN:
case TBLOCK_ENDABORT:
case TBLOCK_END:
- elog(FATAL, "EndTransactionBlock and not in a user-allowed state");
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ case TBLOCK_SUBEND:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ elog(FATAL, "EndTransactionBlock: unexpected state %s",
+ BlockStateAsString(s->blockState));
break;
}
}
@@ -1657,42 +2233,68 @@ UserAbortTransactionBlock(void)
{
TransactionState s = CurrentTransactionState;
- /*
- * if the transaction has already been automatically aborted with an
- * error, and the user subsequently types 'abort', allow it. (the
- * behavior is the same as if they had typed 'end'.)
- */
- if (s->blockState == TBLOCK_ABORT)
- {
- s->blockState = TBLOCK_ENDABORT;
- return;
- }
-
- if (s->blockState == TBLOCK_INPROGRESS)
- {
+ switch (s->blockState) {
/*
- * here we were inside a transaction block and we got an abort
- * command from the user, so we move to the ENDABORT state and
- * do abort processing so we will end up in the default state
- * after the upcoming CommitTransactionCommand().
+ * here we are inside a failed transaction block and we got an abort
+ * command from the user. Abort processing is already done, we just
+ * need to move to the ENDABORT state so we will end up in the default
+ * state after the upcoming CommitTransactionCommand().
*/
- s->blockState = TBLOCK_ABORT;
- AbortTransaction();
- s->blockState = TBLOCK_ENDABORT;
- return;
+ case TBLOCK_ABORT:
+ s->blockState = TBLOCK_ENDABORT;
+ break;
+
+ /* Ditto, for a subtransaction. */
+ case TBLOCK_SUBABORT:
+ s->blockState = TBLOCK_SUBENDABORT_OK;
+ break;
+
+ /*
+ * here we are inside a transaction block and we got an abort
+ * command from the user, so we move to the ENDABORT state and
+ * do abort processing so we will end up in the default state
+ * after the upcoming CommitTransactionCommand().
+ */
+ case TBLOCK_INPROGRESS:
+ AbortTransaction();
+ s->blockState = TBLOCK_ENDABORT;
+ break;
+
+ /* Ditto, for a subtransaction. */
+ case TBLOCK_SUBINPROGRESS:
+ AbortSubTransaction();
+ s->blockState = TBLOCK_SUBENDABORT_OK;
+ break;
+
+ /*
+ * here, the user issued ABORT when not inside a
+ * transaction. Issue a WARNING and go to abort state. The
+ * upcoming call to CommitTransactionCommand() will then put us
+ * back into the default state.
+ */
+ case TBLOCK_STARTED:
+ ereport(WARNING,
+ (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
+ errmsg("there is no transaction in progress")));
+ AbortTransaction();
+ s->blockState = TBLOCK_ENDABORT;
+ break;
+
+ /* these cases are invalid. */
+ case TBLOCK_DEFAULT:
+ case TBLOCK_BEGIN:
+ case TBLOCK_END:
+ case TBLOCK_ENDABORT:
+ case TBLOCK_SUBEND:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
+ BlockStateAsString(s->blockState));
+ break;
}
- /*
- * here, the user issued ABORT when not inside a transaction. Issue a
- * WARNING and go to abort state. The upcoming call to
- * CommitTransactionCommand() will then put us back into the default
- * state.
- */
- ereport(WARNING,
- (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
- errmsg("there is no transaction in progress")));
- AbortTransaction();
- s->blockState = TBLOCK_ENDABORT;
}
/*
@@ -1708,32 +2310,58 @@ AbortOutOfAnyTransaction(void)
TransactionState s = CurrentTransactionState;
/*
- * Get out of any transaction
+ * Get out of any transaction or nested transaction
*/
- switch (s->blockState)
- {
- case TBLOCK_DEFAULT:
- /* Not in a transaction, do nothing */
- break;
- case TBLOCK_STARTED:
- case TBLOCK_BEGIN:
- case TBLOCK_INPROGRESS:
- case TBLOCK_END:
- /* In a transaction, so clean up */
- AbortTransaction();
- CleanupTransaction();
- break;
- case TBLOCK_ABORT:
- case TBLOCK_ENDABORT:
- /* AbortTransaction already done, still need Cleanup */
- CleanupTransaction();
- break;
- }
+ do {
+ switch (s->blockState)
+ {
+ case TBLOCK_DEFAULT:
+ /* Not in a transaction, do nothing */
+ break;
+ case TBLOCK_STARTED:
+ case TBLOCK_BEGIN:
+ case TBLOCK_INPROGRESS:
+ case TBLOCK_END:
+ /* In a transaction, so clean up */
+ AbortTransaction();
+ CleanupTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+ case TBLOCK_ABORT:
+ case TBLOCK_ENDABORT:
+ /* AbortTransaction already done, still need Cleanup */
+ CleanupTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ /*
+ * Just starting a new transaction -- return to parent.
+ * FIXME -- Is this correct?
+ */
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ break;
+ case TBLOCK_SUBINPROGRESS:
+ case TBLOCK_SUBEND:
+ /* In a subtransaction, so clean it up and abort parent too */
+ AbortSubTransaction();
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ break;
+ case TBLOCK_SUBABORT:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ CleanupSubTransaction();
+ PopTransaction();
+ s = CurrentTransactionState; /* changed by pop */
+ break;
+ }
+ } while (s->blockState != TBLOCK_DEFAULT);
- /*
- * Now reset the transaction state
- */
- s->blockState = TBLOCK_DEFAULT;
+ /* Should be out of all subxacts now */
+ Assert(s->parent == NULL);
}
/*
@@ -1784,18 +2412,436 @@ TransactionBlockStatusCode(void)
case TBLOCK_BEGIN:
case TBLOCK_INPROGRESS:
case TBLOCK_END:
+ case TBLOCK_SUBINPROGRESS:
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBEND:
return 'T'; /* in transaction */
case TBLOCK_ABORT:
case TBLOCK_ENDABORT:
+ case TBLOCK_SUBABORT:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ case TBLOCK_SUBBEGINABORT:
return 'E'; /* in failed transaction */
}
/* should never get here */
- elog(ERROR, "invalid transaction block state: %d",
- (int) s->blockState);
+ elog(FATAL, "invalid transaction block state: %s",
+ BlockStateAsString(s->blockState));
return 0; /* keep compiler quiet */
}
+/*
+ * IsSubTransaction
+ */
+bool
+IsSubTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ switch (s->blockState) {
+ case TBLOCK_DEFAULT:
+ case TBLOCK_STARTED:
+ case TBLOCK_BEGIN:
+ case TBLOCK_INPROGRESS:
+ case TBLOCK_END:
+ case TBLOCK_ABORT:
+ case TBLOCK_ENDABORT:
+ return false;
+ case TBLOCK_SUBBEGIN:
+ case TBLOCK_SUBBEGINABORT:
+ case TBLOCK_SUBINPROGRESS:
+ case TBLOCK_SUBABORT:
+ case TBLOCK_SUBEND:
+ case TBLOCK_SUBENDABORT_OK:
+ case TBLOCK_SUBENDABORT_ERROR:
+ return true;
+ }
+
+ /* should never get here */
+ elog(FATAL, "invalid transaction block state: %s",
+ BlockStateAsString(s->blockState));
+ return false; /* keep compiler quiet */
+}
+
+/*
+ * StartSubTransaction
+ */
+static void
+StartSubTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->state != TRANS_DEFAULT)
+ elog(WARNING, "StartSubTransaction and not in default state");
+
+ s->state = TRANS_START;
+
+ /*
+ * Generate a new Xid and record it in pg_subtrans.
+ */
+ s->transactionIdData = GetNewTransactionId(true);
+
+ SubTransSetParent(s->transactionIdData, s->parent->transactionIdData);
+
+ /*
+ * Finish setup of other transaction state fields.
+ */
+ s->currentUser = GetUserId();
+
+ /* Initialize the various transaction subsystems */
+ AtSubStart_Memory();
+ AtSubStart_Inval();
+ AtSubStart_RelationCache();
+ AtSubStart_CatCache();
+ AtSubStart_Buffers();
+ AtSubStart_smgr();
+ AtSubStart_Notify();
+ DeferredTriggerBeginSubXact();
+
+ s->state = TRANS_INPROGRESS;
+
+ ShowTransactionState("StartSubTransaction");
+}
+
+/*
+ * CommitSubTransaction
+ */
+static void
+CommitSubTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ ShowTransactionState("CommitSubTransaction");
+
+ if (s->state != TRANS_INPROGRESS)
+ elog(WARNING, "CommitSubTransaction and not in in-progress state");
+
+ /* Pre-commit processing */
+ AtSubCommit_Portals(s->parent->transactionIdData);
+ DeferredTriggerEndSubXact(true);
+
+ /* Mark subtransaction as subcommitted */
+ CommandCounterIncrement();
+ RecordSubTransactionCommit();
+ AtSubCommit_childXids();
+
+ /* Post-commit cleanup */
+ AtSubCommit_smgr();
+
+ AtSubEOXact_Inval(true);
+ AtEOSubXact_SPI(true, s->transactionIdData);
+ AtSubCommit_Notify();
+ AtEOXact_GUC(true, true);
+ AtEOSubXact_gist(s->transactionIdData);
+ AtEOSubXact_hash(s->transactionIdData);
+ AtEOSubXact_rtree(s->transactionIdData);
+ AtEOSubXact_on_commit_actions(true, s->transactionIdData,
+ s->parent->transactionIdData);
+
+ AtEOSubXact_CatCache(true);
+ AtEOSubXact_RelationCache(true);
+ AtEOSubXact_Buffers(true);
+ AtSubCommit_Memory();
+
+ s->state = TRANS_DEFAULT;
+}
+
+/*
+ * AbortSubTransaction
+ */
+static void
+AbortSubTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ ShowTransactionState("AbortSubTransaction");
+
+ HOLD_INTERRUPTS();
+
+ s->state = TRANS_ABORT;
+
+ /*
+ * Release any LW locks we might be holding as quickly as possible.
+ * (Regular locks, however, must be held till we finish aborting.)
+ * Releasing LW locks is critical since we might try to grab them
+ * again while cleaning up!
+ *
+ * FIXME This may be incorrect --- Are there some locks we should keep?
+ * Buffer locks, for example? I don't think so but I'm not sure.
+ */
+ LWLockReleaseAll();
+
+ AbortBufferIO();
+ UnlockBuffers();
+
+ LockWaitCancel();
+
+ AtSubAbort_Memory();
+
+ /*
+ * do abort processing
+ */
+
+ RecordSubTransactionAbort();
+
+ /* Post-abort cleanup */
+ AtSubAbort_smgr();
+
+ DeferredTriggerEndSubXact(false);
+ AtSubAbort_Portals();
+ AtSubEOXact_Inval(false);
+ AtSubAbort_Locks();
+ AtEOSubXact_SPI(false, s->transactionIdData);
+ AtSubAbort_Notify();
+ AtEOXact_GUC(false, true);
+ AtEOSubXact_gist(s->transactionIdData);
+ AtEOSubXact_hash(s->transactionIdData);
+ AtEOSubXact_rtree(s->transactionIdData);
+ AtEOSubXact_on_commit_actions(false, s->transactionIdData,
+ s->parent->transactionIdData);
+ AtEOSubXact_RelationCache(false);
+ AtEOSubXact_CatCache(false);
+ AtEOSubXact_Buffers(false);
+
+ /*
+ * Reset user id which might have been changed transiently. Here we
+ * want to restore to the userid that was current at subxact entry.
+ * (As in AbortTransaction, we need not worry about the session userid.)
+ *
+ * Must do this after AtEOXact_GUC to handle the case where we entered
+ * the subxact inside a SECURITY DEFINER function (hence current and
+ * session userids were different) and then session auth was changed
+ * inside the subxact. GUC will reset both current and session userids
+ * to the entry-time session userid. This is right in every other
+ * scenario so it seems simplest to let GUC do that and fix it here.
+ */
+ SetUserId(s->currentUser);
+
+ CommandCounterIncrement();
+
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * CleanupSubTransaction
+ */
+static void
+CleanupSubTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ ShowTransactionState("CleanupSubTransaction");
+
+ if (s->state != TRANS_ABORT)
+ elog(WARNING, "CleanupSubTransaction and not in aborted state");
+
+ AtSubCleanup_Portals();
+ AtSubCleanup_Memory();
+
+ s->state = TRANS_DEFAULT;
+}
+
+/*
+ * PushTransaction
+ * Set up transaction state for a subtransaction
+ */
+static void
+PushTransaction(void)
+{
+ TransactionState p = CurrentTransactionState;
+ TransactionState s;
+
+ /*
+ * We keep subtransaction state nodes in TopTransactionContext.
+ */
+ s = (TransactionState)
+ MemoryContextAllocZero(TopTransactionContext,
+ sizeof(TransactionStateData));
+ s->parent = p;
+ s->nestingLevel = p->nestingLevel + 1;
+ s->state = TRANS_DEFAULT;
+ s->blockState = TBLOCK_SUBBEGIN;
+
+ /* Command IDs count in a continuous sequence through subtransactions */
+ s->commandId = p->commandId;
+
+ /*
+ * Copy down some other data so that we will have valid state until
+ * StartSubTransaction runs.
+ */
+ s->transactionIdData = p->transactionIdData;
+ s->curTransactionContext = p->curTransactionContext;
+
+ CurrentTransactionState = s;
+}
+
+/*
+ * PopTransaction
+ * Pop back to parent transaction state
+ */
+static void
+PopTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->state != TRANS_DEFAULT)
+ elog(WARNING, "PopTransaction and not in default state");
+
+ if (s->parent == NULL)
+ elog(FATAL, "PopTransaction with no parent");
+
+ /* Command IDs count in a continuous sequence through subtransactions */
+ s->parent->commandId = s->commandId;
+
+ CurrentTransactionState = s->parent;
+
+ /* Let's just make sure CurTransactionContext is good */
+ CurTransactionContext = s->parent->curTransactionContext;
+ MemoryContextSwitchTo(CurTransactionContext);
+
+ /* Free the old child structure */
+ pfree(s);
+}
+
+/*
+ * ShowTransactionState
+ * Debug support
+ */
+static void
+ShowTransactionState(const char *str)
+{
+ /* skip work if message will definitely not be printed */
+ if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
+ {
+ elog(DEBUG2, "%s", str);
+ ShowTransactionStateRec(CurrentTransactionState);
+ }
+}
+
+/*
+ * ShowTransactionStateRec
+ * Recursive subroutine for ShowTransactionState
+ */
+static void
+ShowTransactionStateRec(TransactionState s)
+{
+ if (s->parent)
+ ShowTransactionStateRec(s->parent);
+
+ /* use ereport to suppress computation if msg will not be printed */
+ ereport(DEBUG2,
+ (errmsg_internal("blockState: %13s; state: %7s, xid/cid: %u/%02u, nestlvl: %d, children: %s",
+ BlockStateAsString(s->blockState),
+ TransStateAsString(s->state),
+ (unsigned int) s->transactionIdData,
+ (unsigned int) s->commandId,
+ s->nestingLevel,
+ nodeToString(s->childXids))));
+}
+
+/*
+ * BlockStateAsString
+ * Debug support
+ */
+static const char *
+BlockStateAsString(TBlockState blockState)
+{
+ switch (blockState) {
+ case TBLOCK_DEFAULT:
+ return "DEFAULT";
+ case TBLOCK_STARTED:
+ return "STARTED";
+ case TBLOCK_BEGIN:
+ return "BEGIN";
+ case TBLOCK_INPROGRESS:
+ return "INPROGRESS";
+ case TBLOCK_END:
+ return "END";
+ case TBLOCK_ABORT:
+ return "ABORT";
+ case TBLOCK_ENDABORT:
+ return "ENDABORT";
+ case TBLOCK_SUBBEGIN:
+ return "SUB BEGIN";
+ case TBLOCK_SUBBEGINABORT:
+ return "SUB BEGIN AB";
+ case TBLOCK_SUBINPROGRESS:
+ return "SUB INPROGRS";
+ case TBLOCK_SUBEND:
+ return "SUB END";
+ case TBLOCK_SUBABORT:
+ return "SUB ABORT";
+ case TBLOCK_SUBENDABORT_OK:
+ return "SUB ENDAB OK";
+ case TBLOCK_SUBENDABORT_ERROR:
+ return "SUB ENDAB ERR";
+ }
+ return "UNRECOGNIZED";
+}
+
+/*
+ * TransStateAsString
+ * Debug support
+ */
+static const char *
+TransStateAsString(TransState state)
+{
+ switch (state) {
+ case TRANS_DEFAULT:
+ return "DEFAULT";
+ case TRANS_START:
+ return "START";
+ case TRANS_COMMIT:
+ return "COMMIT";
+ case TRANS_ABORT:
+ return "ABORT";
+ case TRANS_INPROGRESS:
+ return "INPROGR";
+ }
+ return "UNRECOGNIZED";
+}
+
+/*
+ * xactGetCommittedChildren
+ *
+ * Gets the list of committed children of the current transaction. The return
+ * value is the number of child transactions. *children is set to point to a
+ * palloc'd array of TransactionIds. If there are no subxacts, *children is
+ * set to NULL.
+ *
+ * If metoo is true, include the current TransactionId.
+ */
+int
+xactGetCommittedChildren(TransactionId **ptr, bool metoo)
+{
+ TransactionState s = CurrentTransactionState;
+ int nchildren;
+ TransactionId *children;
+ ListCell *p;
+
+ nchildren = list_length(s->childXids);
+ if (metoo)
+ nchildren++;
+ if (nchildren == 0)
+ {
+ *ptr = NULL;
+ return 0;
+ }
+
+ children = (TransactionId *) palloc(nchildren * sizeof(TransactionId));
+ *ptr = children;
+
+ foreach(p, s->childXids)
+ {
+ TransactionId child = lfirst_int(p);
+ *children++ = (TransactionId)child;
+ }
+ if (metoo)
+ *children = s->transactionIdData;
+
+ return nchildren;
+}
/*
* XLOG support routines
@@ -1809,13 +2855,14 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- int nfiles;
int i;
TransactionIdCommit(record->xl_xid);
+ /* Mark committed subtransactions as committed */
+ TransactionIdCommitTree(xlrec->nsubxacts,
+ (TransactionId *) &(xlrec->xnodes[xlrec->nrels]));
/* Make sure files supposed to be dropped are dropped */
- nfiles = (record->xl_len - MinSizeOfXactCommit) / sizeof(RelFileNode);
- for (i = 0; i < nfiles; i++)
+ for (i = 0; i < xlrec->nrels; i++)
{
XLogCloseRelation(xlrec->xnodes[i]);
smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
@@ -1824,13 +2871,14 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
- int nfiles;
int i;
TransactionIdAbort(record->xl_xid);
+ /* mark subtransactions as aborted */
+ TransactionIdAbortTree(xlrec->nsubxacts,
+ (TransactionId *) &(xlrec->xnodes[xlrec->nrels]));
/* Make sure files supposed to be dropped are dropped */
- nfiles = (record->xl_len - MinSizeOfXactAbort) / sizeof(RelFileNode);
- for (i = 0; i < nfiles; i++)
+ for (i = 0; i < xlrec->nrels; i++)
{
XLogCloseRelation(xlrec->xnodes[i]);
smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
@@ -1855,6 +2903,7 @@ void
xact_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
+ int i;
if (info == XLOG_XACT_COMMIT)
{
@@ -1864,7 +2913,25 @@ xact_desc(char *buf, uint8 xl_info, char *rec)
sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u",
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
tm->tm_hour, tm->tm_min, tm->tm_sec);
- /* XXX can't show RelFileNodes for lack of access to record length */
+ if (xlrec->nrels > 0)
+ {
+ sprintf(buf + strlen(buf), "; rels:");
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ RelFileNode rnode = xlrec->xnodes[i];
+ sprintf(buf + strlen(buf), " %u/%u/%u",
+ rnode.spcNode, rnode.dbNode, rnode.relNode);
+ }
+ }
+ if (xlrec->nsubxacts > 0)
+ {
+ TransactionId *xacts = (TransactionId *)
+ &xlrec->xnodes[xlrec->nrels];
+
+ sprintf(buf + strlen(buf), "; subxacts:");
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ sprintf(buf + strlen(buf), " %u", xacts[i]);
+ }
}
else if (info == XLOG_XACT_ABORT)
{
@@ -1874,7 +2941,25 @@ xact_desc(char *buf, uint8 xl_info, char *rec)
sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u",
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
tm->tm_hour, tm->tm_min, tm->tm_sec);
- /* XXX can't show RelFileNodes for lack of access to record length */
+ if (xlrec->nrels > 0)
+ {
+ sprintf(buf + strlen(buf), "; rels:");
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ RelFileNode rnode = xlrec->xnodes[i];
+ sprintf(buf + strlen(buf), " %u/%u/%u",
+ rnode.spcNode, rnode.dbNode, rnode.relNode);
+ }
+ }
+ if (xlrec->nsubxacts > 0)
+ {
+ TransactionId *xacts = (TransactionId *)
+ &xlrec->xnodes[xlrec->nrels];
+
+ sprintf(buf + strlen(buf), "; subxacts:");
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ sprintf(buf + strlen(buf), " %u", xacts[i]);
+ }
}
else
strcat(buf, "UNKNOWN");
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1205640615..a6f53ba79f1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.146 2004/06/03 02:08:00 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.147 2004/07/01 00:49:50 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,6 +22,7 @@
#include <sys/time.h>
#include "access/clog.h"
+#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
@@ -2755,6 +2756,7 @@ BootStrapXLOG(void)
/* Bootstrap the commit log, too */
BootStrapCLOG();
+ BootStrapSUBTRANS();
}
static char *
@@ -3154,6 +3156,7 @@ StartupXLOG(void)
/* Start up the commit log, too */
StartupCLOG();
+ StartupSUBTRANS();
ereport(LOG,
(errmsg("database system is ready")));
@@ -3292,6 +3295,7 @@ ShutdownXLOG(int code, Datum arg)
CritSectionCount++;
CreateCheckPoint(true, true);
ShutdownCLOG();
+ ShutdownSUBTRANS();
CritSectionCount--;
ereport(LOG,
@@ -3467,6 +3471,7 @@ CreateCheckPoint(bool shutdown, bool force)
END_CRIT_SECTION();
CheckPointCLOG();
+ CheckPointSUBTRANS();
FlushBufferPool();
START_CRIT_SECTION();