aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/basebackup.c')
-rw-r--r--src/backend/replication/basebackup.c137
1 files changed, 135 insertions, 2 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 2bbe384e351..d68a1533602 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -25,6 +25,7 @@
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
+#include "pgtar.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/walsender.h"
@@ -34,7 +35,8 @@
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/ps_status.h"
-#include "pgtar.h"
+#include "utils/timestamp.h"
+
typedef struct
{
@@ -43,6 +45,7 @@ typedef struct
bool fastcheckpoint;
bool nowait;
bool includewal;
+ uint32 maxrate;
} basebackup_options;
@@ -60,6 +63,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
@@ -72,6 +76,23 @@ static char *statrelpath = NULL;
*/
#define TAR_SEND_SIZE 32768
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/* The actual number of bytes, transfer of which may cause sleep. */
+static uint64 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled. */
+static int64 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
typedef struct
{
char *oid;
@@ -203,6 +224,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Send tablespace header */
SendBackupHeader(tablespaces);
+ /* Setup and activate network throttling, if client requested it */
+ if (opt->maxrate > 0)
+ {
+ throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample
+ * bytes to be transfered.
+ */
+ elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ /* Enable throttling. */
+ throttling_counter = 0;
+
+ /* The 'real data' starts now (header was ignored). */
+ throttled_last = GetCurrentIntegerTimestamp();
+ }
+ else
+ {
+ /* Disable throttling. */
+ throttling_counter = -1;
+ }
+
/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
{
@@ -430,6 +474,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
+
if (len == XLogSegSize)
break;
}
@@ -500,6 +546,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;
+ bool o_maxrate = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -551,6 +598,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
opt->includewal = true;
o_wal = true;
}
+ else if (strcmp(defel->defname, "max_rate") == 0)
+ {
+ long maxrate;
+
+ if (o_maxrate)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ maxrate = intVal(defel->arg);
+ if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)",
+ (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER)));
+
+ opt->maxrate = (uint32) maxrate;
+ o_maxrate = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -1112,6 +1178,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
if (len >= statbuf->st_size)
{
@@ -1133,10 +1200,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
cnt = Min(sizeof(buf), statbuf->st_size - len);
pq_putmessage('d', buf, cnt);
len += cnt;
+ throttle(cnt);
}
}
- /* Pad to 512 byte boundary, per tar format requirements */
+ /*
+ * Pad to 512 byte boundary, per tar format requirements. (This small
+ * piece of data is probably not worth throttling.)
+ */
pad = ((len + 511) & ~511) - len;
if (pad > 0)
{
@@ -1162,3 +1233,65 @@ _tarWriteHeader(const char *filename, const char *linktarget,
pq_putmessage('d', h, 512);
}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(size_t increment)
+{
+ int64 elapsed,
+ elapsed_min,
+ sleep;
+ int wait_result;
+
+ if (throttling_counter < 0)
+ return;
+
+ throttling_counter += increment;
+ if (throttling_counter < throttling_sample)
+ return;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+ /* How much should have elapsed at minimum? */
+ elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+ sleep = elapsed_min - elapsed;
+ /* Only sleep if the transfer is faster than it should be. */
+ if (sleep > 0)
+ {
+ ResetLatch(&MyWalSnd->latch);
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(&MyWalSnd->latch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ (long) (sleep / 1000));
+ }
+ else
+ {
+ /*
+ * The actual transfer rate is below the limit. A negative value would
+ * distort the adjustment of throttled_last.
+ */
+ wait_result = 0;
+ sleep = 0;
+ }
+
+ /*
+ * Only a whole multiple of throttling_sample was processed. The rest will
+ * be done during the next call of this function.
+ */
+ throttling_counter %= throttling_sample;
+
+ /* Once the (possible) sleep has ended, new period starts. */
+ if (wait_result & WL_TIMEOUT)
+ throttled_last += elapsed + sleep;
+ else if (sleep > 0)
+ /* Sleep was necessary but might have been interrupted. */
+ throttled_last = GetCurrentIntegerTimestamp();
+}