aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/heap/heapam.c88
-rw-r--r--src/backend/access/index/indexam.c25
-rw-r--r--src/backend/access/nbtree/nbtinsert.c12
-rw-r--r--src/backend/access/nbtree/nbtpage.c7
-rw-r--r--src/backend/access/nbtree/nbtree.c2
-rw-r--r--src/backend/access/nbtree/nbtsearch.c13
-rw-r--r--src/backend/access/transam/twophase.c3
-rw-r--r--src/backend/access/transam/twophase_rmgr.c5
-rw-r--r--src/backend/access/transam/varsup.c5
-rw-r--r--src/backend/access/transam/xact.c21
-rw-r--r--src/backend/commands/variable.c38
-rw-r--r--src/backend/executor/nodeBitmapHeapscan.c3
-rw-r--r--src/backend/executor/nodeSeqscan.c5
-rw-r--r--src/backend/parser/gram.y6
-rw-r--r--src/backend/storage/ipc/ipci.c7
-rw-r--r--src/backend/storage/ipc/shmem.c2
-rw-r--r--src/backend/storage/ipc/shmqueue.c8
-rw-r--r--src/backend/storage/lmgr/Makefile2
-rw-r--r--src/backend/storage/lmgr/README4
-rw-r--r--src/backend/storage/lmgr/README-SSI537
-rw-r--r--src/backend/storage/lmgr/lwlock.c4
-rw-r--r--src/backend/storage/lmgr/predicate.c4439
-rw-r--r--src/backend/tcop/utility.c4
-rw-r--r--src/backend/utils/adt/lockfuncs.c79
-rw-r--r--src/backend/utils/misc/guc.c37
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample4
-rw-r--r--src/backend/utils/resowner/resowner.c4
-rw-r--r--src/backend/utils/time/snapmgr.c15
28 files changed, 5349 insertions, 30 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 25d9fdea3a0..7dcc6015de9 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -57,6 +57,7 @@
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "storage/standby.h"
@@ -261,20 +262,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
{
if (ItemIdIsNormal(lpp))
{
+ HeapTupleData loctup;
bool valid;
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(loctup.t_self), page, lineoff);
+
if (all_visible)
valid = true;
else
- {
- HeapTupleData loctup;
+ valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- loctup.t_len = ItemIdGetLength(lpp);
- ItemPointerSet(&(loctup.t_self), page, lineoff);
+ CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
- valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- }
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
@@ -468,12 +469,16 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
+ CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
+
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
@@ -741,12 +746,16 @@ heapgettup_pagemode(HeapScanDesc scan,
nkeys, key, valid);
if (valid)
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
}
else
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
@@ -1213,6 +1222,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
scan->rs_strategy = NULL; /* set in initscan */
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
+ scan->rs_relpredicatelocked = false;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1459,8 +1469,13 @@ heap_fetch(Relation relation,
*/
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
+ if (valid)
+ PredicateLockTuple(relation, tuple);
+
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ CheckForSerializableConflictOut(valid, relation, tuple, buffer);
+
if (valid)
{
/*
@@ -1506,13 +1521,15 @@ heap_fetch(Relation relation,
* heap_fetch, we do not report any pgstats count; caller may do so if wanted.
*/
bool
-heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
- bool *all_dead)
+heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
+ Snapshot snapshot, bool *all_dead)
{
Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum;
bool at_chain_start;
+ bool valid;
+ bool match_found;
if (all_dead)
*all_dead = true;
@@ -1522,6 +1539,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true;
+ match_found = false;
/* Scan through possible multiple members of HOT-chain */
for (;;)
@@ -1552,6 +1570,8 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple.t_len = ItemIdGetLength(lp);
+ heapTuple.t_tableOid = relation->rd_id;
+ heapTuple.t_self = *tid;
/*
* Shouldn't see a HEAP_ONLY tuple at chain start.
@@ -1569,12 +1589,18 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break;
/* If it's visible per the snapshot, we must return it */
- if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer))
+ valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
+ CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
+ if (valid)
{
ItemPointerSetOffsetNumber(tid, offnum);
+ PredicateLockTuple(relation, &heapTuple);
if (all_dead)
*all_dead = false;
- return true;
+ if (IsolationIsSerializable())
+ match_found = true;
+ else
+ return true;
}
/*
@@ -1603,7 +1629,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break; /* end of chain */
}
- return false;
+ return match_found;
}
/*
@@ -1622,7 +1648,7 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead);
+ result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
@@ -1729,6 +1755,7 @@ heap_get_latest_tid(Relation relation,
* result candidate.
*/
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
+ CheckForSerializableConflictOut(valid, relation, &tp, buffer);
if (valid)
*tid = ctid;
@@ -1893,6 +1920,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate);
+ /*
+ * We're about to do the actual insert -- check for conflict at the
+ * relation or buffer level first, to avoid possibly having to roll
+ * back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, NULL, buffer);
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -2193,6 +2227,12 @@ l1:
return result;
}
+ /*
+ * We're about to do the actual delete -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &tp, buffer);
+
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
@@ -2546,6 +2586,12 @@ l2:
return result;
}
+ /*
+ * We're about to do the actual update -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &oldtup, buffer);
+
/* Fill in OID and transaction status data for newtup */
if (relation->rd_rel->relhasoids)
{
@@ -2691,6 +2737,16 @@ l2:
}
/*
+ * We're about to create the new tuple -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ *
+ * NOTE: For a tuple insert, we only need to check for table locks, since
+ * predicate locking at the index level will cover ranges for anything
+ * except a table scan. Therefore, only provide the relation.
+ */
+ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+ /*
* At this point newbuf and buffer are both pinned and locked, and newbuf
* has enough space for the new tuple. If they are the same buffer, only
* one pin is held.
@@ -2799,6 +2855,12 @@ l2:
END_CRIT_SECTION();
+ /*
+ * Any existing SIREAD locks on the old tuple must be linked to the new
+ * tuple for conflict detection purposes.
+ */
+ PredicateLockTupleRowVersionLink(relation, &oldtup, newtup);
+
if (newbuf != buffer)
LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 32af32a2067..6e0db795176 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -64,9 +64,11 @@
#include "access/relscan.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
@@ -192,6 +194,11 @@ index_insert(Relation indexRelation,
RELATION_CHECKS;
GET_REL_PROCEDURE(aminsert);
+ if (!(indexRelation->rd_am->ampredlocks))
+ CheckForSerializableConflictIn(indexRelation,
+ (HeapTuple) NULL,
+ InvalidBuffer);
+
/*
* have the am's insert proc do all the work.
*/
@@ -266,6 +273,9 @@ index_beginscan_internal(Relation indexRelation,
RELATION_CHECKS;
GET_REL_PROCEDURE(ambeginscan);
+ if (!(indexRelation->rd_am->ampredlocks))
+ PredicateLockRelation(indexRelation);
+
/*
* We hold a reference count to the relcache entry throughout the scan.
*/
@@ -523,6 +533,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
{
ItemId lp;
ItemPointer ctid;
+ bool valid;
/* check for bogus TID */
if (offnum < FirstOffsetNumber ||
@@ -577,8 +588,13 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
break;
/* If it's visible per the snapshot, we must return it */
- if (HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
- scan->xs_cbuf))
+ valid = HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
+ scan->xs_cbuf);
+
+ CheckForSerializableConflictOut(valid, scan->heapRelation,
+ heapTuple, scan->xs_cbuf);
+
+ if (valid)
{
/*
* If the snapshot is MVCC, we know that it could accept at
@@ -586,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
* any more members. Otherwise, check for continuation of the
* HOT-chain, and set state for next time.
*/
- if (IsMVCCSnapshot(scan->xs_snapshot))
+ if (IsMVCCSnapshot(scan->xs_snapshot)
+ && !IsolationIsSerializable())
scan->xs_next_hot = InvalidOffsetNumber;
else if (HeapTupleIsHotUpdated(heapTuple))
{
@@ -598,6 +615,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
else
scan->xs_next_hot = InvalidOffsetNumber;
+ PredicateLockTuple(scan->heapRelation, heapTuple);
+
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
pgstat_count_heap_fetch(scan->indexRelation);
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 91b72b8f91d..0dd745f19a4 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/tqual.h"
@@ -174,6 +175,14 @@ top:
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
+ /*
+ * The only conflict predicate locking cares about for indexes is when
+ * an index tuple insert conflicts with an existing lock. Since the
+ * actual location of the insert is hard to predict because of the
+ * random search used to prevent O(N^2) performance when there are many
+ * duplicate entries, we can just use the "first valid" page.
+ */
+ CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
_bt_insertonpg(rel, buf, stack, itup, offset, false);
@@ -696,6 +705,9 @@ _bt_insertonpg(Relation rel,
/* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf, firstright,
newitemoff, itemsz, itup, newitemonleft);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buf),
+ BufferGetBlockNumber(rbuf));
/*----------
* By here,
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index db86ec9a1ad..27964455f7c 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -29,6 +29,7 @@
#include "storage/freespace.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/snapmgr.h"
@@ -1184,6 +1185,12 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
RelationGetRelationName(rel));
/*
+ * Any insert which would have gone on the target block will now go to the
+ * right sibling block.
+ */
+ PredicateLockPageCombine(rel, target, rightsib);
+
+ /*
* Next find and write-lock the current parent of the target page. This is
* essentially the same as the corresponding step of splitting.
*/
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index d66bdd47e49..558ace1562b 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,6 +29,7 @@
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
@@ -822,6 +823,7 @@ restart:
if (_bt_page_recyclable(page))
{
/* Okay to recycle this page */
+ Assert(!PageIsPredicateLocked(rel, blkno));
RecordFreeIndexPage(rel, blkno);
vstate->totFreePages++;
stats->pages_deleted++;
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 42d956c6eac..cf74f7776ce 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
+#include "storage/predicate.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@@ -63,7 +64,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
/* If index is empty and access = BT_READ, no root page is created. */
if (!BufferIsValid(*bufP))
+ {
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return (BTStack) NULL;
+ }
/* Loop iterates once per level descended in the tree */
for (;;)
@@ -88,7 +92,11 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
page = BufferGetPage(*bufP);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISLEAF(opaque))
+ {
+ if (access == BT_READ)
+ PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
break;
+ }
/*
* Find the appropriate item on the internal page, and get the child
@@ -1142,6 +1150,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, blkno);
/* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
@@ -1189,6 +1198,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf));
/* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
@@ -1352,6 +1362,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
if (!BufferIsValid(buf))
{
/* empty index... */
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return InvalidBuffer;
}
@@ -1431,10 +1442,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf))
{
/* empty index... */
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
so->currPos.buf = InvalidBuffer;
return false;
}
+ PredicateLockPage(rel, BufferGetBlockNumber(buf));
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 4fee9c3244b..287ad266980 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -57,6 +57,7 @@
#include "pgstat.h"
#include "replication/walsender.h"
#include "storage/fd.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
@@ -1357,6 +1358,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
else
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
+ PredicateLockTwoPhaseFinish(xid, isCommit);
+
/* Count the prepared xact as committed or aborted */
AtEOXact_PgStat(isCommit);
diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c
index 02de1e85269..47c15af241d 100644
--- a/src/backend/access/transam/twophase_rmgr.c
+++ b/src/backend/access/transam/twophase_rmgr.c
@@ -18,12 +18,14 @@
#include "access/twophase_rmgr.h"
#include "pgstat.h"
#include "storage/lock.h"
+#include "storage/predicate.h"
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_recover, /* Lock */
+ predicatelock_twophase_recover, /* PredicateLock */
NULL, /* pgstat */
multixact_twophase_recover /* MultiXact */
};
@@ -32,6 +34,7 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postcommit, /* Lock */
+ NULL, /* PredicateLock */
pgstat_twophase_postcommit, /* pgstat */
multixact_twophase_postcommit /* MultiXact */
};
@@ -40,6 +43,7 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postabort, /* Lock */
+ NULL, /* PredicateLock */
pgstat_twophase_postabort, /* pgstat */
multixact_twophase_postabort /* MultiXact */
};
@@ -48,6 +52,7 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1
{
NULL, /* END ID */
lock_twophase_standby_recover, /* Lock */
+ NULL, /* PredicateLock */
NULL, /* pgstat */
NULL /* MultiXact */
};
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index a03ec85f8fc..a828b3de48f 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
#include "storage/pmsignal.h"
+#include "storage/predicate.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
@@ -161,6 +162,10 @@ GetNewTransactionId(bool isSubXact)
ExtendCLOG(xid);
ExtendSUBTRANS(xid);
+ /* If it's top level, the predicate locking system also needs to know. */
+ if (!isSubXact)
+ RegisterPredicateLockingXid(xid);
+
/*
* Now advance the nextXid counter. This must not happen until after we
* have successfully completed ExtendCLOG() --- if that routine fails, we
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1e31e07ec97..a0170b42e20 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,6 +40,7 @@
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
@@ -63,6 +64,9 @@ int XactIsoLevel;
bool DefaultXactReadOnly = false;
bool XactReadOnly;
+bool DefaultXactDeferrable = false;
+bool XactDeferrable;
+
bool XactSyncCommit = true;
int CommitDelay = 0; /* precommit delay in microseconds */
@@ -1640,6 +1644,7 @@ StartTransaction(void)
s->startedInRecovery = false;
XactReadOnly = DefaultXactReadOnly;
}
+ XactDeferrable = DefaultXactDeferrable;
XactIsoLevel = DefaultXactIsoLevel;
forceSyncCommit = false;
MyXactAccessedTempRel = false;
@@ -1787,6 +1792,13 @@ CommitTransaction(void)
AtEOXact_LargeObject(true);
/*
+ * Mark serializable transaction as complete for predicate locking
+ * purposes. This should be done as late as we can put it and still
+ * allow errors to be raised for failure patterns found at commit.
+ */
+ PreCommit_CheckForSerializationFailure();
+
+ /*
* Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
* holding the notify-insertion lock.
@@ -1980,6 +1992,13 @@ PrepareTransaction(void)
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
+ /*
+ * Mark serializable transaction as complete for predicate locking
+ * purposes. This should be done as late as we can put it and still
+ * allow errors to be raised for failure patterns found at commit.
+ */
+ PreCommit_CheckForSerializationFailure();
+
/* NOTIFY will be handled below */
/*
@@ -2044,6 +2063,7 @@ PrepareTransaction(void)
AtPrepare_Notify();
AtPrepare_Locks();
+ AtPrepare_PredicateLocks();
AtPrepare_PgStat();
AtPrepare_MultiXact();
AtPrepare_RelationMap();
@@ -2103,6 +2123,7 @@ PrepareTransaction(void)
PostPrepare_MultiXact(xid);
PostPrepare_Locks(xid);
+ PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 139148eec2e..2a61ea3bc7c 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -616,6 +616,15 @@ assign_XactIsoLevel(const char *value, bool doit, GucSource source)
errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction")));
return NULL;
}
+ /* Can't go to serializable mode while recovery is still active */
+ if (RecoveryInProgress() && strcmp(value, "serializable") == 0)
+ {
+ ereport(GUC_complaint_elevel(source),
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use serializable mode in a hot standby"),
+ errhint("You can use REPEATABLE READ instead.")));
+ return false;
+ }
}
if (strcmp(value, "serializable") == 0)
@@ -667,6 +676,35 @@ show_XactIsoLevel(void)
}
}
+/*
+ * SET TRANSACTION [NOT] DEFERRABLE
+ */
+
+bool
+assign_transaction_deferrable(bool newval, bool doit, GucSource source)
+{
+ /* source == PGC_S_OVERRIDE means do it anyway, eg at xact abort */
+ if (source == PGC_S_OVERRIDE)
+ return true;
+
+ if (IsSubTransaction())
+ {
+ ereport(GUC_complaint_elevel(source),
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION [NOT] DEFERRABLE cannot be called within a subtransaction")));
+ return false;
+ }
+
+ if (FirstSnapshotSet)
+ {
+ ereport(GUC_complaint_elevel(source),
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION [NOT] DEFERRABLE must be called before any query")));
+ return false;
+ }
+
+ return true;
+}
/*
* Random number seed
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 7178b4c17d0..20d5eb14bf2 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -42,6 +42,7 @@
#include "executor/nodeBitmapHeapscan.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
+#include "storage/predicate.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
@@ -351,7 +352,7 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
ItemPointerData tid;
ItemPointerSet(&tid, page, offnum);
- if (heap_hot_search_buffer(&tid, buffer, snapshot, NULL))
+ if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, NULL))
scan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid);
}
}
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 0f3438d0639..1e566b2d505 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -28,6 +28,7 @@
#include "access/relscan.h"
#include "executor/execdebug.h"
#include "executor/nodeSeqscan.h"
+#include "storage/predicate.h"
static void InitScanRelation(SeqScanState *node, EState *estate);
static TupleTableSlot *SeqNext(SeqScanState *node);
@@ -105,11 +106,15 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
* tuple.
* We call the ExecScan() routine and pass it the appropriate
* access method functions.
+ * For serializable transactions, we first acquire a predicate
+ * lock on the entire relation.
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecSeqScan(SeqScanState *node)
{
+ PredicateLockRelation(node->ss_currentRelation);
+ node->ss_currentScanDesc->rs_relpredicatelocked = true;
return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 456db5c50ef..21782824ca1 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -6768,6 +6768,12 @@ transaction_mode_item:
| READ WRITE
{ $$ = makeDefElem("transaction_read_only",
makeIntConst(FALSE, @1)); }
+ | DEFERRABLE
+ { $$ = makeDefElem("transaction_deferrable",
+ makeIntConst(TRUE, @1)); }
+ | NOT DEFERRABLE
+ { $$ = makeDefElem("transaction_deferrable",
+ makeIntConst(FALSE, @1)); }
;
/* Syntax with commas is SQL-spec, without commas is Postgres historical */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2dbac56c25d..56c0bd8d498 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -32,6 +32,7 @@
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
@@ -105,6 +106,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
sizeof(ShmemIndexEnt)));
size = add_size(size, BufferShmemSize());
size = add_size(size, LockShmemSize());
+ size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
@@ -200,6 +202,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
InitLocks();
/*
+ * Set up predicate lock manager
+ */
+ InitPredicateLocks();
+
+ /*
* Set up process table
*/
if (!IsUnderPostmaster)
diff --git a/src/backend/storage/ipc/shmem.c b/src/backend/storage/ipc/shmem.c
index 62809577991..0811da7b6fe 100644
--- a/src/backend/storage/ipc/shmem.c
+++ b/src/backend/storage/ipc/shmem.c
@@ -198,7 +198,7 @@ ShmemAlloc(Size size)
* Returns TRUE if the pointer points within the shared memory segment.
*/
bool
-ShmemAddrIsValid(void *addr)
+ShmemAddrIsValid(const void *addr)
{
return (addr >= ShmemBase) && (addr < ShmemEnd);
}
diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c
index 0277c3f9f40..1cf69a09c83 100644
--- a/src/backend/storage/ipc/shmqueue.c
+++ b/src/backend/storage/ipc/shmqueue.c
@@ -43,14 +43,12 @@ SHMQueueInit(SHM_QUEUE *queue)
* SHMQueueIsDetached -- TRUE if element is not currently
* in a queue.
*/
-#ifdef NOT_USED
bool
-SHMQueueIsDetached(SHM_QUEUE *queue)
+SHMQueueIsDetached(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
return (queue->prev == NULL);
}
-#endif
/*
* SHMQueueElemInit -- clear an element's links
@@ -146,7 +144,7 @@ SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
*--------------------
*/
Pointer
-SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
+SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
{
SHM_QUEUE *elemPtr = curElem->next;
@@ -162,7 +160,7 @@ SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
bool
-SHMQueueEmpty(SHM_QUEUE *queue)
+SHMQueueEmpty(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index 9aa9a5c0868..e12a8549f74 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o
+OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o predicate.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 87cae18cb6f..40779d2e359 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -3,7 +3,7 @@ src/backend/storage/lmgr/README
Locking Overview
================
-Postgres uses three types of interprocess locks:
+Postgres uses four types of interprocess locks:
* Spinlocks. These are intended for *very* short-term locks. If a lock
is to be held more than a few dozen instructions, or across any sort of
@@ -34,6 +34,8 @@ supports a variety of lock modes with table-driven semantics, and it has
full deadlock detection and automatic release at transaction end.
Regular locks should be used for all user-driven lock requests.
+* SIReadLock predicate locks. See separate README-SSI file for details.
+
Acquisition of either a spinlock or a lightweight lock causes query
cancel and die() interrupts to be held off until all such locks are
released. No such restriction exists for regular locks, however. Also
diff --git a/src/backend/storage/lmgr/README-SSI b/src/backend/storage/lmgr/README-SSI
new file mode 100644
index 00000000000..a2bb63e3f8b
--- /dev/null
+++ b/src/backend/storage/lmgr/README-SSI
@@ -0,0 +1,537 @@
+src/backend/storage/lmgr/README-SSI
+
+Serializable Snapshot Isolation (SSI) and Predicate Locking
+===========================================================
+
+This is currently sitting in the lmgr directory because about 90% of
+the code is an implementation of predicate locking, which is required
+for SSI, rather than being directly related to SSI itself. When
+another use for predicate locking justifies the effort to tease these
+two things apart, this README file should probably be split.
+
+
+Credits
+-------
+
+This feature was developed by Kevin Grittner and Dan R. K. Ports,
+with review and suggestions from Joe Conway, Heikki Linnakangas, and
+Jeff Davis. It is based on work published in these papers:
+
+ Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
+ Serializable isolation for snapshot databases.
+ In SIGMOD ’08: Proceedings of the 2008 ACM SIGMOD
+ international conference on Management of data,
+ pages 729–738, New York, NY, USA. ACM.
+ http://doi.acm.org/10.1145/1376616.1376690
+
+ Michael James Cahill. 2009.
+ Serializable Isolation for Snapshot Databases.
+ Sydney Digital Theses.
+ University of Sydney, School of Information Technologies.
+ http://hdl.handle.net/2123/5353
+
+
+Overview
+--------
+
+With true serializable transactions, if you can show that your
+transaction will do the right thing if there are no concurrent
+transactions, it will do the right thing in any mix of serializable
+transactions or be rolled back with a serialization failure. This
+feature has been implemented in PostgreSQL using SSI.
+
+
+Serializable and Snapshot Transaction Isolation Levels
+------------------------------------------------------
+
+Serializable transaction isolation is attractive for shops with
+active development by many programmers against a complex schema
+because it guarantees data integrity with very little staff time --
+if a transaction can be shown to always do the right thing when it is
+run alone (before or after any other transaction), it will always do
+the right thing in any mix of concurrent serializable transactions.
+Where conflicts with other transactions would result in an
+inconsistent state within the database, or an inconsistent view of
+the data, a serializable transaction will block or roll back to
+prevent the anomaly. The SQL standard provides a specific SQLSTATE
+for errors generated when a transaction rolls back for this reason,
+so that transactions can be retried automatically.
+
+Before version 9.1 PostgreSQL did not support a full serializable
+isolation level. A request for serializable transaction isolation
+actually provided snapshot isolation. This has well known anomalies
+which can allow data corruption or inconsistent views of the data
+during concurrent transactions; although these anomalies only occur
+when certain patterns of read-write dependencies exist within a set
+of concurrent transactions. Where these patterns exist, the anomalies
+can be prevented by introducing conflicts through explicitly
+programmed locks or otherwise unnecessary writes to the database.
+Snapshot isolation is popular because performance is better than
+serializable isolation and the integrity guarantees which it does
+provide allow anomalies to be avoided or managed with reasonable
+effort in many environments.
+
+
+Serializable Isolation Implementation Strategies
+------------------------------------------------
+
+Techniques for implementing full serializable isolation have been
+published and in use in many database products for decades. The
+primary technique which has been used is Strict 2 Phase Locking
+(S2PL), which operates by blocking writes against data which has been
+read by concurrent transactions and blocking any access (read or
+write) against data which has been written by concurrent
+transactions. A cycle in a graph of blocking indicates a deadlock,
+requiring a rollback. Blocking and deadlocks under S2PL in high
+contention workloads can be debilitating, crippling throughput and
+response time.
+
+A new technique for implementing full serializable isolation in an
+MVCC database appears in the literature beginning in 2008. This
+technique, known as Serializable Snapshot Isolation (SSI) has many of
+the advantages of snapshot isolation. In particular, reads don't
+block anything and writes don't block reads. Essentially, it runs
+snapshot isolation but monitors the read-write conflicts between
+transactions to identify dangerous structures in the transaction
+graph which indicate that a set of concurrent transactions might
+produce an anomaly, and rolls back transactions to ensure that no
+anomalies occur. It will produce some false positives (where a
+transaction is rolled back even though there would not have been an
+anomaly), but will never let an anomaly occur. In the two known
+prototype implementations, performance for many workloads (even with
+the need to restart transactions which are rolled back) is very close
+to snapshot isolation and generally far better than an S2PL
+implementation.
+
+
+Apparent Serial Order of Execution
+----------------------------------
+
+One way to understand when snapshot anomalies can occur, and to
+visualize the difference between the serializable implementations
+described above, is to consider that among transactions executing at
+the serializable transaction isolation level, the results are
+required to be consistent with some serial (one-at-a-time) execution
+of the transactions[1]. How is that order determined in each?
+
+S2PL locks rows used by the transaction in a way which blocks
+conflicting access, so that at the moment of a successful commit it
+is certain that no conflicting access has occurred. Some transactions
+may have blocked, essentially being partially serialized with the
+committing transaction, to allow this. Some transactions may have
+been rolled back, due to cycles in the blocking. But with S2PL,
+transactions can always be viewed as having occurred serially, in the
+order of successful commit.
+
+With snapshot isolation, reads never block writes, nor vice versa, so
+there is much less actual serialization. The order in which
+transactions appear to have executed is determined by something more
+subtle than in S2PL: read/write dependencies. If a transaction
+attempts to read data which is not visible to it because the
+transaction which wrote it (or will later write it) is concurrent
+(one of them was running when the other acquired its snapshot), then
+the reading transaction appears to have executed first, regardless of
+the actual sequence of transaction starts or commits (since it sees a
+database state prior to that in which the other transaction leaves
+it). If one transaction has both rw-dependencies in (meaning that a
+concurrent transaction attempts to read data it writes) and out
+(meaning it attempts to read data a concurrent transaction writes),
+and a couple other conditions are met, there can appear to be a cycle
+in execution order of the transactions. This is when the anomalies
+occur.
+
+SSI works by watching for the conditions mentioned above, and rolling
+back a transaction when needed to prevent any anomaly. The apparent
+order of execution will always be consistent with any actual
+serialization (i.e., a transaction which run by itself can always be
+considered to have run after any transactions committed before it
+started and before any transacton which starts after it commits); but
+among concurrent transactions it will appear that the transaction on
+the read side of a rw-dependency executed before the transaction on
+the write side.
+
+
+PostgreSQL Implementation
+-------------------------
+
+The implementation of serializable transactions for PostgreSQL is
+accomplished through Serializable Snapshot Isolation (SSI), based on
+the work of Cahill, et al. Fundamentally, this allows snapshot
+isolation to run as it has, while monitoring for conditions which
+could create a serialization anomaly.
+
+ * Since this technique is based on Snapshot Isolation (SI), those
+areas in PostgreSQL which don't use SI can't be brought under SSI.
+This includes system tables, temporary tables, sequences, hint bit
+rewrites, etc. SSI can not eliminate existing anomalies in these
+areas.
+
+ * Any transaction which is run at a transaction isolation level
+other than SERIALIZABLE will not be affected by SSI. If you want to
+enforce business rules through SSI, all transactions should be run at
+the SERIALIZABLE transaction isolation level, and that should
+probably be set as the default.
+
+ * If all transactions are run at the SERIALIZABLE transaction
+isolation level, business rules can be enforced in triggers or
+application code without ever having a need to acquire an explicit
+lock or to use SELECT FOR SHARE or SELECT FOR UPDATE.
+
+ * Those who want to continue to use snapshot isolation without
+the additional protections of SSI (and the associated costs of
+enforcing those protections), can use the REPEATABLE READ transaction
+isolation level. This level will retain its legacy behavior, which
+is identical to the old SERIALIZABLE implementation and fully
+consistent with the standard's requirements for the REPEATABLE READ
+transaction isolation level.
+
+ * Performance under this SSI implementation will be significantly
+improved if transactions which don't modify permanent tables are
+declared to be READ ONLY before they begin reading data.
+
+ * Performance under SSI will tend to degrade more rapidly with a
+large number of active database transactions than under less strict
+isolation levels. Limiting the number of active transactions through
+use of a connection pool or similar techniques may be necessary to
+maintain good performance.
+
+ * Any transaction which must be rolled back to prevent
+serialization anomalies will fail with SQLSTATE 40001, which has a
+standard meaning of "serialization failure".
+
+ * This SSI implementation makes an effort to choose the
+transaction to be cancelled such that an immediate retry of the
+transaction will not fail due to conflicts with exactly the same
+transactions. Pursuant to this goal, no transaction is cancelled
+until one of the other transactions in the set of conflicts which
+could generate an anomaly has successfully committed. This is
+conceptually similar to how write conflicts are handled. To fully
+implement this guarantee there needs to be a way to roll back the
+active transaction for another process with a serialization failure
+SQLSTATE, even if it is "idle in transaction".
+
+
+Predicate Locking
+-----------------
+
+Both S2PL and SSI require some form of predicate locking to handle
+situations where reads conflict with later inserts or with later
+updates which move data into the selected range. PostgreSQL didn't
+already have predicate locking, so it needed to be added to support
+full serializable transactions under either strategy. Practical
+implementations of predicate locking generally involve acquiring
+locks against data as it is accessed, using multiple granularities
+(tuple, page, table, etc.) with escalation as needed to keep the lock
+count to a number which can be tracked within RAM structures, and
+this was used in PostgreSQL. Coarse granularities can cause some
+false positive indications of conflict. The number of false positives
+can be influenced by plan choice.
+
+
+Implementation overview
+-----------------------
+
+New RAM structures, inspired by those used to track traditional locks
+in PostgreSQL, but tailored to the needs of SIREAD predicate locking,
+are used. These refer to physical objects actually accessed in the
+course of executing the query, to model the predicates through
+inference. Anyone interested in this subject should review the
+Hellerstein, Stonebraker and Hamilton paper[2], along with the
+locking papers referenced from that and the Cahill papers.
+
+Because the SIREAD locks don't block, traditional locking techniques
+were be modified. Intent locking (locking higher level objects
+before locking lower level objects) doesn't work with non-blocking
+"locks" (which are, in some respects, more like flags than locks).
+
+A configurable amount of shared memory is reserved at postmaster
+start-up to track predicate locks. This size cannot be changed
+without a restart.
+
+ * To prevent resource exhaustion, multiple fine-grained locks may
+be promoted to a single coarser-grained lock as needed.
+
+ * An attempt to acquire an SIREAD lock on a tuple when the same
+transaction already holds an SIREAD lock on the page or the relation
+will be ignored. Likewise, an attempt to lock a page when the
+relation is locked will be ignored, and the acquisition of a coarser
+lock will result in the automatic release of all finer-grained locks
+it covers.
+
+
+Heap locking
+------------
+
+Predicate locks will be acquired for the heap based on the following:
+
+ * For a table scan, the entire relation will be locked.
+
+ * Each tuple read which is visible to the reading transaction
+will be locked, whether or not it meets selection criteria; except
+that there is no need to acquire an SIREAD lock on a tuple when the
+transaction already holds a write lock on any tuple representing the
+row, since a rw-dependency would also create a ww-dependency which
+has more aggressive enforcement and will thus prevent any anomaly.
+
+
+Index AM implementations
+------------------------
+
+Since predicate locks only exist to detect writes which conflict with
+earlier reads, and heap tuple locks are acquired to cover all heap
+tuples actually read, including those read through indexes, the index
+tuples which were actually scanned are not of interest in themselves;
+we only care about their "new neighbors" -- later inserts into the
+index which would have been included in the scan had they existed at
+the time. Conceptually, we want to lock the gaps between and
+surrounding index entries within the scanned range.
+
+Correctness requires that any insert into an index generates a
+rw-conflict with a concurrent serializable transaction if, after that
+insert, re-execution of any index scan of the other transaction would
+access the heap for a row not accessed during the previous execution.
+Note that a non-HOT update which expires an old index entry covered
+by the scan and adds a new entry for the modified row's new tuple
+need not generate a conflict, although an update which "moves" a row
+into the scan must generate a conflict. While correctness allows
+false positives, they should be minimized for performance reasons.
+
+Several optimizations are possible:
+
+ * An index scan which is just finding the right position for an
+index insertion or deletion need not acquire a predicate lock.
+
+ * An index scan which is comparing for equality on the entire key
+for a unique index need not acquire a predicate lock as long as a key
+is found corresponding to a visible tuple which has not been modified
+by another transaction -- there are no "between or around" gaps to
+cover.
+
+ * As long as built-in foreign key enforcement continues to use
+its current "special tricks" to deal with MVCC issues, predicate
+locks should not be needed for scans done by enforcement code.
+
+ * If a search determines that no rows can be found regardless of
+index contents because the search conditions are contradictory (e.g.,
+x = 1 AND x = 2), then no predicate lock is needed.
+
+Other index AM implementation considerations:
+
+ * If a btree search discovers that no root page has yet been
+created, a predicate lock on the index relation is required;
+otherwise btree searches must get to the leaf level to determine
+which tuples match, so predicate locks go there.
+
+ * GiST searches can determine that there are no matches at any
+level of the index, so there must be a predicate lock at each index
+level during a GiST search. An index insert at the leaf level can
+then be trusted to ripple up to all levels and locations where
+conflicting predicate locks may exist.
+
+ * The effects of page splits, overflows, consolidations, and
+removals must be carefully reviewed to ensure that predicate locks
+aren't "lost" during those operations, or kept with pages which could
+get re-used for different parts of the index.
+
+
+Innovations
+-----------
+
+The PostgreSQL implementation of Serializable Snapshot Isolation
+differs from what is described in the cited papers for several
+reasons:
+
+ 1. PostgreSQL didn't have any existing predicate locking. It had
+to be added from scratch.
+
+ 2. The existing in-memory lock structures were not suitable for
+tracking SIREAD locks.
+ * The database products used for the prototype
+implementations for the papers used update-in-place with a rollback
+log for their MVCC implementations, while PostgreSQL leaves the old
+version of a row in place and adds a new tuple to represent the row
+at a new location.
+ * In PostgreSQL, tuple level locks are not held in RAM for
+any length of time; lock information is written to the tuples
+involved in the transactions.
+ * In PostgreSQL, existing lock structures have pointers to
+memory which is related to a connection. SIREAD locks need to persist
+past the end of the originating transaction and even the connection
+which ran it.
+ * PostgreSQL needs to be able to tolerate a large number of
+transactions executing while one long-running transaction stays open
+-- the in-RAM techniques discussed in the papers wouldn't support
+that.
+
+ 3. Unlike the database products used for the prototypes described
+in the papers, PostgreSQL didn't already have a true serializable
+isolation level distinct from snapshot isolation.
+
+ 4. PostgreSQL supports subtransactions -- an issue not mentioned
+in the papers.
+
+ 5. PostgreSQL doesn't assign a transaction number to a database
+transaction until and unless necessary.
+
+ 6. PostgreSQL has pluggable data types with user-definable
+operators, as well as pluggable index types, not all of which are
+based around data types which support ordering.
+
+ 7. Some possible optimizations became apparent during development
+and testing.
+
+Differences from the implementation described in the papers are
+listed below.
+
+ * New structures needed to be created in shared memory to track
+the proper information for serializable transactions and their SIREAD
+locks.
+
+ * Because PostgreSQL does not have the same concept of an "oldest
+transaction ID" for all serializable transactions as assumed in the
+Cahill these, we track the oldest snapshot xmin among serializable
+transactions, and a count of how many active transactions use that
+xmin. When the count hits zero we find the new oldest xmin and run a
+clean-up based on that.
+
+ * Because reads in a subtransaction may cause that subtransaction
+to roll back, thereby affecting what is written by the top level
+transaction, predicate locks must survive a subtransaction rollback.
+As a consequence, all xid usage in SSI, including predicate locking,
+is based on the top level xid. When looking at an xid that comes
+from a tuple's xmin or xmax, for example, we always call
+SubTransGetTopmostTransaction() before doing much else with it.
+
+ * Predicate locking in PostgreSQL will start at the tuple level
+when possible, with automatic conversion of multiple fine-grained
+locks to coarser granularity as need to avoid resource exhaustion.
+The amount of memory used for these structures will be configurable,
+to balance RAM usage against SIREAD lock granularity.
+
+ * A process-local copy of locks held by a process and the coarser
+covering locks with counts, are kept to support granularity promotion
+decisions with low CPU and locking overhead.
+
+ * Conflicts will be identified by looking for predicate locks
+when tuples are written and looking at the MVCC information when
+tuples are read. There is no matching between two RAM-based locks.
+
+ * Because write locks are stored in the heap tuples rather than a
+RAM-based lock table, the optimization described in the Cahill thesis
+which eliminates an SIREAD lock where there is a write lock is
+implemented by the following:
+ 1. When checking a heap write for conflicts against existing
+predicate locks, a tuple lock on the tuple being written is removed.
+ 2. When acquiring a predicate lock on a heap tuple, we
+return quickly without doing anything if it is a tuple written by the
+reading transaction.
+
+ * Rather than using conflictIn and conflictOut pointers which use
+NULL to indicate no conflict and a self-reference to indicate
+multiple conflicts or conflicts with committed transactions, we use a
+list of rw-conflicts. With the more complete information, false
+positives are reduced and we have sufficient data for more aggressive
+clean-up and other optimizations.
+ o We can avoid ever rolling back a transaction until and
+unless there is a pivot where a transaction on the conflict *out*
+side of the pivot committed before either of the other transactions.
+ o We can avoid ever rolling back a transaction when the
+transaction on the conflict *in* side of the pivot is explicitly or
+implicitly READ ONLY unless the transaction on the conflict *out*
+side of the pivot committed before the READ ONLY transaction acquired
+its snapshot. (An implicit READ ONLY transaction is one which
+committed without writing, even though it was not explicitly declared
+to be READ ONLY.)
+ o We can more aggressively clean up conflicts, predicate
+locks, and SSI transaction information.
+
+ * Allow a READ ONLY transaction to "opt out" of SSI if there are
+no READ WRITE transactions which could cause the READ ONLY
+transaction to ever become part of a "dangerous structure" of
+overlapping transaction dependencies.
+
+ * Allow the user to request that a READ ONLY transaction wait
+until the conditions are right for it to start in the "opt out" state
+described above. We add a DEFERRABLE state to transactions, which is
+specified and maintained in a way similar to READ ONLY. It is
+ignored for transactions which are not SERIALIZABLE and READ ONLY.
+
+ * When a transaction must be rolled back, we pick among the
+active transactions such that an immediate retry will not fail again
+on conflicts with the same transactions.
+
+ * We use the PostgreSQL SLRU system to hold summarized
+information about older committed transactions to put an upper bound
+on RAM used. Beyond that limit, information spills to disk.
+Performance can degrade in a pessimal situation, but it should be
+tolerable, and transactions won't need to be cancelled or blocked
+from starting.
+
+
+R&D Issues
+----------
+
+This is intended to be the place to record specific issues which need
+more detailed review or analysis.
+
+ * WAL file replay. While serializable implementations using S2PL
+can guarantee that the write-ahead log contains commits in a sequence
+consistent with some serial execution of serializable transactions,
+SSI cannot make that guarantee. While the WAL replay is no less
+consistent than under snapshot isolation, it is possible that under
+PITR recovery or hot standby a database could reach a readable state
+where some transactions appear before other transactions which would
+have had to precede them to maintain serializable consistency. In
+essence, if we do nothing, WAL replay will be at snapshot isolation
+even for serializable transactions. Is this OK? If not, how do we
+address it?
+
+ * External replication. Look at how this impacts external
+replication solutions, like Postgres-R, Slony, pgpool, HS/SR, etc.
+This is related to the "WAL file replay" issue.
+
+ * Weak-memory-ordering machines. Make sure that shared memory
+access which involves visibility across multiple transactions uses
+locks as needed to avoid problems. On the other hand, ensure that we
+really need volatile where we're using it.
+http://archives.postgresql.org/pgsql-committers/2008-06/msg00228.php
+
+ * UNIQUE btree search for equality on all columns. Since a search
+of a UNIQUE index using equality tests on all columns will lock the
+heap tuple if an entry is found, it appears that there is no need to
+get a predicate lock on the index in that case. A predicate lock is
+still needed for such a search if a matching index entry which points
+to a visible tuple is not found.
+
+ * Planner index probes. To avoid problems with data skew at the
+ends of an index which have historically caused bad plans, the
+planner now probes the end of an index to see what the maximum or
+minimum value is when a query appears to be requesting a range of
+data outside what statistics shows is present. These planner checks
+don't require predicate locking, but there's currently no easy way to
+avoid it. What can we do to avoid predicate locking for such planner
+activity?
+
+ * Minimize touching of shared memory. Should lists in shared
+memory push entries which have just been returned to the front of the
+available list, so they will be popped back off soon and some memory
+might never be touched, or should we keep adding returned items to
+the end of the available list?
+
+
+Footnotes
+---------
+
+[1] http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt
+Search for serial execution to find the relevant section.
+
+[2] http://db.cs.berkeley.edu/papers/fntdb07-architecture.pdf
+Joseph M. Hellerstein, Michael Stonebraker and James Hamilton. 2007.
+Architecture of a Database System. Foundations and Trends(R) in
+Databases Vol. 1, No. 2 (2007) 141–259.
+ Of particular interest:
+ * 6.1 A Note on ACID
+ * 6.2 A Brief Review of Serializability
+ * 6.3 Locking and Latching
+ * 6.3.1 Transaction Isolation Levels
+ * 6.5.3 Next-Key Locking: Physical Surrogates for Logical
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 61621a4d0ae..0fe7ce45cd6 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -28,6 +28,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/ipc.h"
+#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/spin.h"
@@ -178,6 +179,9 @@ NumLWLocks(void)
/* async.c needs one per Async buffer */
numLocks += NUM_ASYNC_BUFFERS;
+ /* predicate.c needs one per old serializable xid buffer */
+ numLocks += NUM_OLDSERXID_BUFFERS;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
new file mode 100644
index 00000000000..5e62ba9e4d9
--- /dev/null
+++ b/src/backend/storage/lmgr/predicate.c
@@ -0,0 +1,4439 @@
+/*-------------------------------------------------------------------------
+ *
+ * predicate.c
+ * POSTGRES predicate locking
+ * to support full serializable transaction isolation
+ *
+ *
+ * The approach taken is to implement Serializable Snapshot Isolation (SSI)
+ * as initially described in this paper:
+ *
+ * Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
+ * Serializable isolation for snapshot databases.
+ * In SIGMOD ’08: Proceedings of the 2008 ACM SIGMOD
+ * international conference on Management of data,
+ * pages 729–738, New York, NY, USA. ACM.
+ * http://doi.acm.org/10.1145/1376616.1376690
+ *
+ * and further elaborated in Cahill's doctoral thesis:
+ *
+ * Michael James Cahill. 2009.
+ * Serializable Isolation for Snapshot Databases.
+ * Sydney Digital Theses.
+ * University of Sydney, School of Information Technologies.
+ * http://hdl.handle.net/2123/5353
+ *
+ *
+ * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
+ * locks, which are so different from normal locks that a distinct set of
+ * structures is required to handle them. They are needed to detect
+ * rw-conflicts when the read happens before the write. (When the write
+ * occurs first, the reading transaction can check for a conflict by
+ * examining the MVCC data.)
+ *
+ * (1) Besides tuples actually read, they must cover ranges of tuples
+ * which would have been read based on the predicate. This will
+ * require modelling the predicates through locks against database
+ * objects such as pages, index ranges, or entire tables.
+ *
+ * (2) They must be kept in RAM for quick access. Because of this, it
+ * isn't possible to always maintain tuple-level granularity -- when
+ * the space allocated to store these approaches exhaustion, a
+ * request for a lock may need to scan for situations where a single
+ * transaction holds many fine-grained locks which can be coalesced
+ * into a single coarser-grained lock.
+ *
+ * (3) They never block anything; they are more like flags than locks
+ * in that regard; although they refer to database objects and are
+ * used to identify rw-conflicts with normal write locks.
+ *
+ * (4) While they are associated with a transaction, they must survive
+ * a successful COMMIT of that transaction, and remain until all
+ * overlapping transactions complete. This even means that they
+ * must survive termination of the transaction's process. If a
+ * top level transaction is rolled back, however, it is immediately
+ * flagged so that it can be ignored, and its SIREAD locks can be
+ * released any time after that.
+ *
+ * (5) The only transactions which create SIREAD locks or check for
+ * conflicts with them are serializable transactions.
+ *
+ * (6) When a write lock for a top level transaction is found to cover
+ * an existing SIREAD lock for the same transaction, the SIREAD lock
+ * can be deleted.
+ *
+ * (7) A write from a serializable transaction must ensure that a xact
+ * record exists for the transaction, with the same lifespan (until
+ * all concurrent transaction complete or the transaction is rolled
+ * back) so that rw-dependencies to that transaction can be
+ * detected.
+ *
+ * We use an optimization for read-only transactions. Under certain
+ * circumstances, a read-only transaction's snapshot can be shown to
+ * never have conflicts with other transactions. This is referred to
+ * as a "safe" snapshot (and one known not to be is "unsafe").
+ * However, it can't be determined whether a snapshot is safe until
+ * all concurrent read/write transactions complete.
+ *
+ * Once a read-only transaction is known to have a safe snapshot, it
+ * can release its predicate locks and exempt itself from further
+ * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
+ * on safe snapshots, waiting as necessary for one to be available.
+ *
+ *
+ * Lightweight locks to manage access to the predicate locking shared
+ * memory objects must be taken in this order, and should be released in
+ * reverse order:
+ *
+ * SerializableFinishedListLock
+ * - Protects the list of transactions which have completed but which
+ * may yet matter because they overlap still-active transactions.
+ *
+ * SerializablePredicateLockListLock
+ * - Protects the linked list of locks held by a transaction. Note
+ * that the locks themselves are also covered by the partition
+ * locks of their respective lock targets; this lock only affects
+ * the linked list connecting the locks related to a transaction.
+ * - All transactions share this single lock (with no partitioning).
+ * - There is never a need for a process other than the one running
+ * an active transaction to walk the list of locks held by that
+ * transaction.
+ * - It is relatively infrequent that another process needs to
+ * modify the list for a transaction, but it does happen for such
+ * things as index page splits for pages with predicate locks and
+ * freeing of predicate locked pages by a vacuum process. When
+ * removing a lock in such cases, the lock itself contains the
+ * pointers needed to remove it from the list. When adding a
+ * lock in such cases, the lock can be added using the anchor in
+ * the transaction structure. Neither requires walking the list.
+ * - Cleaning up the list for a terminated transaction is sometimes
+ * not done on a retail basis, in which case no lock is required.
+ * - Due to the above, a process accessing its active transaction's
+ * list always uses a shared lock, regardless of whether it is
+ * walking or maintaining the list. This improves concurrency
+ * for the common access patterns.
+ * - A process which needs to alter the list of a transaction other
+ * than its own active transaction must acquire an exclusive
+ * lock.
+ *
+ * FirstPredicateLockMgrLock based partition locks
+ * - The same lock protects a target, all locks on that target, and
+ * the linked list of locks on the target..
+ * - When more than one is needed, acquire in ascending order.
+ *
+ * SerializableXactHashLock
+ * - Protects both PredXact and SerializableXidHash.
+ *
+ * PredicateLockNextRowLinkLock
+ * - Protects the priorVersionOfRow and nextVersionOfRow fields of
+ * PREDICATELOCKTARGET when linkage is being created or destroyed.
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/storage/lmgr/predicate.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ *
+ * housekeeping for setting up shared memory predicate lock structures
+ * InitPredicateLocks(void)
+ * PredicateLockShmemSize(void)
+ *
+ * predicate lock reporting
+ * GetPredicateLockStatusData(void)
+ * PageIsPredicateLocked(Relation relation, BlockNumber blkno)
+ *
+ * predicate lock maintenance
+ * RegisterSerializableTransaction(Snapshot snapshot)
+ * RegisterPredicateLockingXid(void)
+ * PredicateLockRelation(Relation relation)
+ * PredicateLockPage(Relation relation, BlockNumber blkno)
+ * PredicateLockTuple(Relation relation, HeapTuple tuple)
+ * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * PredicateLockTupleRowVersionLink(const Relation relation,
+ * const HeapTuple oldTuple,
+ * const HeapTuple newTuple)
+ * ReleasePredicateLocks(bool isCommit)
+ *
+ * conflict detection (may also trigger rollback)
+ * CheckForSerializableConflictOut(bool visible, Relation relation,
+ * HeapTupleData *tup, Buffer buffer)
+ * CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
+ * Buffer buffer)
+ *
+ * final rollback checking
+ * PreCommit_CheckForSerializationFailure(void)
+ *
+ * two-phase commit support
+ * AtPrepare_PredicateLocks(void);
+ * PostPrepare_PredicateLocks(TransactionId xid);
+ * PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
+ * predicatelock_twophase_recover(TransactionId xid, uint16 info,
+ * void *recdata, uint32 len);
+ */
+
+#include "postgres.h"
+
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/twophase_rmgr.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/predicate.h"
+#include "storage/predicate_internals.h"
+#include "storage/procarray.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+/* Uncomment the next line to test the graceful degradation code. */
+/* #define TEST_OLDSERXID */
+
+/*
+ * Test the most selective fields first, for performance.
+ *
+ * a is covered by b if all of the following hold:
+ * 1) a.database = b.database
+ * 2) a.relation = b.relation
+ * 3) b.offset is invalid (b is page-granularity or higher)
+ * 4) either of the following:
+ * 4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
+ * or 4b) a.offset is invalid and b.page is invalid (a is
+ * page-granularity and b is relation-granularity
+ */
+#define TargetTagIsCoveredBy(covered_target, covering_target) \
+ ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */ \
+ GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
+ && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
+ InvalidOffsetNumber) /* (3) */ \
+ && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
+ InvalidOffsetNumber) /* (4a) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
+ || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ InvalidBlockNumber) /* (4b) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
+ != InvalidBlockNumber))) \
+ && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == /* (1) */ \
+ GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
+
+/*
+ * The predicate locking target and lock shared hash tables are partitioned to
+ * reduce contention. To determine which partition a given target belongs to,
+ * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
+ * apply one of these macros.
+ * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
+ */
+#define PredicateLockHashPartition(hashcode) \
+ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
+#define PredicateLockHashPartitionLock(hashcode) \
+ ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
+
+#define NPREDICATELOCKTARGETENTS() \
+ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
+
+#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
+
+#define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
+#define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
+#define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
+#define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
+#define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
+#define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
+#define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
+#define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
+#define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
+#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0)
+
+/*
+ * When a public interface method is called for a split on an index relation,
+ * this is the test to see if we should do a quick return.
+ */
+#define SkipSplitTracking(relation) \
+ (((relation)->rd_id < FirstBootstrapObjectId) \
+ || RelationUsesLocalBuffers(relation))
+
+/*
+ * When a public interface method is called for serializing a relation within
+ * the current transaction, this is the test to see if we should do a quick
+ * return.
+ */
+#define SkipSerialization(relation) \
+ ((!IsolationIsSerializable()) \
+ || ((MySerializableXact == InvalidSerializableXact)) \
+ || ReleasePredicateLocksIfROSafe() \
+ || SkipSplitTracking(relation))
+
+
+/*
+ * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
+ *
+ * To avoid unnecessary recomputations of the hash code, we try to do this
+ * just once per function, and then pass it around as needed. Aside from
+ * passing the hashcode to hash_search_with_hash_value(), we can extract
+ * the lock partition number from the hashcode.
+ */
+#define PredicateLockTargetTagHashCode(predicatelocktargettag) \
+ (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
+
+/*
+ * Given a predicate lock tag, and the hash for its target,
+ * compute the lock hash.
+ *
+ * To make the hash code also depend on the transaction, we xor the sxid
+ * struct's address into the hash code, left-shifted so that the
+ * partition-number bits don't change. Since this is only a hash, we
+ * don't care if we lose high-order bits of the address; use an
+ * intermediate variable to suppress cast-pointer-to-int warnings.
+ */
+#define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
+ ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
+ << LOG2_NUM_PREDICATELOCK_PARTITIONS)
+
+
+/*
+ * The SLRU buffer area through which we access the old xids.
+ */
+static SlruCtlData OldSerXidSlruCtlData;
+
+#define OldSerXidSlruCtl (&OldSerXidSlruCtlData)
+
+#define OLDSERXID_PAGESIZE BLCKSZ
+#define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo)
+#define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
+#define OLDSERXID_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
+
+#define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
+
+#define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
+ (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
+ ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
+
+#define OldSerXidPage(xid) ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
+#define OldSerXidSegment(page) ((page) / SLRU_PAGES_PER_SEGMENT)
+
+typedef struct OldSerXidControlData
+{
+ int headPage;
+ int tailSegment;
+ TransactionId headXid;
+ TransactionId tailXid;
+ bool warningIssued;
+} OldSerXidControlData;
+
+typedef struct OldSerXidControlData *OldSerXidControl;
+
+static OldSerXidControl oldSerXidControl;
+
+/*
+ * When the oldest committed transaction on the "finished" list is moved to
+ * SLRU, its predicate locks will be moved to this "dummy" transaction,
+ * collapsing duplicate targets. When a duplicate is found, the later
+ * commitSeqNo is used.
+ */
+static SERIALIZABLEXACT *OldCommittedSxact;
+
+
+/* This configuration variable is used to set the predicate lock table size */
+int max_predicate_locks_per_xact; /* set by guc.c */
+
+/*
+ * This provides a list of objects in order to track transactions
+ * participating in predicate locking. Entries in the list are fixed size,
+ * and reside in shared memory. The memory address of an entry must remain
+ * fixed during its lifetime. The list will be protected from concurrent
+ * update externally; no provision is made in this code to manage that. The
+ * number of entries in the list, and the size allowed for each entry is
+ * fixed upon creation.
+ */
+static PredXactList PredXact;
+
+/*
+ * This provides a pool of RWConflict data elements to use in conflict lists
+ * between transactions.
+ */
+static RWConflictPoolHeader RWConflictPool;
+
+/*
+ * The predicate locking hash tables are in shared memory.
+ * Each backend keeps pointers to them.
+ */
+static HTAB *SerializableXidHash;
+static HTAB *PredicateLockTargetHash;
+static HTAB *PredicateLockHash;
+static SHM_QUEUE *FinishedSerializableTransactions;
+
+/*
+ * Tag for a reserved entry in PredicateLockTargetHash; used to ensure
+ * there's an element available for scratch space if we need it,
+ * e.g. in PredicateLockPageSplit. This is an otherwise-invalid tag.
+ */
+static const PREDICATELOCKTARGETTAG ReservedTargetTag = {0, 0, 0, 0, 0};
+
+/*
+ * The local hash table used to determine when to combine multiple fine-
+ * grained locks into a single courser-grained lock.
+ */
+static HTAB *LocalPredicateLockHash = NULL;
+
+/*
+ * Keep a pointer to the currently-running serializable transaction (if any)
+ * for quick reference.
+ * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
+ */
+static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
+
+/* local functions */
+
+static SERIALIZABLEXACT *CreatePredXact(void);
+static void ReleasePredXact(SERIALIZABLEXACT *sxact);
+static SERIALIZABLEXACT *FirstPredXact(void);
+static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
+
+static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
+static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
+static void ReleaseRWConflict(RWConflict conflict);
+static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
+
+static bool OldSerXidPagePrecedesLogically(int p, int q);
+static void OldSerXidInit(void);
+static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
+static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
+static void OldSerXidSetActiveSerXmin(TransactionId xid);
+
+static uint32 predicatelock_hash(const void *key, Size keysize);
+static void SummarizeOldestCommittedSxact(void);
+static Snapshot GetSafeSnapshot(Snapshot snapshot);
+static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot);
+static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
+static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent);
+static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
+static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
+ uint32 targettaghash);
+static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
+static int PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
+static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
+static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
+static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
+ uint32 targettaghash,
+ SERIALIZABLEXACT *sxact);
+static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
+static bool TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
+ const PREDICATELOCKTARGETTAG newtargettag,
+ bool removeOld);
+static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
+static void SetNewSxactGlobalXmin(void);
+static bool ReleasePredicateLocksIfROSafe(void);
+static void ClearOldPredicateLocks(void);
+static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
+ bool summarize);
+static bool XidIsConcurrent(TransactionId xid);
+static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
+static bool CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
+ PREDICATELOCKTARGETTAG *nexttargettag);
+static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ SERIALIZABLEXACT *writer);
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * These functions are a simple implementation of a list for this specific
+ * type of struct. If there is ever a generalized shared memory list, we
+ * should probably switch to that.
+ */
+static SERIALIZABLEXACT *
+CreatePredXact(void)
+{
+ PredXactListElement ptle;
+
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->availableList,
+ &PredXact->availableList,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
+ return &ptle->sxact;
+}
+
+static void
+ReleasePredXact(SERIALIZABLEXACT *sxact)
+{
+ PredXactListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredXactListElement)
+ (((char *) sxact)
+ - offsetof(PredXactListElementData, sxact)
+ +offsetof(PredXactListElementData, link));
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
+}
+
+static SERIALIZABLEXACT *
+FirstPredXact(void)
+{
+ PredXactListElement ptle;
+
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->activeList,
+ &PredXact->activeList,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+}
+
+static SERIALIZABLEXACT *
+NextPredXact(SERIALIZABLEXACT *sxact)
+{
+ PredXactListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredXactListElement)
+ (((char *) sxact)
+ - offsetof(PredXactListElementData, sxact)
+ +offsetof(PredXactListElementData, link));
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->activeList,
+ &ptle->link,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * These functions manage primitive access to the RWConflict pool and lists.
+ */
+static bool
+RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
+{
+ RWConflict conflict;
+
+ Assert(reader != writer);
+
+ /* Check the ends of the purported conflict first. */
+ if (SxactIsRolledBack(reader)
+ || SxactIsRolledBack(writer)
+ || SHMQueueEmpty(&reader->outConflicts)
+ || SHMQueueEmpty(&writer->inConflicts))
+ return false;
+
+ /* A conflict is possible; walk the list to find out. */
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &reader->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if (conflict->sxactIn == writer)
+ return true;
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+
+ /* No conflict found. */
+ return false;
+}
+
+static void
+SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+{
+ RWConflict conflict;
+
+ Assert(reader != writer);
+ Assert(!RWConflictExists(reader, writer));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a rw-conflict"),
+ errhint("You might need to run fewer transactions at a time or increase max_connections.")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = reader;
+ conflict->sxactIn = writer;
+ SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+ SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+}
+
+static void
+SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
+ SERIALIZABLEXACT *activeXact)
+{
+ RWConflict conflict;
+
+ Assert(roXact != activeXact);
+ Assert(SxactIsReadOnly(roXact));
+ Assert(!SxactIsReadOnly(activeXact));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"),
+ errhint("You might need to run fewer transactions at a time or increase max_connections.")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = activeXact;
+ conflict->sxactIn = roXact;
+ SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
+ &conflict->outLink);
+ SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
+ &conflict->inLink);
+}
+
+static void
+ReleaseRWConflict(RWConflict conflict)
+{
+ SHMQueueDelete(&conflict->inLink);
+ SHMQueueDelete(&conflict->outLink);
+ SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+}
+
+static void
+FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
+{
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(SxactIsReadOnly(sxact));
+ Assert(!SxactIsROSafe(sxact));
+
+ sxact->flags |= SXACT_FLAG_RO_UNSAFE;
+
+ /*
+ * We know this isn't a safe snapshot, so we can stop looking for other
+ * potential conflicts.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &sxact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(conflict->sxactOut));
+ Assert(sxact == conflict->sxactIn);
+
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
+ * Compares using wraparound logic, as is required by slru.c.
+ */
+static bool
+OldSerXidPagePrecedesLogically(int p, int q)
+{
+ int diff;
+
+ /*
+ * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2. Both inputs should
+ * be in the range 0..OLDSERXID_MAX_PAGE.
+ */
+ Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
+ Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
+
+ diff = p - q;
+ if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
+ diff -= OLDSERXID_MAX_PAGE + 1;
+ else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2))
+ diff += OLDSERXID_MAX_PAGE + 1;
+ return diff < 0;
+}
+
+/*
+ * Initialize for the tracking of old serializable committed xids.
+ */
+static void
+OldSerXidInit(void)
+{
+ bool found;
+
+ /*
+ * Set up SLRU management of the pg_serial data.
+ */
+ OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
+ SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", NUM_OLDSERXID_BUFFERS, 0,
+ OldSerXidLock, "pg_serial");
+ /* Override default assumption that writes should be fsync'd */
+ OldSerXidSlruCtl->do_fsync = false;
+
+ /*
+ * Create or attach to the OldSerXidControl structure.
+ */
+ oldSerXidControl = (OldSerXidControl)
+ ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
+
+ if (!found)
+ {
+ /*
+ * Set control information to reflect empty SLRU.
+ */
+ oldSerXidControl->headPage = -1;
+ oldSerXidControl->tailSegment = -1;
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->tailXid = InvalidTransactionId;
+ oldSerXidControl->warningIssued = false;
+ }
+}
+
+/*
+ * Record a committed read write serializable xid and the minimum
+ * commitSeqNo of any transactions to which this xid had a rw-conflict out.
+ * A zero seqNo means that there were no conflicts out from xid.
+ *
+ * The return value is normally false -- true means that we're about to
+ * wrap around our space for tracking these xids, so the caller might want
+ * to take action to prevent that.
+ */
+static void
+OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
+{
+ TransactionId tailXid;
+ int targetPage;
+ int slotno;
+ int page;
+ int xidSpread;
+ bool isNewPage;
+
+ Assert(TransactionIdIsValid(xid));
+
+ targetPage = OldSerXidPage(xid);
+
+ LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
+
+ /*
+ * If no serializable transactions are active, there shouldn't be anything
+ * to push out to this SLRU. Hitting this assert would mean there's
+ * something wrong with the earlier cleanup logic.
+ */
+ tailXid = oldSerXidControl->tailXid;
+ Assert(TransactionIdIsValid(tailXid));
+
+ if (oldSerXidControl->headPage < 0)
+ {
+ page = OldSerXidPage(tailXid);
+ oldSerXidControl->tailSegment = OldSerXidSegment(page);
+ page = oldSerXidControl->tailSegment * OLDSERXID_ENTRIESPERPAGE;
+ isNewPage = true;
+ }
+ else
+ {
+ page = OldSerXidNextPage(oldSerXidControl->headPage);
+ isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage);
+ }
+
+ if (!TransactionIdIsValid(oldSerXidControl->headXid)
+ || TransactionIdFollows(xid, oldSerXidControl->headXid))
+ oldSerXidControl->headXid = xid;
+ if (oldSerXidControl->headPage < 0
+ || OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage))
+ oldSerXidControl->headPage = targetPage;
+
+ xidSpread = (((uint32) xid) - ((uint32) tailXid));
+ if (oldSerXidControl->warningIssued)
+ {
+ if (xidSpread < 800000000)
+ oldSerXidControl->warningIssued = false;
+ }
+ else if (xidSpread >= 1000000000)
+ {
+ oldSerXidControl->warningIssued = true;
+ ereport(WARNING,
+ (errmsg("memory for serializable conflict tracking is nearly exhausted"),
+ errhint("There may be an idle transaction or a forgotten prepared transaction causing this.")));
+ }
+
+ if (isNewPage)
+ {
+ /* Initialize intervening pages. */
+ while (page != targetPage)
+ {
+ (void) SimpleLruZeroPage(OldSerXidSlruCtl, page);
+ page = OldSerXidNextPage(page);
+ }
+ slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
+ }
+ else
+ slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
+
+ OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
+
+ LWLockRelease(OldSerXidLock);
+}
+
+/*
+ * Get the minimum commitSeqNo for any conflict out for the given xid. For
+ * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
+ * will be returned.
+ */
+static SerCommitSeqNo
+OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
+{
+ TransactionId headXid;
+ TransactionId tailXid;
+ SerCommitSeqNo val;
+ int slotno;
+
+ Assert(TransactionIdIsValid(xid));
+
+ LWLockAcquire(OldSerXidLock, LW_SHARED);
+ headXid = oldSerXidControl->headXid;
+ tailXid = oldSerXidControl->tailXid;
+ LWLockRelease(OldSerXidLock);
+
+ if (!TransactionIdIsValid(headXid))
+ return 0;
+
+ Assert(TransactionIdIsValid(tailXid));
+
+ if (TransactionIdPrecedes(xid, tailXid)
+ || TransactionIdFollows(xid, headXid))
+ return 0;
+
+ /*
+ * The following function must be called without holding OldSerXidLock,
+ * but will return with that lock held, which must then be released.
+ */
+ slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
+ OldSerXidPage(xid), xid);
+ val = OldSerXidValue(slotno, xid);
+ LWLockRelease(OldSerXidLock);
+ return val;
+}
+
+/*
+ * Call this whenever there is a new xmin for active serializable
+ * transactions. We don't need to keep information on transactions which
+ * preceed that. InvalidTransactionId means none active, so everything in
+ * the SLRU should be discarded.
+ */
+static void
+OldSerXidSetActiveSerXmin(TransactionId xid)
+{
+ int newTailPage;
+ int newTailSegment;
+
+ LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
+
+ /*
+ * When no sxacts are active, nothing overlaps, set the xid values to
+ * invalid to show that there are no valid entries. Don't clear the
+ * segment/page information, though. A new xmin might still land in an
+ * existing segment, and we don't want to repeatedly delete and re-create
+ * the same segment file.
+ */
+ if (!TransactionIdIsValid(xid))
+ {
+ if (TransactionIdIsValid(oldSerXidControl->tailXid))
+ {
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->tailXid = InvalidTransactionId;
+ }
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ /*
+ * When we're recovering prepared transactions, the global xmin might move
+ * backwards depending on the order they're recovered. Normally that's not
+ * OK, but during recovery no serializable transactions will commit, so
+ * the SLRU is empty and we can get away with it.
+ */
+ if (RecoveryInProgress())
+ {
+ Assert(oldSerXidControl->headPage < 0);
+ if (!TransactionIdIsValid(oldSerXidControl->tailXid)
+ || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
+ oldSerXidControl->tailXid = xid;
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
+ || TransactionIdFollows(xid, oldSerXidControl->tailXid));
+
+ oldSerXidControl->tailXid = xid;
+
+ /* Exit quickly if there are no segments active. */
+ if (oldSerXidControl->headPage < 0)
+ {
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ newTailPage = OldSerXidPage(xid);
+ newTailSegment = OldSerXidSegment(newTailPage);
+
+ /* Exit quickly if we're still on the same segment. */
+ if (newTailSegment == oldSerXidControl->tailSegment)
+ {
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ oldSerXidControl->tailSegment = newTailSegment;
+
+ /* See if that has cleared the last segment. */
+ if (OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
+ newTailSegment * SLRU_PAGES_PER_SEGMENT))
+ {
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->headPage = -1;
+ oldSerXidControl->tailSegment = -1;
+ }
+
+ LWLockRelease(OldSerXidLock);
+
+ SimpleLruTruncate(OldSerXidSlruCtl, newTailPage);
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * InitPredicateLocks -- Initialize the predicate locking data structures.
+ *
+ * This is called from CreateSharedMemoryAndSemaphores(), which see for
+ * more comments. In the normal postmaster case, the shared hash tables
+ * are created here. Backends inherit the pointers
+ * to the shared tables via fork(). In the EXEC_BACKEND case, each
+ * backend re-executes this code to obtain pointers to the already existing
+ * shared hash tables.
+ */
+void
+InitPredicateLocks(void)
+{
+ HASHCTL info;
+ int hash_flags;
+ long init_table_size,
+ max_table_size;
+ Size requestSize;
+ bool found;
+
+ /*
+ * Compute init/max size to request for predicate lock target hashtable.
+ * Note these calculations must agree with PredicateLockShmemSize!
+ */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate hash table for PREDICATELOCKTARGET structs. This stores
+ * per-predicate-lock-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ info.entrysize = sizeof(PREDICATELOCKTARGET);
+ info.hash = tag_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /* Assume an average of 2 xacts per target */
+ max_table_size *= 2;
+ init_table_size *= 2;
+
+ /*
+ * Reserve an entry in the hash table; we use it to make sure there's
+ * always one entry available when we need to split or combine a page,
+ * because running out of space there could mean aborting a
+ * non-serializable transaction.
+ */
+ hash_search(PredicateLockTargetHash, &ReservedTargetTag,
+ HASH_ENTER, NULL);
+
+
+ /*
+ * Allocate hash table for PREDICATELOCK structs. This stores per
+ * xact-lock-of-a-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTAG);
+ info.entrysize = sizeof(PREDICATELOCK);
+ info.hash = predicatelock_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Compute init/max size to request for serializable transaction
+ * hashtable. Note these calculations must agree with
+ * PredicateLockShmemSize!
+ */
+ max_table_size = (MaxBackends + max_prepared_xacts);
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate a list to hold information on transactions participating in
+ * predicate locking.
+ *
+ * Assume an average of 10 predicate locking transactions per backend.
+ * This allows aggressive cleanup while detail is present before data must
+ * be summarized for storage in SLRU and the "dummy" transaction.
+ */
+ max_table_size *= 10;
+ init_table_size *= 10;
+
+ PredXact = ShmemInitStruct("PredXactList",
+ PredXactListDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&PredXact->availableList);
+ SHMQueueInit(&PredXact->activeList);
+ PredXact->SxactGlobalXmin = InvalidTransactionId;
+ PredXact->SxactGlobalXminCount = 0;
+ PredXact->WritableSxactCount = 0;
+ PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
+ PredXact->CanPartialClearThrough = 0;
+ PredXact->HavePartialClearedThrough = 0;
+ PredXact->NeedTargetLinkCleanup = false;
+ requestSize = mul_size((Size) max_table_size,
+ PredXactListElementDataSize);
+ PredXact->element = ShmemAlloc(requestSize);
+ if (PredXact->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "PredXactList", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(PredXact->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(PredXact->availableList),
+ &(PredXact->element[i].link));
+ }
+ PredXact->OldCommittedSxact = CreatePredXact();
+ SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
+ PredXact->OldCommittedSxact->commitSeqNo = 0;
+ PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
+ SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
+ SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
+ SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+ PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
+ PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
+ PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
+ PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
+ PredXact->OldCommittedSxact->pid = 0;
+ }
+ /* This never changes, so let's keep a local copy. */
+ OldCommittedSxact = PredXact->OldCommittedSxact;
+
+ /*
+ * Allocate hash table for SERIALIZABLEXID structs. This stores per-xid
+ * information for serializable transactions which have accessed data.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(SERIALIZABLEXIDTAG);
+ info.entrysize = sizeof(SERIALIZABLEXID);
+ info.hash = tag_hash;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION);
+
+ SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Allocate space for tracking rw-conflicts in lists attached to the
+ * transactions.
+ *
+ * Assume an average of 5 conflicts per transaction. Calculations suggest
+ * that this will prevent resource exhaustion in even the most pessimal
+ * loads up to max_connections = 200 with all 200 connections pounding the
+ * database with serializable transactions. Beyond that, there may be
+ * occassional transactions canceled when trying to flag conflicts. That's
+ * probably OK.
+ */
+ max_table_size *= 5;
+
+ RWConflictPool = ShmemInitStruct("RWConflictPool",
+ RWConflictPoolHeaderDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&RWConflictPool->availableList);
+ requestSize = mul_size((Size) max_table_size,
+ PredXactListElementDataSize);
+ RWConflictPool->element = ShmemAlloc(requestSize);
+ if (RWConflictPool->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "RWConflictPool", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(RWConflictPool->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(RWConflictPool->availableList),
+ &(RWConflictPool->element[i].outLink));
+ }
+ }
+
+ /*
+ * Create or attach to the header for the list of finished serializable
+ * transactions.
+ */
+ FinishedSerializableTransactions = (SHM_QUEUE *)
+ ShmemInitStruct("FinishedSerializableTransactions",
+ sizeof(SHM_QUEUE),
+ &found);
+ if (!found)
+ SHMQueueInit(FinishedSerializableTransactions);
+
+ /*
+ * Initialize the SLRU storage for old committed serializable
+ * transactions.
+ */
+ OldSerXidInit();
+}
+
+/*
+ * Estimate shared-memory space used for predicate lock table
+ */
+Size
+PredicateLockShmemSize(void)
+{
+ Size size = 0;
+ long max_table_size;
+
+ /* predicate lock target hash table */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCKTARGET)));
+
+ /* predicate lock hash table */
+ max_table_size *= 2;
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCK)));
+
+ /*
+ * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
+ * margin.
+ */
+ size = add_size(size, size / 10);
+
+ /* transaction list */
+ max_table_size = MaxBackends + max_prepared_xacts;
+ max_table_size *= 10;
+ size = add_size(size, PredXactListDataSize);
+ size = add_size(size, mul_size((Size) max_table_size,
+ PredXactListElementDataSize));
+
+ /* transaction xid table */
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(SERIALIZABLEXID)));
+
+ /* Head for list of finished serializable transactions. */
+ size = add_size(size, sizeof(SHM_QUEUE));
+
+ /* Shared memory structures for SLRU tracking of old committed xids. */
+ size = add_size(size, sizeof(OldSerXidControl));
+ size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
+
+ return size;
+}
+
+
+/*
+ * Compute the hash code associated with a PREDICATELOCKTAG.
+ *
+ * Because we want to use just one set of partition locks for both the
+ * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
+ * that PREDICATELOCKs fall into the same partition number as their
+ * associated PREDICATELOCKTARGETs. dynahash.c expects the partition number
+ * to be the low-order bits of the hash code, and therefore a
+ * PREDICATELOCKTAG's hash code must have the same low-order bits as the
+ * associated PREDICATELOCKTARGETTAG's hash code. We achieve this with this
+ * specialized hash function.
+ */
+static uint32
+predicatelock_hash(const void *key, Size keysize)
+{
+ const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
+ uint32 targethash;
+
+ Assert(keysize == sizeof(PREDICATELOCKTAG));
+
+ /* Look into the associated target object, and compute its hash code */
+ targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
+
+ return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
+}
+
+
+/*
+ * GetPredicateLockStatusData
+ * Return a table containing the internal state of the predicate
+ * lock manager for use in pg_lock_status.
+ *
+ * Like GetLockStatusData, this function tries to hold the partition LWLocks
+ * for as short a time as possible by returning two arrays that simply
+ * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
+ * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
+ * SERIALIZABLEXACT will likely appear.
+ */
+PredicateLockData *
+GetPredicateLockStatusData(void)
+{
+ PredicateLockData *data;
+ int i;
+ int els,
+ el;
+ HASH_SEQ_STATUS seqstat;
+ PREDICATELOCK *predlock;
+
+ data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
+
+ /*
+ * To ensure consistency, take simultaneous locks on all partition locks
+ * in ascending order, then SerializableXactHashLock.
+ */
+ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+
+ /* Get number of locks and allocate appropriately-sized arrays. */
+ els = hash_get_num_entries(PredicateLockHash);
+ data->nelements = els;
+ data->locktags = (PREDICATELOCKTARGETTAG *)
+ palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
+ data->xacts = (SERIALIZABLEXACT *)
+ palloc(sizeof(SERIALIZABLEXACT) * els);
+
+
+ /* Scan through PredicateLockHash and copy contents */
+ hash_seq_init(&seqstat, PredicateLockHash);
+
+ el = 0;
+
+ while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
+ {
+ data->locktags[el] = predlock->tag.myTarget->tag;
+ data->xacts[el] = *predlock->tag.myXact;
+ el++;
+ }
+
+ Assert(el == els);
+
+ /* Release locks in reverse order */
+ LWLockRelease(SerializableXactHashLock);
+ for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ LWLockRelease(FirstPredicateLockMgrLock + i);
+
+ return data;
+}
+
+/*
+ * Free up shared memory structures by pushing the oldest sxact (the one at
+ * the front of the SummarizeOldestCommittedSxact queue) into summary form.
+ * Each call will free exactly one SERIALIZABLEXACT structure and may also
+ * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
+ * PREDICATELOCKTARGET, RWConflictData.
+ */
+static void
+SummarizeOldestCommittedSxact(void)
+{
+ SERIALIZABLEXACT *sxact;
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+
+#ifdef TEST_OLDSERXID
+ if (SHMQueueEmpty(FinishedSerializableTransactions))
+ {
+ LWLockRelease(SerializableFinishedListLock);
+ return;
+ }
+#else
+ Assert(!SHMQueueEmpty(FinishedSerializableTransactions));
+#endif
+
+ /*
+ * Grab the first sxact off the finished list -- this will be the earliest
+ * commit. Remove it from the list.
+ */
+ sxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ FinishedSerializableTransactions,
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ SHMQueueDelete(&(sxact->finishedLink));
+
+ /* Add to SLRU summary information. */
+ if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
+ OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
+ ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
+
+ /* Summarize and release the detail. */
+ ReleaseOneSerializableXact(sxact, false, true);
+
+ LWLockRelease(SerializableFinishedListLock);
+}
+
+/*
+ * GetSafeSnapshot
+ * Obtain and register a snapshot for a READ ONLY DEFERRABLE
+ * transaction. Ensures that the snapshot is "safe", i.e. a
+ * read-only transaction running on it can execute serializably
+ * without further checks. This requires waiting for concurrent
+ * transactions to complete, and retrying with a new snapshot if
+ * one of them could possibly create a conflict.
+ */
+static Snapshot
+GetSafeSnapshot(Snapshot origSnapshot)
+{
+ Snapshot snapshot;
+
+ Assert(XactReadOnly && XactDeferrable);
+
+ while (true)
+ {
+ /*
+ * RegisterSerializableTransactionInt is going to call
+ * GetSnapshotData, so we need to provide it the static snapshot our
+ * caller passed to us. It returns a copy of that snapshot and
+ * registers it on TopTransactionResourceOwner.
+ */
+ snapshot = RegisterSerializableTransactionInt(origSnapshot);
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return snapshot; /* no concurrent r/w xacts; it's safe */
+
+ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+
+ /*
+ * Wait for concurrent transactions to finish. Stop early if one of
+ * them marked us as conflicted.
+ */
+ while (!(SHMQueueEmpty((SHM_QUEUE *)
+ &MySerializableXact->possibleUnsafeConflicts) ||
+ SxactIsROUnsafe(MySerializableXact)))
+ ProcWaitForSignal();
+
+ MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+ if (!SxactIsROUnsafe(MySerializableXact))
+ break; /* success */
+
+ /* else, need to retry... */
+ ereport(DEBUG2,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("deferrable snapshot was unsafe; trying a new one")));
+ ReleasePredicateLocks(false);
+ UnregisterSnapshotFromOwner(snapshot,
+ TopTransactionResourceOwner);
+ }
+
+ /*
+ * Now we have a safe snapshot, so we don't need to do any further checks.
+ */
+ Assert(SxactIsROSafe(MySerializableXact));
+ ReleasePredicateLocks(false);
+
+ return snapshot;
+}
+
+/*
+ * Acquire and register a snapshot which can be used for this transaction..
+ * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
+ * It should be current for this process and be contained in PredXact.
+ */
+Snapshot
+RegisterSerializableTransaction(Snapshot snapshot)
+{
+ Assert(IsolationIsSerializable());
+
+ /*
+ * A special optimization is available for SERIALIZABLE READ ONLY
+ * DEFERRABLE transactions -- we can wait for a suitable snapshot and
+ * thereby avoid all SSI overhead once it's running..
+ */
+ if (XactReadOnly && XactDeferrable)
+ return GetSafeSnapshot(snapshot);
+
+ return RegisterSerializableTransactionInt(snapshot);
+}
+
+static Snapshot
+RegisterSerializableTransactionInt(Snapshot snapshot)
+{
+ PGPROC *proc;
+ VirtualTransactionId vxid;
+ SERIALIZABLEXACT *sxact,
+ *othersxact;
+ HASHCTL hash_ctl;
+
+ /* We only do this for serializable transactions. Once. */
+ Assert(MySerializableXact == InvalidSerializableXact);
+
+ Assert(!RecoveryInProgress());
+
+ proc = MyProc;
+ Assert(proc != NULL);
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+
+ /*
+ * First we get the sxact structure, which may involve looping and access
+ * to the "finished" list to free a structure for use.
+ */
+#ifdef TEST_OLDSERXID
+ SummarizeOldestCommittedSxact();
+#endif
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ do
+ {
+ sxact = CreatePredXact();
+ /* If null, push out committed sxact to SLRU summary & retry. */
+ if (!sxact)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ SummarizeOldestCommittedSxact();
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ }
+ } while (!sxact);
+
+ /* Get and register a snapshot */
+ snapshot = GetSnapshotData(snapshot);
+ snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
+
+ /*
+ * If there are no serializable transactions which are not read-only, we
+ * can "opt out" of predicate locking and conflict checking for a
+ * read-only transaction.
+ *
+ * The reason this is safe is that a read-only transaction can only become
+ * part of a dangerous structure if it overlaps a writable transaction
+ * which in turn overlaps a writable transaction which committed before
+ * the read-only transaction started. A new writable transaction can
+ * overlap this one, but it can't meet the other condition of overlapping
+ * a transaction which committed before this one started.
+ */
+ if (XactReadOnly && PredXact->WritableSxactCount == 0)
+ {
+ ReleasePredXact(sxact);
+ LWLockRelease(SerializableXactHashLock);
+ return snapshot;
+ }
+
+ /* Maintain serializable global xmin info. */
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount == 0);
+ PredXact->SxactGlobalXmin = snapshot->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ OldSerXidSetActiveSerXmin(snapshot->xmin);
+ }
+ else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ PredXact->SxactGlobalXminCount++;
+ }
+ else
+ {
+ Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
+ }
+
+ /* Initialize the structure. */
+ sxact->vxid = vxid;
+ sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
+ sxact->commitSeqNo = InvalidSerCommitSeqNo;
+ SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflicts));
+ SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+ sxact->topXid = GetTopTransactionIdIfAny();
+ sxact->finishedBefore = InvalidTransactionId;
+ sxact->xmin = snapshot->xmin;
+ sxact->pid = MyProcPid;
+ SHMQueueInit(&(sxact->predicateLocks));
+ SHMQueueElemInit(&(sxact->finishedLink));
+ sxact->flags = 0;
+ if (XactReadOnly)
+ {
+ sxact->flags |= SXACT_FLAG_READ_ONLY;
+
+ /*
+ * Register all concurrent r/w transactions as possible conflicts; if
+ * all of them commit without any outgoing conflicts to earlier
+ * transactions then this snapshot can be deemed safe (and we can run
+ * without tracking predicate locks).
+ */
+ for (othersxact = FirstPredXact();
+ othersxact != NULL;
+ othersxact = NextPredXact(othersxact))
+ {
+ if (!SxactIsOnFinishedList(othersxact) &&
+ !SxactIsReadOnly(othersxact))
+ {
+ SetPossibleUnsafeConflict(sxact, othersxact);
+ }
+ }
+ }
+ else
+ {
+ ++(PredXact->WritableSxactCount);
+ Assert(PredXact->WritableSxactCount <=
+ (MaxBackends + max_prepared_xacts));
+ }
+
+ MySerializableXact = sxact;
+
+ LWLockRelease(SerializableXactHashLock);
+
+ /* Initialize the backend-local hash table of parent locks */
+ Assert(LocalPredicateLockHash == NULL);
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
+ hash_ctl.hash = tag_hash;
+ LocalPredicateLockHash = hash_create("Local predicate lock",
+ max_predicate_locks_per_xact,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+
+ return snapshot;
+}
+
+/*
+ * Register the top level XID in SerializableXidHash.
+ * Also store it for easy reference in MySerializableXact.
+ */
+void
+RegisterPredicateLockingXid(const TransactionId xid)
+{
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ bool found;
+
+ /*
+ * If we're not tracking predicate lock data for this transaction, we
+ * should ignore the request and return quickly.
+ */
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ /* This should only be done once per transaction. */
+ Assert(MySerializableXact->topXid == InvalidTransactionId);
+
+ /* We should have a valid XID and be at the top level. */
+ Assert(TransactionIdIsValid(xid));
+
+ MySerializableXact->topXid = xid;
+
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
+ &sxidtag,
+ HASH_ENTER, &found);
+ if (!sxid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ Assert(!found);
+
+ /* Initialize the structure. */
+ sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
+ LWLockRelease(SerializableXactHashLock);
+}
+
+
+/*
+ * Check whether there are any predicate locks held by any transaction
+ * for the page at the given block number.
+ *
+ * Note that the transaction may be completed but not yet subject to
+ * cleanup due to overlapping serializable transactions. This must
+ * return valid information regardless of transaction isolation level.
+ *
+ * Also note that this doesn't check for a conflicting relation lock,
+ * just a lock specifically on the given page.
+ *
+ * One use is to support proper behavior during GiST index vacuum.
+ */
+bool
+PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
+{
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &targettag, targettaghash,
+ HASH_FIND, NULL);
+ LWLockRelease(partitionLock);
+
+ return (target != NULL);
+}
+
+
+/*
+ * Check whether a particular lock is held by this transaction.
+ *
+ * Important note: this function may return false even if the lock is
+ * being held, because it uses the local lock table which is not
+ * updated if another transaction modifies our lock list (e.g. to
+ * split an index page). However, it will never return true if the
+ * lock is not held. We only use this function in circumstances where
+ * such false negatives are acceptable.
+ */
+static bool
+PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
+{
+ LOCALPREDICATELOCK *lock;
+
+ /* check local hash table */
+ lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ targettag,
+ HASH_FIND, NULL);
+
+ if (!lock)
+ return false;
+
+ /*
+ * Found entry in the table, but still need to check whether it's actually
+ * held -- it could just be a parent of some held lock.
+ */
+ return lock->held;
+}
+
+/*
+ * Return the parent lock tag in the lock hierarchy: the next coarser
+ * lock that covers the provided tag.
+ *
+ * Returns true and sets *parent to the parent tag if one exists,
+ * returns false if none exists.
+ */
+static bool
+GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent)
+{
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ /* relation locks have no parent lock */
+ return false;
+
+ case PREDLOCKTAG_PAGE:
+ /* parent lock is relation lock */
+ SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
+
+ return true;
+
+ case PREDLOCKTAG_TUPLE:
+ /* parent lock is page lock */
+ SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
+ GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
+ return true;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return false;
+}
+
+/*
+ * Check whether the lock we are considering is already covered by a
+ * coarser lock for our transaction.
+ */
+static bool
+CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
+{
+ PREDICATELOCKTARGETTAG targettag,
+ parenttag;
+
+ targettag = *newtargettag;
+
+ /* check parents iteratively until no more */
+ while (GetParentPredicateLockTag(&targettag, &parenttag))
+ {
+ targettag = parenttag;
+ if (PredicateLockExists(&targettag))
+ return true;
+ }
+
+ /* no more parents to check; lock is not covered */
+ return false;
+}
+
+/*
+ * Check whether both the list of related predicate locks and the pointer to
+ * a prior version of the row (if this is a tuple lock target) are empty for
+ * a predicate lock target, and remove the target if they are.
+ */
+static void
+RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
+{
+ PREDICATELOCKTARGET *rmtarget;
+ PREDICATELOCKTARGET *next;
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+
+ /* Can't remove it until no locks at this target. */
+ if (!SHMQueueEmpty(&target->predicateLocks))
+ return;
+
+ /* Can't remove it if there are locks for a prior row version. */
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
+ if (target->priorVersionOfRow != NULL)
+ {
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ return;
+ }
+
+ /*
+ * We are going to release this target, This requires that we let the
+ * next version of the row (if any) know that it's previous version is
+ * done.
+ *
+ * It might be that the link was all that was keeping the other target
+ * from cleanup, but we can't clean that up here -- LW locking is all
+ * wrong for that. We'll pass the HTAB in the general cleanup function to
+ * get rid of such "dead" targets.
+ */
+ next = target->nextVersionOfRow;
+ if (next != NULL)
+ {
+ next->priorVersionOfRow = NULL;
+ if (SHMQueueEmpty(&next->predicateLocks))
+ PredXact->NeedTargetLinkCleanup = true;
+ }
+ LWLockRelease(PredicateLockNextRowLinkLock);
+
+ /* Actually remove the target. */
+ rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &target->tag,
+ targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmtarget == target);
+}
+
+/*
+ * Delete child target locks owned by this process.
+ * This implementation is assuming that the usage of each target tag field
+ * is uniform. No need to make this hard if we don't have to.
+ *
+ * We aren't acquiring lightweight locks for the predicate lock or lock
+ * target structures associated with this transaction unless we're going
+ * to modify them, because no other process is permitted to modify our
+ * locks.
+ */
+static void
+DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
+{
+ SERIALIZABLEXACT *sxact;
+ PREDICATELOCK *predlock;
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ sxact = (SERIALIZABLEXACT *) MySerializableXact;
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ SHM_QUEUE *predlocksxactlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG oldlocktag;
+ PREDICATELOCKTARGET *oldtarget;
+ PREDICATELOCKTARGETTAG oldtargettag;
+
+ predlocksxactlink = &(predlock->xactLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ predlocksxactlink,
+ offsetof(PREDICATELOCK, xactLink));
+
+ oldlocktag = predlock->tag;
+ Assert(oldlocktag.myXact == sxact);
+ oldtarget = oldlocktag.myTarget;
+ oldtargettag = oldtarget->tag;
+
+ if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
+ {
+ uint32 oldtargettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCK *rmpredlock;
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(predlocksxactlink);
+ SHMQueueDelete(&(predlock->targetLink));
+ rmpredlock = hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
+ oldtargettaghash),
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
+
+ LWLockRelease(partitionLock);
+
+ DecrementParentLocks(&oldtargettag);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Returns the promotion threshold for a given predicate lock
+ * target. This is the number of descendant locks required to promote
+ * to the specified tag. Note that the threshold includes non-direct
+ * descendants, e.g. both tuples and pages for a relation lock.
+ *
+ * TODO SSI: We should do something more intelligent about what the
+ * thresholds are, either making it proportional to the number of
+ * tuples in a page & pages in a relation, or at least making it a
+ * GUC. Currently the threshold is 3 for a page lock, and
+ * max_predicate_locks_per_transaction/2 for a relation lock, chosen
+ * entirely arbitrarily (and without benchmarking).
+ */
+static int
+PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
+{
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ return max_predicate_locks_per_xact / 2;
+
+ case PREDLOCKTAG_PAGE:
+ return 3;
+
+ case PREDLOCKTAG_TUPLE:
+
+ /*
+ * not reachable: nothing is finer-granularity than a tuple, so we
+ * should never try to promote to it.
+ */
+ Assert(false);
+ return 0;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return 0;
+}
+
+/*
+ * For all ancestors of a newly-acquired predicate lock, increment
+ * their child count in the parent hash table. If any of them have
+ * more descendants than their promotion threshold, acquire the
+ * coarsest such lock.
+ *
+ * Returns true if a parent lock was acquired and false otherwise.
+ */
+static bool
+CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
+{
+ PREDICATELOCKTARGETTAG targettag,
+ nexttag,
+ promotiontag;
+ LOCALPREDICATELOCK *parentlock;
+ bool found,
+ promote;
+
+ promote = false;
+
+ targettag = *reqtag;
+
+ /* check parents iteratively */
+ while (GetParentPredicateLockTag(&targettag, &nexttag))
+ {
+ targettag = nexttag;
+ parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ &targettag,
+ HASH_ENTER,
+ &found);
+ if (!found)
+ {
+ parentlock->held = false;
+ parentlock->childLocks = 1;
+ }
+ else
+ parentlock->childLocks++;
+
+ if (parentlock->childLocks >=
+ PredicateLockPromotionThreshold(&targettag))
+ {
+ /*
+ * We should promote to this parent lock. Continue to check its
+ * ancestors, however, both to get their child counts right and to
+ * check whether we should just go ahead and promote to one of
+ * them.
+ */
+ promotiontag = targettag;
+ promote = true;
+ }
+ }
+
+ if (promote)
+ {
+ /* acquire coarsest ancestor eligible for promotion */
+ PredicateLockAcquire(&promotiontag);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*
+ * When releasing a lock, decrement the child count on all ancestor
+ * locks.
+ *
+ * This is called only when releasing a lock via
+ * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
+ * we've acquired its parent, possibly due to promotion) or when a new
+ * MVCC write lock makes the predicate lock unnecessary. There's no
+ * point in calling it when locks are released at transaction end, as
+ * this information is no longer needed.
+ */
+static void
+DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
+{
+ PREDICATELOCKTARGETTAG parenttag,
+ nexttag;
+
+ parenttag = *targettag;
+
+ while (GetParentPredicateLockTag(&parenttag, &nexttag))
+ {
+ uint32 targettaghash;
+ LOCALPREDICATELOCK *parentlock,
+ *rmlock;
+
+ parenttag = nexttag;
+ targettaghash = PredicateLockTargetTagHashCode(&parenttag);
+ parentlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_FIND, NULL);
+
+ /*
+ * There's a small chance the parent lock doesn't exist in the lock
+ * table. This can happen if we prematurely removed it because an
+ * index split caused the child refcount to be off.
+ */
+ if (parentlock == NULL)
+ continue;
+
+ parentlock->childLocks--;
+
+ /*
+ * Under similar circumstances the parent lock's refcount might be
+ * zero. This only happens if we're holding that lock (otherwise we
+ * would have removed the entry).
+ */
+ if (parentlock->childLocks < 0)
+ {
+ Assert(parentlock->held);
+ parentlock->childLocks = 0;
+ }
+
+ if ((parentlock->childLocks == 0) && (!parentlock->held))
+ {
+ rmlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlock == parentlock);
+ }
+ }
+}
+
+/*
+ * Indicate that a predicate lock on the given target is held by the
+ * specified transaction. Has no effect if the lock is already held.
+ *
+ * This updates the lock table and the sxact's lock list, and creates
+ * the lock target if necessary, but does *not* do anything related to
+ * granularity promotion or the local lock table. See
+ * PredicateLockAcquire for that.
+ */
+static void
+CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
+ uint32 targettaghash,
+ SERIALIZABLEXACT *sxact)
+{
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTAG locktag;
+ PREDICATELOCK *lock;
+ LWLockId partitionLock;
+ bool found;
+
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ /* Make sure that the target is represented. */
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ if (!target)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ {
+ SHMQueueInit(&(target->predicateLocks));
+ target->priorVersionOfRow = NULL;
+ target->nextVersionOfRow = NULL;
+ }
+
+ /* We've got the sxact and target, make sure they're joined. */
+ locktag.myTarget = target;
+ locktag.myXact = sxact;
+ lock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash, &locktag,
+ PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
+ HASH_ENTER, &found);
+ if (!lock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ if (!found)
+ {
+ SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
+ SHMQueueInsertBefore(&(sxact->predicateLocks),
+ &(lock->xactLink));
+ lock->commitSeqNo = 0;
+ }
+
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Acquire a predicate lock on the specified target for the current
+ * connection if not already held. This updates the local lock table
+ * and uses it to implement granularity promotion. It will consolidate
+ * multiple locks into a coarser lock if warranted, and will release
+ * any finer-grained locks covered by the new one.
+ */
+static void
+PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
+{
+ uint32 targettaghash;
+ bool found;
+ LOCALPREDICATELOCK *locallock;
+
+ /* Do we have the lock already, or a covering lock? */
+ if (PredicateLockExists(targettag))
+ return;
+
+ if (CoarserLockCovers(targettag))
+ return;
+
+ /* the same hash and LW lock apply to the lock target and the local lock. */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+
+ /* Acquire lock in local table */
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ /* We should not hold the lock (but its entry might still exist) */
+ Assert(!found || !locallock->held);
+ locallock->held = true;
+ if (!found)
+ locallock->childLocks = 0;
+
+ /* Actually create the lock */
+ CreatePredicateLock(targettag, targettaghash,
+ (SERIALIZABLEXACT *) MySerializableXact);
+
+ /*
+ * Lock has been acquired. Check whether it should be promoted to a
+ * coarser granularity, or whether there are finer-granularity locks to
+ * clean up.
+ */
+ if (CheckAndPromotePredicateLockRequest(targettag))
+ {
+ /*
+ * Lock request was promoted to a coarser-granularity lock, and that
+ * lock was acquired. It will delete this lock and any of its
+ * children, so we're done.
+ */
+ }
+ else
+ {
+ /* Clean up any finer-granularity locks */
+ if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
+ DeleteChildTargetLocks(targettag);
+ }
+}
+
+
+/*
+ * PredicateLockRelation
+ *
+ * Gets a predicate lock at the relation level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+void
+PredicateLockRelation(const Relation relation)
+{
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * PredicateLockPage
+ *
+ * Gets a predicate lock at the page level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Skip if a coarser predicate lock already covers this page.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+void
+PredicateLockPage(const Relation relation, const BlockNumber blkno)
+{
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * PredicateLockTuple
+ *
+ * Gets a predicate lock at the tuple level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ */
+void
+PredicateLockTuple(const Relation relation, const HeapTuple tuple)
+{
+ PREDICATELOCKTARGETTAG tag;
+ ItemPointer tid;
+
+ if (SkipSerialization(relation))
+ return;
+
+ /*
+ * If it's a heap tuple, return if this xact wrote it.
+ */
+ if (relation->rd_index == NULL)
+ {
+ TransactionId myxid = GetTopTransactionIdIfAny();
+
+ if (TransactionIdIsValid(myxid))
+ {
+ TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data);
+
+ if (TransactionIdFollowsOrEquals(xid, TransactionXmin))
+ {
+ xid = SubTransGetTopmostTransaction(xid);
+ if (TransactionIdEquals(xid, myxid))
+ {
+ /* We wrote it; we already have a write lock. */
+ return;
+ }
+ }
+ }
+ }
+
+ /*
+ * Do quick-but-not-definitive test for a relation lock first. This will
+ * never cause a return when the relation is *not* locked, but will
+ * occasionally let the check continue when there really *is* a relation
+ * level lock.
+ */
+ SET_PREDICATELOCKTARGETTAG_RELATION(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ if (PredicateLockExists(&tag))
+ return;
+
+ tid = &(tuple->t_self);
+ SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid));
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * If the old tuple has any predicate locks, create a lock target for the
+ * new tuple and point them at each other. Conflict detection needs to
+ * look for locks against prior versions of the row.
+ */
+void
+PredicateLockTupleRowVersionLink(const Relation relation,
+ const HeapTuple oldTuple,
+ const HeapTuple newTuple)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ PREDICATELOCKTARGET *oldtarget;
+ PREDICATELOCKTARGET *newtarget;
+ PREDICATELOCKTARGET *next;
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+ bool found;
+
+ SET_PREDICATELOCKTARGETTAG_TUPLE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(oldTuple->t_self)),
+ ItemPointerGetOffsetNumber(&(oldTuple->t_self)));
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+
+ SET_PREDICATELOCKTARGETTAG_TUPLE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(newTuple->t_self)),
+ ItemPointerGetOffsetNumber(&(newTuple->t_self)));
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ /* Lock lower numbered partition first. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (newpartitionLock < oldpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ oldtarget = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag, oldtargettaghash,
+ HASH_FIND, NULL);
+
+ /* Only need to link if there is an old target already. */
+ if (oldtarget)
+ {
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
+
+ /* Guard against stale pointers from rollback. */
+ next = oldtarget->nextVersionOfRow;
+ if (next != NULL)
+ {
+ next->priorVersionOfRow = NULL;
+ oldtarget->nextVersionOfRow = NULL;
+ }
+
+ /* Find or create the new target, and link old and new. */
+ newtarget = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag, newtargettaghash,
+ HASH_ENTER, &found);
+ if (!newtarget)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ {
+ SHMQueueInit(&(newtarget->predicateLocks));
+ newtarget->nextVersionOfRow = NULL;
+ }
+ else
+ Assert(newtarget->priorVersionOfRow == NULL);
+
+ newtarget->priorVersionOfRow = oldtarget;
+ oldtarget->nextVersionOfRow = newtarget;
+
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ }
+
+ /* Release lower number partition last. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (newpartitionLock < oldpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+}
+
+
+/*
+ * DeleteLockTarget
+ *
+ * Remove a predicate lock target along with any locks held for it.
+ *
+ * Caller must hold SerializablePredicateLockListLock and the
+ * appropriate hash partition lock for the target.
+ */
+static void
+DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
+{
+ PREDICATELOCK *predlock;
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ bool found;
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (predlock)
+ {
+ predlocktargetlink = &(predlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+
+ SHMQueueDelete(&(predlock->xactLink));
+ SHMQueueDelete(&(predlock->targetLink));
+
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &predlock->tag,
+ PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
+ targettaghash),
+ HASH_REMOVE, &found);
+ Assert(found);
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ /* Remove the target itself, if possible. */
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+}
+
+
+/*
+ * TransferPredicateLocksToNewTarget
+ *
+ * Move or copy all the predicate locks for a lock target, for use by
+ * index page splits/combines and other things that create or replace
+ * lock targets. If 'removeOld' is true, the old locks and the target
+ * will be removed.
+ *
+ * Returns true on success, or false if we ran out of shared memory to
+ * allocate the new target or locks. Guaranteed to always succeed if
+ * removeOld is set (by using the reserved entry in
+ * PredicateLockTargetHash for scratch space).
+ *
+ * Caller must hold SerializablePredicateLockListLock.
+ */
+static bool
+TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
+ const PREDICATELOCKTARGETTAG newtargettag,
+ bool removeOld)
+{
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ PREDICATELOCKTARGET *oldtarget;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+ bool found;
+ bool outOfShmem = false;
+ uint32 reservedtargettaghash;
+ LWLockId reservedpartitionLock;
+
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ reservedtargettaghash = 0; /* Quiet compiler warnings. */
+ reservedpartitionLock = 0; /* Quiet compiler warnings. */
+
+ if (removeOld)
+ {
+ /*
+ * Remove the reserved entry to give us scratch space, so we know
+ * we'll be able to create the new lock target.
+ */
+ reservedtargettaghash = PredicateLockTargetTagHashCode(&ReservedTargetTag);
+ reservedpartitionLock = PredicateLockHashPartitionLock(reservedtargettaghash);
+ LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &ReservedTargetTag,
+ reservedtargettaghash,
+ HASH_REMOVE, &found);
+ Assert(found);
+ LWLockRelease(reservedpartitionLock);
+ }
+
+ /*
+ * We must get the partition locks in ascending sequence to avoid
+ * deadlocks. If old and new partitions are the same, we must request the
+ * lock only once.
+ */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock,
+ (removeOld ? LW_EXCLUSIVE : LW_SHARED));
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock,
+ (removeOld ? LW_EXCLUSIVE : LW_SHARED));
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ /*
+ * Look for the old target. If not found, that's OK; no predicate locks
+ * are affected, so we can just clean up and return. If it does exist,
+ * walk its list of predicate locks and move or copy them to the new
+ * target.
+ */
+ oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_FIND, NULL);
+
+ if (oldtarget)
+ {
+ PREDICATELOCKTARGET *newtarget;
+ PREDICATELOCK *oldpredlock;
+ PREDICATELOCKTAG newpredlocktag;
+
+ newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag,
+ newtargettaghash,
+ HASH_ENTER_NULL, &found);
+
+ if (!newtarget)
+ {
+ /* Failed to allocate due to insufficient shmem */
+ outOfShmem = true;
+ goto exit;
+ }
+
+ /* If we created a new entry, initialize it */
+ if (!found)
+ {
+ SHMQueueInit(&(newtarget->predicateLocks));
+ newpredlocktag.myTarget = newtarget;
+ }
+
+ oldpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ &(oldtarget->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (oldpredlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *newpredlock;
+
+ predlocktargetlink = &(oldpredlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+ newpredlocktag.myXact = oldpredlock->tag.myXact;
+
+ if (removeOld)
+ {
+ SHMQueueDelete(&(oldpredlock->xactLink));
+ SHMQueueDelete(&(oldpredlock->targetLink));
+
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldpredlock->tag,
+ PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
+ oldtargettaghash),
+ HASH_REMOVE, &found);
+ Assert(found);
+ }
+
+
+ newpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &newpredlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
+ newtargettaghash),
+ HASH_ENTER_NULL, &found);
+ if (!newpredlock)
+ {
+ /* Out of shared memory. Undo what we've done so far. */
+ LWLockRelease(SerializableXactHashLock);
+ DeleteLockTarget(newtarget, newtargettaghash);
+ outOfShmem = true;
+ goto exit;
+ }
+ SHMQueueInsertBefore(&(newtarget->predicateLocks),
+ &(newpredlock->targetLink));
+ SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
+
+ oldpredlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ if (removeOld)
+ {
+ Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
+ RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
+ }
+ }
+
+
+exit:
+ /* Release partition locks in reverse order of acquisition. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+
+ if (removeOld)
+ {
+ /* We shouldn't run out of memory if we're moving locks */
+ Assert(!outOfShmem);
+
+ /* Put the reserved entry back */
+ LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &ReservedTargetTag,
+ reservedtargettaghash,
+ HASH_ENTER, &found);
+ Assert(!found);
+ LWLockRelease(reservedpartitionLock);
+ }
+
+ return !outOfShmem;
+}
+
+
+/*
+ * PredicateLockPageSplit
+ *
+ * Copies any predicate locks for the old page to the new page.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page split (or overflow) affects all serializable transactions,
+ * even if it occurs in the context of another transaction isolation level.
+ *
+ * NOTE: This currently leaves the local copy of the locks without
+ * information on the new lock which is in shared memory. This could cause
+ * problems if enough page splits occur on locked pages without the processes
+ * which hold the locks getting in and noticing.
+ */
+void
+PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ bool success;
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /*
+ * Try copying the locks over to the new page's tag, creating it if
+ * necessary.
+ */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ false);
+
+ if (!success)
+ {
+ /*
+ * No more predicate lock entries are available. Failure isn't an
+ * option here, so promote the page lock to a relation lock.
+ */
+
+ /* Get the parent relation lock's lock tag */
+ success = GetParentPredicateLockTag(&oldtargettag,
+ &newtargettag);
+ Assert(success);
+
+ /* Move the locks to the parent. This shouldn't fail. */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ true);
+ Assert(success);
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * PredicateLockPageCombine
+ *
+ * Combines predicate locks for two existing pages.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page combine affects all serializable transactions, even if it
+ * occurs in the context of another transaction isolation level.
+ */
+void
+PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ bool success;
+
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /* Move the locks. This shouldn't fail. */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ true);
+ Assert(success);
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Walk the hash table and find the new xmin.
+ */
+static void
+SetNewSxactGlobalXmin(void)
+{
+ SERIALIZABLEXACT *sxact;
+
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ PredXact->SxactGlobalXmin = InvalidTransactionId;
+ PredXact->SxactGlobalXminCount = 0;
+
+ for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
+ {
+ if (!SxactIsRolledBack(sxact)
+ && !SxactIsCommitted(sxact)
+ && sxact != OldCommittedSxact)
+ {
+ Assert(sxact->xmin != InvalidTransactionId);
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
+ || TransactionIdPrecedes(sxact->xmin,
+ PredXact->SxactGlobalXmin))
+ {
+ PredXact->SxactGlobalXmin = sxact->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ }
+ else if (TransactionIdEquals(sxact->xmin,
+ PredXact->SxactGlobalXmin))
+ PredXact->SxactGlobalXminCount++;
+ }
+ }
+
+ OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
+}
+
+/*
+ * ReleasePredicateLocks
+ *
+ * Releases predicate locks based on completion of the current transaction,
+ * whether committed or rolled back. It can also be called for a read only
+ * transaction when it becomes impossible for the transaction to become
+ * part of a dangerous structure.
+ *
+ * We do nothing unless this is a serializable transaction.
+ *
+ * This method must ensure that shared memory hash tables are cleaned
+ * up in some relatively timely fashion.
+ *
+ * If this transaction is committing and is holding any predicate locks,
+ * it must be added to a list of completed serializable transaction still
+ * holding locks.
+ */
+void
+ReleasePredicateLocks(const bool isCommit)
+{
+ bool needToClear;
+ RWConflict conflict,
+ nextConflict,
+ possibleUnsafeConflict;
+ SERIALIZABLEXACT *roXact;
+
+ /*
+ * We can't trust XactReadOnly here, because a transaction which started
+ * as READ WRITE can show as READ ONLY later, e.g., within
+ * substransactions. We want to flag a transaction as READ ONLY if it
+ * commits without writing so that de facto READ ONLY transactions get the
+ * benefit of some RO optimizations, so we will use this local variable to
+ * get some cleanup logic right which is based on whether the transaction
+ * was declared READ ONLY at the top level.
+ */
+ bool topLevelIsDeclaredReadOnly;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ {
+ Assert(LocalPredicateLockHash == NULL);
+ return;
+ }
+
+ Assert(!isCommit || SxactIsPrepared(MySerializableXact));
+ Assert(!SxactIsRolledBack(MySerializableXact));
+ Assert(!SxactIsCommitted(MySerializableXact));
+
+ /* may not be serializable during COMMIT/ROLLBACK PREPARED */
+ if (MySerializableXact->pid != 0)
+ Assert(IsolationIsSerializable());
+
+ /* We'd better not already be on the cleanup list. */
+ Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
+
+ topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * We don't hold a lock here, assuming that TransactionId is atomic!
+ *
+ * If this value is changing, we don't care that much whether we get the
+ * old or new value -- it is just used to determine how far
+ * GlobalSerizableXmin must advance before this transaction can be cleaned
+ * fully cleaned up. The worst that could happen is we wait for ome more
+ * transaction to complete before freeing some RAM; correctness of visible
+ * behavior is not affected.
+ */
+ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
+
+ /*
+ * If it's not a commit it's a rollback, and we can clear our locks
+ * immediately.
+ */
+ if (isCommit)
+ {
+ MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
+ MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+ /* Recognize implicit read-only transaction (commit without write). */
+ if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
+ MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
+ }
+ else
+ {
+ MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
+ }
+
+ if (!topLevelIsDeclaredReadOnly)
+ {
+ Assert(PredXact->WritableSxactCount > 0);
+ if (--(PredXact->WritableSxactCount) == 0)
+ {
+ /*
+ * Release predicate locks and rw-conflicts in for all committed
+ * transactions. There are no longer any transactions which might
+ * conflict with the locks and no chance for new transactions to
+ * overlap. Similarly, existing conflicts in can't cause pivots,
+ * and any conflicts in which could have completed a dangerous
+ * structure would already have caused a rollback, so any
+ * remaining ones must be benign.
+ */
+ PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
+ }
+ }
+ else
+ {
+ /*
+ * Read-only transactions: clear the list of transactions that might
+ * make us unsafe. Note that we use 'inLink' for the iteration as
+ * opposed to 'outLink' for the r/w xacts.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
+
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+
+ /* Check for conflict out to old committed transactions. */
+ if (isCommit
+ && !SxactIsReadOnly(MySerializableXact)
+ && SxactHasSummaryConflictOut(MySerializableXact))
+ {
+ MySerializableXact->SeqNo.earliestOutConflictCommit =
+ FirstNormalSerCommitSeqNo;
+ MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
+ }
+
+ /*
+ * Release all outConflicts to committed transactions. If we're rolling
+ * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
+ * previously committed transactions.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ (SHM_QUEUE *) &MySerializableXact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ if (isCommit
+ && !SxactIsReadOnly(MySerializableXact)
+ && SxactIsCommitted(conflict->sxactIn))
+ {
+ if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
+ || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
+ MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
+ MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
+ }
+
+ if (!isCommit
+ || SxactIsCommitted(conflict->sxactIn)
+ || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+
+ /*
+ * Release all inConflicts from committed and read-only transactions. If
+ * we're rolling back, clear them all.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ if (!isCommit
+ || SxactIsCommitted(conflict->sxactOut)
+ || SxactIsReadOnly(conflict->sxactOut))
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+
+ if (!topLevelIsDeclaredReadOnly)
+ {
+ /*
+ * Remove ourselves from the list of possible conflicts for concurrent
+ * READ ONLY transactions, flagging them as unsafe if we have a
+ * conflict out. If any are waiting DEFERRABLE transactions, wake them
+ * up if they are known safe or known unsafe.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, outLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ roXact = possibleUnsafeConflict->sxactIn;
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
+ Assert(SxactIsReadOnly(roXact));
+
+ /* Mark conflicted if necessary. */
+ if (isCommit
+ && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE)
+ && SxactHasConflictOut(MySerializableXact)
+ && (MySerializableXact->SeqNo.earliestOutConflictCommit
+ <= roXact->SeqNo.lastCommitBeforeSnapshot))
+ {
+ /*
+ * This releases possibleUnsafeConflict (as well as all other
+ * possible conflicts for roXact)
+ */
+ FlagSxactUnsafe(roXact);
+ }
+ else
+ {
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ /*
+ * If we were the last possible conflict, flag it safe. The
+ * transaction can now safely release its predicate locks (but
+ * that transaction's backend has to do that itself).
+ */
+ if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ roXact->flags |= SXACT_FLAG_RO_SAFE;
+ }
+
+ /*
+ * Wake up the process for a waiting DEFERRABLE transaction if we
+ * now know it's either safe or conflicted.
+ */
+ if (SxactIsDeferrableWaiting(roXact) &&
+ (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+ ProcSendSignal(roXact->pid);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+
+ /*
+ * Check whether it's time to clean up old transactions. This can only be
+ * done when the last serializable transaction with the oldest xmin among
+ * serializable transactions completes. We then find the "new oldest"
+ * xmin and purge any transactions which finished before this transaction
+ * was launched.
+ */
+ needToClear = false;
+ if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ if (--(PredXact->SxactGlobalXminCount) == 0)
+ {
+ SetNewSxactGlobalXmin();
+ needToClear = true;
+ }
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+
+ /* Add this to the list of transactions to check for later cleanup. */
+ if (isCommit)
+ SHMQueueInsertBefore(FinishedSerializableTransactions,
+ (SHM_QUEUE *) &(MySerializableXact->finishedLink));
+
+ if (!isCommit)
+ ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact,
+ false, false);
+
+ LWLockRelease(SerializableFinishedListLock);
+
+ if (needToClear)
+ ClearOldPredicateLocks();
+
+ MySerializableXact = InvalidSerializableXact;
+
+ /* Delete per-transaction lock table */
+ if (LocalPredicateLockHash != NULL)
+ {
+ hash_destroy(LocalPredicateLockHash);
+ LocalPredicateLockHash = NULL;
+ }
+}
+
+/*
+ * ReleasePredicateLocksIfROSafe
+ * Check if the current transaction is read only and operating on
+ * a safe snapshot. If so, release predicate locks and return
+ * true.
+ *
+ * A transaction is flagged as RO_SAFE if all concurrent R/W
+ * transactions commit without having conflicts out to an earlier
+ * snapshot, thus ensuring that no conflicts are possible for this
+ * transaction. Thus, we call this function as part of the
+ * SkipSerialization check on all public interface methods.
+ */
+static bool
+ReleasePredicateLocksIfROSafe(void)
+{
+ if (SxactIsROSafe(MySerializableXact))
+ {
+ ReleasePredicateLocks(false);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*
+ * Clear old predicate locks.
+ */
+static void
+ClearOldPredicateLocks(void)
+{
+ SERIALIZABLEXACT *finishedSxact;
+ PREDICATELOCK *predlock;
+ int i;
+ HASH_SEQ_STATUS seqstat;
+ PREDICATELOCKTARGET *locktarget;
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+ finishedSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ FinishedSerializableTransactions,
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (finishedSxact)
+ {
+ SERIALIZABLEXACT *nextSxact;
+
+ nextSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ &(finishedSxact->finishedLink),
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
+ || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
+ PredXact->SxactGlobalXmin))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ SHMQueueDelete(&(finishedSxact->finishedLink));
+ ReleaseOneSerializableXact(finishedSxact, false, false);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
+ && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ReleaseOneSerializableXact(finishedSxact,
+ !SxactIsReadOnly(finishedSxact),
+ false);
+ PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ break;
+ finishedSxact = nextSxact;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ /*
+ * Loop through predicate locks on dummy transaction for summarized data.
+ */
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&OldCommittedSxact->predicateLocks,
+ &OldCommittedSxact->predicateLocks,
+ offsetof(PREDICATELOCK, xactLink));
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ while (predlock)
+ {
+ PREDICATELOCK *nextpredlock;
+ bool canDoPartialCleanup;
+
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&OldCommittedSxact->predicateLocks,
+ &predlock->xactLink,
+ offsetof(PREDICATELOCK, xactLink));
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
+ LWLockRelease(SerializableXactHashLock);
+
+ if (canDoPartialCleanup)
+ {
+ PREDICATELOCKTAG tag;
+ SHM_QUEUE *targetLink;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+
+ tag = predlock->tag;
+ targetLink = &(predlock->targetLink);
+ target = tag.myTarget;
+ targettag = target->tag;
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(targetLink);
+ SHMQueueDelete(&(predlock->xactLink));
+
+ hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_REMOVE, NULL);
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(partitionLock);
+ }
+
+ predlock = nextpredlock;
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+ LWLockRelease(SerializableFinishedListLock);
+
+ if (!PredXact->NeedTargetLinkCleanup)
+ return;
+
+ /*
+ * Clean up any targets which were disconnected from a prior version with
+ * no predicate locks attached.
+ */
+ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
+
+ hash_seq_init(&seqstat, PredicateLockTargetHash);
+ while ((locktarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
+ {
+ if (SHMQueueEmpty(&locktarget->predicateLocks)
+ && locktarget->priorVersionOfRow == NULL
+ && locktarget->nextVersionOfRow == NULL)
+ {
+ hash_search(PredicateLockTargetHash, &locktarget->tag,
+ HASH_REMOVE, NULL);
+ }
+ }
+
+ PredXact->NeedTargetLinkCleanup = false;
+
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ LWLockRelease(FirstPredicateLockMgrLock + i);
+}
+
+/*
+ * This is the normal way to delete anything from any of the predicate
+ * locking hash tables. Given a transaction which we know can be deleted:
+ * delete all predicate locks held by that transaction and any predicate
+ * lock targets which are now unreferenced by a lock; delete all conflicts
+ * for the transaction; delete all xid values for the transaction; then
+ * delete the transaction.
+ *
+ * When the partial flag is set, we can release all predicate locks and
+ * out-conflict information -- we've established that there are no longer
+ * any overlapping read write transactions for which this transaction could
+ * matter.
+ *
+ * When the summarize flag is set, we've run short of room for sxact data
+ * and must summarize to the SLRU. Predicate locks are transferred to a
+ * dummy "old" transaction, with duplicate locks on a single target
+ * collapsing to a single lock with the "latest" commitSeqNo from among
+ * the conflicting locks..
+ */
+static void
+ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
+ bool summarize)
+{
+ PREDICATELOCK *predlock;
+ SERIALIZABLEXIDTAG sxidtag;
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(sxact != NULL);
+ Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
+ Assert(LWLockHeldByMe(SerializableFinishedListLock));
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG tag;
+ SHM_QUEUE *targetLink;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(predlock->xactLink),
+ offsetof(PREDICATELOCK, xactLink));
+
+ tag = predlock->tag;
+ targetLink = &(predlock->targetLink);
+ target = tag.myTarget;
+ targettag = target->tag;
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(targetLink);
+
+ hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_REMOVE, NULL);
+ if (summarize)
+ {
+ bool found;
+
+ /* Fold into dummy transaction list. */
+ tag.myXact = OldCommittedSxact;
+ predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_ENTER, &found);
+ if (!predlock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (found)
+ {
+ if (predlock->commitSeqNo < sxact->commitSeqNo)
+ predlock->commitSeqNo = sxact->commitSeqNo;
+ }
+ else
+ {
+ SHMQueueInsertBefore(&(target->predicateLocks),
+ &(predlock->targetLink));
+ SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
+ &(predlock->xactLink));
+ predlock->commitSeqNo = sxact->commitSeqNo;
+ }
+ }
+ else
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(partitionLock);
+
+ predlock = nextpredlock;
+ }
+
+ /*
+ * Rather than retail removal, just re-init the head after we've run
+ * through the list.
+ */
+ SHMQueueInit(&sxact->predicateLocks);
+
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ sxidtag.xid = sxact->topXid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ if (!partial)
+ {
+ /* Release all outConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->outConflicts,
+ &sxact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ if (summarize)
+ conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+ }
+
+ /* Release all inConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->inConflicts,
+ &sxact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ if (summarize)
+ conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+
+ if (!partial)
+ {
+ /* Get rid of the xid and the record of the transaction itself. */
+ if (sxidtag.xid != InvalidTransactionId)
+ hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
+ ReleasePredXact(sxact);
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*
+ * Tests whether the given top level transaction is concurrent with
+ * (overlaps) our current transaction.
+ *
+ * We need to identify the top level transaction for SSI, anyway, so pass
+ * that to this function to save the overhead of checking the snapshot's
+ * subxip array.
+ */
+static bool
+XidIsConcurrent(TransactionId xid)
+{
+ Snapshot snap;
+ uint32 i;
+
+ Assert(TransactionIdIsValid(xid));
+ Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+ snap = GetTransactionSnapshot();
+
+ if (TransactionIdPrecedes(xid, snap->xmin))
+ return false;
+
+ if (TransactionIdFollowsOrEquals(xid, snap->xmax))
+ return true;
+
+ for (i = 0; i < snap->xcnt; i++)
+ {
+ if (xid == snap->xip[i])
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * CheckForSerializableConflictOut
+ * We are reading a tuple which has been modified. If it is visible to
+ * us but has been deleted, that indicates a rw-conflict out. If it's
+ * not visible and was created by a concurrent (overlapping)
+ * serializable transaction, that is also a rw-conflict out,
+ *
+ * We will determine the top level xid of the writing transaction with which
+ * we may be in conflict, and check for overlap with our own transaction.
+ * If the transactions overlap (i.e., they cannot see each other's writes),
+ * then we have a conflict out.
+ *
+ * This function should be called just about anywhere in heapam.c that a
+ * tuple has been read. There is currently no known reason to call this
+ * function from an index AM.
+ */
+void
+CheckForSerializableConflictOut(const bool visible, const Relation relation,
+ const HeapTuple tuple, const Buffer buffer)
+{
+ TransactionId xid;
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXACT *sxact;
+ HTSV_Result htsvResult;
+
+ if (SkipSerialization(relation))
+ return;
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during conflict out checking."),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ /*
+ * Check to see whether the tuple has been written to by a concurrent
+ * transaction, either to create it not visible to us, or to delete it
+ * while it is visible to us. The "visible" bool indicates whether the
+ * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
+ * is going on with it.
+ */
+ htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
+ switch (htsvResult)
+ {
+ case HEAPTUPLE_LIVE:
+ if (visible)
+ return;
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ break;
+ case HEAPTUPLE_RECENTLY_DEAD:
+ if (!visible)
+ return;
+ xid = HeapTupleHeaderGetXmax(tuple->t_data);
+ break;
+ case HEAPTUPLE_DELETE_IN_PROGRESS:
+ xid = HeapTupleHeaderGetXmax(tuple->t_data);
+ break;
+ case HEAPTUPLE_INSERT_IN_PROGRESS:
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ break;
+ case HEAPTUPLE_DEAD:
+ return;
+ default:
+
+ /*
+ * The only way to get to this default clause is if a new value is
+ * added to the enum type without adding it to this switch
+ * statement. That's a bug, so elog.
+ */
+ elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
+
+ /*
+ * In spite of having all enum values covered and calling elog on
+ * this default, some compilers think this is a code path which
+ * allows xid to be used below without initialization. Silence
+ * that warning.
+ */
+ xid = InvalidTransactionId;
+ }
+ Assert(TransactionIdIsValid(xid));
+ Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+ /*
+ * Find top level xid. Bail out if xid is too early to be a conflict, or
+ * if it's our own xid.
+ */
+ if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
+ return;
+ xid = SubTransGetTopmostTransaction(xid);
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return;
+ if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
+ return;
+
+ /*
+ * Find sxact or summarized info for the top level xid.
+ */
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ if (!sxid)
+ {
+ /*
+ * Transaction not found in "normal" SSI structures. Check whether it
+ * got pushed out to SLRU storage for "old committed" transactions.
+ */
+ SerCommitSeqNo conflictCommitSeqNo;
+
+ conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
+ if (conflictCommitSeqNo != 0)
+ {
+ if (conflictCommitSeqNo != InvalidSerCommitSeqNo
+ && (!SxactIsReadOnly(MySerializableXact)
+ || conflictCommitSeqNo
+ <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to old pivot %u.", xid),
+ errhint("The transaction might succeed if retried.")));
+
+ if (SxactHasSummaryConflictIn(MySerializableXact)
+ || !SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
+ errhint("The transaction might succeed if retried.")));
+
+ MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ }
+
+ /* It's not serializable or otherwise not important. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ sxact = sxid->myXact;
+ Assert(TransactionIdEquals(sxact->topXid, xid));
+ if (sxact == MySerializableXact
+ || SxactIsRolledBack(sxact)
+ || SxactIsMarkedForDeath(sxact))
+ {
+ /* We can't conflict with our own transaction or one rolled back. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * We have a conflict out to a transaction which has a conflict out to a
+ * summarized transaction. That summarized transaction must have
+ * committed first, and we can't tell when it committed in relation to our
+ * snapshot acquisition, so something needs to be cancelled.
+ */
+ if (SxactHasSummaryConflictOut(sxact))
+ {
+ if (!SxactIsPrepared(sxact))
+ {
+ sxact->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ else
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to old pivot."),
+ errhint("The transaction might succeed if retried.")));
+ }
+ }
+
+ /*
+ * If this is a read-only transaction and the writing transaction has
+ * committed, and it doesn't have a rw-conflict to a transaction which
+ * committed before it, no conflict.
+ */
+ if (SxactIsReadOnly(MySerializableXact)
+ && SxactIsCommitted(sxact)
+ && !SxactHasSummaryConflictOut(sxact)
+ && (!SxactHasConflictOut(sxact)
+ || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
+ {
+ /* Read-only transaction will appear to run first. No conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ if (!XidIsConcurrent(xid))
+ {
+ /* This write was already in our snapshot; no conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
+ {
+ /* We don't want duplicate conflict records in the list. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * Flag the conflict. But first, if this conflict creates a dangerous
+ * structure, ereport an error.
+ */
+ FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*
+ * Check a particular target for rw-dependency conflict in. This will
+ * also check prior versions of a tuple, if any.
+ */
+static void
+CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
+{
+ PREDICATELOCKTARGETTAG nexttargettag;
+ PREDICATELOCKTARGETTAG thistargettag;
+
+ for (;;)
+ {
+ if (!CheckSingleTargetForConflictsIn(targettag, &nexttargettag))
+ break;
+ thistargettag = nexttargettag;
+ targettag = &thistargettag;
+ }
+}
+
+/*
+ * Check a particular target for rw-dependency conflict in. If the tuple
+ * has prior versions, returns true and *nexttargettag is set to the tag
+ * of the prior tuple version.
+ */
+static bool
+CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
+ PREDICATELOCKTARGETTAG *nexttargettag)
+{
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCK *predlock;
+ bool hasnexttarget = false;
+
+ Assert(MySerializableXact != InvalidSerializableXact);
+
+ /*
+ * The same hash and LW lock apply to the lock target and the lock itself.
+ */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ if (!target)
+ {
+ /* Nothing has this target locked; we're done here. */
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ LWLockRelease(partitionLock);
+ return false;
+ }
+
+ /*
+ * If the target is linked to a prior version of the row, save the tag so
+ * that it can be used for iterative calls to this function.
+ */
+ if (target->priorVersionOfRow != NULL)
+ {
+ *nexttargettag = target->priorVersionOfRow->tag;
+ hasnexttarget = true;
+ }
+ LWLockRelease(PredicateLockNextRowLinkLock);
+
+ /*
+ * Each lock for an overlapping transaction represents a conflict: a
+ * rw-dependency in to this transaction.
+ */
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (predlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ SERIALIZABLEXACT *sxact;
+
+ predlocktargetlink = &(predlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+
+ sxact = predlock->tag.myXact;
+ if (sxact == MySerializableXact)
+ {
+ /*
+ * If we're getting a write lock on the tuple, we don't need a
+ * predicate (SIREAD) lock. At this point our transaction already
+ * has an ExclusiveRowLock on the relation, so we are OK to drop
+ * the predicate lock on the tuple, if found, without fearing that
+ * another write against the tuple will occur before the MVCC
+ * information makes it to the buffer.
+ */
+ if (GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
+ {
+ uint32 predlockhashcode;
+ PREDICATELOCKTARGET *rmtarget = NULL;
+ PREDICATELOCK *rmpredlock;
+ LOCALPREDICATELOCK *locallock,
+ *rmlocallock;
+
+ /*
+ * This is a tuple on which we have a tuple predicate lock. We
+ * only have shared LW locks now; release those, and get
+ * exclusive locks only while we modify things.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove the predicate lock from shared memory, if it wasn't
+ * removed while the locks were released. One way that could
+ * happen is from autovacuum cleaning up an index.
+ */
+ predlockhashcode = PredicateLockHashCodeFromTargetHashCode
+ (&(predlock->tag), targettaghash);
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_FIND, NULL);
+ if (rmpredlock)
+ {
+ Assert(rmpredlock == predlock);
+
+ SHMQueueDelete(predlocktargetlink);
+ SHMQueueDelete(&(predlock->xactLink));
+
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ Assert(locallock != NULL);
+ Assert(locallock->held);
+ locallock->held = false;
+
+ if (locallock->childLocks == 0)
+ {
+ rmlocallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlocallock == locallock);
+ }
+
+ DecrementParentLocks(targettag);
+
+ /*
+ * If we've cleaned up the last of the predicate locks for
+ * the target, bail out before re-acquiring the locks.
+ */
+ if (rmtarget)
+ return hasnexttarget;
+
+ /*
+ * The list has been altered. Start over at the front.
+ */
+ LWLockAcquire(partitionLock, LW_SHARED);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ {
+ /*
+ * The predicate lock was cleared while we were attempting
+ * to upgrade our lightweight locks. Revert to the shared
+ * locks.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ }
+ }
+ else if (!SxactIsRolledBack(sxact)
+ && (!SxactIsCommitted(sxact)
+ || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
+ sxact->finishedBefore))
+ && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+
+ return hasnexttarget;
+}
+
+/*
+ * CheckForSerializableConflictIn
+ * We are writing the given tuple. If that indicates a rw-conflict
+ * in from another serializable transaction, take appropriate action.
+ *
+ * Skip checking for any granularity for which a parameter is missing.
+ *
+ * A tuple update or delete is in conflict if we have a predicate lock
+ * against the relation or page in which the tuple exists, or against the
+ * tuple itself.
+ */
+void
+CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
+ const Buffer buffer)
+{
+ PREDICATELOCKTARGETTAG targettag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during conflict in checking."),
+ errhint("The transaction might succeed if retried.")));
+
+ MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
+
+ /*
+ * It is important that we check for locks from the finest granularity to
+ * the coarsest granularity, so that granularity promotion doesn't cause
+ * us to miss a lock. The new (coarser) lock will be acquired before the
+ * old (finer) locks are released.
+ *
+ * It is not possible to take and hold a lock across the checks for all
+ * granularities because each target could be in a separate partition.
+ */
+ if (tuple != NULL)
+ {
+ SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
+ ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ if (BufferIsValid(buffer))
+ {
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ BufferGetBlockNumber(buffer));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ CheckTargetForConflictsIn(&targettag);
+}
+
+/*
+ * Flag a rw-dependency between two serializable transactions.
+ *
+ * The caller is responsible for ensuring that we have a LW lock on
+ * the transaction hash table.
+ */
+static void
+FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+{
+ Assert(reader != writer);
+
+ /* First, see if this conflict causes failure. */
+ OnConflict_CheckForSerializationFailure(reader, writer);
+
+ /* Actually do the conflict flagging. */
+ if (reader == OldCommittedSxact)
+ writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ else if (writer == OldCommittedSxact)
+ reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ else
+ SetRWConflict(reader, writer);
+}
+
+/*
+ * Check whether we should roll back one of these transactions
+ * instead of flagging a new rw-conflict.
+ */
+static void
+OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ SERIALIZABLEXACT *writer)
+{
+ bool failure;
+ RWConflict conflict;
+
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ failure = false;
+
+ /*
+ * Check for already-committed writer with rw-conflict out flagged. This
+ * means that the reader must immediately fail.
+ */
+ if (SxactIsCommitted(writer)
+ && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
+ failure = true;
+
+ /*
+ * Check whether the reader has become a pivot with a committed writer. If
+ * so, we must roll back unless every in-conflict either committed before
+ * the writer committed or is READ ONLY and overlaps the writer.
+ */
+ if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
+ {
+ if (SxactHasSummaryConflictIn(reader))
+ {
+ failure = true;
+ conflict = NULL;
+ }
+ else
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &reader->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ if (!SxactIsRolledBack(conflict->sxactOut)
+ && (!SxactIsCommitted(conflict->sxactOut)
+ || conflict->sxactOut->commitSeqNo >= writer->commitSeqNo)
+ && (!SxactIsReadOnly(conflict->sxactOut)
+ || conflict->sxactOut->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ }
+
+ /*
+ * Check whether the writer has become a pivot with an out-conflict
+ * committed transaction, while neither reader nor writer is committed. If
+ * the reader is a READ ONLY transaction, there is only a serialization
+ * failure if an out-conflict transaction causing the pivot committed
+ * before the reader acquired its snapshot. (That is, the reader must not
+ * have been concurrent with the out-conflict transaction.)
+ */
+ if (!failure && !SxactIsCommitted(writer))
+ {
+ if (SxactHasSummaryConflictOut(reader))
+ {
+ failure = true;
+ conflict = NULL;
+ }
+ else
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &writer->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if ((reader == conflict->sxactIn && SxactIsCommitted(reader))
+ || (SxactIsCommitted(conflict->sxactIn)
+ && !SxactIsCommitted(reader)
+ && (!SxactIsReadOnly(reader)
+ || conflict->sxactIn->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ }
+
+ if (failure)
+ {
+ if (MySerializableXact == writer)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as pivot, during write."),
+ errhint("The transaction might succeed if retried.")));
+ }
+ else if (SxactIsPrepared(writer))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to pivot %u, during read.", writer->topXid),
+ errhint("The transaction might succeed if retried.")));
+ }
+ writer->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ }
+}
+
+/*
+ * PreCommit_CheckForSerializableConflicts
+ * Check for dangerous structures in a serializable transaction
+ * at commit.
+ *
+ * We're checking for a dangerous structure as each conflict is recorded.
+ * The only way we could have a problem at commit is if this is the "out"
+ * side of a pivot, and neither the "in" side nor the pivot has yet
+ * committed.
+ *
+ * If a dangerous structure is found, the pivot (the near conflict) is
+ * marked for death, because rolling back another transaction might mean
+ * that we flail without ever making progress. This transaction is
+ * committing writes, so letting it commit ensures progress. If we
+ * cancelled the far conflict, it might immediately fail again on retry.
+ */
+void
+PreCommit_CheckForSerializationFailure(void)
+{
+ RWConflict nearConflict;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ Assert(IsolationIsSerializable());
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during commit attempt."),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (nearConflict)
+ {
+ if (!SxactIsCommitted(nearConflict->sxactOut)
+ && !SxactIsRolledBack(nearConflict->sxactOut)
+ && !SxactIsMarkedForDeath(nearConflict->sxactOut))
+ {
+ RWConflict farConflict;
+
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &nearConflict->sxactOut->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (farConflict)
+ {
+ if (farConflict->sxactOut == MySerializableXact
+ || (!SxactIsCommitted(farConflict->sxactOut)
+ && !SxactIsReadOnly(farConflict->sxactOut)
+ && !SxactIsRolledBack(farConflict->sxactOut)
+ && !SxactIsMarkedForDeath(farConflict->sxactOut)))
+ {
+ nearConflict->sxactOut->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ break;
+ }
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &farConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ }
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &nearConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+
+ MySerializableXact->flags |= SXACT_FLAG_PREPARED;
+
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * Two-phase commit support
+ */
+
+/*
+ * AtPrepare_Locks
+ * Do the preparatory work for a PREPARE: make 2PC state file
+ * records for all predicate locks currently held.
+ */
+void
+AtPrepare_PredicateLocks(void)
+{
+ PREDICATELOCK *predlock;
+ SERIALIZABLEXACT *sxact;
+ TwoPhasePredicateRecord record;
+ TwoPhasePredicateXactRecord *xactRecord;
+ TwoPhasePredicateLockRecord *lockRecord;
+
+ sxact = (SERIALIZABLEXACT *) MySerializableXact;
+ xactRecord = &(record.data.xactRecord);
+ lockRecord = &(record.data.lockRecord);
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ /* Generate a xact record for our SERIALIZABLEXACT */
+ record.type = TWOPHASEPREDICATERECORD_XACT;
+ xactRecord->xmin = MySerializableXact->xmin;
+ xactRecord->flags = MySerializableXact->flags;
+
+ /*
+ * Tweak the flags. Since we're not going to output the inConflicts and
+ * outConflicts lists, if they're non-empty we'll represent that by
+ * setting the appropriate summary conflict flags.
+ */
+ if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
+ xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->outConflicts))
+ xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
+ &record, sizeof(record));
+
+ /*
+ * Generate a lock record for each lock.
+ *
+ * To do this, we need to walk the predicate lock list in our sxact rather
+ * than using the local predicate lock table because the latter is not
+ * guaranteed to be accurate.
+ */
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+
+ while (predlock != NULL)
+ {
+ record.type = TWOPHASEPREDICATERECORD_LOCK;
+ lockRecord->target = predlock->tag.myTarget->tag;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
+ &record, sizeof(record));
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(predlock->xactLink),
+ offsetof(PREDICATELOCK, xactLink));
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * PostPrepare_Locks
+ * Clean up after successful PREPARE. Unlike the non-predicate
+ * lock manager, we do not need to transfer locks to a dummy
+ * PGPROC because our SERIALIZABLEXACT will stay around
+ * anyway. We only need to clean up our local state.
+ */
+void
+PostPrepare_PredicateLocks(TransactionId xid)
+{
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ Assert(SxactIsPrepared(MySerializableXact));
+
+ MySerializableXact->pid = 0;
+
+ hash_destroy(LocalPredicateLockHash);
+ LocalPredicateLockHash = NULL;
+
+ MySerializableXact = InvalidSerializableXact;
+}
+
+/*
+ * PredicateLockTwoPhaseFinish
+ * Release a prepared transaction's predicate locks once it
+ * commits or aborts.
+ */
+void
+PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
+{
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXIDTAG sxidtag;
+
+ sxidtag.xid = xid;
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ LWLockRelease(SerializableXactHashLock);
+
+ /* xid will not be found if it wasn't a serializable transaction */
+ if (sxid == NULL)
+ return;
+
+ /* Release its locks */
+ MySerializableXact = sxid->myXact;
+ ReleasePredicateLocks(isCommit);
+}
+
+/*
+ * Re-acquire a predicate lock belonging to a transaction that was prepared.
+ */
+void
+predicatelock_twophase_recover(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+{
+ TwoPhasePredicateRecord *record;
+
+ Assert(len == sizeof(TwoPhasePredicateRecord));
+
+ record = (TwoPhasePredicateRecord *) recdata;
+
+ Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
+ (record->type == TWOPHASEPREDICATERECORD_LOCK));
+
+ if (record->type == TWOPHASEPREDICATERECORD_XACT)
+ {
+ /* Per-transaction record. Set up a SERIALIZABLEXACT. */
+ TwoPhasePredicateXactRecord *xactRecord;
+ SERIALIZABLEXACT *sxact;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXIDTAG sxidtag;
+ bool found;
+
+ xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxact = CreatePredXact();
+ if (!sxact)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+
+ /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
+ sxact->vxid.backendId = InvalidBackendId;
+ sxact->vxid.localTransactionId = (LocalTransactionId) xid;
+ sxact->pid = 0;
+
+ /* a prepared xact hasn't committed yet */
+ sxact->commitSeqNo = InvalidSerCommitSeqNo;
+ sxact->finishedBefore = InvalidTransactionId;
+
+ sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
+
+
+ /*
+ * We don't need the details of a prepared transaction's conflicts,
+ * just whether it had conflicts in or out (which we get from the
+ * flags)
+ */
+ SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflicts));
+
+ /*
+ * Don't need to track this; no transactions running at the time the
+ * recovered xact started are still active, except possibly other
+ * prepared xacts and we don't care whether those are RO_SAFE or not.
+ */
+ SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+
+ SHMQueueInit(&(sxact->predicateLocks));
+ SHMQueueElemInit(&(sxact->finishedLink));
+
+ sxact->topXid = xid;
+ sxact->xmin = xactRecord->xmin;
+ sxact->flags = xactRecord->flags;
+ Assert(SxactIsPrepared(sxact));
+ if (!SxactIsReadOnly(sxact))
+ {
+ ++(PredXact->WritableSxactCount);
+ Assert(PredXact->WritableSxactCount <=
+ (MaxBackends + max_prepared_xacts));
+ }
+
+ /* Register the transaction's xid */
+ sxidtag.xid = xid;
+ sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
+ &sxidtag,
+ HASH_ENTER, &found);
+ if (!sxid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ Assert(!found);
+ sxid->myXact = (SERIALIZABLEXACT *) sxact;
+
+ /*
+ * Update global xmin. Note that this is a special case compared to
+ * registering a normal transaction, because the global xmin might go
+ * backwards. That's OK, because until recovery is over we're not
+ * going to complete any transactions or create any non-prepared
+ * transactions, so there's no danger of throwing away.
+ */
+ if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
+ (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
+ {
+ PredXact->SxactGlobalXmin = sxact->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ OldSerXidSetActiveSerXmin(sxact->xmin);
+ }
+ else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ PredXact->SxactGlobalXminCount++;
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+ else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
+ {
+ /* Lock record. Recreate the PREDICATELOCK */
+ TwoPhasePredicateLockRecord *lockRecord;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXACT *sxact;
+ SERIALIZABLEXIDTAG sxidtag;
+ uint32 targettaghash;
+
+ lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
+ targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ sxidtag.xid = xid;
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ LWLockRelease(SerializableXactHashLock);
+
+ Assert(sxid != NULL);
+ sxact = sxid->myXact;
+ Assert(sxact != InvalidSerializableXact);
+
+ CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
+ }
+}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 95000377704..af2eba01d61 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -374,6 +374,10 @@ standard_ProcessUtility(Node *parsetree,
SetPGVariable("transaction_read_only",
list_make1(item->arg),
true);
+ else if (strcmp(item->defname, "transaction_deferrable") == 0)
+ SetPGVariable("transaction_deferrable",
+ list_make1(item->arg),
+ true);
}
}
break;
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index 78cafada2c8..8e369826cec 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -15,6 +15,7 @@
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/predicate_internals.h"
#include "storage/proc.h"
#include "utils/builtins.h"
@@ -32,11 +33,20 @@ static const char *const LockTagTypeNames[] = {
"advisory"
};
+/* This must match enum PredicateLockTargetType (predicate_internals.h) */
+static const char *const PredicateLockTagTypeNames[] = {
+ "relation",
+ "page",
+ "tuple"
+};
+
/* Working status for pg_lock_status */
typedef struct
{
LockData *lockData; /* state data from lmgr */
int currIdx; /* current PROCLOCK index */
+ PredicateLockData *predLockData; /* state data for pred locks */
+ int predLockIdx; /* current index for pred lock */
} PG_Lock_Status;
@@ -69,6 +79,7 @@ pg_lock_status(PG_FUNCTION_ARGS)
FuncCallContext *funcctx;
PG_Lock_Status *mystatus;
LockData *lockData;
+ PredicateLockData *predLockData;
if (SRF_IS_FIRSTCALL())
{
@@ -126,6 +137,8 @@ pg_lock_status(PG_FUNCTION_ARGS)
mystatus->lockData = GetLockStatusData();
mystatus->currIdx = 0;
+ mystatus->predLockData = GetPredicateLockStatusData();
+ mystatus->predLockIdx = 0;
MemoryContextSwitchTo(oldcontext);
}
@@ -303,6 +316,72 @@ pg_lock_status(PG_FUNCTION_ARGS)
SRF_RETURN_NEXT(funcctx, result);
}
+ /*
+ * Have returned all regular locks. Now start on the SIREAD predicate
+ * locks.
+ */
+ predLockData = mystatus->predLockData;
+ if (mystatus->predLockIdx < predLockData->nelements)
+ {
+ PredicateLockTargetType lockType;
+
+ PREDICATELOCKTARGETTAG *predTag = &(predLockData->locktags[mystatus->predLockIdx]);
+ SERIALIZABLEXACT *xact = &(predLockData->xacts[mystatus->predLockIdx]);
+ Datum values[14];
+ bool nulls[14];
+ HeapTuple tuple;
+ Datum result;
+
+ mystatus->predLockIdx++;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, false, sizeof(nulls));
+
+ /* lock type */
+ lockType = GET_PREDICATELOCKTARGETTAG_TYPE(*predTag);
+
+ values[0] = CStringGetTextDatum(PredicateLockTagTypeNames[lockType]);
+
+ /* lock target */
+ values[1] = GET_PREDICATELOCKTARGETTAG_DB(*predTag);
+ values[2] = GET_PREDICATELOCKTARGETTAG_RELATION(*predTag);
+ if (lockType == PREDLOCKTAG_TUPLE)
+ values[4] = GET_PREDICATELOCKTARGETTAG_OFFSET(*predTag);
+ else
+ nulls[4] = true;
+ if ((lockType == PREDLOCKTAG_TUPLE) ||
+ (lockType == PREDLOCKTAG_PAGE))
+ values[3] = GET_PREDICATELOCKTARGETTAG_PAGE(*predTag);
+ else
+ nulls[3] = true;
+
+ /* these fields are targets for other types of locks */
+ nulls[5] = true; /* virtualxid */
+ nulls[6] = true; /* transactionid */
+ nulls[7] = true; /* classid */
+ nulls[8] = true; /* objid */
+ nulls[9] = true; /* objsubid */
+
+ /* lock holder */
+ values[10] = VXIDGetDatum(xact->vxid.backendId,
+ xact->vxid.localTransactionId);
+ nulls[11] = true; /* pid */
+
+ /*
+ * Lock mode. Currently all predicate locks are SIReadLocks, which are
+ * always held (never waiting)
+ */
+ values[12] = CStringGetTextDatum("SIReadLock");
+ values[13] = BoolGetDatum(true);
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
SRF_RETURN_DONE(funcctx);
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2c95ef80c45..216236b5294 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -59,6 +59,7 @@
#include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h"
+#include "storage/predicate.h"
#include "tcop/tcopprot.h"
#include "tsearch/ts_cache.h"
#include "utils/builtins.h"
@@ -1097,6 +1098,23 @@ static struct config_bool ConfigureNamesBool[] =
false, assign_transaction_read_only, NULL
},
{
+ {"default_transaction_deferrable", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ gettext_noop("Sets the default deferrable status of new transactions."),
+ NULL
+ },
+ &DefaultXactDeferrable,
+ false, NULL, NULL
+ },
+ {
+ {"transaction_deferrable", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ gettext_noop("Whether to defer a read-only serializable transaction until it can be executed with no possible serialization failures."),
+ NULL,
+ GUC_NO_RESET_ALL | GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
+ },
+ &XactDeferrable,
+ false, assign_transaction_deferrable, NULL
+ },
+ {
{"check_function_bodies", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Check function bodies during CREATE FUNCTION."),
NULL
@@ -1696,6 +1714,17 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_predicate_locks_per_transaction", PGC_POSTMASTER, LOCK_MANAGEMENT,
+ gettext_noop("Sets the maximum number of predicate locks per transaction."),
+ gettext_noop("The shared predicate lock table is sized on the assumption that "
+ "at most max_predicate_locks_per_transaction * max_connections distinct "
+ "objects will need to be locked at any one time.")
+ },
+ &max_predicate_locks_per_xact,
+ 64, 10, INT_MAX, NULL, NULL
+ },
+
+ {
{"authentication_timeout", PGC_SIGHUP, CONN_AUTH_SECURITY,
gettext_noop("Sets the maximum allowed time to complete client authentication."),
NULL,
@@ -3460,6 +3489,8 @@ InitializeGUCOptions(void)
PGC_POSTMASTER, PGC_S_OVERRIDE);
SetConfigOption("transaction_read_only", "no",
PGC_POSTMASTER, PGC_S_OVERRIDE);
+ SetConfigOption("transaction_deferrable", "no",
+ PGC_POSTMASTER, PGC_S_OVERRIDE);
/*
* For historical reasons, some GUC parameters can receive defaults from
@@ -5699,6 +5730,9 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg), stmt->is_local);
+ else if (strcmp(item->defname, "transaction_deferrable") == 0)
+ SetPGVariable("transaction_deferrable",
+ list_make1(item->arg), stmt->is_local);
else
elog(ERROR, "unexpected SET TRANSACTION element: %s",
item->defname);
@@ -5718,6 +5752,9 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("default_transaction_read_only",
list_make1(item->arg), stmt->is_local);
+ else if (strcmp(item->defname, "transaction_deferrable") == 0)
+ SetPGVariable("default_transaction_deferrable",
+ list_make1(item->arg), stmt->is_local);
else
elog(ERROR, "unexpected SET SESSION element: %s",
item->defname);
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6c6f9a9a0d8..fe80c4dc239 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -450,6 +450,7 @@
#check_function_bodies = on
#default_transaction_isolation = 'read committed'
#default_transaction_read_only = off
+#default_transaction_deferrable = off
#session_replication_role = 'origin'
#statement_timeout = 0 # in milliseconds, 0 is disabled
#vacuum_freeze_min_age = 50000000
@@ -501,7 +502,8 @@
# Note: Each lock table slot uses ~270 bytes of shared memory, and there are
# max_locks_per_transaction * (max_connections + max_prepared_transactions)
# lock table slots.
-
+#max_predicate_locks_per_transaction = 64 # min 10
+ # (change requires restart)
#------------------------------------------------------------------------------
# VERSION/PLATFORM COMPATIBILITY
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index ef1cac01477..c1ba5ad8e64 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -22,6 +22,7 @@
#include "access/hash.h"
#include "storage/bufmgr.h"
+#include "storage/predicate.h"
#include "storage/proc.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@@ -261,7 +262,10 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
* the top of the recursion.
*/
if (owner == TopTransactionResourceOwner)
+ {
ProcReleaseLocks(isCommit);
+ ReleasePredicateLocks(isCommit);
+ }
}
else
{
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 45b92a0ef88..c2ff5e542b0 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -27,6 +27,7 @@
#include "access/transam.h"
#include "access/xact.h"
+#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
@@ -126,9 +127,6 @@ GetTransactionSnapshot(void)
{
Assert(RegisteredSnapshots == 0);
- CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
- FirstSnapshotSet = true;
-
/*
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
@@ -136,11 +134,20 @@ GetTransactionSnapshot(void)
*/
if (IsolationUsesXactSnapshot())
{
- CurrentSnapshot = RegisterSnapshotOnOwner(CurrentSnapshot,
+ if (IsolationIsSerializable())
+ CurrentSnapshot = RegisterSerializableTransaction(&CurrentSnapshotData);
+ else
+ {
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+ CurrentSnapshot = RegisterSnapshotOnOwner(CurrentSnapshot,
TopTransactionResourceOwner);
+ }
registered_xact_snapshot = true;
}
+ else
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+ FirstSnapshotSet = true;
return CurrentSnapshot;
}