diff options
author | Robert Haas <rhaas@postgresql.org> | 2021-11-05 10:08:30 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2021-11-05 10:08:30 -0400 |
commit | bef47ff85df18bf4a3a9b13bd2a54820e27f3614 (patch) | |
tree | 9b0ff2c1fa76a38a425172a66d9afb2c3550743c /src/backend/replication/basebackup_throttle.c | |
parent | bd807be6935929bdefe74d1258ca08048f0aafa3 (diff) | |
download | postgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.tar.gz postgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.zip |
Introduce 'bbsink' abstraction to modularize base backup code.
The base backup code has accumulated a healthy number of new
features over the years, but it's becoming increasingly difficult
to maintain and further enhance that code because there's no
real separation of concerns. For example, the code that
understands knows the details of how we send data to the client
using the libpq protocol is scattered throughout basebackup.c,
rather than being centralized in one place.
To try to improve this situation, introduce a new 'bbsink' object
which acts as a recipient for archives generated during the base
backup progress and also for the backup manifest. This commit
introduces three types of bbsink: a 'copytblspc' bbsink forwards the
backup to the client using one COPY OUT operation per tablespace and
another for the manifest, a 'progress' bbsink performs command
progress reporting, and a 'throttle' bbsink performs rate-limiting.
The 'progress' and 'throttle' bbsink types also forward the data to a
successor bbsink; at present, the last bbsink in the chain will
always be of type 'copytblspc'. There are plans to add more types
of 'bbsink' in future commits.
This abstraction is a bit leaky in the case of progress reporting,
but this still seems cleaner than what we had before.
Patch by me, reviewed and tested by Andres Freund, Sumanta Mukherjee,
Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger,
and Jeevan Ladhe.
Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
Diffstat (limited to 'src/backend/replication/basebackup_throttle.c')
-rw-r--r-- | src/backend/replication/basebackup_throttle.c | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c new file mode 100644 index 00000000000..f163931f8a3 --- /dev/null +++ b/src/backend/replication/basebackup_throttle.c @@ -0,0 +1,199 @@ +/*------------------------------------------------------------------------- + * + * basebackup_throttle.c + * Basebackup sink implementing throttling. Data is forwarded to the + * next base backup sink in the chain at a rate no greater than the + * configured maximum. + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_throttle.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "replication/basebackup_sink.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +typedef struct bbsink_throttle +{ + /* Common information for all types of sink. */ + bbsink base; + + /* The actual number of bytes, transfer of which may cause sleep. */ + uint64 throttling_sample; + + /* Amount of data already transferred but not yet throttled. */ + int64 throttling_counter; + + /* The minimum time required to transfer throttling_sample bytes. */ + TimeOffset elapsed_min_unit; + + /* The last check of the transfer rate. */ + TimestampTz throttled_last; +} bbsink_throttle; + +static void bbsink_throttle_begin_backup(bbsink *sink); +static void bbsink_throttle_archive_contents(bbsink *sink, size_t len); +static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len); +static void throttle(bbsink_throttle *sink, size_t increment); + +const bbsink_ops bbsink_throttle_ops = { + .begin_backup = bbsink_throttle_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_throttle_archive_contents, + .end_archive = bbsink_forward_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_throttle_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup, + .cleanup = bbsink_forward_cleanup +}; + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* + * Create a new basebackup sink that performs throttling and forwards data + * to a successor sink. + */ +bbsink * +bbsink_throttle_new(bbsink *next, uint32 maxrate) +{ + bbsink_throttle *sink; + + Assert(next != NULL); + Assert(maxrate > 0); + + sink = palloc0(sizeof(bbsink_throttle)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; + sink->base.bbs_next = next; + + sink->throttling_sample = + (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample bytes to be + * transferred. + */ + sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + return &sink->base; +} + +/* + * There's no real work to do here, but we need to record the current time so + * that it can be used for future calculations. + */ +static void +bbsink_throttle_begin_backup(bbsink *sink) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + bbsink_forward_begin_backup(sink); + + /* The 'real data' starts now (header was ignored). */ + mysink->throttled_last = GetCurrentTimestamp(); +} + +/* + * First throttle, and then pass archive contents to next sink. + */ +static void +bbsink_throttle_archive_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_archive_contents(sink, len); +} + +/* + * First throttle, and then pass manifest contents to next sink. + */ +static void +bbsink_throttle_manifest_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_manifest_contents(sink->bbs_next, len); +} + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(bbsink_throttle *sink, size_t increment) +{ + TimeOffset elapsed_min; + + Assert(sink->throttling_counter >= 0); + + sink->throttling_counter += increment; + if (sink->throttling_counter < sink->throttling_sample) + return; + + /* How much time should have elapsed at minimum? */ + elapsed_min = sink->elapsed_min_unit * + (sink->throttling_counter / sink->throttling_sample); + + /* + * Since the latch could be set repeatedly because of concurrently WAL + * activity, sleep in a loop to ensure enough time has passed. + */ + for (;;) + { + TimeOffset elapsed, + sleep; + int wait_result; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentTimestamp() - sink->throttled_last; + + /* sleep if the transfer is faster than it should be */ + sleep = elapsed_min - elapsed; + if (sleep <= 0) + break; + + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + (long) (sleep / 1000), + WAIT_EVENT_BASE_BACKUP_THROTTLE); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + + /* Done waiting? */ + if (wait_result & WL_TIMEOUT) + break; + } + + /* + * As we work with integers, only whole multiple of throttling_sample was + * processed. The rest will be done during the next call of this function. + */ + sink->throttling_counter %= sink->throttling_sample; + + /* + * Time interval for the remaining amount and possible next increments + * starts now. + */ + sink->throttled_last = GetCurrentTimestamp(); +} |