diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/brin/brin.c | 74 | ||||
-rw-r--r-- | src/backend/access/brin/brin_revmap.c | 140 | ||||
-rw-r--r-- | src/backend/access/brin/brin_xlog.c | 43 | ||||
-rw-r--r-- | src/backend/access/rmgrdesc/brindesc.c | 10 |
4 files changed, 264 insertions, 3 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 86e73b62427..649f3488c20 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -909,6 +909,80 @@ brin_summarize_range(PG_FUNCTION_ARGS) } /* + * SQL-callable interface to mark a range as no longer summarized + */ +Datum +brin_desummarize_range(PG_FUNCTION_ARGS) +{ + Oid indexoid = PG_GETARG_OID(0); + int64 heapBlk64 = PG_GETARG_INT64(1); + BlockNumber heapBlk; + Oid heapoid; + Relation heapRel; + Relation indexRel; + bool done; + + if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0) + { + char *blk = psprintf(INT64_FORMAT, heapBlk64); + + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("block number out of range: %s", blk))); + } + heapBlk = (BlockNumber) heapBlk64; + + /* + * We must lock table before index to avoid deadlocks. However, if the + * passed indexoid isn't an index then IndexGetRelation() will fail. + * Rather than emitting a not-very-helpful error message, postpone + * complaining, expecting that the is-it-an-index test below will fail. + */ + heapoid = IndexGetRelation(indexoid, true); + if (OidIsValid(heapoid)) + heapRel = heap_open(heapoid, ShareUpdateExclusiveLock); + else + heapRel = NULL; + + indexRel = index_open(indexoid, ShareUpdateExclusiveLock); + + /* Must be a BRIN index */ + if (indexRel->rd_rel->relkind != RELKIND_INDEX || + indexRel->rd_rel->relam != BRIN_AM_OID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a BRIN index", + RelationGetRelationName(indexRel)))); + + /* User must own the index (comparable to privileges needed for VACUUM) */ + if (!pg_class_ownercheck(indexoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(indexRel)); + + /* + * Since we did the IndexGetRelation call above without any lock, it's + * barely possible that a race against an index drop/recreation could have + * netted us the wrong table. Recheck. + */ + if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("could not open parent table of index %s", + RelationGetRelationName(indexRel)))); + + /* the revmap does the hard work */ + do { + done = brinRevmapDesummarizeRange(indexRel, heapBlk); + } + while (!done); + + relation_close(indexRel, ShareUpdateExclusiveLock); + relation_close(heapRel, ShareUpdateExclusiveLock); + + PG_RETURN_VOID(); +} + +/* * Build a BrinDesc used to create or scan a BRIN index */ BrinDesc * diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c index 5d45b48fd94..35e53a2bac2 100644 --- a/src/backend/access/brin/brin_revmap.c +++ b/src/backend/access/brin/brin_revmap.c @@ -168,9 +168,12 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange, iptr = (ItemPointerData *) contents->rm_tids; iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk); - ItemPointerSet(iptr, - ItemPointerGetBlockNumber(&tid), - ItemPointerGetOffsetNumber(&tid)); + if (ItemPointerIsValid(&tid)) + ItemPointerSet(iptr, + ItemPointerGetBlockNumber(&tid), + ItemPointerGetOffsetNumber(&tid)); + else + ItemPointerSetInvalid(iptr); } /* @@ -305,6 +308,137 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, } /* + * Delete an index tuple, marking a page range as unsummarized. + * + * Index must be locked in ShareUpdateExclusiveLock mode. + * + * Return FALSE if caller should retry. + */ +bool +brinRevmapDesummarizeRange(Relation idxrel, BlockNumber heapBlk) +{ + BrinRevmap *revmap; + BlockNumber pagesPerRange; + RevmapContents *contents; + ItemPointerData *iptr; + ItemPointerData invalidIptr; + BlockNumber revmapBlk; + Buffer revmapBuf; + Buffer regBuf; + Page revmapPg; + Page regPg; + OffsetNumber revmapOffset; + OffsetNumber regOffset; + ItemId lp; + BrinTuple *tup; + + revmap = brinRevmapInitialize(idxrel, &pagesPerRange, NULL); + + revmapBlk = revmap_get_blkno(revmap, heapBlk); + if (!BlockNumberIsValid(revmapBlk)) + { + /* revmap page doesn't exist: range not summarized, we're done */ + brinRevmapTerminate(revmap); + return true; + } + + /* Lock the revmap page, obtain the index tuple pointer from it */ + revmapBuf = brinLockRevmapPageForUpdate(revmap, heapBlk); + revmapPg = BufferGetPage(revmapBuf); + revmapOffset = HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk); + + contents = (RevmapContents *) PageGetContents(revmapPg); + iptr = contents->rm_tids; + iptr += revmapOffset; + + if (!ItemPointerIsValid(iptr)) + { + /* no index tuple: range not summarized, we're done */ + LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK); + brinRevmapTerminate(revmap); + return true; + } + + regBuf = ReadBuffer(idxrel, ItemPointerGetBlockNumber(iptr)); + LockBuffer(regBuf, BUFFER_LOCK_EXCLUSIVE); + regPg = BufferGetPage(regBuf); + + /* if this is no longer a regular page, tell caller to start over */ + if (!BRIN_IS_REGULAR_PAGE(regPg)) + { + LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK); + LockBuffer(regBuf, BUFFER_LOCK_UNLOCK); + brinRevmapTerminate(revmap); + return false; + } + + regOffset = ItemPointerGetOffsetNumber(iptr); + if (regOffset > PageGetMaxOffsetNumber(regPg)) + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("corrupted BRIN index: inconsistent range map"))); + + lp = PageGetItemId(regPg, regOffset); + if (!ItemIdIsUsed(lp)) + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("corrupted BRIN index: inconsistent range map"))); + tup = (BrinTuple *) PageGetItem(regPg, lp); + /* XXX apply sanity checks? Might as well delete a bogus tuple ... */ + + /* + * We're only removing data, not reading it, so there's no need to + * TestForOldSnapshot here. + */ + + /* + * Because of SUE lock, this function shouldn't run concurrently with + * summarization. Placeholder tuples can only exist as leftovers from + * crashed summarization, so if we detect any, we complain but proceed. + */ + if (BrinTupleIsPlaceholder(tup)) + ereport(WARNING, + (errmsg("leftover placeholder tuple detected in BRIN index \"%s\", deleting", + RelationGetRelationName(idxrel)))); + + START_CRIT_SECTION(); + + ItemPointerSetInvalid(&invalidIptr); + brinSetHeapBlockItemptr(revmapBuf, revmap->rm_pagesPerRange, heapBlk, + invalidIptr); + PageIndexTupleDeleteNoCompact(regPg, regOffset); + /* XXX record free space in FSM? */ + + MarkBufferDirty(regBuf); + MarkBufferDirty(revmapBuf); + + if (RelationNeedsWAL(idxrel)) + { + xl_brin_desummarize xlrec; + XLogRecPtr recptr; + + xlrec.heapBlk = heapBlk; + xlrec.regOffset = regOffset; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBrinDesummarize); + XLogRegisterBuffer(0, revmapBuf, 0); + XLogRegisterBuffer(1, regBuf, REGBUF_STANDARD); + recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_DESUMMARIZE); + PageSetLSN(revmapPg, recptr); + PageSetLSN(regPg, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(regBuf); + LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK); + brinRevmapTerminate(revmap); + + return true; +} + +/* * Given a heap block number, find the corresponding physical revmap block * number and return it. If the revmap page hasn't been allocated yet, return * InvalidBlockNumber. diff --git a/src/backend/access/brin/brin_xlog.c b/src/backend/access/brin/brin_xlog.c index f416bacc3f7..8f5b5ceb3f2 100644 --- a/src/backend/access/brin/brin_xlog.c +++ b/src/backend/access/brin/brin_xlog.c @@ -254,6 +254,46 @@ brin_xlog_revmap_extend(XLogReaderState *record) UnlockReleaseBuffer(metabuf); } +static void +brin_xlog_desummarize_page(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_brin_desummarize *xlrec; + Buffer buffer; + XLogRedoAction action; + + xlrec = (xl_brin_desummarize *) XLogRecGetData(record); + + /* Update the revmap */ + action = XLogReadBufferForRedo(record, 0, &buffer); + if (action == BLK_NEEDS_REDO) + { + ItemPointerData iptr; + + ItemPointerSetInvalid(&iptr); + brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk, iptr); + + PageSetLSN(BufferGetPage(buffer), lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* remove the leftover entry from the regular page */ + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + Page regPg = BufferGetPage(buffer); + + PageIndexTupleDeleteNoCompact(regPg, xlrec->regOffset); + + PageSetLSN(regPg, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + void brin_redo(XLogReaderState *record) { @@ -276,6 +316,9 @@ brin_redo(XLogReaderState *record) case XLOG_BRIN_REVMAP_EXTEND: brin_xlog_revmap_extend(record); break; + case XLOG_BRIN_DESUMMARIZE: + brin_xlog_desummarize_page(record); + break; default: elog(PANIC, "brin_redo: unknown op code %u", info); } diff --git a/src/backend/access/rmgrdesc/brindesc.c b/src/backend/access/rmgrdesc/brindesc.c index b58cb5bde91..8eb5275a8b4 100644 --- a/src/backend/access/rmgrdesc/brindesc.c +++ b/src/backend/access/rmgrdesc/brindesc.c @@ -61,6 +61,13 @@ brin_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "targetBlk %u", xlrec->targetBlk); } + else if (info == XLOG_BRIN_DESUMMARIZE) + { + xl_brin_desummarize *xlrec = (xl_brin_desummarize *) rec; + + appendStringInfo(buf, "pagesPerRange %u, heapBlk %u, page offset %u", + xlrec->pagesPerRange, xlrec->heapBlk, xlrec->regOffset); + } } const char * @@ -91,6 +98,9 @@ brin_identify(uint8 info) case XLOG_BRIN_REVMAP_EXTEND: id = "REVMAP_EXTEND"; break; + case XLOG_BRIN_DESUMMARIZE: + id = "DESUMMARIZE"; + break; } return id; |