diff options
Diffstat (limited to 'src/backend/replication/basebackup.c')
-rw-r--r-- | src/backend/replication/basebackup.c | 137 |
1 files changed, 135 insertions, 2 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 2bbe384e351..d68a1533602 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -25,6 +25,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/pg_list.h" +#include "pgtar.h" #include "pgstat.h" #include "replication/basebackup.h" #include "replication/walsender.h" @@ -34,7 +35,8 @@ #include "utils/builtins.h" #include "utils/elog.h" #include "utils/ps_status.h" -#include "pgtar.h" +#include "utils/timestamp.h" + typedef struct { @@ -43,6 +45,7 @@ typedef struct bool fastcheckpoint; bool nowait; bool includewal; + uint32 maxrate; } basebackup_options; @@ -60,6 +63,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); +static void throttle(size_t increment); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -72,6 +76,23 @@ static char *statrelpath = NULL; */ #define TAR_SEND_SIZE 32768 +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* The actual number of bytes, transfer of which may cause sleep. */ +static uint64 throttling_sample; + +/* Amount of data already transfered but not yet throttled. */ +static int64 throttling_counter; + +/* The minimum time required to transfer throttling_sample bytes. */ +static int64 elapsed_min_unit; + +/* The last check of the transfer rate. */ +static int64 throttled_last; + typedef struct { char *oid; @@ -203,6 +224,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send tablespace header */ SendBackupHeader(tablespaces); + /* Setup and activate network throttling, if client requested it */ + if (opt->maxrate > 0) + { + throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample + * bytes to be transfered. + */ + elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentIntegerTimestamp(); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -430,6 +474,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); + if (len == XLogSegSize) break; } @@ -500,6 +546,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_fast = false; bool o_nowait = false; bool o_wal = false; + bool o_maxrate = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -551,6 +598,25 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->includewal = true; o_wal = true; } + else if (strcmp(defel->defname, "max_rate") == 0) + { + long maxrate; + + if (o_maxrate) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + + maxrate = intVal(defel->arg); + if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)", + (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER))); + + opt->maxrate = (uint32) maxrate; + o_maxrate = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1112,6 +1178,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); if (len >= statbuf->st_size) { @@ -1133,10 +1200,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, cnt = Min(sizeof(buf), statbuf->st_size - len); pq_putmessage('d', buf, cnt); len += cnt; + throttle(cnt); } } - /* Pad to 512 byte boundary, per tar format requirements */ + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ pad = ((len + 511) & ~511) - len; if (pad > 0) { @@ -1162,3 +1233,65 @@ _tarWriteHeader(const char *filename, const char *linktarget, pq_putmessage('d', h, 512); } + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(size_t increment) +{ + int64 elapsed, + elapsed_min, + sleep; + int wait_result; + + if (throttling_counter < 0) + return; + + throttling_counter += increment; + if (throttling_counter < throttling_sample) + return; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentIntegerTimestamp() - throttled_last; + /* How much should have elapsed at minimum? */ + elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); + sleep = elapsed_min - elapsed; + /* Only sleep if the transfer is faster than it should be. */ + if (sleep > 0) + { + ResetLatch(&MyWalSnd->latch); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(&MyWalSnd->latch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + (long) (sleep / 1000)); + } + else + { + /* + * The actual transfer rate is below the limit. A negative value would + * distort the adjustment of throttled_last. + */ + wait_result = 0; + sleep = 0; + } + + /* + * Only a whole multiple of throttling_sample was processed. The rest will + * be done during the next call of this function. + */ + throttling_counter %= throttling_sample; + + /* Once the (possible) sleep has ended, new period starts. */ + if (wait_result & WL_TIMEOUT) + throttled_last += elapsed + sleep; + else if (sleep > 0) + /* Sleep was necessary but might have been interrupted. */ + throttled_last = GetCurrentIntegerTimestamp(); +} |