aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/pg_basebackup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c104
1 files changed, 52 insertions, 52 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8f3dd04fd22..1966ada69c9 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -26,8 +26,8 @@
#endif
#include "access/xlog_internal.h"
+#include "astreamer.h"
#include "backup/basebackup.h"
-#include "bbstreamer.h"
#include "common/compression.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
@@ -57,8 +57,8 @@ typedef struct ArchiveStreamState
{
int tablespacenum;
pg_compress_specification *compress;
- bbstreamer *streamer;
- bbstreamer *manifest_inject_streamer;
+ astreamer *streamer;
+ astreamer *manifest_inject_streamer;
PQExpBuffer manifest_buffer;
char manifest_filename[MAXPGPATH];
FILE *manifest_file;
@@ -67,7 +67,7 @@ typedef struct ArchiveStreamState
typedef struct WriteTarState
{
int tablespacenum;
- bbstreamer *streamer;
+ astreamer *streamer;
} WriteTarState;
typedef struct WriteManifestState
@@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
static void progress_update_filename(const char *filename);
static void progress_report(int tablespacenum, bool force, bool finished);
-static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
- bbstreamer **manifest_inject_streamer_p,
- bool is_recovery_guc_supported,
- bool expect_unterminated_tarfile,
- pg_compress_specification *compress);
+static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
+ astreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported,
+ bool expect_unterminated_tarfile,
+ pg_compress_specification *compress);
static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
void *callback_data);
static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
@@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
* the options selected by the user. We may just write the results directly
* to a file, or we might compress first, or we might extract the tar file
* and write each member separately. This function doesn't do any of that
- * directly, but it works out what kind of bbstreamer we need to create so
+ * directly, but it works out what kind of astreamer we need to create so
* that the right stuff happens when, down the road, we actually receive
* the data.
*/
-static bbstreamer *
+static astreamer *
CreateBackupStreamer(char *archive_name, char *spclocation,
- bbstreamer **manifest_inject_streamer_p,
+ astreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile,
pg_compress_specification *compress)
{
- bbstreamer *streamer = NULL;
- bbstreamer *manifest_inject_streamer = NULL;
+ astreamer *streamer = NULL;
+ astreamer *manifest_inject_streamer = NULL;
bool inject_manifest;
bool is_tar,
is_tar_gz,
@@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
directory = psprintf("%s/%s", basedir, spclocation);
else
directory = get_tablespace_mapping(spclocation);
- streamer = bbstreamer_extractor_new(directory,
- get_tablespace_mapping,
- progress_update_filename);
+ streamer = astreamer_extractor_new(directory,
+ get_tablespace_mapping,
+ progress_update_filename);
}
else
{
@@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
}
if (compress->algorithm == PG_COMPRESSION_NONE)
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
else if (compress->algorithm == PG_COMPRESSION_GZIP)
{
strlcat(archive_filename, ".gz", sizeof(archive_filename));
- streamer = bbstreamer_gzip_writer_new(archive_filename,
- archive_file, compress);
+ streamer = astreamer_gzip_writer_new(archive_filename,
+ archive_file, compress);
}
else if (compress->algorithm == PG_COMPRESSION_LZ4)
{
strlcat(archive_filename, ".lz4", sizeof(archive_filename));
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
- streamer = bbstreamer_lz4_compressor_new(streamer, compress);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
+ streamer = astreamer_lz4_compressor_new(streamer, compress);
}
else if (compress->algorithm == PG_COMPRESSION_ZSTD)
{
strlcat(archive_filename, ".zst", sizeof(archive_filename));
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
- streamer = bbstreamer_zstd_compressor_new(streamer, compress);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
+ streamer = astreamer_zstd_compressor_new(streamer, compress);
}
else
{
@@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* into it.
*/
if (must_parse_archive)
- streamer = bbstreamer_tar_archiver_new(streamer);
+ streamer = astreamer_tar_archiver_new(streamer);
progress_update_filename(archive_filename);
}
@@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (spclocation == NULL && writerecoveryconf)
{
Assert(must_parse_archive);
- streamer = bbstreamer_recovery_injector_new(streamer,
- is_recovery_guc_supported,
- recoveryconfcontents);
+ streamer = astreamer_recovery_injector_new(streamer,
+ is_recovery_guc_supported,
+ recoveryconfcontents);
}
/*
@@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* we're talking to such a server we'll need to add the terminator here.
*/
if (must_parse_archive)
- streamer = bbstreamer_tar_parser_new(streamer);
+ streamer = astreamer_tar_parser_new(streamer);
else if (expect_unterminated_tarfile)
- streamer = bbstreamer_tar_terminator_new(streamer);
+ streamer = astreamer_tar_terminator_new(streamer);
/*
* If the user has requested a server compressed archive along with
@@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (format == 'p')
{
if (is_tar_gz)
- streamer = bbstreamer_gzip_decompressor_new(streamer);
+ streamer = astreamer_gzip_decompressor_new(streamer);
else if (is_tar_lz4)
- streamer = bbstreamer_lz4_decompressor_new(streamer);
+ streamer = astreamer_lz4_decompressor_new(streamer);
else if (is_tar_zstd)
- streamer = bbstreamer_zstd_decompressor_new(streamer);
+ streamer = astreamer_zstd_decompressor_new(streamer);
}
/* Return the results. */
@@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
if (state.manifest_inject_streamer != NULL &&
state.manifest_buffer != NULL)
{
- bbstreamer_inject_file(state.manifest_inject_streamer,
- "backup_manifest",
- state.manifest_buffer->data,
- state.manifest_buffer->len);
+ astreamer_inject_file(state.manifest_inject_streamer,
+ "backup_manifest",
+ state.manifest_buffer->data,
+ state.manifest_buffer->len);
destroyPQExpBuffer(state.manifest_buffer);
state.manifest_buffer = NULL;
}
@@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
/* If there's still an archive in progress, end processing. */
if (state.streamer != NULL)
{
- bbstreamer_finalize(state.streamer);
- bbstreamer_free(state.streamer);
+ astreamer_finalize(state.streamer);
+ astreamer_free(state.streamer);
state.streamer = NULL;
}
}
@@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
/* End processing of any prior archive. */
if (state->streamer != NULL)
{
- bbstreamer_finalize(state->streamer);
- bbstreamer_free(state->streamer);
+ astreamer_finalize(state->streamer);
+ astreamer_free(state->streamer);
state->streamer = NULL;
}
@@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
else if (state->streamer != NULL)
{
/* Archive data. */
- bbstreamer_content(state->streamer, NULL, copybuf + 1,
- r - 1, BBSTREAMER_UNKNOWN);
+ astreamer_content(state->streamer, NULL, copybuf + 1,
+ r - 1, ASTREAMER_UNKNOWN);
}
else
pg_fatal("unexpected payload data");
@@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
bool tablespacenum, pg_compress_specification *compress)
{
WriteTarState state;
- bbstreamer *manifest_inject_streamer;
+ astreamer *manifest_inject_streamer;
bool is_recovery_guc_supported;
bool expect_unterminated_tarfile;
@@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
pg_fatal("out of memory");
/* Inject it into the output tarfile. */
- bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest",
- buf.data, buf.len);
+ astreamer_inject_file(manifest_inject_streamer, "backup_manifest",
+ buf.data, buf.len);
/* Free memory. */
termPQExpBuffer(&buf);
}
/* Cleanup. */
- bbstreamer_finalize(state.streamer);
- bbstreamer_free(state.streamer);
+ astreamer_finalize(state.streamer);
+ astreamer_free(state.streamer);
progress_report(tablespacenum, true, false);
@@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
{
WriteTarState *state = callback_data;
- bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN);
+ astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN);
totaldone += r;
progress_report(state->tablespacenum, false, false);