aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup_throttle.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2021-11-05 10:08:30 -0400
committerRobert Haas <rhaas@postgresql.org>2021-11-05 10:08:30 -0400
commitbef47ff85df18bf4a3a9b13bd2a54820e27f3614 (patch)
tree9b0ff2c1fa76a38a425172a66d9afb2c3550743c /src/backend/replication/basebackup_throttle.c
parentbd807be6935929bdefe74d1258ca08048f0aafa3 (diff)
downloadpostgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.tar.gz
postgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.zip
Introduce 'bbsink' abstraction to modularize base backup code.
The base backup code has accumulated a healthy number of new features over the years, but it's becoming increasingly difficult to maintain and further enhance that code because there's no real separation of concerns. For example, the code that understands knows the details of how we send data to the client using the libpq protocol is scattered throughout basebackup.c, rather than being centralized in one place. To try to improve this situation, introduce a new 'bbsink' object which acts as a recipient for archives generated during the base backup progress and also for the backup manifest. This commit introduces three types of bbsink: a 'copytblspc' bbsink forwards the backup to the client using one COPY OUT operation per tablespace and another for the manifest, a 'progress' bbsink performs command progress reporting, and a 'throttle' bbsink performs rate-limiting. The 'progress' and 'throttle' bbsink types also forward the data to a successor bbsink; at present, the last bbsink in the chain will always be of type 'copytblspc'. There are plans to add more types of 'bbsink' in future commits. This abstraction is a bit leaky in the case of progress reporting, but this still seems cleaner than what we had before. Patch by me, reviewed and tested by Andres Freund, Sumanta Mukherjee, Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger, and Jeevan Ladhe. Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
Diffstat (limited to 'src/backend/replication/basebackup_throttle.c')
-rw-r--r--src/backend/replication/basebackup_throttle.c199
1 files changed, 199 insertions, 0 deletions
diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c
new file mode 100644
index 00000000000..f163931f8a3
--- /dev/null
+++ b/src/backend/replication/basebackup_throttle.c
@@ -0,0 +1,199 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_throttle.c
+ * Basebackup sink implementing throttling. Data is forwarded to the
+ * next base backup sink in the chain at a rate no greater than the
+ * configured maximum.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_throttle.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "replication/basebackup_sink.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_throttle
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* The actual number of bytes, transfer of which may cause sleep. */
+ uint64 throttling_sample;
+
+ /* Amount of data already transferred but not yet throttled. */
+ int64 throttling_counter;
+
+ /* The minimum time required to transfer throttling_sample bytes. */
+ TimeOffset elapsed_min_unit;
+
+ /* The last check of the transfer rate. */
+ TimestampTz throttled_last;
+} bbsink_throttle;
+
+static void bbsink_throttle_begin_backup(bbsink *sink);
+static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
+static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
+static void throttle(bbsink_throttle *sink, size_t increment);
+
+const bbsink_ops bbsink_throttle_ops = {
+ .begin_backup = bbsink_throttle_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_throttle_archive_contents,
+ .end_archive = bbsink_forward_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_throttle_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/*
+ * Create a new basebackup sink that performs throttling and forwards data
+ * to a successor sink.
+ */
+bbsink *
+bbsink_throttle_new(bbsink *next, uint32 maxrate)
+{
+ bbsink_throttle *sink;
+
+ Assert(next != NULL);
+ Assert(maxrate > 0);
+
+ sink = palloc0(sizeof(bbsink_throttle));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
+ sink->base.bbs_next = next;
+
+ sink->throttling_sample =
+ (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample bytes to be
+ * transferred.
+ */
+ sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ return &sink->base;
+}
+
+/*
+ * There's no real work to do here, but we need to record the current time so
+ * that it can be used for future calculations.
+ */
+static void
+bbsink_throttle_begin_backup(bbsink *sink)
+{
+ bbsink_throttle *mysink = (bbsink_throttle *) sink;
+
+ bbsink_forward_begin_backup(sink);
+
+ /* The 'real data' starts now (header was ignored). */
+ mysink->throttled_last = GetCurrentTimestamp();
+}
+
+/*
+ * First throttle, and then pass archive contents to next sink.
+ */
+static void
+bbsink_throttle_archive_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_archive_contents(sink, len);
+}
+
+/*
+ * First throttle, and then pass manifest contents to next sink.
+ */
+static void
+bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(bbsink_throttle *sink, size_t increment)
+{
+ TimeOffset elapsed_min;
+
+ Assert(sink->throttling_counter >= 0);
+
+ sink->throttling_counter += increment;
+ if (sink->throttling_counter < sink->throttling_sample)
+ return;
+
+ /* How much time should have elapsed at minimum? */
+ elapsed_min = sink->elapsed_min_unit *
+ (sink->throttling_counter / sink->throttling_sample);
+
+ /*
+ * Since the latch could be set repeatedly because of concurrently WAL
+ * activity, sleep in a loop to ensure enough time has passed.
+ */
+ for (;;)
+ {
+ TimeOffset elapsed,
+ sleep;
+ int wait_result;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentTimestamp() - sink->throttled_last;
+
+ /* sleep if the transfer is faster than it should be */
+ sleep = elapsed_min - elapsed;
+ if (sleep <= 0)
+ break;
+
+ ResetLatch(MyLatch);
+
+ /* We're eating a potentially set latch, so check for interrupts */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ (long) (sleep / 1000),
+ WAIT_EVENT_BASE_BACKUP_THROTTLE);
+
+ if (wait_result & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
+
+ /* Done waiting? */
+ if (wait_result & WL_TIMEOUT)
+ break;
+ }
+
+ /*
+ * As we work with integers, only whole multiple of throttling_sample was
+ * processed. The rest will be done during the next call of this function.
+ */
+ sink->throttling_counter %= sink->throttling_sample;
+
+ /*
+ * Time interval for the remaining amount and possible next increments
+ * starts now.
+ */
+ sink->throttled_last = GetCurrentTimestamp();
+}