diff options
Diffstat (limited to 'src/backend/commands/sequence.c')
-rw-r--r-- | src/backend/commands/sequence.c | 180 |
1 files changed, 155 insertions, 25 deletions
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index d6a6b1b4300..210657cdf06 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -22,19 +22,12 @@ #define SEQ_MAXVALUE ((int4)0x7FFFFFFF) #define SEQ_MINVALUE -(SEQ_MAXVALUE) -typedef struct FormData_pg_sequence -{ - NameData sequence_name; - int4 last_value; - int4 increment_by; - int4 max_value; - int4 min_value; - int4 cache_value; - char is_cycled; - char is_called; -} FormData_pg_sequence; - -typedef FormData_pg_sequence *Form_pg_sequence; +/* + * We don't want to log each fetching values from sequences, + * so we pre-log a few fetches in advance. In the event of + * crash we can lose as much as we pre-logged. + */ +#define SEQ_LOG_VALS 32 typedef struct sequence_magic { @@ -138,6 +131,11 @@ DefineSequence(CreateSeqStmt *seq) coldef->colname = "cache_value"; value[i - 1] = Int32GetDatum(new.cache_value); break; + case SEQ_COL_LOG: + typnam->name = "int4"; + coldef->colname = "log_cnt"; + value[i - 1] = Int32GetDatum((int32)1); + break; case SEQ_COL_CYCLE: typnam->name = "char"; coldef->colname = "is_cycled"; @@ -196,10 +194,14 @@ nextval(PG_FUNCTION_ARGS) int32 incby, maxv, minv, - cache; + cache, + log, + fetch, + last; int32 result, next, rescnt = 0; + bool logit = false; if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK) elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s", @@ -219,16 +221,27 @@ nextval(PG_FUNCTION_ARGS) seq = read_info("nextval", elm, &buf); /* lock page' buffer and * read tuple */ - next = result = seq->last_value; + last = next = result = seq->last_value; incby = seq->increment_by; maxv = seq->max_value; minv = seq->min_value; - cache = seq->cache_value; + fetch = cache = seq->cache_value; + log = seq->log_cnt; if (seq->is_called != 't') + { rescnt++; /* last_value if not called */ + fetch--; + log--; + } - while (rescnt < cache) /* try to fetch cache numbers */ + if (log < fetch) + { + fetch = log = fetch - log + SEQ_LOG_VALS; + logit = true; + } + + while (fetch) /* try to fetch cache [+ log ] numbers */ { /* @@ -242,7 +255,7 @@ nextval(PG_FUNCTION_ARGS) (maxv < 0 && next + incby > maxv)) { if (rescnt > 0) - break; /* stop caching */ + break; /* stop fetching */ if (seq->is_cycled != 't') elog(ERROR, "%s.nextval: got MAXVALUE (%d)", elm->name, maxv); @@ -258,7 +271,7 @@ nextval(PG_FUNCTION_ARGS) (minv >= 0 && next + incby < minv)) { if (rescnt > 0) - break; /* stop caching */ + break; /* stop fetching */ if (seq->is_cycled != 't') elog(ERROR, "%s.nextval: got MINVALUE (%d)", elm->name, minv); @@ -267,17 +280,43 @@ nextval(PG_FUNCTION_ARGS) else next += incby; } - rescnt++; /* got result */ - if (rescnt == 1) /* if it's first one - */ - result = next; /* it's what to return */ + fetch--; + if (rescnt < cache) + { + log--; + rescnt++; + last = next; + if (rescnt == 1) /* if it's first result - */ + result = next; /* it's what to return */ + } } /* save info in local cache */ elm->last = result; /* last returned number */ - elm->cached = next; /* last cached number */ + elm->cached = last; /* last fetched number */ + + if (logit) + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + + if (fetch) /* not all numbers were fetched */ + log -= fetch; + + xlrec.node = elm->rel->rd_node; + xlrec.value = next; + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, + (char*) &xlrec, sizeof(xlrec), NULL, 0); + + PageSetLSN(BufferGetPage(buf), recptr); + PageSetSUI(BufferGetPage(buf), ThisStartUpID); + } /* save info in sequence relation */ - seq->last_value = next; /* last fetched number */ + seq->last_value = last; /* last fetched number */ + Assert(log >= 0); + seq->log_cnt = log; /* how much is logged */ seq->is_called = 't'; LockBuffer(buf, BUFFER_LOCK_UNLOCK); @@ -349,6 +388,21 @@ do_setval(char *seqname, int32 next, bool iscalled) /* save info in sequence relation */ seq->last_value = next; /* last fetched number */ seq->is_called = iscalled ? 't' : 'f'; + seq->log_cnt = (iscalled) ? 0 : 1; + + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + + xlrec.node = elm->rel->rd_node; + xlrec.value = next; + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN, + (char*) &xlrec, sizeof(xlrec), NULL, 0); + + PageSetLSN(BufferGetPage(buf), recptr); + PageSetSUI(BufferGetPage(buf), ThisStartUpID); + } LockBuffer(buf, BUFFER_LOCK_UNLOCK); @@ -638,7 +692,6 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) } - static int get_param(DefElem *def) { @@ -651,3 +704,80 @@ get_param(DefElem *def) elog(ERROR, "DefineSequence: \"%s\" is to be integer", def->defname); return -1; } + +void seq_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + Relation reln; + Buffer buffer; + Page page; + ItemId lp; + HeapTupleData tuple; + Form_pg_sequence seq; + xl_seq_rec *xlrec; + + if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET) + elog(STOP, "seq_redo: unknown op code %u", info); + + xlrec = (xl_seq_rec*) XLogRecGetData(record); + + reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node); + if (!RelationIsValid(reln)) + return; + + buffer = XLogReadBuffer(false, reln, 0); + if (!BufferIsValid(buffer)) + elog(STOP, "seq_redo: can't read block of %u/%u", + xlrec->node.tblNode, xlrec->node.relNode); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page) || + ((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC) + elog(STOP, "seq_redo: uninitialized page of %u/%u", + xlrec->node.tblNode, xlrec->node.relNode); + + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockAndReleaseBuffer(buffer); + return; + } + + lp = PageGetItemId(page, FirstOffsetNumber); + Assert(ItemIdIsUsed(lp)); + tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); + + seq = (Form_pg_sequence) GETSTRUCT(&tuple); + + seq->last_value = xlrec->value; /* last logged value */ + seq->is_called = 't'; + seq->log_cnt = 0; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + + return; +} + +void seq_undo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void seq_desc(char *buf, uint8 xl_info, char* rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + xl_seq_rec *xlrec = (xl_seq_rec*) rec; + + if (info == XLOG_SEQ_LOG) + strcat(buf, "log: "); + else if (info == XLOG_SEQ_SET) + strcat(buf, "set: "); + else + { + strcat(buf, "UNKNOWN"); + return; + } + + sprintf(buf + strlen(buf), "node %u/%u; value %d", + xlrec->node.tblNode, xlrec->node.relNode, xlrec->value); +} |