aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c191
1 files changed, 23 insertions, 168 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5cf28d4df42..98a827e0b69 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -68,19 +68,9 @@
* memory gets actually freed.
*
* We use a max-heap with transaction size as the key to efficiently find
- * the largest transaction. While the max-heap is empty, we don't update
- * the max-heap when updating the memory counter. Therefore, we can get
- * the largest transaction in O(N) time, where N is the number of
- * transactions including top-level transactions and subtransactions.
- *
- * We build the max-heap just before selecting the largest transactions
- * if the number of transactions being decoded is higher than the threshold,
- * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
- * update the max-heap when updating the memory counter. The intention is
- * to efficiently find the largest transaction in O(1) time instead of
- * incurring the cost of memory counter updates (O(log N)). Once the number
- * of transactions got lower than the threshold, we reset the max-heap
- * (refer to ReorderBufferMaybeResetMaxHeap() for details).
+ * the largest transaction. We update the max-heap whenever the memory
+ * counter is updated; however transactions with size 0 are not stored in
+ * the heap, because they have no changes to evict.
*
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory. At that point we can't use the memory limit directly
@@ -122,23 +112,6 @@
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
-/*
- * Threshold of the total number of top-level and sub transactions that
- * controls whether we use the max-heap for tracking their sizes. Although
- * using the max-heap to select the largest transaction is effective when
- * there are many transactions being decoded, maintaining the max-heap while
- * updating the memory statistics can be costly. Therefore, we use
- * MaxConnections as the threshold so that we use the max-heap only when
- * using subtransactions.
- */
-#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
-
-/*
- * A macro to check if the max-heap is ready to use and needs to be updated
- * accordingly.
- */
-#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
-
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -290,9 +263,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
-static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
-static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
-static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
+static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -390,16 +361,8 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0;
buffer->size = 0;
- /*
- * The binaryheap is indexed for faster manipulations.
- *
- * We allocate the initial heap size greater than
- * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
- * until the threshold is exceeded.
- */
- buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
- ReorderBufferTXNSizeCompare,
- true, NULL);
+ /* txn_heap is ordered by transaction size */
+ buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
buffer->spillTxns = 0;
buffer->spillCount = 0;
@@ -1637,12 +1600,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
-
- /*
- * After cleaning up one transaction, the number of transactions might get
- * lower than the threshold for the max-heap.
- */
- ReorderBufferMaybeResetMaxHeap(rb);
}
/*
@@ -3265,20 +3222,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
if (addition)
{
+ Size oldsize = txn->size;
+
txn->size += sz;
rb->size += sz;
/* Update the total size in the top transaction. */
toptxn->total_size += sz;
- /* Update the max-heap as well if necessary */
- if (ReorderBufferMaxHeapIsReady(rb))
- {
- if ((txn->size - sz) == 0)
- binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
- else
- binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
- }
+ /* Update the max-heap */
+ if (oldsize != 0)
+ pairingheap_remove(rb->txn_heap, &txn->txn_node);
+ pairingheap_add(rb->txn_heap, &txn->txn_node);
}
else
{
@@ -3289,14 +3244,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn->total_size -= sz;
- /* Update the max-heap as well if necessary */
- if (ReorderBufferMaxHeapIsReady(rb))
- {
- if (txn->size == 0)
- binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
- else
- binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
- }
+ /* Update the max-heap */
+ pairingheap_remove(rb->txn_heap, &txn->txn_node);
+ if (txn->size != 0)
+ pairingheap_add(rb->txn_heap, &txn->txn_node);
}
Assert(txn->size <= rb->size);
@@ -3555,10 +3506,10 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
/* Compare two transactions by size */
static int
-ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
- ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
- ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
+ const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a);
+ const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b);
if (ta->size < tb->size)
return -1;
@@ -3568,106 +3519,16 @@ ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
}
/*
- * Build the max-heap. The heap assembly step is deferred until the end, for
- * efficiency.
- */
-static void
-ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
-{
- HASH_SEQ_STATUS hash_seq;
- ReorderBufferTXNByIdEnt *ent;
-
- Assert(binaryheap_empty(rb->txn_heap));
-
- hash_seq_init(&hash_seq, rb->by_txn);
- while ((ent = hash_seq_search(&hash_seq)) != NULL)
- {
- ReorderBufferTXN *txn = ent->txn;
-
- if (txn->size == 0)
- continue;
-
- binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
- }
-
- binaryheap_build(rb->txn_heap);
-}
-
-/*
- * Reset the max-heap if the number of transactions got lower than the
- * threshold.
- */
-static void
-ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
-{
- /*
- * If we add and remove transactions right around the threshold, we could
- * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
- * reset the max-heap.
- */
- if (ReorderBufferMaxHeapIsReady(rb) &&
- binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
- binaryheap_reset(rb->txn_heap);
-}
-
-/*
- * Find the largest transaction (toplevel or subxact) to evict (spill to disk)
- * by doing a linear search or using the max-heap depending on the number of
- * transactions in ReorderBuffer. Refer to the comments atop this file for the
- * algorithm details.
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN(ReorderBuffer *rb)
{
- ReorderBufferTXN *largest = NULL;
-
- if (!ReorderBufferMaxHeapIsReady(rb))
- {
- /*
- * If the number of transactions are small, we scan all transactions
- * being decoded to get the largest transaction. This saves the cost
- * of building a max-heap with a small number of transactions.
- */
- if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
- {
- HASH_SEQ_STATUS hash_seq;
- ReorderBufferTXNByIdEnt *ent;
-
- hash_seq_init(&hash_seq, rb->by_txn);
- while ((ent = hash_seq_search(&hash_seq)) != NULL)
- {
- ReorderBufferTXN *txn = ent->txn;
-
- /* if the current transaction is larger, remember it */
- if ((!largest) || (txn->size > largest->size))
- largest = txn;
- }
-
- Assert(largest);
- }
- else
- {
- /*
- * There are a large number of transactions in ReorderBuffer. We
- * build the max-heap for efficiently selecting the largest
- * transactions.
- */
- ReorderBufferBuildMaxHeap(rb);
-
- /*
- * The max-heap is ready now. We remain the max-heap at least
- * until we free up enough transactions to bring the total memory
- * usage below the limit. The largest transaction is selected
- * below.
- */
- Assert(ReorderBufferMaxHeapIsReady(rb));
- }
- }
+ ReorderBufferTXN *largest;
/* Get the largest transaction from the max-heap */
- if (ReorderBufferMaxHeapIsReady(rb))
- largest = (ReorderBufferTXN *)
- DatumGetPointer(binaryheap_first(rb->txn_heap));
+ largest = pairingheap_container(ReorderBufferTXN, txn_node,
+ pairingheap_first(rb->txn_heap));
Assert(largest);
Assert(largest->size > 0);
@@ -3812,12 +3673,6 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * 1024L);
- /*
- * After evicting some transactions, the number of transactions might get
- * lower than the threshold for the max-heap.
- */
- ReorderBufferMaybeResetMaxHeap(rb);
-
}
/*