aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/sequence.c
diff options
context:
space:
mode:
authorVadim B. Mikheev <vadim4o@yahoo.com>2000-11-30 01:47:33 +0000
committerVadim B. Mikheev <vadim4o@yahoo.com>2000-11-30 01:47:33 +0000
commit741510521caea7e1ca12b4db0701bbc2db346a5f (patch)
treed26b28fc9215dd82b038f9c3d51925a6e7e65e1f /src/backend/commands/sequence.c
parent680b7357ce850c28d06997be793aee18f72434ba (diff)
downloadpostgresql-741510521caea7e1ca12b4db0701bbc2db346a5f.tar.gz
postgresql-741510521caea7e1ca12b4db0701bbc2db346a5f.zip
XLOG stuff for sequences.
CommitDelay in guc.c
Diffstat (limited to 'src/backend/commands/sequence.c')
-rw-r--r--src/backend/commands/sequence.c180
1 files changed, 155 insertions, 25 deletions
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index d6a6b1b4300..210657cdf06 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -22,19 +22,12 @@
#define SEQ_MAXVALUE ((int4)0x7FFFFFFF)
#define SEQ_MINVALUE -(SEQ_MAXVALUE)
-typedef struct FormData_pg_sequence
-{
- NameData sequence_name;
- int4 last_value;
- int4 increment_by;
- int4 max_value;
- int4 min_value;
- int4 cache_value;
- char is_cycled;
- char is_called;
-} FormData_pg_sequence;
-
-typedef FormData_pg_sequence *Form_pg_sequence;
+/*
+ * We don't want to log each fetching values from sequences,
+ * so we pre-log a few fetches in advance. In the event of
+ * crash we can lose as much as we pre-logged.
+ */
+#define SEQ_LOG_VALS 32
typedef struct sequence_magic
{
@@ -138,6 +131,11 @@ DefineSequence(CreateSeqStmt *seq)
coldef->colname = "cache_value";
value[i - 1] = Int32GetDatum(new.cache_value);
break;
+ case SEQ_COL_LOG:
+ typnam->name = "int4";
+ coldef->colname = "log_cnt";
+ value[i - 1] = Int32GetDatum((int32)1);
+ break;
case SEQ_COL_CYCLE:
typnam->name = "char";
coldef->colname = "is_cycled";
@@ -196,10 +194,14 @@ nextval(PG_FUNCTION_ARGS)
int32 incby,
maxv,
minv,
- cache;
+ cache,
+ log,
+ fetch,
+ last;
int32 result,
next,
rescnt = 0;
+ bool logit = false;
if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
@@ -219,16 +221,27 @@ nextval(PG_FUNCTION_ARGS)
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
- next = result = seq->last_value;
+ last = next = result = seq->last_value;
incby = seq->increment_by;
maxv = seq->max_value;
minv = seq->min_value;
- cache = seq->cache_value;
+ fetch = cache = seq->cache_value;
+ log = seq->log_cnt;
if (seq->is_called != 't')
+ {
rescnt++; /* last_value if not called */
+ fetch--;
+ log--;
+ }
- while (rescnt < cache) /* try to fetch cache numbers */
+ if (log < fetch)
+ {
+ fetch = log = fetch - log + SEQ_LOG_VALS;
+ logit = true;
+ }
+
+ while (fetch) /* try to fetch cache [+ log ] numbers */
{
/*
@@ -242,7 +255,7 @@ nextval(PG_FUNCTION_ARGS)
(maxv < 0 && next + incby > maxv))
{
if (rescnt > 0)
- break; /* stop caching */
+ break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MAXVALUE (%d)",
elm->name, maxv);
@@ -258,7 +271,7 @@ nextval(PG_FUNCTION_ARGS)
(minv >= 0 && next + incby < minv))
{
if (rescnt > 0)
- break; /* stop caching */
+ break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MINVALUE (%d)",
elm->name, minv);
@@ -267,17 +280,43 @@ nextval(PG_FUNCTION_ARGS)
else
next += incby;
}
- rescnt++; /* got result */
- if (rescnt == 1) /* if it's first one - */
- result = next; /* it's what to return */
+ fetch--;
+ if (rescnt < cache)
+ {
+ log--;
+ rescnt++;
+ last = next;
+ if (rescnt == 1) /* if it's first result - */
+ result = next; /* it's what to return */
+ }
}
/* save info in local cache */
elm->last = result; /* last returned number */
- elm->cached = next; /* last cached number */
+ elm->cached = last; /* last fetched number */
+
+ if (logit)
+ {
+ xl_seq_rec xlrec;
+ XLogRecPtr recptr;
+
+ if (fetch) /* not all numbers were fetched */
+ log -= fetch;
+
+ xlrec.node = elm->rel->rd_node;
+ xlrec.value = next;
+
+ recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN,
+ (char*) &xlrec, sizeof(xlrec), NULL, 0);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+ }
/* save info in sequence relation */
- seq->last_value = next; /* last fetched number */
+ seq->last_value = last; /* last fetched number */
+ Assert(log >= 0);
+ seq->log_cnt = log; /* how much is logged */
seq->is_called = 't';
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@@ -349,6 +388,21 @@ do_setval(char *seqname, int32 next, bool iscalled)
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled ? 't' : 'f';
+ seq->log_cnt = (iscalled) ? 0 : 1;
+
+ {
+ xl_seq_rec xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.node = elm->rel->rd_node;
+ xlrec.value = next;
+
+ recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN,
+ (char*) &xlrec, sizeof(xlrec), NULL, 0);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+ }
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@@ -638,7 +692,6 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
}
-
static int
get_param(DefElem *def)
{
@@ -651,3 +704,80 @@ get_param(DefElem *def)
elog(ERROR, "DefineSequence: \"%s\" is to be integer", def->defname);
return -1;
}
+
+void seq_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ Relation reln;
+ Buffer buffer;
+ Page page;
+ ItemId lp;
+ HeapTupleData tuple;
+ Form_pg_sequence seq;
+ xl_seq_rec *xlrec;
+
+ if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET)
+ elog(STOP, "seq_redo: unknown op code %u", info);
+
+ xlrec = (xl_seq_rec*) XLogRecGetData(record);
+
+ reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
+ if (!RelationIsValid(reln))
+ return;
+
+ buffer = XLogReadBuffer(false, reln, 0);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "seq_redo: can't read block of %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page) ||
+ ((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC)
+ elog(STOP, "seq_redo: uninitialized page of %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ lp = PageGetItemId(page, FirstOffsetNumber);
+ Assert(ItemIdIsUsed(lp));
+ tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
+
+ seq = (Form_pg_sequence) GETSTRUCT(&tuple);
+
+ seq->last_value = xlrec->value; /* last logged value */
+ seq->is_called = 't';
+ seq->log_cnt = 0;
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+
+ return;
+}
+
+void seq_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+}
+
+void seq_desc(char *buf, uint8 xl_info, char* rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec = (xl_seq_rec*) rec;
+
+ if (info == XLOG_SEQ_LOG)
+ strcat(buf, "log: ");
+ else if (info == XLOG_SEQ_SET)
+ strcat(buf, "set: ");
+ else
+ {
+ strcat(buf, "UNKNOWN");
+ return;
+ }
+
+ sprintf(buf + strlen(buf), "node %u/%u; value %d",
+ xlrec->node.tblNode, xlrec->node.relNode, xlrec->value);
+}