diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b567b8b59e2..92204bd9cdf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* GUC variable */ +int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to - * disk until we reach under the memory limit. + * disk or send to the output plugin until we reach under the memory limit. + * + * If logical_decoding_mode is set to "immediate", stream or serialize the changes + * immediately. * * XXX At this point we select the transactions until we reach under the memory * limit, but we might also adapt a more elaborate eviction strategy - for example @@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Bail out if logical_decoding_mode is buffered and we haven't exceeded + * the memory limit. + */ + if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED && + rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the - * logical_decoding_work_mem to a smaller value before the most recent + * If logical_decoding_mode is immediate, loop until there's no change. + * Otherwise, loop until we reach under the memory limit. One might think + * that just by evicting the largest (sub)transaction we will come under + * the memory limit based on assumption that the selected transaction is + * at least as large as the most recent change (which caused us to go over + * the memory limit). However, that is not true because a user can reduce + * the logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while (rb->size >= logical_decoding_work_mem * 1024L || + (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE && + rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from |