aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginfast.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginfast.c')
-rw-r--r--src/backend/access/gin/ginfast.c126
1 files changed, 69 insertions, 57 deletions
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
index 2ddf5680f6f..5cf737f6213 100644
--- a/src/backend/access/gin/ginfast.c
+++ b/src/backend/access/gin/ginfast.c
@@ -27,7 +27,9 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/acl.h"
+#include "postmaster/autovacuum.h"
#include "storage/indexfsm.h"
+#include "storage/lmgr.h"
/* GUC parameter */
int gin_pending_list_limit = 0;
@@ -437,7 +439,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
END_CRIT_SECTION();
if (needCleanup)
- ginInsertCleanup(ginstate, true, NULL);
+ ginInsertCleanup(ginstate, false, true, NULL);
}
/*
@@ -502,11 +504,8 @@ ginHeapTupleFastCollect(GinState *ginstate,
* If newHead == InvalidBlockNumber then function drops the whole list.
*
* metapage is pinned and exclusive-locked throughout this function.
- *
- * Returns true if another cleanup process is running concurrently
- * (if so, we can just abandon our own efforts)
*/
-static bool
+static void
shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
bool fill_fsm, IndexBulkDeleteResult *stats)
{
@@ -537,14 +536,7 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
data.ndeleted++;
- if (GinPageIsDeleted(page))
- {
- /* concurrent cleanup process is detected */
- for (i = 0; i < data.ndeleted; i++)
- UnlockReleaseBuffer(buffers[i]);
-
- return true;
- }
+ Assert(!GinPageIsDeleted(page));
nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
blknoToDelete = GinPageGetOpaque(page)->rightlink;
@@ -620,8 +612,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
RecordFreeIndexPage(index, freespace[i]);
} while (blknoToDelete != newHead);
-
- return false;
}
/* Initialize empty KeyArray */
@@ -722,18 +712,10 @@ processPendingPage(BuildAccumulator *accum, KeyArray *ka,
/*
* Move tuples from pending pages into regular GIN structure.
*
- * This can be called concurrently by multiple backends, so it must cope.
- * On first glance it looks completely not concurrent-safe and not crash-safe
- * either. The reason it's okay is that multiple insertion of the same entry
- * is detected and treated as a no-op by gininsert.c. If we crash after
- * posting entries to the main index and before removing them from the
+ * On first glance it looks completely not crash-safe. But if we crash
+ * after posting entries to the main index and before removing them from the
* pending list, it's okay because when we redo the posting later on, nothing
- * bad will happen. Likewise, if two backends simultaneously try to post
- * a pending entry into the main index, one will succeed and one will do
- * nothing. We try to notice when someone else is a little bit ahead of
- * us in the process, but that's just to avoid wasting cycles. Only the
- * action of removing a page from the pending list really needs exclusive
- * lock.
+ * bad will happen.
*
* fill_fsm indicates that ginInsertCleanup should add deleted pages
* to FSM otherwise caller is responsible to put deleted pages into
@@ -742,7 +724,7 @@ processPendingPage(BuildAccumulator *accum, KeyArray *ka,
* If stats isn't null, we count deleted pending pages into the counts.
*/
void
-ginInsertCleanup(GinState *ginstate,
+ginInsertCleanup(GinState *ginstate, bool full_clean,
bool fill_fsm, IndexBulkDeleteResult *stats)
{
Relation index = ginstate->index;
@@ -755,8 +737,43 @@ ginInsertCleanup(GinState *ginstate,
oldCtx;
BuildAccumulator accum;
KeyArray datums;
- BlockNumber blkno;
+ BlockNumber blkno,
+ blknoFinish;
+ bool cleanupFinish = false;
bool fsm_vac = false;
+ Size workMemory;
+ bool inVacuum = (stats == NULL);
+
+ /*
+ * We would like to prevent concurrent cleanup process. For
+ * that we will lock metapage in exclusive mode using LockPage()
+ * call. Nobody other will use that lock for metapage, so
+ * we keep possibility of concurrent insertion into pending list
+ */
+
+ if (inVacuum)
+ {
+ /*
+ * We are called from [auto]vacuum/analyze or
+ * gin_clean_pending_list() and we would like to wait
+ * concurrent cleanup to finish.
+ */
+ LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
+ workMemory =
+ (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
+ autovacuum_work_mem : maintenance_work_mem;
+ }
+ else
+ {
+ /*
+ * We are called from regular insert and if we see
+ * concurrent cleanup just exit in hope that concurrent
+ * process will clean up pending list.
+ */
+ if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
+ return;
+ workMemory = work_mem;
+ }
metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
LockBuffer(metabuffer, GIN_SHARE);
@@ -767,10 +784,17 @@ ginInsertCleanup(GinState *ginstate,
{
/* Nothing to do */
UnlockReleaseBuffer(metabuffer);
+ UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
return;
}
/*
+ * Remember a tail page to prevent infinite cleanup if other backends add
+ * new tuples faster than we can cleanup.
+ */
+ blknoFinish = metadata->tail;
+
+ /*
* Read and lock head of pending list
*/
blkno = metadata->head;
@@ -802,13 +826,15 @@ ginInsertCleanup(GinState *ginstate,
*/
for (;;)
{
- if (GinPageIsDeleted(page))
- {
- /* another cleanup process is running concurrently */
- UnlockReleaseBuffer(buffer);
- fsm_vac = false;
- break;
- }
+ Assert(!GinPageIsDeleted(page));
+
+ /*
+ * Are we walk through the page which as we remember was a tail when we
+ * start our cleanup? But if caller asks us to clean up whole pending
+ * list then ignore old tail, we will work until list becomes empty.
+ */
+ if (blkno == blknoFinish && full_clean == false)
+ cleanupFinish = true;
/*
* read page's datums into accum
@@ -821,13 +847,10 @@ ginInsertCleanup(GinState *ginstate,
* Is it time to flush memory to disk? Flush if we are at the end of
* the pending list, or if we have a full row and memory is getting
* full.
- *
- * XXX using up maintenance_work_mem here is probably unreasonably
- * much, since vacuum might already be using that much.
*/
if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
(GinPageHasFullRow(page) &&
- (accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)))
+ (accum.allocatedMemory >= workMemory * 1024L)))
{
ItemPointerData *list;
uint32 nlist;
@@ -864,14 +887,7 @@ ginInsertCleanup(GinState *ginstate,
LockBuffer(metabuffer, GIN_EXCLUSIVE);
LockBuffer(buffer, GIN_SHARE);
- if (GinPageIsDeleted(page))
- {
- /* another cleanup process is running concurrently */
- UnlockReleaseBuffer(buffer);
- LockBuffer(metabuffer, GIN_UNLOCK);
- fsm_vac = false;
- break;
- }
+ Assert(!GinPageIsDeleted(page));
/*
* While we left the page unlocked, more stuff might have gotten
@@ -904,13 +920,7 @@ ginInsertCleanup(GinState *ginstate,
* remove read pages from pending list, at this point all
* content of read pages is in regular structure
*/
- if (shiftList(index, metabuffer, blkno, fill_fsm, stats))
- {
- /* another cleanup process is running concurrently */
- LockBuffer(metabuffer, GIN_UNLOCK);
- fsm_vac = false;
- break;
- }
+ shiftList(index, metabuffer, blkno, fill_fsm, stats);
/* At this point, some pending pages have been freed up */
fsm_vac = true;
@@ -919,9 +929,10 @@ ginInsertCleanup(GinState *ginstate,
LockBuffer(metabuffer, GIN_UNLOCK);
/*
- * if we removed the whole pending list just exit
+ * if we removed the whole pending list or we cleanup tail (which
+ * we remembered on start our cleanup process) then just exit
*/
- if (blkno == InvalidBlockNumber)
+ if (blkno == InvalidBlockNumber || cleanupFinish)
break;
/*
@@ -946,6 +957,7 @@ ginInsertCleanup(GinState *ginstate,
page = BufferGetPage(buffer);
}
+ UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
ReleaseBuffer(metabuffer);
/*
@@ -1004,7 +1016,7 @@ gin_clean_pending_list(PG_FUNCTION_ARGS)
memset(&stats, 0, sizeof(stats));
initGinState(&ginstate, indexRel);
- ginInsertCleanup(&ginstate, true, &stats);
+ ginInsertCleanup(&ginstate, true, true, &stats);
index_close(indexRel, AccessShareLock);