aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/applyparallelworker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-02-02 08:15:18 +0530
committerAmit Kapila <akapila@postgresql.org>2023-02-02 08:15:18 +0530
commit9f2213a7c575bae43a2d41abc8b60770066ca81c (patch)
tree5456fdc30b648c117a14f863d2c2857307485cd2 /src/backend/replication/logical/applyparallelworker.c
parentfb1a59de0c52609653166aafc6ce8679a9cfe54b (diff)
downloadpostgresql-9f2213a7c575bae43a2d41abc8b60770066ca81c.tar.gz
postgresql-9f2213a7c575bae43a2d41abc8b60770066ca81c.zip
Allow the logical_replication_mode to be used on the subscriber.
Extend the existing developer option 'logical_replication_mode' to help test the parallel apply of large transactions on the subscriber. When set to 'buffered', the leader sends changes to parallel apply workers via a shared memory queue. When set to 'immediate', the leader serializes all changes to files and notifies the parallel apply workers to read and apply them at the end of the transaction. This helps in adding tests to cover the serialization code path in parallel streaming mode. Author: Hou Zhijie Reviewed-by: Peter Smith, Kuroda Hayato, Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/applyparallelworker.c')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c16
1 files changed, 11 insertions, 5 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3579e704fe5..e670ec617a4 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
Assert(!IsTransactionState());
Assert(!winfo->serialize_changes);
+ /*
+ * We don't try to send data to parallel worker for 'immediate' mode. This
+ * is primarily used for testing purposes.
+ */
+ if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
+ return false;
+
/*
* This timeout is a bit arbitrary but testing revealed that it is sufficient
* to send the message unless the parallel apply worker is waiting on some
@@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
startTime = GetCurrentTimestamp();
else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
SHM_SEND_TIMEOUT_MS))
- {
- ereport(LOG,
- (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
- winfo->shared->xid)));
return false;
- }
}
}
@@ -1206,6 +1208,10 @@ void
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked)
{
+ ereport(LOG,
+ (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+ winfo->shared->xid)));
+
/*
* The parallel apply worker could be stuck for some reason (say waiting
* on some lock by other backend), so stop trying to send data directly to