aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c33
1 files changed, 23 insertions, 10 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b567b8b59e2..92204bd9cdf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
int logical_decoding_work_mem;
static const Size max_changes_in_memory = 4096; /* XXX for restore only */
+/* GUC variable */
+int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED;
+
/* ---------------------------------------
* primary reorderbuffer support routines
* ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
- * disk until we reach under the memory limit.
+ * disk or send to the output plugin until we reach under the memory limit.
+ *
+ * If logical_decoding_mode is set to "immediate", stream or serialize the changes
+ * immediately.
*
* XXX At this point we select the transactions until we reach under the memory
* limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
- /* bail out if we haven't exceeded the memory limit */
- if (rb->size < logical_decoding_work_mem * 1024L)
+ /*
+ * Bail out if logical_decoding_mode is buffered and we haven't exceeded
+ * the memory limit.
+ */
+ if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED &&
+ rb->size < logical_decoding_work_mem * 1024L)
return;
/*
- * Loop until we reach under the memory limit. One might think that just
- * by evicting the largest (sub)transaction we will come under the memory
- * limit based on assumption that the selected transaction is at least as
- * large as the most recent change (which caused us to go over the memory
- * limit). However, that is not true because a user can reduce the
- * logical_decoding_work_mem to a smaller value before the most recent
+ * If logical_decoding_mode is immediate, loop until there's no change.
+ * Otherwise, loop until we reach under the memory limit. One might think
+ * that just by evicting the largest (sub)transaction we will come under
+ * the memory limit based on assumption that the selected transaction is
+ * at least as large as the most recent change (which caused us to go over
+ * the memory limit). However, that is not true because a user can reduce
+ * the logical_decoding_work_mem to a smaller value before the most recent
* change.
*/
- while (rb->size >= logical_decoding_work_mem * 1024L)
+ while (rb->size >= logical_decoding_work_mem * 1024L ||
+ (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE &&
+ rb->size > 0))
{
/*
* Pick the largest transaction (or subtransaction) and evict it from