diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 410 |
1 files changed, 376 insertions, 34 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index aa43fc09241..2a58be638a5 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -54,6 +54,16 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct ArchiveStreamState +{ + int tablespacenum; + bbstreamer *streamer; + bbstreamer *manifest_inject_streamer; + PQExpBuffer manifest_buffer; + char manifest_filename[MAXPGPATH]; + FILE *manifest_file; +} ArchiveStreamState; + typedef struct WriteTarState { int tablespacenum; @@ -174,6 +184,13 @@ static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, bbstreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, bool expect_unterminated_tarfile); +static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, + void *callback_data); +static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); +static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor); +static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor); +static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor); +static void ReportCopyDataParseError(size_t r, char *copybuf); static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); @@ -1098,6 +1115,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } /* + * Receive all of the archives the server wants to send - and the backup + * manifest if present - as a single COPY stream. + */ +static void +ReceiveArchiveStream(PGconn *conn) +{ + ArchiveStreamState state; + + /* Set up initial state. */ + memset(&state, 0, sizeof(state)); + state.tablespacenum = -1; + + /* All the real work happens in ReceiveArchiveStreamChunk. */ + ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); + + /* If we wrote the backup manifest to a file, close the file. */ + if (state.manifest_file !=NULL) + { + fclose(state.manifest_file); + state.manifest_file = NULL; + } + + /* + * If we buffered the backup manifest in order to inject it into the + * output tarfile, do that now. + */ + if (state.manifest_inject_streamer != NULL && + state.manifest_buffer != NULL) + { + bbstreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); + destroyPQExpBuffer(state.manifest_buffer); + state.manifest_buffer = NULL; + } + + /* If there's still an archive in progress, end processing. */ + if (state.streamer != NULL) + { + bbstreamer_finalize(state.streamer); + bbstreamer_free(state.streamer); + state.streamer = NULL; + } +} + +/* + * Receive one chunk of data sent by the server as part of a single COPY + * stream that includes all archives and the manifest. + */ +static void +ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) +{ + ArchiveStreamState *state = callback_data; + size_t cursor = 0; + + /* Each CopyData message begins with a type byte. */ + switch (GetCopyDataByte(r, copybuf, &cursor)) + { + case 'n': + { + /* New archive. */ + char *archive_name; + char *spclocation; + + /* + * We force a progress report at the end of each tablespace. A + * new tablespace starts when the previous one ends, except in + * the case of the very first one. + */ + if (++state->tablespacenum > 0) + progress_report(state->tablespacenum, true, false); + + /* Sanity check. */ + if (state->manifest_buffer != NULL || + state->manifest_file !=NULL) + { + pg_log_error("archives should precede manifest"); + exit(1); + } + + /* Parse the rest of the CopyData message. */ + archive_name = GetCopyDataString(r, copybuf, &cursor); + spclocation = GetCopyDataString(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * Basic sanity checks on the archive name: it shouldn't be + * empty, it shouldn't start with a dot, and it shouldn't + * contain a path separator. + */ + if (archive_name[0] == '\0' || archive_name[0] == '.' || + strchr(archive_name, '/') != NULL || + strchr(archive_name, '\\') != NULL) + { + pg_log_error("invalid archive name: \"%s\"", + archive_name); + exit(1); + } + + /* + * An empty spclocation is treated as NULL. We expect this + * case to occur for the data directory itself, but not for + * any archives that correspond to tablespaces. + */ + if (spclocation[0] == '\0') + spclocation = NULL; + + /* End processing of any prior archive. */ + if (state->streamer != NULL) + { + bbstreamer_finalize(state->streamer); + bbstreamer_free(state->streamer); + state->streamer = NULL; + } + + /* + * Create an appropriate backup streamer. We know that + * recovery GUCs are supported, because this protocol can only + * be used on v15+. + */ + state->streamer = + CreateBackupStreamer(archive_name, + spclocation, + &state->manifest_inject_streamer, + true, false); + break; + } + + case 'd': + { + /* Archive or manifest data. */ + if (state->manifest_buffer != NULL) + { + /* Manifest data, buffer in memory. */ + appendPQExpBuffer(state->manifest_buffer, copybuf + 1, + r - 1); + } + else if (state->manifest_file !=NULL) + { + /* Manifest data, write to disk. */ + if (fwrite(copybuf + 1, r - 1, 1, + state->manifest_file) != 1) + { + /* + * If fwrite() didn't set errno, assume that the + * problem is that we're out of disk space. + */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + else if (state->streamer != NULL) + { + /* Archive data. */ + bbstreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, BBSTREAMER_UNKNOWN); + } + else + { + pg_log_error("unexpected payload data"); + exit(1); + } + break; + } + + case 'p': + { + /* + * Progress report. + * + * The remainder of the message is expected to be an 8-byte + * count of bytes completed. + */ + totaldone = GetCopyDataUInt64(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * The server shouldn't send progres report messages too + * often, so we force an update each time we receive one. + */ + progress_report(state->tablespacenum, true, false); + break; + } + + case 'm': + { + /* + * Manifest data will be sent next. This message is not + * expected to have any further payload data. + */ + GetCopyDataEnd(r, copybuf, cursor); + + /* + * If we're supposed inject the manifest into the archive, we + * prepare to buffer it in memory; otherwise, we prepare to + * write it to a temporary file. + */ + if (state->manifest_inject_streamer != NULL) + state->manifest_buffer = createPQExpBuffer(); + else + { + snprintf(state->manifest_filename, + sizeof(state->manifest_filename), + "%s/backup_manifest.tmp", basedir); + state->manifest_file = + fopen(state->manifest_filename, "wb"); + if (state->manifest_file == NULL) + { + pg_log_error("could not create file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + break; + } + + default: + ReportCopyDataParseError(r, copybuf); + break; + } +} + +/* + * Get a single byte from a CopyData message. + * + * Bail out if none remain. + */ +static char +GetCopyDataByte(size_t r, char *copybuf, size_t *cursor) +{ + if (*cursor >= r) + ReportCopyDataParseError(r, copybuf); + + return copybuf[(*cursor)++]; +} + +/* + * Get a NUL-terminated string from a CopyData message. + * + * Bail out if the terminating NUL cannot be found. + */ +static char * +GetCopyDataString(size_t r, char *copybuf, size_t *cursor) +{ + size_t startpos = *cursor; + size_t endpos = startpos; + + while (1) + { + if (endpos >= r) + ReportCopyDataParseError(r, copybuf); + if (copybuf[endpos] == '\0') + break; + ++endpos; + } + + *cursor = endpos + 1; + return ©buf[startpos]; +} + +/* + * Get an unsigned 64-bit integer from a CopyData message. + * + * Bail out if there are not at least 8 bytes remaining. + */ +static uint64 +GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor) +{ + uint64 result; + + if (*cursor + sizeof(uint64) > r) + ReportCopyDataParseError(r, copybuf); + memcpy(&result, ©buf[*cursor], sizeof(uint64)); + *cursor += sizeof(uint64); + return pg_ntoh64(result); +} + +/* + * Bail out if we didn't parse the whole message. + */ +static void +GetCopyDataEnd(size_t r, char *copybuf, size_t cursor) +{ + if (r != cursor) + ReportCopyDataParseError(r, copybuf); +} + +/* + * Report failure to parse a CopyData message from the server. Then exit. + * + * As a debugging aid, we try to give some hint about what kind of message + * provoked the failure. Perhaps this is not detailed enough, but it's not + * clear that it's worth expending any more code on what shoud be a + * can't-happen case. + */ +static void +ReportCopyDataParseError(size_t r, char *copybuf) +{ + if (r == 0) + pg_log_error("empty COPY message"); + else + pg_log_error("malformed COPY message of type %d, length %zu", + copybuf[0], r); + exit(1); +} + +/* * Receive raw tar data from the server, and stream it to the appropriate * location. If we're writing a single tarfile to standard output, also * receive the backup manifest and inject it into that tarfile. @@ -1376,6 +1704,10 @@ BaseBackup(void) "MANIFEST_CHECKSUMS", manifest_checksums); } + if (serverMajor >= 1500) + AppendStringCommandOption(&buf, use_new_option_syntax, + "TARGET", "client"); + if (verbose) pg_log_info("initiating base backup, waiting for checkpoint to complete"); @@ -1498,46 +1830,56 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* Receive a tar file for each tablespace in turn */ - for (i = 0; i < PQntuples(res); i++) + if (serverMajor >= 1500) { - char archive_name[MAXPGPATH]; - char *spclocation; - - /* - * If we write the data out to a tar file, it will be named base.tar - * if it's the main data directory or <tablespaceoid>.tar if it's for - * another tablespace. CreateBackupStreamer() will arrange to add .gz - * to the archive name if pg_basebackup is performing compression. - */ - if (PQgetisnull(res, i, 0)) - { - strlcpy(archive_name, "base.tar", sizeof(archive_name)); - spclocation = NULL; - } - else + /* Receive a single tar stream with everything. */ + ReceiveArchiveStream(conn); + } + else + { + /* Receive a tar file for each tablespace in turn */ + for (i = 0; i < PQntuples(res); i++) { - snprintf(archive_name, sizeof(archive_name), - "%s.tar", PQgetvalue(res, i, 0)); - spclocation = PQgetvalue(res, i, 1); + char archive_name[MAXPGPATH]; + char *spclocation; + + /* + * If we write the data out to a tar file, it will be named + * base.tar if it's the main data directory or <tablespaceoid>.tar + * if it's for another tablespace. CreateBackupStreamer() will + * arrange to add .gz to the archive name if pg_basebackup is + * performing compression. + */ + if (PQgetisnull(res, i, 0)) + { + strlcpy(archive_name, "base.tar", sizeof(archive_name)); + spclocation = NULL; + } + else + { + snprintf(archive_name, sizeof(archive_name), + "%s.tar", PQgetvalue(res, i, 0)); + spclocation = PQgetvalue(res, i, 1); + } + + ReceiveTarFile(conn, archive_name, spclocation, i); } - ReceiveTarFile(conn, archive_name, spclocation, i); + /* + * Now receive backup manifest, if appropriate. + * + * If we're writing a tarfile to stdout, ReceiveTarFile will have + * already processed the backup manifest and included it in the output + * tarfile. Such a configuration doesn't allow for writing multiple + * files. + * + * If we're talking to an older server, it won't send a backup + * manifest, so don't try to receive one. + */ + if (!writing_to_stdout && manifest) + ReceiveBackupManifest(conn); } - /* - * Now receive backup manifest, if appropriate. - * - * If we're writing a tarfile to stdout, ReceiveTarFile will have already - * processed the backup manifest and included it in the output tarfile. - * Such a configuration doesn't allow for writing multiple files. - * - * If we're talking to an older server, it won't send a backup manifest, - * so don't try to receive one. - */ - if (!writing_to_stdout && manifest) - ReceiveBackupManifest(conn); - if (showprogress) { progress_filename = NULL; |