diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 104 |
1 files changed, 52 insertions, 52 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8f3dd04fd22..1966ada69c9 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -26,8 +26,8 @@ #endif #include "access/xlog_internal.h" +#include "astreamer.h" #include "backup/basebackup.h" -#include "bbstreamer.h" #include "common/compression.h" #include "common/file_perm.h" #include "common/file_utils.h" @@ -57,8 +57,8 @@ typedef struct ArchiveStreamState { int tablespacenum; pg_compress_specification *compress; - bbstreamer *streamer; - bbstreamer *manifest_inject_streamer; + astreamer *streamer; + astreamer *manifest_inject_streamer; PQExpBuffer manifest_buffer; char manifest_filename[MAXPGPATH]; FILE *manifest_file; @@ -67,7 +67,7 @@ typedef struct ArchiveStreamState typedef struct WriteTarState { int tablespacenum; - bbstreamer *streamer; + astreamer *streamer; } WriteTarState; typedef struct WriteManifestState @@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo static void progress_update_filename(const char *filename); static void progress_report(int tablespacenum, bool force, bool finished); -static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, - bool is_recovery_guc_supported, - bool expect_unterminated_tarfile, - pg_compress_specification *compress); +static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation, + astreamer **manifest_inject_streamer_p, + bool is_recovery_guc_supported, + bool expect_unterminated_tarfile, + pg_compress_specification *compress); static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data); static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); @@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, * the options selected by the user. We may just write the results directly * to a file, or we might compress first, or we might extract the tar file * and write each member separately. This function doesn't do any of that - * directly, but it works out what kind of bbstreamer we need to create so + * directly, but it works out what kind of astreamer we need to create so * that the right stuff happens when, down the road, we actually receive * the data. */ -static bbstreamer * +static astreamer * CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, + astreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, bool expect_unterminated_tarfile, pg_compress_specification *compress) { - bbstreamer *streamer = NULL; - bbstreamer *manifest_inject_streamer = NULL; + astreamer *streamer = NULL; + astreamer *manifest_inject_streamer = NULL; bool inject_manifest; bool is_tar, is_tar_gz, @@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, directory = psprintf("%s/%s", basedir, spclocation); else directory = get_tablespace_mapping(spclocation); - streamer = bbstreamer_extractor_new(directory, - get_tablespace_mapping, - progress_update_filename); + streamer = astreamer_extractor_new(directory, + get_tablespace_mapping, + progress_update_filename); } else { @@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } if (compress->algorithm == PG_COMPRESSION_NONE) - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); else if (compress->algorithm == PG_COMPRESSION_GZIP) { strlcat(archive_filename, ".gz", sizeof(archive_filename)); - streamer = bbstreamer_gzip_writer_new(archive_filename, - archive_file, compress); + streamer = astreamer_gzip_writer_new(archive_filename, + archive_file, compress); } else if (compress->algorithm == PG_COMPRESSION_LZ4) { strlcat(archive_filename, ".lz4", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_lz4_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_lz4_compressor_new(streamer, compress); } else if (compress->algorithm == PG_COMPRESSION_ZSTD) { strlcat(archive_filename, ".zst", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_zstd_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_zstd_compressor_new(streamer, compress); } else { @@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * into it. */ if (must_parse_archive) - streamer = bbstreamer_tar_archiver_new(streamer); + streamer = astreamer_tar_archiver_new(streamer); progress_update_filename(archive_filename); } @@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (spclocation == NULL && writerecoveryconf) { Assert(must_parse_archive); - streamer = bbstreamer_recovery_injector_new(streamer, - is_recovery_guc_supported, - recoveryconfcontents); + streamer = astreamer_recovery_injector_new(streamer, + is_recovery_guc_supported, + recoveryconfcontents); } /* @@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * we're talking to such a server we'll need to add the terminator here. */ if (must_parse_archive) - streamer = bbstreamer_tar_parser_new(streamer); + streamer = astreamer_tar_parser_new(streamer); else if (expect_unterminated_tarfile) - streamer = bbstreamer_tar_terminator_new(streamer); + streamer = astreamer_tar_terminator_new(streamer); /* * If the user has requested a server compressed archive along with @@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (format == 'p') { if (is_tar_gz) - streamer = bbstreamer_gzip_decompressor_new(streamer); + streamer = astreamer_gzip_decompressor_new(streamer); else if (is_tar_lz4) - streamer = bbstreamer_lz4_decompressor_new(streamer); + streamer = astreamer_lz4_decompressor_new(streamer); else if (is_tar_zstd) - streamer = bbstreamer_zstd_decompressor_new(streamer); + streamer = astreamer_zstd_decompressor_new(streamer); } /* Return the results. */ @@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) if (state.manifest_inject_streamer != NULL && state.manifest_buffer != NULL) { - bbstreamer_inject_file(state.manifest_inject_streamer, - "backup_manifest", - state.manifest_buffer->data, - state.manifest_buffer->len); + astreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); destroyPQExpBuffer(state.manifest_buffer); state.manifest_buffer = NULL; } @@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) /* If there's still an archive in progress, end processing. */ if (state.streamer != NULL) { - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); state.streamer = NULL; } } @@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) /* End processing of any prior archive. */ if (state->streamer != NULL) { - bbstreamer_finalize(state->streamer); - bbstreamer_free(state->streamer); + astreamer_finalize(state->streamer); + astreamer_free(state->streamer); state->streamer = NULL; } @@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) else if (state->streamer != NULL) { /* Archive data. */ - bbstreamer_content(state->streamer, NULL, copybuf + 1, - r - 1, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, ASTREAMER_UNKNOWN); } else pg_fatal("unexpected payload data"); @@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum, pg_compress_specification *compress) { WriteTarState state; - bbstreamer *manifest_inject_streamer; + astreamer *manifest_inject_streamer; bool is_recovery_guc_supported; bool expect_unterminated_tarfile; @@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, pg_fatal("out of memory"); /* Inject it into the output tarfile. */ - bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest", - buf.data, buf.len); + astreamer_inject_file(manifest_inject_streamer, "backup_manifest", + buf.data, buf.len); /* Free memory. */ termPQExpBuffer(&buf); } /* Cleanup. */ - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); progress_report(tablespacenum, true, false); @@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data) { WriteTarState *state = callback_data; - bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN); totaldone += r; progress_report(state->tablespacenum, false, false); |