aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/pg_basebackup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c410
1 files changed, 376 insertions, 34 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index aa43fc09241..2a58be638a5 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -54,6 +54,16 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct ArchiveStreamState
+{
+ int tablespacenum;
+ bbstreamer *streamer;
+ bbstreamer *manifest_inject_streamer;
+ PQExpBuffer manifest_buffer;
+ char manifest_filename[MAXPGPATH];
+ FILE *manifest_file;
+} ArchiveStreamState;
+
typedef struct WriteTarState
{
int tablespacenum;
@@ -174,6 +184,13 @@ static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
bbstreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile);
+static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
+ void *callback_data);
+static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
+static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor);
+static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor);
+static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor);
+static void ReportCopyDataParseError(size_t r, char *copybuf);
static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
bool tablespacenum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
@@ -1098,6 +1115,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
}
/*
+ * Receive all of the archives the server wants to send - and the backup
+ * manifest if present - as a single COPY stream.
+ */
+static void
+ReceiveArchiveStream(PGconn *conn)
+{
+ ArchiveStreamState state;
+
+ /* Set up initial state. */
+ memset(&state, 0, sizeof(state));
+ state.tablespacenum = -1;
+
+ /* All the real work happens in ReceiveArchiveStreamChunk. */
+ ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state);
+
+ /* If we wrote the backup manifest to a file, close the file. */
+ if (state.manifest_file !=NULL)
+ {
+ fclose(state.manifest_file);
+ state.manifest_file = NULL;
+ }
+
+ /*
+ * If we buffered the backup manifest in order to inject it into the
+ * output tarfile, do that now.
+ */
+ if (state.manifest_inject_streamer != NULL &&
+ state.manifest_buffer != NULL)
+ {
+ bbstreamer_inject_file(state.manifest_inject_streamer,
+ "backup_manifest",
+ state.manifest_buffer->data,
+ state.manifest_buffer->len);
+ destroyPQExpBuffer(state.manifest_buffer);
+ state.manifest_buffer = NULL;
+ }
+
+ /* If there's still an archive in progress, end processing. */
+ if (state.streamer != NULL)
+ {
+ bbstreamer_finalize(state.streamer);
+ bbstreamer_free(state.streamer);
+ state.streamer = NULL;
+ }
+}
+
+/*
+ * Receive one chunk of data sent by the server as part of a single COPY
+ * stream that includes all archives and the manifest.
+ */
+static void
+ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
+{
+ ArchiveStreamState *state = callback_data;
+ size_t cursor = 0;
+
+ /* Each CopyData message begins with a type byte. */
+ switch (GetCopyDataByte(r, copybuf, &cursor))
+ {
+ case 'n':
+ {
+ /* New archive. */
+ char *archive_name;
+ char *spclocation;
+
+ /*
+ * We force a progress report at the end of each tablespace. A
+ * new tablespace starts when the previous one ends, except in
+ * the case of the very first one.
+ */
+ if (++state->tablespacenum > 0)
+ progress_report(state->tablespacenum, true, false);
+
+ /* Sanity check. */
+ if (state->manifest_buffer != NULL ||
+ state->manifest_file !=NULL)
+ {
+ pg_log_error("archives should precede manifest");
+ exit(1);
+ }
+
+ /* Parse the rest of the CopyData message. */
+ archive_name = GetCopyDataString(r, copybuf, &cursor);
+ spclocation = GetCopyDataString(r, copybuf, &cursor);
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * Basic sanity checks on the archive name: it shouldn't be
+ * empty, it shouldn't start with a dot, and it shouldn't
+ * contain a path separator.
+ */
+ if (archive_name[0] == '\0' || archive_name[0] == '.' ||
+ strchr(archive_name, '/') != NULL ||
+ strchr(archive_name, '\\') != NULL)
+ {
+ pg_log_error("invalid archive name: \"%s\"",
+ archive_name);
+ exit(1);
+ }
+
+ /*
+ * An empty spclocation is treated as NULL. We expect this
+ * case to occur for the data directory itself, but not for
+ * any archives that correspond to tablespaces.
+ */
+ if (spclocation[0] == '\0')
+ spclocation = NULL;
+
+ /* End processing of any prior archive. */
+ if (state->streamer != NULL)
+ {
+ bbstreamer_finalize(state->streamer);
+ bbstreamer_free(state->streamer);
+ state->streamer = NULL;
+ }
+
+ /*
+ * Create an appropriate backup streamer. We know that
+ * recovery GUCs are supported, because this protocol can only
+ * be used on v15+.
+ */
+ state->streamer =
+ CreateBackupStreamer(archive_name,
+ spclocation,
+ &state->manifest_inject_streamer,
+ true, false);
+ break;
+ }
+
+ case 'd':
+ {
+ /* Archive or manifest data. */
+ if (state->manifest_buffer != NULL)
+ {
+ /* Manifest data, buffer in memory. */
+ appendPQExpBuffer(state->manifest_buffer, copybuf + 1,
+ r - 1);
+ }
+ else if (state->manifest_file !=NULL)
+ {
+ /* Manifest data, write to disk. */
+ if (fwrite(copybuf + 1, r - 1, 1,
+ state->manifest_file) != 1)
+ {
+ /*
+ * If fwrite() didn't set errno, assume that the
+ * problem is that we're out of disk space.
+ */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ state->manifest_filename);
+ exit(1);
+ }
+ }
+ else if (state->streamer != NULL)
+ {
+ /* Archive data. */
+ bbstreamer_content(state->streamer, NULL, copybuf + 1,
+ r - 1, BBSTREAMER_UNKNOWN);
+ }
+ else
+ {
+ pg_log_error("unexpected payload data");
+ exit(1);
+ }
+ break;
+ }
+
+ case 'p':
+ {
+ /*
+ * Progress report.
+ *
+ * The remainder of the message is expected to be an 8-byte
+ * count of bytes completed.
+ */
+ totaldone = GetCopyDataUInt64(r, copybuf, &cursor);
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * The server shouldn't send progres report messages too
+ * often, so we force an update each time we receive one.
+ */
+ progress_report(state->tablespacenum, true, false);
+ break;
+ }
+
+ case 'm':
+ {
+ /*
+ * Manifest data will be sent next. This message is not
+ * expected to have any further payload data.
+ */
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * If we're supposed inject the manifest into the archive, we
+ * prepare to buffer it in memory; otherwise, we prepare to
+ * write it to a temporary file.
+ */
+ if (state->manifest_inject_streamer != NULL)
+ state->manifest_buffer = createPQExpBuffer();
+ else
+ {
+ snprintf(state->manifest_filename,
+ sizeof(state->manifest_filename),
+ "%s/backup_manifest.tmp", basedir);
+ state->manifest_file =
+ fopen(state->manifest_filename, "wb");
+ if (state->manifest_file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m",
+ state->manifest_filename);
+ exit(1);
+ }
+ }
+ break;
+ }
+
+ default:
+ ReportCopyDataParseError(r, copybuf);
+ break;
+ }
+}
+
+/*
+ * Get a single byte from a CopyData message.
+ *
+ * Bail out if none remain.
+ */
+static char
+GetCopyDataByte(size_t r, char *copybuf, size_t *cursor)
+{
+ if (*cursor >= r)
+ ReportCopyDataParseError(r, copybuf);
+
+ return copybuf[(*cursor)++];
+}
+
+/*
+ * Get a NUL-terminated string from a CopyData message.
+ *
+ * Bail out if the terminating NUL cannot be found.
+ */
+static char *
+GetCopyDataString(size_t r, char *copybuf, size_t *cursor)
+{
+ size_t startpos = *cursor;
+ size_t endpos = startpos;
+
+ while (1)
+ {
+ if (endpos >= r)
+ ReportCopyDataParseError(r, copybuf);
+ if (copybuf[endpos] == '\0')
+ break;
+ ++endpos;
+ }
+
+ *cursor = endpos + 1;
+ return &copybuf[startpos];
+}
+
+/*
+ * Get an unsigned 64-bit integer from a CopyData message.
+ *
+ * Bail out if there are not at least 8 bytes remaining.
+ */
+static uint64
+GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor)
+{
+ uint64 result;
+
+ if (*cursor + sizeof(uint64) > r)
+ ReportCopyDataParseError(r, copybuf);
+ memcpy(&result, &copybuf[*cursor], sizeof(uint64));
+ *cursor += sizeof(uint64);
+ return pg_ntoh64(result);
+}
+
+/*
+ * Bail out if we didn't parse the whole message.
+ */
+static void
+GetCopyDataEnd(size_t r, char *copybuf, size_t cursor)
+{
+ if (r != cursor)
+ ReportCopyDataParseError(r, copybuf);
+}
+
+/*
+ * Report failure to parse a CopyData message from the server. Then exit.
+ *
+ * As a debugging aid, we try to give some hint about what kind of message
+ * provoked the failure. Perhaps this is not detailed enough, but it's not
+ * clear that it's worth expending any more code on what shoud be a
+ * can't-happen case.
+ */
+static void
+ReportCopyDataParseError(size_t r, char *copybuf)
+{
+ if (r == 0)
+ pg_log_error("empty COPY message");
+ else
+ pg_log_error("malformed COPY message of type %d, length %zu",
+ copybuf[0], r);
+ exit(1);
+}
+
+/*
* Receive raw tar data from the server, and stream it to the appropriate
* location. If we're writing a single tarfile to standard output, also
* receive the backup manifest and inject it into that tarfile.
@@ -1376,6 +1704,10 @@ BaseBackup(void)
"MANIFEST_CHECKSUMS", manifest_checksums);
}
+ if (serverMajor >= 1500)
+ AppendStringCommandOption(&buf, use_new_option_syntax,
+ "TARGET", "client");
+
if (verbose)
pg_log_info("initiating base backup, waiting for checkpoint to complete");
@@ -1498,46 +1830,56 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /* Receive a tar file for each tablespace in turn */
- for (i = 0; i < PQntuples(res); i++)
+ if (serverMajor >= 1500)
{
- char archive_name[MAXPGPATH];
- char *spclocation;
-
- /*
- * If we write the data out to a tar file, it will be named base.tar
- * if it's the main data directory or <tablespaceoid>.tar if it's for
- * another tablespace. CreateBackupStreamer() will arrange to add .gz
- * to the archive name if pg_basebackup is performing compression.
- */
- if (PQgetisnull(res, i, 0))
- {
- strlcpy(archive_name, "base.tar", sizeof(archive_name));
- spclocation = NULL;
- }
- else
+ /* Receive a single tar stream with everything. */
+ ReceiveArchiveStream(conn);
+ }
+ else
+ {
+ /* Receive a tar file for each tablespace in turn */
+ for (i = 0; i < PQntuples(res); i++)
{
- snprintf(archive_name, sizeof(archive_name),
- "%s.tar", PQgetvalue(res, i, 0));
- spclocation = PQgetvalue(res, i, 1);
+ char archive_name[MAXPGPATH];
+ char *spclocation;
+
+ /*
+ * If we write the data out to a tar file, it will be named
+ * base.tar if it's the main data directory or <tablespaceoid>.tar
+ * if it's for another tablespace. CreateBackupStreamer() will
+ * arrange to add .gz to the archive name if pg_basebackup is
+ * performing compression.
+ */
+ if (PQgetisnull(res, i, 0))
+ {
+ strlcpy(archive_name, "base.tar", sizeof(archive_name));
+ spclocation = NULL;
+ }
+ else
+ {
+ snprintf(archive_name, sizeof(archive_name),
+ "%s.tar", PQgetvalue(res, i, 0));
+ spclocation = PQgetvalue(res, i, 1);
+ }
+
+ ReceiveTarFile(conn, archive_name, spclocation, i);
}
- ReceiveTarFile(conn, archive_name, spclocation, i);
+ /*
+ * Now receive backup manifest, if appropriate.
+ *
+ * If we're writing a tarfile to stdout, ReceiveTarFile will have
+ * already processed the backup manifest and included it in the output
+ * tarfile. Such a configuration doesn't allow for writing multiple
+ * files.
+ *
+ * If we're talking to an older server, it won't send a backup
+ * manifest, so don't try to receive one.
+ */
+ if (!writing_to_stdout && manifest)
+ ReceiveBackupManifest(conn);
}
- /*
- * Now receive backup manifest, if appropriate.
- *
- * If we're writing a tarfile to stdout, ReceiveTarFile will have already
- * processed the backup manifest and included it in the output tarfile.
- * Such a configuration doesn't allow for writing multiple files.
- *
- * If we're talking to an older server, it won't send a backup manifest,
- * so don't try to receive one.
- */
- if (!writing_to_stdout && manifest)
- ReceiveBackupManifest(conn);
-
if (showprogress)
{
progress_filename = NULL;