aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execMain.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2014-10-07 17:23:34 -0300
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2014-10-07 17:23:34 -0300
commitdf630b0dd5ea2de52972d456f5978a012436115e (patch)
treebe0007351a856caa48230155f5c5bcfe5a31f86d /src/backend/executor/execMain.c
parentc421efd21330f2e5bed253b4a53d7ea5e084edf6 (diff)
downloadpostgresql-df630b0dd5ea2de52972d456f5978a012436115e.tar.gz
postgresql-df630b0dd5ea2de52972d456f5978a012436115e.zip
Implement SKIP LOCKED for row-level locks
This clause changes the behavior of SELECT locking clauses in the presence of locked rows: instead of causing a process to block waiting for the locks held by other processes (or raise an error, with NOWAIT), SKIP LOCKED makes the new reader skip over such rows. While this is not appropriate behavior for general purposes, there are some cases in which it is useful, such as queue-like tables. Catalog version bumped because this patch changes the representation of stored rules. Reviewed by Craig Ringer (based on a previous attempt at an implementation by Simon Riggs, who also provided input on the syntax used in the current patch), David Rowley, and Álvaro Herrera. Author: Thomas Munro
Diffstat (limited to 'src/backend/executor/execMain.c')
-rw-r--r--src/backend/executor/execMain.c45
1 files changed, 30 insertions, 15 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index a546292da6e..a753b207008 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -836,7 +836,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
erm->prti = rc->prti;
erm->rowmarkId = rc->rowmarkId;
erm->markType = rc->markType;
- erm->noWait = rc->noWait;
+ erm->waitPolicy = rc->waitPolicy;
ItemPointerSetInvalid(&(erm->curCtid));
estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
}
@@ -1871,7 +1871,7 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
/*
* Get and lock the updated version of the row; if fail, return NULL.
*/
- copyTuple = EvalPlanQualFetch(estate, relation, lockmode, false /* wait */,
+ copyTuple = EvalPlanQualFetch(estate, relation, lockmode, LockWaitBlock,
tid, priorXmax);
if (copyTuple == NULL)
@@ -1930,12 +1930,15 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
* estate - executor state data
* relation - table containing tuple
* lockmode - requested tuple lock mode
- * noWait - wait mode to pass to heap_lock_tuple
+ * wait_policy - requested lock wait policy
* *tid - t_ctid from the outdated tuple (ie, next updated version)
* priorXmax - t_xmax from the outdated tuple
*
* Returns a palloc'd copy of the newest tuple version, or NULL if we find
* that there is no newest version (ie, the row was deleted not updated).
+ * We also return NULL if the tuple is locked and the wait policy is to skip
+ * such tuples.
+ *
* If successful, we have locked the newest tuple version, so caller does not
* need to worry about it changing anymore.
*
@@ -1943,7 +1946,8 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
* but we use "int" to avoid having to include heapam.h in executor.h.
*/
HeapTuple
-EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
+EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
+ LockWaitPolicy wait_policy,
ItemPointer tid, TransactionId priorXmax)
{
HeapTuple copyTuple = NULL;
@@ -1992,18 +1996,25 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
if (TransactionIdIsValid(SnapshotDirty.xmax))
{
ReleaseBuffer(buffer);
- if (noWait)
+ switch (wait_policy)
{
- if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on row in relation \"%s\"",
- RelationGetRelationName(relation))));
+ case LockWaitBlock:
+ XactLockTableWait(SnapshotDirty.xmax,
+ relation, &tuple.t_data->t_ctid,
+ XLTW_FetchUpdated);
+ break;
+ case LockWaitSkip:
+ if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+ return NULL; /* skip instead of waiting */
+ break;
+ case LockWaitError:
+ if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on row in relation \"%s\"",
+ RelationGetRelationName(relation))));
+ break;
}
- else
- XactLockTableWait(SnapshotDirty.xmax,
- relation, &tuple.t_data->t_ctid,
- XLTW_FetchUpdated);
continue; /* loop back to repeat heap_fetch */
}
@@ -2030,7 +2041,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
*/
test = heap_lock_tuple(relation, &tuple,
estate->es_output_cid,
- lockmode, noWait,
+ lockmode, wait_policy,
false, &buffer, &hufd);
/* We now have two pins on the buffer, get rid of one */
ReleaseBuffer(buffer);
@@ -2076,6 +2087,10 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
/* tuple was deleted, so give up */
return NULL;
+ case HeapTupleWouldBlock:
+ ReleaseBuffer(buffer);
+ return NULL;
+
default:
ReleaseBuffer(buffer);
elog(ERROR, "unrecognized heap_lock_tuple status: %u",