diff options
author | Robert Haas <rhaas@postgresql.org> | 2022-01-18 13:47:26 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2022-01-18 13:47:49 -0500 |
commit | cc333f32336f5146b75190f57ef587dff225f565 (patch) | |
tree | 8bcc3bf1b50cf4cbfe597a519b37e7aa8fcc5bde /src/backend/replication/basebackup_copy.c | |
parent | 3414099c338bf619c00dd82d96f29a9668a3bf07 (diff) | |
download | postgresql-cc333f32336f5146b75190f57ef587dff225f565.tar.gz postgresql-cc333f32336f5146b75190f57ef587dff225f565.zip |
Modify pg_basebackup to use a new COPY subprotocol for base backups.
In the new approach, all files across all tablespaces are sent in a
single COPY OUT operation. The CopyData messages are no longer raw
archive content; rather, each message is prefixed with a type byte
that describes its purpose, e.g. 'n' signifies the start of a new
archive and 'd' signifies archive or manifest data. This protocol
is significantly more extensible than the old approach, since we can
later create more message types, though not without concern for
backward compatibility.
The new protocol sends a few things to the client that the old one
did not. First, it sends the name of each archive explicitly, instead
of letting the client compute it. This is intended to make it easier
to write future patches that might send archives in a format other
that tar (e.g. cpio, pax, tar.gz). Second, it sends explicit progress
messages rather than allowing the client to assume that progress is
defined by the number of bytes received. This will help with future
features where the server compresses the data, or sends it someplace
directly rather than transmitting it to the client.
The old protocol is still supported for compatibility with previous
releases. The new protocol is selected by means of a new
TARGET option to the BASE_BACKUP command. Currently, the
only supported target is 'client'. Support for additional
targets will be added in a later commit.
Patch by me. The patch set of which this is a part has had review
and/or testing from Jeevan Ladhe, Tushar Ahuja, Suraj Kharage,
Dipesh Pandit, and Mark Dilger.
Discussion: http://postgr.es/m/CA+TgmoaYZbz0=Yk797aOJwkGJC-LK3iXn+wzzMx7KdwNpZhS5g@mail.gmail.com
Diffstat (limited to 'src/backend/replication/basebackup_copy.c')
-rw-r--r-- | src/backend/replication/basebackup_copy.c | 277 |
1 files changed, 275 insertions, 2 deletions
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c index abacc35fcd2..f42b368c033 100644 --- a/src/backend/replication/basebackup_copy.c +++ b/src/backend/replication/basebackup_copy.c @@ -1,8 +1,27 @@ /*------------------------------------------------------------------------- * * basebackup_copy.c - * send basebackup archives using one COPY OUT operation per - * tablespace, and an additional COPY OUT for the backup manifest + * send basebackup archives using COPY OUT + * + * We have two different ways of doing this. + * + * 'copytblspc' is an older method still supported for compatibility + * with releases prior to v15. In this method, a separate COPY OUT + * operation is used for each tablespace. The manifest, if it is sent, + * uses an additional COPY OUT operation. + * + * 'copystream' sends a starts a single COPY OUT operation and transmits + * all the archives and the manifest if present during the course of that + * single COPY OUT. Each CopyData message begins with a type byte, + * allowing us to signal the start of a new archive, or the manifest, + * by some means other than ending the COPY stream. This also allows + * this protocol to be extended more easily, since we can include + * arbitrary information in the message stream as long as we're certain + * that the client will know what to do with it. + * + * Regardless of which method is used, we sent a result set with + * information about the tabelspaces to be included in the backup before + * starting COPY OUT. This result has the same format in every method. * * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group * @@ -18,6 +37,52 @@ #include "libpq/pqformat.h" #include "replication/basebackup.h" #include "replication/basebackup_sink.h" +#include "utils/timestamp.h" + +typedef struct bbsink_copystream +{ + /* Common information for all types of sink. */ + bbsink base; + + /* + * Protocol message buffer. We assemble CopyData protocol messages by + * setting the first character of this buffer to 'd' (archive or manifest + * data) and then making base.bbs_buffer point to the second character so + * that the rest of the data gets copied into the message just where we + * want it. + */ + char *msgbuffer; + + /* + * When did we last report progress to the client, and how much progress + * did we report? + */ + TimestampTz last_progress_report_time; + uint64 bytes_done_at_last_time_check; +} bbsink_copystream; + +/* + * We don't want to send progress messages to the client excessively + * frequently. Ideally, we'd like to send a message when the time since the + * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking + * the system time every time we send a tiny bit of data seems too expensive. + * So we only check it after the number of bytes sine the last check reaches + * PROGRESS_REPORT_BYTE_INTERVAL. + */ +#define PROGRESS_REPORT_BYTE_INTERVAL 65536 +#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000 + +static void bbsink_copystream_begin_backup(bbsink *sink); +static void bbsink_copystream_begin_archive(bbsink *sink, + const char *archive_name); +static void bbsink_copystream_archive_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_archive(bbsink *sink); +static void bbsink_copystream_begin_manifest(bbsink *sink); +static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_manifest(bbsink *sink); +static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); +static void bbsink_copystream_cleanup(bbsink *sink); static void bbsink_copytblspc_begin_backup(bbsink *sink); static void bbsink_copytblspc_begin_archive(bbsink *sink, @@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static void SendTablespaceList(List *tablespaces); static void send_int8_string(StringInfoData *buf, int64 intval); +const bbsink_ops bbsink_copystream_ops = { + .begin_backup = bbsink_copystream_begin_backup, + .begin_archive = bbsink_copystream_begin_archive, + .archive_contents = bbsink_copystream_archive_contents, + .end_archive = bbsink_copystream_end_archive, + .begin_manifest = bbsink_copystream_begin_manifest, + .manifest_contents = bbsink_copystream_manifest_contents, + .end_manifest = bbsink_copystream_end_manifest, + .end_backup = bbsink_copystream_end_backup, + .cleanup = bbsink_copystream_cleanup +}; + const bbsink_ops bbsink_copytblspc_ops = { .begin_backup = bbsink_copytblspc_begin_backup, .begin_archive = bbsink_copytblspc_begin_archive, @@ -51,6 +128,202 @@ const bbsink_ops bbsink_copytblspc_ops = { }; /* + * Create a new 'copystream' bbsink. + */ +bbsink * +bbsink_copystream_new(void) +{ + bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream)); + + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops; + + /* Set up for periodic progress reporting. */ + sink->last_progress_report_time = GetCurrentTimestamp(); + sink->bytes_done_at_last_time_check = UINT64CONST(0); + + return &sink->base; +} + +/* + * Send start-of-backup wire protocol messages. + */ +static void +bbsink_copystream_begin_backup(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = sink->bbs_state; + + /* + * Initialize buffer. We ultimately want to send the archive and manifest + * data by means of CopyData messages where the payload portion of each + * message begins with a type byte, so we set up a buffer that begins with + * a the type byte we're going to need, and then arrange things so that + * the data we're given will be written just after that type byte. That + * will allow us to ship the data with a single call to pq_putmessage and + * without needing any extra copying. + */ + mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1); + mysink->base.bbs_buffer = mysink->msgbuffer + 1; + mysink->msgbuffer[0] = 'd'; /* archive or manifest data */ + + /* Tell client the backup start location. */ + SendXlogRecPtrResult(state->startptr, state->starttli); + + /* Send client a list of tablespaces. */ + SendTablespaceList(state->tablespaces); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); + + /* Begin COPY stream. This will be used for all archives + manifest. */ + SendCopyOutResponse(); +} + +/* + * Send a CopyData message announcing the beginning of a new archive. + */ +static void +bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_state *state = sink->bbs_state; + tablespaceinfo *ti; + StringInfoData buf; + + ti = list_nth(state->tablespaces, state->tablespace_num); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'n'); /* New archive */ + pq_sendstring(&buf, archive_name); + pq_sendstring(&buf, ti->path == NULL ? "" : ti->path); + pq_endmessage(&buf); +} + +/* + * Send a CopyData message containing a chunk of archive content. + */ +static void +bbsink_copystream_archive_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + uint64 targetbytes; + + /* Send the archive content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); + + /* Consider whether to send a progress report to the client. */ + targetbytes = mysink->bytes_done_at_last_time_check + + PROGRESS_REPORT_BYTE_INTERVAL; + if (targetbytes <= state->bytes_done) + { + TimestampTz now = GetCurrentTimestamp(); + long ms; + + /* + * OK, we've sent a decent number of bytes, so check the system time + * to see whether we're due to send a progress report. + */ + mysink->bytes_done_at_last_time_check = state->bytes_done; + ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time, + now); + + /* + * Send a progress report if enough time has passed. Also send one if + * the system clock was set backward, so that such occurrences don't + * have the effect of suppressing further progress messages. + */ + if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD) + { + mysink->last_progress_report_time = now; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); + } + } +} + +/* + * We don't need to explicitly signal the end of the archive; the client + * will figure out that we've reached the end when we begin the next one, + * or begin the manifest, or end the COPY stream. However, this seems like + * a good time to force out a progress report. One reason for that is that + * if this is the last archive, and we don't force a progress report now, + * the client will never be told that we sent all the bytes. + */ +static void +bbsink_copystream_end_archive(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + + mysink->bytes_done_at_last_time_check = state->bytes_done; + mysink->last_progress_report_time = GetCurrentTimestamp(); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); +} + +/* + * Send a CopyData message announcing the beginning of the backup manifest. + */ +static void +bbsink_copystream_begin_manifest(bbsink *sink) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'm'); /* Manifest */ + pq_endmessage(&buf); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_copystream_manifest_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + + /* Send the manifest content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); +} + +/* + * We don't need an explicit terminator for the backup manifest. + */ +static void +bbsink_copystream_end_manifest(bbsink *sink) +{ + /* Do nothing. */ +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + SendCopyDone(); + SendXlogRecPtrResult(endptr, endtli); +} + +/* + * Cleanup. + */ +static void +bbsink_copystream_cleanup(bbsink *sink) +{ + /* Nothing to do. */ +} + +/* * Create a new 'copytblspc' bbsink. */ bbsink * |