aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2002-05-22 21:40:55 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2002-05-22 21:40:55 +0000
commita2597ef17958e75e7ba26507dc407249cc9e7134 (patch)
treeb183395473fde3961ed029a4c82f384e2a392d4d
parentb8ffc9960235afec9f0a81fc15665514241c2a75 (diff)
downloadpostgresql-a2597ef17958e75e7ba26507dc407249cc9e7134.tar.gz
postgresql-a2597ef17958e75e7ba26507dc407249cc9e7134.zip
Modify sequence state storage to eliminate dangling-pointer problem
exemplified by bug #671. Moving the storage to relcache turned out to be a bad idea because relcache might decide to discard the info. Instead, open and close the relcache entry on each sequence operation, and use a record of the current XID to discover whether we already hold AccessShareLock on the sequence.
-rw-r--r--src/backend/access/transam/xact.c5
-rw-r--r--src/backend/commands/define.c31
-rw-r--r--src/backend/commands/sequence.c248
-rw-r--r--src/include/commands/defrem.h5
-rw-r--r--src/include/commands/sequence.h3
5 files changed, 153 insertions, 139 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 10de2f8a6d5..b874b4790ad 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.123 2002/05/21 22:05:53 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.124 2002/05/22 21:40:55 tgl Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
@@ -166,7 +166,6 @@
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "commands/async.h"
-#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "libpq/be-fsstubs.h"
@@ -947,7 +946,6 @@ CommitTransaction(void)
/* NOTIFY commit must also come before lower-level cleanup */
AtCommit_Notify();
- CloseSequences();
AtEOXact_portals();
/* Here is where we really truly commit. */
@@ -1063,7 +1061,6 @@ AbortTransaction(void)
DeferredTriggerAbortXact();
lo_commit(false); /* 'false' means it's abort */
AtAbort_Notify();
- CloseSequences();
AtEOXact_portals();
/* Advertise the fact that we aborted in pg_clog. */
diff --git a/src/backend/commands/define.c b/src/backend/commands/define.c
index 889ddd0f440..20d2a61de47 100644
--- a/src/backend/commands/define.c
+++ b/src/backend/commands/define.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.76 2002/04/15 05:22:03 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.77 2002/05/22 21:40:55 tgl Exp $
*
* DESCRIPTION
* The "DefineFoo" routines take the parse tree and pick out the
@@ -37,6 +37,7 @@
#include "commands/defrem.h"
#include "parser/parse_type.h"
+#include "utils/int8.h"
/*
@@ -115,6 +116,34 @@ defGetNumeric(DefElem *def)
}
/*
+ * Extract an int64 value from a DefElem.
+ */
+int64
+defGetInt64(DefElem *def)
+{
+ if (def->arg == NULL)
+ elog(ERROR, "Define: \"%s\" requires a numeric value",
+ def->defname);
+ switch (nodeTag(def->arg))
+ {
+ case T_Integer:
+ return (int64) intVal(def->arg);
+ case T_Float:
+ /*
+ * Values too large for int4 will be represented as Float
+ * constants by the lexer. Accept these if they are valid int8
+ * strings.
+ */
+ return DatumGetInt64(DirectFunctionCall1(int8in,
+ CStringGetDatum(strVal(def->arg))));
+ default:
+ elog(ERROR, "Define: \"%s\" requires a numeric value",
+ def->defname);
+ }
+ return 0; /* keep compiler quiet */
+}
+
+/*
* Extract a possibly-qualified name (as a List of Strings) from a DefElem.
*/
List *
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index c79e6f97e1b..8d90c81d3cb 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.78 2002/05/21 22:05:54 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.79 2002/05/22 21:40:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,16 +17,14 @@
#include "access/heapam.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
+#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/sequence.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/builtins.h"
-#include "utils/int8.h"
-#define SEQ_MAGIC 0x1717
-
#ifndef INT64_IS_BUSTED
#ifdef HAVE_LL_CONSTANTS
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL)
@@ -46,29 +44,47 @@
*/
#define SEQ_LOG_VALS 32
+/*
+ * The "special area" of a sequence's buffer page looks like this.
+ */
+#define SEQ_MAGIC 0x1717
+
typedef struct sequence_magic
{
uint32 magic;
} sequence_magic;
+/*
+ * We store a SeqTable item for every sequence we have touched in the current
+ * session. This is needed to hold onto nextval/currval state. (We can't
+ * rely on the relcache, since it's only, well, a cache, and may decide to
+ * discard entries.)
+ *
+ * XXX We use linear search to find pre-existing SeqTable entries. This is
+ * good when only a small number of sequences are touched in a session, but
+ * would suck with many different sequences. Perhaps use a hashtable someday.
+ */
typedef struct SeqTableData
{
- Oid relid;
- Relation rel; /* NULL if rel is not open in cur xact */
- int64 cached;
- int64 last;
- int64 increment;
- struct SeqTableData *next;
+ struct SeqTableData *next; /* link to next SeqTable object */
+ Oid relid; /* pg_class OID of this sequence */
+ TransactionId xid; /* xact in which we last did a seq op */
+ int64 last; /* value last returned by nextval */
+ int64 cached; /* last value already cached for nextval */
+ /* if last != cached, we have not used up all the cached values */
+ int64 increment; /* copy of sequence's increment field */
} SeqTableData;
typedef SeqTableData *SeqTable;
-static SeqTable seqtab = NULL;
+static SeqTable seqtab = NULL; /* Head of list of SeqTable items */
-static SeqTable init_sequence(char *caller, RangeVar *relation);
-static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
+
+static void init_sequence(const char *caller, RangeVar *relation,
+ SeqTable *p_elm, Relation *p_rel);
+static Form_pg_sequence read_info(const char *caller, SeqTable elm,
+ Relation rel, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
-static int64 get_param(DefElem *def);
static void do_setval(RangeVar *sequence, int64 next, bool iscalled);
/*
@@ -281,6 +297,7 @@ nextval(PG_FUNCTION_ARGS)
text *seqin = PG_GETARG_TEXT_P(0);
RangeVar *sequence;
SeqTable elm;
+ Relation seqrel;
Buffer buf;
Page page;
Form_pg_sequence seq;
@@ -300,7 +317,7 @@ nextval(PG_FUNCTION_ARGS)
"nextval"));
/* open and AccessShareLock sequence */
- elm = init_sequence("nextval", sequence);
+ init_sequence("nextval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
@@ -309,11 +326,12 @@ nextval(PG_FUNCTION_ARGS)
if (elm->last != elm->cached) /* some numbers were cached */
{
elm->last += elm->increment;
+ relation_close(seqrel, NoLock);
PG_RETURN_INT64(elm->last);
}
- seq = read_info("nextval", elm, &buf); /* lock page' buffer and
- * read tuple */
+ /* lock page' buffer and read tuple */
+ seq = read_info("nextval", elm, seqrel, &buf);
page = BufferGetPage(buf);
last = next = result = seq->last_value;
@@ -421,7 +439,7 @@ nextval(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
XLogRecData rdata[2];
- xlrec.node = elm->rel->rd_node;
+ xlrec.node = seqrel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
@@ -453,6 +471,8 @@ nextval(PG_FUNCTION_ARGS)
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.nextval: WriteBuffer failed", sequence->relname);
+ relation_close(seqrel, NoLock);
+
PG_RETURN_INT64(result);
}
@@ -462,13 +482,14 @@ currval(PG_FUNCTION_ARGS)
text *seqin = PG_GETARG_TEXT_P(0);
RangeVar *sequence;
SeqTable elm;
+ Relation seqrel;
int64 result;
sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
"currval"));
/* open and AccessShareLock sequence */
- elm = init_sequence("currval", sequence);
+ init_sequence("currval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
@@ -480,6 +501,8 @@ currval(PG_FUNCTION_ARGS)
result = elm->last;
+ relation_close(seqrel, NoLock);
+
PG_RETURN_INT64(result);
}
@@ -500,18 +523,19 @@ static void
do_setval(RangeVar *sequence, int64 next, bool iscalled)
{
SeqTable elm;
+ Relation seqrel;
Buffer buf;
Form_pg_sequence seq;
/* open and AccessShareLock sequence */
- elm = init_sequence("setval", sequence);
+ init_sequence("setval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
sequence->relname, sequence->relname);
/* lock page' buffer and read tuple */
- seq = read_info("setval", elm, &buf);
+ seq = read_info("setval", elm, seqrel, &buf);
if ((next < seq->min_value) || (next > seq->max_value))
elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")",
@@ -529,7 +553,7 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
- xlrec.node = elm->rel->rd_node;
+ xlrec.node = seqrel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
@@ -559,6 +583,8 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.setval: WriteBuffer failed", sequence->relname);
+
+ relation_close(seqrel, NoLock);
}
/*
@@ -600,84 +626,48 @@ setval_and_iscalled(PG_FUNCTION_ARGS)
PG_RETURN_INT64(next);
}
-static Form_pg_sequence
-read_info(char *caller, SeqTable elm, Buffer *buf)
-{
- PageHeader page;
- ItemId lp;
- HeapTupleData tuple;
- sequence_magic *sm;
- Form_pg_sequence seq;
- if (elm->rel->rd_nblocks > 1)
- elog(ERROR, "%s.%s: invalid number of blocks in sequence",
- RelationGetRelationName(elm->rel), caller);
-
- *buf = ReadBuffer(elm->rel, 0);
- if (!BufferIsValid(*buf))
- elog(ERROR, "%s.%s: ReadBuffer failed",
- RelationGetRelationName(elm->rel), caller);
-
- LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
-
- page = (PageHeader) BufferGetPage(*buf);
- sm = (sequence_magic *) PageGetSpecialPointer(page);
-
- if (sm->magic != SEQ_MAGIC)
- elog(ERROR, "%s.%s: bad magic (%08X)",
- RelationGetRelationName(elm->rel), caller, sm->magic);
-
- lp = PageGetItemId(page, FirstOffsetNumber);
- Assert(ItemIdIsUsed(lp));
- tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
-
- seq = (Form_pg_sequence) GETSTRUCT(&tuple);
-
- elm->increment = seq->increment_by;
-
- return seq;
-}
-
-
-static SeqTable
-init_sequence(char *caller, RangeVar *relation)
+/*
+ * Given a relation name, open and lock the sequence. p_elm and p_rel are
+ * output parameters.
+ */
+static void
+init_sequence(const char *caller, RangeVar *relation,
+ SeqTable *p_elm, Relation *p_rel)
{
Oid relid = RangeVarGetRelid(relation, false);
- SeqTable elm,
- prev = (SeqTable) NULL;
+ TransactionId thisxid = GetCurrentTransactionId();
+ SeqTable elm;
Relation seqrel;
/* Look to see if we already have a seqtable entry for relation */
- for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
+ for (elm = seqtab; elm != NULL; elm = elm->next)
{
if (elm->relid == relid)
break;
- prev = elm;
}
- /* If so, and if it's already been opened in this xact, just return it */
- if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
- return elm;
+ /*
+ * Open the sequence relation, acquiring AccessShareLock if we don't
+ * already have a lock in the current xact.
+ */
+ if (elm == NULL || elm->xid != thisxid)
+ seqrel = relation_open(relid, AccessShareLock);
+ else
+ seqrel = relation_open(relid, NoLock);
- /* Else open and check it */
- seqrel = heap_open(relid, AccessShareLock);
if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
elog(ERROR, "%s.%s: %s is not a sequence",
relation->relname, caller, relation->relname);
/*
- * If elm exists but elm->rel is NULL, the seqtable entry is left over
- * from a previous xact -- update the entry and reuse it.
+ * Allocate new seqtable entry if we didn't find one.
*
* NOTE: seqtable entries remain in the list for the life of a backend.
* If the sequence itself is deleted then the entry becomes wasted memory,
* but it's small enough that this should not matter.
*/
- if (elm != (SeqTable) NULL)
- {
- elm->rel = seqrel;
- }
- else
+ if (elm == NULL)
{
/*
* Time to make a new seqtable entry. These entries live as long
@@ -686,40 +676,59 @@ init_sequence(char *caller, RangeVar *relation)
elm = (SeqTable) malloc(sizeof(SeqTableData));
if (elm == NULL)
elog(ERROR, "Memory exhausted in init_sequence");
- elm->rel = seqrel;
elm->relid = relid;
- elm->cached = elm->last = elm->increment = 0;
- elm->next = (SeqTable) NULL;
-
- if (seqtab == (SeqTable) NULL)
- seqtab = elm;
- else
- prev->next = elm;
+ /* increment is set to 0 until we do read_info (see currval) */
+ elm->last = elm->cached = elm->increment = 0;
+ elm->next = seqtab;
+ seqtab = elm;
}
- return elm;
+ /* Flag that we have a lock in the current xact. */
+ elm->xid = thisxid;
+
+ *p_elm = elm;
+ *p_rel = seqrel;
}
-/*
- * CloseSequences
- * is called by xact mgr at commit/abort.
- */
-void
-CloseSequences(void)
+/* Given an opened relation, lock the page buffer and find the tuple */
+static Form_pg_sequence
+read_info(const char *caller, SeqTable elm,
+ Relation rel, Buffer *buf)
{
- SeqTable elm;
- Relation rel;
+ PageHeader page;
+ ItemId lp;
+ HeapTupleData tuple;
+ sequence_magic *sm;
+ Form_pg_sequence seq;
- for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
- {
- rel = elm->rel;
- if (rel != (Relation) NULL) /* opened in current xact */
- {
- elm->rel = (Relation) NULL;
- heap_close(rel, AccessShareLock);
- }
- }
+ if (rel->rd_nblocks > 1)
+ elog(ERROR, "%s.%s: invalid number of blocks in sequence",
+ RelationGetRelationName(rel), caller);
+
+ *buf = ReadBuffer(rel, 0);
+ if (!BufferIsValid(*buf))
+ elog(ERROR, "%s.%s: ReadBuffer failed",
+ RelationGetRelationName(rel), caller);
+
+ LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+
+ page = (PageHeader) BufferGetPage(*buf);
+ sm = (sequence_magic *) PageGetSpecialPointer(page);
+
+ if (sm->magic != SEQ_MAGIC)
+ elog(ERROR, "%s.%s: bad magic (%08X)",
+ RelationGetRelationName(rel), caller, sm->magic);
+
+ lp = PageGetItemId(page, FirstOffsetNumber);
+ Assert(ItemIdIsUsed(lp));
+ tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
+
+ seq = (Form_pg_sequence) GETSTRUCT(&tuple);
+
+ elm->increment = seq->increment_by;
+
+ return seq;
}
@@ -761,7 +770,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
if (increment_by == (DefElem *) NULL) /* INCREMENT BY */
new->increment_by = 1;
- else if ((new->increment_by = get_param(increment_by)) == 0)
+ else if ((new->increment_by = defGetInt64(increment_by)) == 0)
elog(ERROR, "DefineSequence: can't INCREMENT by 0");
if (max_value == (DefElem *) NULL) /* MAXVALUE */
@@ -772,7 +781,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->max_value = -1; /* descending seq */
}
else
- new->max_value = get_param(max_value);
+ new->max_value = defGetInt64(max_value);
if (min_value == (DefElem *) NULL) /* MINVALUE */
{
@@ -782,7 +791,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->min_value = SEQ_MINVALUE; /* descending seq */
}
else
- new->min_value = get_param(min_value);
+ new->min_value = defGetInt64(min_value);
if (new->min_value >= new->max_value)
elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")",
@@ -796,7 +805,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->last_value = new->max_value; /* descending seq */
}
else
- new->last_value = get_param(last_value);
+ new->last_value = defGetInt64(last_value);
if (new->last_value < new->min_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")",
@@ -807,33 +816,12 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
if (cache_value == (DefElem *) NULL) /* CACHE */
new->cache_value = 1;
- else if ((new->cache_value = get_param(cache_value)) <= 0)
+ else if ((new->cache_value = defGetInt64(cache_value)) <= 0)
elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0",
new->cache_value);
}
-static int64
-get_param(DefElem *def)
-{
- if (def->arg == (Node *) NULL)
- elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
-
- if (IsA(def->arg, Integer))
- return (int64) intVal(def->arg);
-
- /*
- * Values too large for int4 will be represented as Float constants by
- * the lexer. Accept these if they are valid int8 strings.
- */
- if (IsA(def->arg, Float))
- return DatumGetInt64(DirectFunctionCall1(int8in,
- CStringGetDatum(strVal(def->arg))));
-
- /* Shouldn't get here unless parser messed up */
- elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
- return 0; /* not reached; keep compiler quiet */
-}
void
seq_redo(XLogRecPtr lsn, XLogRecord *record)
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index cf9048c714b..5ebead414d1 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: defrem.h,v 1.37 2002/05/17 18:32:52 petere Exp $
+ * $Id: defrem.h,v 1.38 2002/05/22 21:40:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -54,12 +54,13 @@ extern void DefineDomain(CreateDomainStmt *stmt);
extern void RemoveDomain(List *names, int behavior);
-/* support routines in define.c */
+/* support routines in commands/define.c */
extern void case_translate_language_name(const char *input, char *output);
extern char *defGetString(DefElem *def);
extern double defGetNumeric(DefElem *def);
+extern int64 defGetInt64(DefElem *def);
extern List *defGetQualifiedName(DefElem *def);
extern TypeName *defGetTypeName(DefElem *def);
extern int defGetTypeLength(DefElem *def);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 48c0673cdf9..e91523ac47d 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: sequence.h,v 1.19 2001/11/05 17:46:33 momjian Exp $
+ * $Id: sequence.h,v 1.20 2002/05/22 21:40:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -84,7 +84,6 @@ extern Datum setval(PG_FUNCTION_ARGS);
extern Datum setval_and_iscalled(PG_FUNCTION_ARGS);
extern void DefineSequence(CreateSeqStmt *stmt);
-extern void CloseSequences(void);
extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr);