From 3c905698114d6c4de4dc607c110c27e0723ae70c Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 5 Aug 2024 09:35:42 -0400 Subject: Rename bbstreamer to astreamer. I (rhaas) intended "bbstreamer" to stand for "base backup streamer," but that implies that this infrastructure can only ever be used by pg_basebackup. In fact, it is a generally useful way of streaming data from a tar or compressed tar file, and it could be extended to work with other archive formats as well if we ever wanted to do that. Hence, rename it to "astreamer" (archive streamer) in preparation for reusing the infrastructure from pg_verifybackup (and perhaps eventually also other utilities, such as pg_combinebackup or pg_waldump). This is purely a renaming commit. Comment adjustments and relocation of the actual code to someplace from which it can be reused are left to future commits. Amul Sul, reviewed by Sravan Kumar and by me. Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com --- src/bin/pg_basebackup/Makefile | 12 +- src/bin/pg_basebackup/astreamer.h | 226 +++++++++++++ src/bin/pg_basebackup/astreamer_file.c | 396 +++++++++++++++++++++++ src/bin/pg_basebackup/astreamer_gzip.c | 364 +++++++++++++++++++++ src/bin/pg_basebackup/astreamer_inject.c | 249 +++++++++++++++ src/bin/pg_basebackup/astreamer_lz4.c | 422 ++++++++++++++++++++++++ src/bin/pg_basebackup/astreamer_tar.c | 514 ++++++++++++++++++++++++++++++ src/bin/pg_basebackup/astreamer_zstd.c | 368 +++++++++++++++++++++ src/bin/pg_basebackup/bbstreamer.h | 226 ------------- src/bin/pg_basebackup/bbstreamer_file.c | 396 ----------------------- src/bin/pg_basebackup/bbstreamer_gzip.c | 364 --------------------- src/bin/pg_basebackup/bbstreamer_inject.c | 249 --------------- src/bin/pg_basebackup/bbstreamer_lz4.c | 422 ------------------------ src/bin/pg_basebackup/bbstreamer_tar.c | 514 ------------------------------ src/bin/pg_basebackup/bbstreamer_zstd.c | 368 --------------------- src/bin/pg_basebackup/meson.build | 12 +- src/bin/pg_basebackup/nls.mk | 12 +- src/bin/pg_basebackup/pg_basebackup.c | 104 +++--- src/tools/pgindent/typedefs.list | 26 +- 19 files changed, 2622 insertions(+), 2622 deletions(-) create mode 100644 src/bin/pg_basebackup/astreamer.h create mode 100644 src/bin/pg_basebackup/astreamer_file.c create mode 100644 src/bin/pg_basebackup/astreamer_gzip.c create mode 100644 src/bin/pg_basebackup/astreamer_inject.c create mode 100644 src/bin/pg_basebackup/astreamer_lz4.c create mode 100644 src/bin/pg_basebackup/astreamer_tar.c create mode 100644 src/bin/pg_basebackup/astreamer_zstd.c delete mode 100644 src/bin/pg_basebackup/bbstreamer.h delete mode 100644 src/bin/pg_basebackup/bbstreamer_file.c delete mode 100644 src/bin/pg_basebackup/bbstreamer_gzip.c delete mode 100644 src/bin/pg_basebackup/bbstreamer_inject.c delete mode 100644 src/bin/pg_basebackup/bbstreamer_lz4.c delete mode 100644 src/bin/pg_basebackup/bbstreamer_tar.c delete mode 100644 src/bin/pg_basebackup/bbstreamer_zstd.c (limited to 'src') diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 26c53e473f5..a71af2d48a7 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -37,12 +37,12 @@ OBJS = \ BBOBJS = \ pg_basebackup.o \ - bbstreamer_file.o \ - bbstreamer_gzip.o \ - bbstreamer_inject.o \ - bbstreamer_lz4.o \ - bbstreamer_tar.o \ - bbstreamer_zstd.o + astreamer_file.o \ + astreamer_gzip.o \ + astreamer_inject.o \ + astreamer_lz4.o \ + astreamer_tar.o \ + astreamer_zstd.o all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical diff --git a/src/bin/pg_basebackup/astreamer.h b/src/bin/pg_basebackup/astreamer.h new file mode 100644 index 00000000000..b5ed138f54e --- /dev/null +++ b/src/bin/pg_basebackup/astreamer.h @@ -0,0 +1,226 @@ +/*------------------------------------------------------------------------- + * + * astreamer.h + * + * Each tar archive returned by the server is passed to one or more + * astreamer objects for further processing. The astreamer may do + * something simple, like write the archive to a file, perhaps after + * compressing it, but it can also do more complicated things, like + * annotating the byte stream to indicate which parts of the data + * correspond to tar headers or trailing padding, vs. which parts are + * payload data. A subsequent astreamer may use this information to + * make further decisions about how to process the data; for example, + * it might choose to modify the archive contents. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer.h + *------------------------------------------------------------------------- + */ + +#ifndef ASTREAMER_H +#define ASTREAMER_H + +#include "common/compression.h" +#include "lib/stringinfo.h" +#include "pqexpbuffer.h" + +struct astreamer; +struct astreamer_ops; +typedef struct astreamer astreamer; +typedef struct astreamer_ops astreamer_ops; + +/* + * Each chunk of archive data passed to a astreamer is classified into one + * of these categories. When data is first received from the remote server, + * each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will + * be of whatever size the remote server chose to send. + * + * If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all + * chunks should be labelled as one of the other types listed here. In + * addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and + * exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if + * that means a zero-length call. There can be any number of + * ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There + * should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the + * last ASTREAMER_MEMBER_TRAILER chunk. + * + * In theory, we could need other classifications here, such as a way of + * indicating an archive header, but the "tar" format doesn't need anything + * else, so for the time being there's no point. + */ +typedef enum +{ + ASTREAMER_UNKNOWN, + ASTREAMER_MEMBER_HEADER, + ASTREAMER_MEMBER_CONTENTS, + ASTREAMER_MEMBER_TRAILER, + ASTREAMER_ARCHIVE_TRAILER, +} astreamer_archive_context; + +/* + * Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER, + * ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also + * pass a pointer to an instance of this struct. The details are expected + * to be present in the archive header and used to fill the struct, after + * which all subsequent calls for the same archive member are expected to + * pass the same details. + */ +typedef struct +{ + char pathname[MAXPGPATH]; + pgoff_t size; + mode_t mode; + uid_t uid; + gid_t gid; + bool is_directory; + bool is_link; + char linktarget[MAXPGPATH]; +} astreamer_member; + +/* + * Generally, each type of astreamer will define its own struct, but the + * first element should be 'astreamer base'. A astreamer that does not + * require any additional private data could use this structure directly. + * + * bbs_ops is a pointer to the astreamer_ops object which contains the + * function pointers appropriate to this type of astreamer. + * + * bbs_next is a pointer to the successor astreamer, for those types of + * astreamer which forward data to a successor. It need not be used and + * should be set to NULL when not relevant. + * + * bbs_buffer is a buffer for accumulating data for temporary storage. Each + * type of astreamer makes its own decisions about whether and how to use + * this buffer. + */ +struct astreamer +{ + const astreamer_ops *bbs_ops; + astreamer *bbs_next; + StringInfoData bbs_buffer; +}; + +/* + * There are three callbacks for a astreamer. The 'content' callback is + * called repeatedly, as described in the astreamer_archive_context comments. + * Then, the 'finalize' callback is called once at the end, to give the + * astreamer a chance to perform cleanup such as closing files. Finally, + * because this code is running in a frontend environment where, as of this + * writing, there are no memory contexts, the 'free' callback is called to + * release memory. These callbacks should always be invoked using the static + * inline functions defined below. + */ +struct astreamer_ops +{ + void (*content) (astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); + void (*finalize) (astreamer *streamer); + void (*free) (astreamer *streamer); +}; + +/* Send some content to a astreamer. */ +static inline void +astreamer_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + Assert(streamer != NULL); + streamer->bbs_ops->content(streamer, member, data, len, context); +} + +/* Finalize a astreamer. */ +static inline void +astreamer_finalize(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->finalize(streamer); +} + +/* Free a astreamer. */ +static inline void +astreamer_free(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->free(streamer); +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outside callers. It adds the amount of data specified by + * 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data' + * accordingly. + */ +static inline void +astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, + int nbytes) +{ + Assert(nbytes <= *len); + + appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); + *len -= nbytes; + *data += nbytes; +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outsider callers. It attempts to add enough data to the + * astreamer's buffer to reach a length of target_bytes and adjusts '*len' + * and '*data' accordingly. It returns true if the target length has been + * reached and false otherwise. + */ +static inline bool +astreamer_buffer_until(astreamer *streamer, const char **data, int *len, + int target_bytes) +{ + int buflen = streamer->bbs_buffer.len; + + if (buflen >= target_bytes) + { + /* Target length already reached; nothing to do. */ + return true; + } + + if (buflen + *len < target_bytes) + { + /* Not enough data to reach target length; buffer all of it. */ + astreamer_buffer_bytes(streamer, data, len, *len); + return false; + } + + /* Buffer just enough to reach the target length. */ + astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); + return true; +} + +/* + * Functions for creating astreamer objects of various types. See the header + * comments for each of these functions for details. + */ +extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file); +extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress); +extern astreamer *astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)); + +extern astreamer *astreamer_gzip_decompressor_new(astreamer *next); +extern astreamer *astreamer_lz4_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_lz4_decompressor_new(astreamer *next); +extern astreamer *astreamer_zstd_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_zstd_decompressor_new(astreamer *next); +extern astreamer *astreamer_tar_parser_new(astreamer *next); +extern astreamer *astreamer_tar_terminator_new(astreamer *next); +extern astreamer *astreamer_tar_archiver_new(astreamer *next); + +extern astreamer *astreamer_recovery_injector_new(astreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents); +extern void astreamer_inject_file(astreamer *streamer, char *pathname, + char *data, int len); + +#endif diff --git a/src/bin/pg_basebackup/astreamer_file.c b/src/bin/pg_basebackup/astreamer_file.c new file mode 100644 index 00000000000..2742385e103 --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_file.c @@ -0,0 +1,396 @@ +/*------------------------------------------------------------------------- + * + * astreamer_file.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_file.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "astreamer.h" +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" + +typedef struct astreamer_plain_writer +{ + astreamer base; + char *pathname; + FILE *file; + bool should_close_file; +} astreamer_plain_writer; + +typedef struct astreamer_extractor +{ + astreamer base; + char *basepath; + const char *(*link_map) (const char *); + void (*report_output_file) (const char *); + char filename[MAXPGPATH]; + FILE *file; +} astreamer_extractor; + +static void astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_plain_writer_finalize(astreamer *streamer); +static void astreamer_plain_writer_free(astreamer *streamer); + +static const astreamer_ops astreamer_plain_writer_ops = { + .content = astreamer_plain_writer_content, + .finalize = astreamer_plain_writer_finalize, + .free = astreamer_plain_writer_free +}; + +static void astreamer_extractor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_extractor_finalize(astreamer *streamer); +static void astreamer_extractor_free(astreamer *streamer); +static void extract_directory(const char *filename, mode_t mode); +static void extract_link(const char *filename, const char *linktarget); +static FILE *create_file_for_extract(const char *filename, mode_t mode); + +static const astreamer_ops astreamer_extractor_ops = { + .content = astreamer_extractor_content, + .finalize = astreamer_extractor_finalize, + .free = astreamer_extractor_free +}; + +/* + * Create a astreamer that just writes data to a file. + * + * The caller must specify a pathname and may specify a file. The pathname is + * used for error-reporting purposes either way. If file is NULL, the pathname + * also identifies the file to which the data should be written: it is opened + * for writing and closed when done. If file is not NULL, the data is written + * there. + */ +astreamer * +astreamer_plain_writer_new(char *pathname, FILE *file) +{ + astreamer_plain_writer *streamer; + + streamer = palloc0(sizeof(astreamer_plain_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_plain_writer_ops; + + streamer->pathname = pstrdup(pathname); + streamer->file = file; + + if (file == NULL) + { + streamer->file = fopen(pathname, "wb"); + if (streamer->file == NULL) + pg_fatal("could not create file \"%s\": %m", pathname); + streamer->should_close_file = true; + } + + return &streamer->base; +} + +/* + * Write archive content to file. + */ +static void +astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->pathname); + } +} + +/* + * End-of-archive processing when writing to a plain file consists of closing + * the file if we opened it, but not if the caller provided it. + */ +static void +astreamer_plain_writer_finalize(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) + pg_fatal("could not close file \"%s\": %m", + mystreamer->pathname); + + mystreamer->file = NULL; + mystreamer->should_close_file = false; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_plain_writer_free(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + Assert(!mystreamer->should_close_file); + Assert(mystreamer->base.bbs_next == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Create a astreamer that extracts an archive. + * + * All pathnames in the archive are interpreted relative to basepath. + * + * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here + * with untyped chunks; we need typed chunks which follow the rules described + * in astreamer.h. Assuming we have that, we don't need to worry about the + * original archive format; it's enough to just look at the member information + * provided and write to the corresponding file. + * + * 'link_map' is a function that will be applied to the target of any + * symbolic link, and which should return a replacement pathname to be used + * in its place. If NULL, the symbolic link target is used without + * modification. + * + * 'report_output_file' is a function that will be called each time we open a + * new output file. The pathname to that file is passed as an argument. If + * NULL, the call is skipped. + */ +astreamer * +astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)) +{ + astreamer_extractor *streamer; + + streamer = palloc0(sizeof(astreamer_extractor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_extractor_ops; + streamer->basepath = pstrdup(basepath); + streamer->link_map = link_map; + streamer->report_output_file = report_output_file; + + return &streamer->base; +} + +/* + * Extract archive contents to the filesystem. + */ +static void +astreamer_extractor_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + int fnamelen; + + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); + Assert(context != ASTREAMER_UNKNOWN); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + Assert(mystreamer->file == NULL); + + /* Prepend basepath. */ + snprintf(mystreamer->filename, sizeof(mystreamer->filename), + "%s/%s", mystreamer->basepath, member->pathname); + + /* Remove any trailing slash. */ + fnamelen = strlen(mystreamer->filename); + if (mystreamer->filename[fnamelen - 1] == '/') + mystreamer->filename[fnamelen - 1] = '\0'; + + /* Dispatch based on file type. */ + if (member->is_directory) + extract_directory(mystreamer->filename, member->mode); + else if (member->is_link) + { + const char *linktarget = member->linktarget; + + if (mystreamer->link_map) + linktarget = mystreamer->link_map(linktarget); + extract_link(mystreamer->filename, linktarget); + } + else + mystreamer->file = + create_file_for_extract(mystreamer->filename, + member->mode); + + /* Report output file change. */ + if (mystreamer->report_output_file) + mystreamer->report_output_file(mystreamer->filename); + break; + + case ASTREAMER_MEMBER_CONTENTS: + if (mystreamer->file == NULL) + break; + + errno = 0; + if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->filename); + } + break; + + case ASTREAMER_MEMBER_TRAILER: + if (mystreamer->file == NULL) + break; + fclose(mystreamer->file); + mystreamer->file = NULL; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while extracting archive"); + } +} + +/* + * Should we tolerate an already-existing directory? + * + * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been + * created by the wal receiver process. Also, when the WAL directory location + * was specified, pg_wal (or pg_xlog) has already been created as a symbolic + * link before starting the actual backup. So just ignore creation failures + * on related directories. + * + * If in-place tablespaces are used, pg_tblspc and subdirectories may already + * exist when we get here. So tolerate that case, too. + */ +static bool +should_allow_existing_directory(const char *pathname) +{ + const char *filename = last_dir_separator(pathname) + 1; + + if (strcmp(filename, "pg_wal") == 0 || + strcmp(filename, "pg_xlog") == 0 || + strcmp(filename, "archive_status") == 0 || + strcmp(filename, "summaries") == 0 || + strcmp(filename, "pg_tblspc") == 0) + return true; + + if (strspn(filename, "0123456789") == strlen(filename)) + { + const char *pg_tblspc = strstr(pathname, "/pg_tblspc/"); + + return pg_tblspc != NULL && pg_tblspc + 11 == filename; + } + + return false; +} + +/* + * Create a directory. + */ +static void +extract_directory(const char *filename, mode_t mode) +{ + if (mkdir(filename, pg_dir_create_mode) != 0 && + (errno != EEXIST || !should_allow_existing_directory(filename))) + pg_fatal("could not create directory \"%s\": %m", + filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on directory \"%s\": %m", + filename); +#endif +} + +/* + * Create a symbolic link. + * + * It's most likely a link in pg_tblspc directory, to the location of a + * tablespace. Apply any tablespace mapping given on the command line + * (--tablespace-mapping). (We blindly apply the mapping without checking that + * the link really is inside pg_tblspc. We don't expect there to be other + * symlinks in a data directory, but if there are, you can call it an + * undocumented feature that you can map them too.) + */ +static void +extract_link(const char *filename, const char *linktarget) +{ + if (symlink(linktarget, filename) != 0) + pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m", + filename, linktarget); +} + +/* + * Create a regular file. + * + * Return the resulting handle so we can write the content to the file. + */ +static FILE * +create_file_for_extract(const char *filename, mode_t mode) +{ + FILE *file; + + file = fopen(filename, "wb"); + if (file == NULL) + pg_fatal("could not create file \"%s\": %m", filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on file \"%s\": %m", + filename); +#endif + + return file; +} + +/* + * End-of-stream processing for extracting an archive. + * + * There's nothing to do here but sanity checking. + */ +static void +astreamer_extractor_finalize(astreamer *streamer) +{ + astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY + = (astreamer_extractor *) streamer; + + Assert(mystreamer->file == NULL); +} + +/* + * Free memory. + */ +static void +astreamer_extractor_free(astreamer *streamer) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + + pfree(mystreamer->basepath); + pfree(mystreamer); +} diff --git a/src/bin/pg_basebackup/astreamer_gzip.c b/src/bin/pg_basebackup/astreamer_gzip.c new file mode 100644 index 00000000000..6f7c27afbbc --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_gzip.c @@ -0,0 +1,364 @@ +/*------------------------------------------------------------------------- + * + * astreamer_gzip.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_gzip.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef HAVE_LIBZ +#include +#endif + +#include "astreamer.h" +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" + +#ifdef HAVE_LIBZ +typedef struct astreamer_gzip_writer +{ + astreamer base; + char *pathname; + gzFile gzfile; +} astreamer_gzip_writer; + +typedef struct astreamer_gzip_decompressor +{ + astreamer base; + z_stream zstream; + size_t bytes_written; +} astreamer_gzip_decompressor; + +static void astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_writer_finalize(astreamer *streamer); +static void astreamer_gzip_writer_free(astreamer *streamer); +static const char *get_gz_error(gzFile gzf); + +static const astreamer_ops astreamer_gzip_writer_ops = { + .content = astreamer_gzip_writer_content, + .finalize = astreamer_gzip_writer_finalize, + .free = astreamer_gzip_writer_free +}; + +static void astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_decompressor_finalize(astreamer *streamer); +static void astreamer_gzip_decompressor_free(astreamer *streamer); +static void *gzip_palloc(void *opaque, unsigned items, unsigned size); +static void gzip_pfree(void *opaque, void *address); + +static const astreamer_ops astreamer_gzip_decompressor_ops = { + .content = astreamer_gzip_decompressor_content, + .finalize = astreamer_gzip_decompressor_finalize, + .free = astreamer_gzip_decompressor_free +}; +#endif + +/* + * Create a astreamer that just compresses data using gzip, and then writes + * it to a file. + * + * As in the case of astreamer_plain_writer_new, pathname is always used + * for error reporting purposes; if file is NULL, it is also the opened and + * closed so that the data may be written there. + */ +astreamer * +astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_writer *streamer; + + streamer = palloc0(sizeof(astreamer_gzip_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_writer_ops; + + streamer->pathname = pstrdup(pathname); + + if (file == NULL) + { + streamer->gzfile = gzopen(pathname, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not create compressed file \"%s\": %m", + pathname); + } + else + { + int fd = dup(fileno(file)); + + if (fd < 0) + pg_fatal("could not duplicate stdout: %m"); + + streamer->gzfile = gzdopen(fd, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not open output file: %m"); + } + + if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) + pg_fatal("could not set compression level %d: %s", + compress->level, get_gz_error(streamer->gzfile)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Write archive content to gzip file. + */ +static void +astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (gzwrite(mystreamer->gzfile, data, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to compressed file \"%s\": %s", + mystreamer->pathname, get_gz_error(mystreamer->gzfile)); + } +} + +/* + * End-of-archive processing when writing to a gzip file consists of just + * calling gzclose. + * + * It makes no difference whether we opened the file or the caller did it, + * because libz provides no way of avoiding a close on the underlying file + * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to + * work around this issue, so that the behavior from the caller's viewpoint + * is the same as for astreamer_plain_writer. + */ +static void +astreamer_gzip_writer_finalize(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + errno = 0; /* in case gzclose() doesn't set it */ + if (gzclose(mystreamer->gzfile) != 0) + pg_fatal("could not close compressed file \"%s\": %m", + mystreamer->pathname); + + mystreamer->gzfile = NULL; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_gzip_writer_free(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + Assert(mystreamer->base.bbs_next == NULL); + Assert(mystreamer->gzfile == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Helper function for libz error reporting. + */ +static const char * +get_gz_error(gzFile gzf) +{ + int errnum; + const char *errmsg; + + errmsg = gzerror(gzf, &errnum); + if (errnum == Z_ERRNO) + return strerror(errno); + else + return errmsg; +} +#endif + +/* + * Create a new base backup streamer that performs decompression of gzip + * compressed blocks. + */ +astreamer * +astreamer_gzip_decompressor_new(astreamer *next) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_decompressor *streamer; + z_stream *zs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_gzip_decompressor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + zs = &streamer->zstream; + zs->zalloc = gzip_palloc; + zs->zfree = gzip_pfree; + zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; + zs->avail_out = streamer->base.bbs_buffer.maxlen; + + /* + * Data compression was initialized using deflateInit2 to request a gzip + * header. Similarly, we are using inflateInit2 to initialize data + * decompression. + * + * Per the documentation for inflateInit2, the second argument is + * "windowBits" and its value must be greater than or equal to the value + * provided while compressing the data, so we are using the maximum + * possible value for safety. + */ + if (inflateInit2(zs, 15 + 16) != Z_OK) + pg_fatal("could not initialize compression library"); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_gzip_decompressor *mystreamer; + z_stream *zs; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + zs = &mystreamer->zstream; + zs->next_in = (const uint8 *) data; + zs->avail_in = len; + + /* Process the current chunk */ + while (zs->avail_in > 0) + { + int res; + + Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); + + zs->next_out = (uint8 *) + mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + zs->avail_out = + mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * This call decompresses data starting at zs->next_in and updates + * zs->next_in * and zs->avail_in. It generates output data starting + * at zs->next_out and updates zs->next_out and zs->avail_out + * accordingly. + */ + res = inflate(zs, Z_NO_FLUSH); + + if (res == Z_STREAM_ERROR) + pg_log_error("could not decompress data: %s", zs->msg); + + mystreamer->bytes_written = + mystreamer->base.bbs_buffer.maxlen - zs->avail_out; + + /* If output buffer is full then pass data to next streamer */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, context); + mystreamer->bytes_written = 0; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_gzip_decompressor_finalize(astreamer *streamer) +{ + astreamer_gzip_decompressor *mystreamer; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_gzip_decompressor_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} + +/* + * Wrapper function to adjust the signature of palloc to match what libz + * expects. + */ +static void * +gzip_palloc(void *opaque, unsigned items, unsigned size) +{ + return palloc(items * size); +} + +/* + * Wrapper function to adjust the signature of pfree to match what libz + * expects. + */ +static void +gzip_pfree(void *opaque, void *address) +{ + pfree(address); +} +#endif diff --git a/src/bin/pg_basebackup/astreamer_inject.c b/src/bin/pg_basebackup/astreamer_inject.c new file mode 100644 index 00000000000..7f1decded8d --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_inject.c @@ -0,0 +1,249 @@ +/*------------------------------------------------------------------------- + * + * astreamer_inject.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_inject.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "astreamer.h" +#include "common/file_perm.h" +#include "common/logging.h" + +typedef struct astreamer_recovery_injector +{ + astreamer base; + bool skip_file; + bool is_recovery_guc_supported; + bool is_postgresql_auto_conf; + bool found_postgresql_auto_conf; + PQExpBuffer recoveryconfcontents; + astreamer_member member; +} astreamer_recovery_injector; + +static void astreamer_recovery_injector_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_recovery_injector_finalize(astreamer *streamer); +static void astreamer_recovery_injector_free(astreamer *streamer); + +static const astreamer_ops astreamer_recovery_injector_ops = { + .content = astreamer_recovery_injector_content, + .finalize = astreamer_recovery_injector_finalize, + .free = astreamer_recovery_injector_free +}; + +/* + * Create a astreamer that can edit recoverydata into an archive stream. + * + * The input should be a series of typed chunks (not ASTREAMER_UNKNOWN) as + * per the conventions described in astreamer.h; the chunks forwarded to + * the next astreamer will be similarly typed, but the + * ASTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've + * edited the archive stream. + * + * Our goal is to do one of the following three things with the content passed + * via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then + * put the content into recovery.conf, replacing any existing archive member + * by that name; (2) if is_recovery_guc_supported is true and + * postgresql.auto.conf exists in the archive, then append the content + * provided to the existing file; and (3) if is_recovery_guc_supported is + * true but postgresql.auto.conf does not exist in the archive, then create + * it with the specified content. + * + * In addition, if is_recovery_guc_supported is true, then we create a + * zero-length standby.signal file, dropping any file with that name from + * the archive. + */ +astreamer * +astreamer_recovery_injector_new(astreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents) +{ + astreamer_recovery_injector *streamer; + + streamer = palloc0(sizeof(astreamer_recovery_injector)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_recovery_injector_ops; + streamer->base.bbs_next = next; + streamer->is_recovery_guc_supported = is_recovery_guc_supported; + streamer->recoveryconfcontents = recoveryconfcontents; + + return &streamer->base; +} + +/* + * Handle each chunk of tar content while injecting recovery configuration. + */ +static void +astreamer_recovery_injector_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_recovery_injector *mystreamer; + + mystreamer = (astreamer_recovery_injector *) streamer; + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + /* Must copy provided data so we have the option to modify it. */ + memcpy(&mystreamer->member, member, sizeof(astreamer_member)); + + /* + * On v12+, skip standby.signal and edit postgresql.auto.conf; on + * older versions, skip recovery.conf. + */ + if (mystreamer->is_recovery_guc_supported) + { + mystreamer->skip_file = + (strcmp(member->pathname, "standby.signal") == 0); + mystreamer->is_postgresql_auto_conf = + (strcmp(member->pathname, "postgresql.auto.conf") == 0); + if (mystreamer->is_postgresql_auto_conf) + { + /* Remember we saw it so we don't add it again. */ + mystreamer->found_postgresql_auto_conf = true; + + /* Increment length by data to be injected. */ + mystreamer->member.size += + mystreamer->recoveryconfcontents->len; + + /* + * Zap data and len because the archive header is no + * longer valid; some subsequent astreamer must regenerate + * it if it's necessary. + */ + data = NULL; + len = 0; + } + } + else + mystreamer->skip_file = + (strcmp(member->pathname, "recovery.conf") == 0); + + /* Do not forward if the file is to be skipped. */ + if (mystreamer->skip_file) + return; + break; + + case ASTREAMER_MEMBER_CONTENTS: + /* Do not forward if the file is to be skipped. */ + if (mystreamer->skip_file) + return; + break; + + case ASTREAMER_MEMBER_TRAILER: + /* Do not forward it the file is to be skipped. */ + if (mystreamer->skip_file) + return; + + /* Append provided content to whatever we already sent. */ + if (mystreamer->is_postgresql_auto_conf) + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len, + ASTREAMER_MEMBER_CONTENTS); + break; + + case ASTREAMER_ARCHIVE_TRAILER: + if (mystreamer->is_recovery_guc_supported) + { + /* + * If we didn't already find (and thus modify) + * postgresql.auto.conf, inject it as an additional archive + * member now. + */ + if (!mystreamer->found_postgresql_auto_conf) + astreamer_inject_file(mystreamer->base.bbs_next, + "postgresql.auto.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); + + /* Inject empty standby.signal file. */ + astreamer_inject_file(mystreamer->base.bbs_next, + "standby.signal", "", 0); + } + else + { + /* Inject recovery.conf file with specified contents. */ + astreamer_inject_file(mystreamer->base.bbs_next, + "recovery.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); + } + + /* Nothing to do here. */ + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while injecting recovery settings"); + } + + astreamer_content(mystreamer->base.bbs_next, &mystreamer->member, + data, len, context); +} + +/* + * End-of-stream processing for this astreamer. + */ +static void +astreamer_recovery_injector_finalize(astreamer *streamer) +{ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_recovery_injector_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Inject a member into the archive with specified contents. + */ +void +astreamer_inject_file(astreamer *streamer, char *pathname, char *data, + int len) +{ + astreamer_member member; + + strlcpy(member.pathname, pathname, MAXPGPATH); + member.size = len; + member.mode = pg_file_create_mode; + member.is_directory = false; + member.is_link = false; + member.linktarget[0] = '\0'; + + /* + * There seems to be no principled argument for these values, but they are + * what PostgreSQL has historically used. + */ + member.uid = 04000; + member.gid = 02000; + + /* + * We don't know here how to generate valid member headers and trailers + * for the archiving format in use, so if those are needed, some successor + * astreamer will have to generate them using the data from 'member'. + */ + astreamer_content(streamer, &member, NULL, 0, + ASTREAMER_MEMBER_HEADER); + astreamer_content(streamer, &member, data, len, + ASTREAMER_MEMBER_CONTENTS); + astreamer_content(streamer, &member, NULL, 0, + ASTREAMER_MEMBER_TRAILER); +} diff --git a/src/bin/pg_basebackup/astreamer_lz4.c b/src/bin/pg_basebackup/astreamer_lz4.c new file mode 100644 index 00000000000..1c40d7d8ad5 --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_lz4.c @@ -0,0 +1,422 @@ +/*------------------------------------------------------------------------- + * + * astreamer_lz4.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_lz4.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef USE_LZ4 +#include +#endif + +#include "astreamer.h" +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" + +#ifdef USE_LZ4 +typedef struct astreamer_lz4_frame +{ + astreamer base; + + LZ4F_compressionContext_t cctx; + LZ4F_decompressionContext_t dctx; + LZ4F_preferences_t prefs; + + size_t bytes_written; + bool header_written; +} astreamer_lz4_frame; + +static void astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_compressor_finalize(astreamer *streamer); +static void astreamer_lz4_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_compressor_ops = { + .content = astreamer_lz4_compressor_content, + .finalize = astreamer_lz4_compressor_finalize, + .free = astreamer_lz4_compressor_free +}; + +static void astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_decompressor_finalize(astreamer *streamer); +static void astreamer_lz4_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_decompressor_ops = { + .content = astreamer_lz4_decompressor_content, + .finalize = astreamer_lz4_decompressor_finalize, + .free = astreamer_lz4_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs lz4 compression of tar + * blocks. + */ +astreamer * +astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + LZ4F_preferences_t *prefs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->header_written = false; + + /* Initialize stream compression preferences */ + prefs = &streamer->prefs; + memset(prefs, 0, sizeof(LZ4F_preferences_t)); + prefs->frameInfo.blockSizeID = LZ4F_max256KB; + prefs->compressionLevel = compress->level; + + ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_log_error("could not create lz4 compression context: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t out_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + + /* Write header before processing the first input chunk. */ + if (!mystreamer->header_written) + { + compressed_size = LZ4F_compressBegin(mystreamer->cctx, + (uint8 *) mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + &mystreamer->prefs); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not write lz4 header: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + mystreamer->header_written = true; + } + + /* + * Update the offset and capacity of output buffer based on number of + * bytes written to output buffer. + */ + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * Find out the compression bound and make sure that output buffer has the + * required capacity for the success of LZ4F_compressUpdate. If needed + * forward the content to next streamer and empty the buffer. + */ + out_bound = LZ4F_compressBound(len, &mystreamer->prefs); + if (avail_out < out_bound) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + context); + + /* Enlarge buffer if it falls short of out bound. */ + if (mystreamer->base.bbs_buffer.maxlen < out_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + + /* + * This call compresses the data starting at next_in and generates the + * output starting at next_out. It expects the caller to provide the size + * of input buffer and capacity of output buffer by providing parameters + * len and avail_out. + * + * It returns the number of bytes compressed to output buffer. + */ + compressed_size = LZ4F_compressUpdate(mystreamer->cctx, + next_out, avail_out, + next_in, len, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not compress data: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_compressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_out; + size_t footer_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* Find out the footer bound and update the output buffer. */ + footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); + if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < + footer_bound) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + /* Enlarge buffer if it falls short of footer bound. */ + if (mystreamer->base.bbs_buffer.maxlen < footer_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + } + + /* + * Finalize the frame and flush whatever data remaining in compression + * context. + */ + compressed_size = LZ4F_compressEnd(mystreamer->cctx, + next_out, avail_out, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not end lz4 compression: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_compressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeCompressionContext(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of lz4 + * compressed blocks. + */ +astreamer * +astreamer_lz4_decompressor_new(astreamer *next) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_fatal("could not initialize compression library: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t avail_in, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + avail_in = len; + avail_out = mystreamer->base.bbs_buffer.maxlen; + + while (avail_in > 0) + { + size_t ret, + read_size, + out_size; + + read_size = avail_in; + out_size = avail_out; + + /* + * This call decompresses the data starting at next_in and generates + * the output data starting at next_out. It expects the caller to + * provide size of the input buffer and total capacity of the output + * buffer by providing the read_size and out_size parameters + * respectively. + * + * Per the documentation of LZ4, parameters read_size and out_size + * behaves as dual parameters. On return, the number of bytes consumed + * from the input buffer will be written back to read_size and the + * number of bytes decompressed to output buffer will be written back + * to out_size respectively. + */ + ret = LZ4F_decompress(mystreamer->dctx, + next_out, &out_size, + next_in, &read_size, NULL); + + if (LZ4F_isError(ret)) + pg_log_error("could not decompress data: %s", + LZ4F_getErrorName(ret)); + + /* Update input buffer based on number of bytes consumed */ + avail_in -= read_size; + next_in += read_size; + + mystreamer->bytes_written += out_size; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + context); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + next_out += mystreamer->bytes_written; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_decompressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_decompressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeDecompressionContext(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/bin/pg_basebackup/astreamer_tar.c b/src/bin/pg_basebackup/astreamer_tar.c new file mode 100644 index 00000000000..673690cd18f --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_tar.c @@ -0,0 +1,514 @@ +/*------------------------------------------------------------------------- + * + * astreamer_tar.c + * + * This module implements three types of tar processing. A tar parser + * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits + * it into labelled chunks (any other value of astreamer_archive_context). + * A tar archiver does the reverse: it takes a bunch of labelled chunks + * and produces a tarfile, optionally replacing member headers and trailers + * so that upstream astreamer objects can perform surgery on the tarfile + * contents without knowing the details of the tar format. A tar terminator + * just adds two blocks of NUL bytes to the end of the file, since older + * server versions produce files with this terminator omitted. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_tar.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "astreamer.h" +#include "common/logging.h" +#include "pgtar.h" + +typedef struct astreamer_tar_parser +{ + astreamer base; + astreamer_archive_context next_context; + astreamer_member member; + size_t file_bytes_sent; + size_t pad_bytes_expected; +} astreamer_tar_parser; + +typedef struct astreamer_tar_archiver +{ + astreamer base; + bool rearchive_member; +} astreamer_tar_archiver; + +static void astreamer_tar_parser_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_parser_finalize(astreamer *streamer); +static void astreamer_tar_parser_free(astreamer *streamer); +static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); + +static const astreamer_ops astreamer_tar_parser_ops = { + .content = astreamer_tar_parser_content, + .finalize = astreamer_tar_parser_finalize, + .free = astreamer_tar_parser_free +}; + +static void astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_archiver_finalize(astreamer *streamer); +static void astreamer_tar_archiver_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_archiver_ops = { + .content = astreamer_tar_archiver_content, + .finalize = astreamer_tar_archiver_finalize, + .free = astreamer_tar_archiver_free +}; + +static void astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_terminator_finalize(astreamer *streamer); +static void astreamer_tar_terminator_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_terminator_ops = { + .content = astreamer_tar_terminator_content, + .finalize = astreamer_tar_terminator_finalize, + .free = astreamer_tar_terminator_free +}; + +/* + * Create a astreamer that can parse a stream of content as tar data. + * + * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer + * specified by 'next' will receive a series of typed chunks, as per the + * conventions described in astreamer.h. + */ +astreamer * +astreamer_tar_parser_new(astreamer *next) +{ + astreamer_tar_parser *streamer; + + streamer = palloc0(sizeof(astreamer_tar_parser)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_parser_ops; + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->next_context = ASTREAMER_MEMBER_HEADER; + + return &streamer->base; +} + +/* + * Parse unknown content as tar data. + */ +static void +astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + size_t nbytes; + + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + while (len > 0) + { + switch (mystreamer->next_context) + { + case ASTREAMER_MEMBER_HEADER: + + /* + * If we're expecting an archive member header, accumulate a + * full block of data before doing anything further. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) + return; + + /* + * Now we can process the header and get ready to process the + * file contents; however, we might find out that what we + * thought was the next file header is actually the start of + * the archive trailer. Switch modes accordingly. + */ + if (astreamer_tar_header(mystreamer)) + { + if (mystreamer->member.size == 0) + { + /* No content; trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Expect contents. */ + mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; + } + mystreamer->base.bbs_buffer.len = 0; + mystreamer->file_bytes_sent = 0; + } + else + mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; + break; + + case ASTREAMER_MEMBER_CONTENTS: + + /* + * Send as much content as we have, but not more than the + * remaining file length. + */ + Assert(mystreamer->file_bytes_sent < mystreamer->member.size); + nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; + nbytes = Min(nbytes, len); + Assert(nbytes > 0); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + ASTREAMER_MEMBER_CONTENTS); + mystreamer->file_bytes_sent += nbytes; + data += nbytes; + len -= nbytes; + + /* + * If we've not yet sent the whole file, then there's more + * content to come; otherwise, it's time to expect the file + * trailer. + */ + Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); + if (mystreamer->file_bytes_sent == mystreamer->member.size) + { + if (mystreamer->pad_bytes_expected == 0) + { + /* Trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Trailer is not zero-length. */ + mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; + } + mystreamer->base.bbs_buffer.len = 0; + } + break; + + case ASTREAMER_MEMBER_TRAILER: + + /* + * If we're expecting an archive member trailer, accumulate + * the expected number of padding bytes before sending + * anything onward. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) + return; + + /* OK, now we can send it. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next file header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + mystreamer->base.bbs_buffer.len = 0; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + + /* + * We've seen an end-of-archive indicator, so anything more is + * buffered and sent as part of the archive trailer. But we + * don't expect more than 2 blocks. + */ + astreamer_buffer_bytes(streamer, &data, &len, len); + if (len > 2 * TAR_BLOCK_SIZE) + pg_fatal("tar file trailer exceeds 2 blocks"); + return; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar archive"); + } + } +} + +/* + * Parse a file header within a tar stream. + * + * The return value is true if we found a file header and passed it on to the + * next astreamer; it is false if we have reached the archive trailer. + */ +static bool +astreamer_tar_header(astreamer_tar_parser *mystreamer) +{ + bool has_nonzero_byte = false; + int i; + astreamer_member *member = &mystreamer->member; + char *buffer = mystreamer->base.bbs_buffer.data; + + Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); + + /* Check whether we've got a block of all zero bytes. */ + for (i = 0; i < TAR_BLOCK_SIZE; ++i) + { + if (buffer[i] != '\0') + { + has_nonzero_byte = true; + break; + } + } + + /* + * If the entire block was zeros, this is the end of the archive, not the + * start of the next file. + */ + if (!has_nonzero_byte) + return false; + + /* + * Parse key fields out of the header. + */ + strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); + if (member->pathname[0] == '\0') + pg_fatal("tar member has empty name"); + member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); + member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); + member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); + member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); + member->is_directory = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); + member->is_link = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); + if (member->is_link) + strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); + + /* Compute number of padding bytes. */ + mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); + + /* Forward the entire header to the next astreamer. */ + astreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + ASTREAMER_MEMBER_HEADER); + + return true; +} + +/* + * End-of-stream processing for a tar parser. + */ +static void +astreamer_tar_parser_finalize(astreamer *streamer) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + + if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || + mystreamer->base.bbs_buffer.len > 0)) + pg_fatal("COPY stream ended before last file was finished"); + + /* Send the archive trailer, even if empty. */ + astreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + ASTREAMER_ARCHIVE_TRAILER); + + /* Now finalize successor. */ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar parser. + */ +static void +astreamer_tar_parser_free(astreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + astreamer_free(streamer->bbs_next); +} + +/* + * Create a astreamer that can generate a tar archive. + * + * This is intended to be usable either for generating a brand-new tar archive + * or for modifying one on the fly. The input should be a series of typed + * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for + * astreamer_tar_parser_content. + */ +astreamer * +astreamer_tar_archiver_new(astreamer *next) +{ + astreamer_tar_archiver *streamer; + + streamer = palloc0(sizeof(astreamer_tar_archiver)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_archiver_ops; + streamer->base.bbs_next = next; + + return &streamer->base; +} + +/* + * Fix up the stream of input chunks to create a valid tar file. + * + * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is + * passed through without change. Any other size is a fatal error (and + * indicates a bug). + * + * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from + * scratch. Specifically, we construct a block of zero bytes sufficient to + * pad out to a block boundary, as required by the tar format. Other + * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. + * + * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * + * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * blocks of zero bytes. Not all tar programs require this, but apparently + * some do. The server does not supply this trailer. If no archive trailer is + * present, one will be added by astreamer_tar_parser_finalize. + */ +static void +astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; + char buffer[2 * TAR_BLOCK_SIZE]; + + Assert(context != ASTREAMER_UNKNOWN); + + if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + { + Assert(len == 0); + + /* Replace zero-length tar header with a newly constructed one. */ + tarCreateHeader(buffer, member->pathname, NULL, + member->size, member->mode, member->uid, member->gid, + time(NULL)); + data = buffer; + len = TAR_BLOCK_SIZE; + + /* Also make a note to replace padding, in case size changed. */ + mystreamer->rearchive_member = true; + } + else if (context == ASTREAMER_MEMBER_TRAILER && + mystreamer->rearchive_member) + { + int pad_bytes = tarPaddingBytesRequired(member->size); + + /* Also replace padding, if we regenerated the header. */ + memset(buffer, 0, pad_bytes); + data = buffer; + len = pad_bytes; + + /* Don't do this again unless we replace another header. */ + mystreamer->rearchive_member = false; + } + else if (context == ASTREAMER_ARCHIVE_TRAILER) + { + /* Trailer should always be two blocks of zero bytes. */ + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + data = buffer; + len = 2 * TAR_BLOCK_SIZE; + } + + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * End-of-stream processing for a tar archiver. + */ +static void +astreamer_tar_archiver_finalize(astreamer *streamer) +{ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar archiver. + */ +static void +astreamer_tar_archiver_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Create a astreamer that blindly adds two blocks of NUL bytes to the + * end of an incomplete tarfile that the server might send us. + */ +astreamer * +astreamer_tar_terminator_new(astreamer *next) +{ + astreamer *streamer; + + streamer = palloc0(sizeof(astreamer)); + *((const astreamer_ops **) &streamer->bbs_ops) = + &astreamer_tar_terminator_ops; + streamer->bbs_next = next; + + return streamer; +} + +/* + * Pass all the content through without change. + */ +static void +astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + /* Just forward it. */ + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * At the end, blindly add the two blocks of NUL bytes which the server fails + * to supply. + */ +static void +astreamer_tar_terminator_finalize(astreamer *streamer) +{ + char buffer[2 * TAR_BLOCK_SIZE]; + + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + astreamer_content(streamer->bbs_next, NULL, buffer, + 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar terminator. + */ +static void +astreamer_tar_terminator_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} diff --git a/src/bin/pg_basebackup/astreamer_zstd.c b/src/bin/pg_basebackup/astreamer_zstd.c new file mode 100644 index 00000000000..58dc679ef99 --- /dev/null +++ b/src/bin/pg_basebackup/astreamer_zstd.c @@ -0,0 +1,368 @@ +/*------------------------------------------------------------------------- + * + * astreamer_zstd.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_zstd.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef USE_ZSTD +#include +#endif + +#include "astreamer.h" +#include "common/logging.h" + +#ifdef USE_ZSTD + +typedef struct astreamer_zstd_frame +{ + astreamer base; + + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; + ZSTD_outBuffer zstd_outBuf; +} astreamer_zstd_frame; + +static void astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_compressor_finalize(astreamer *streamer); +static void astreamer_zstd_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_compressor_ops = { + .content = astreamer_zstd_compressor_content, + .finalize = astreamer_zstd_compressor_finalize, + .free = astreamer_zstd_compressor_free +}; + +static void astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_decompressor_finalize(astreamer *streamer); +static void astreamer_zstd_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_decompressor_ops = { + .content = astreamer_zstd_decompressor_content, + .finalize = astreamer_zstd_decompressor_finalize, + .free = astreamer_zstd_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs zstd compression of tar + * blocks. + */ +astreamer * +astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + size_t ret; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->cctx = ZSTD_createCCtx(); + if (!streamer->cctx) + pg_fatal("could not create zstd compression context"); + + /* Set compression level */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + pg_fatal("could not set zstd compression level to %d: %s", + compress->level, ZSTD_getErrorName(ret)); + + /* Set # of workers, if specified */ + if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0) + { + /* + * On older versions of libzstd, this option does not exist, and + * trying to set it will fail. Similarly for newer versions if they + * are compiled without threading support. + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + pg_fatal("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + } + + if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) + { + ret = ZSTD_CCtx_setParameter(streamer->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->long_distance); + if (ZSTD_isError(ret)) + { + pg_log_error("could not enable long-distance mode: %s", + ZSTD_getErrorName(ret)); + exit(1); + } + } + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = + ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_compressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t max_needed = ZSTD_compressBound(0); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, + &mystreamer->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next streamer. */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_compressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeCCtx(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of zstd + * compressed blocks. + */ +astreamer * +astreamer_zstd_decompressor_new(astreamer *next) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->dctx = ZSTD_createDCtx(); + if (!streamer->dctx) + pg_fatal("could not create zstd decompression context"); + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t ret; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + ret = ZSTD_decompressStream(mystreamer->dctx, + &mystreamer->zstd_outBuf, &inBuf); + + if (ZSTD_isError(ret)) + pg_log_error("could not decompress data: %s", + ZSTD_getErrorName(ret)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_decompressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_decompressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeDCtx(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h deleted file mode 100644 index 3b820f13b51..00000000000 --- a/src/bin/pg_basebackup/bbstreamer.h +++ /dev/null @@ -1,226 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer.h - * - * Each tar archive returned by the server is passed to one or more - * bbstreamer objects for further processing. The bbstreamer may do - * something simple, like write the archive to a file, perhaps after - * compressing it, but it can also do more complicated things, like - * annotating the byte stream to indicate which parts of the data - * correspond to tar headers or trailing padding, vs. which parts are - * payload data. A subsequent bbstreamer may use this information to - * make further decisions about how to process the data; for example, - * it might choose to modify the archive contents. - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer.h - *------------------------------------------------------------------------- - */ - -#ifndef BBSTREAMER_H -#define BBSTREAMER_H - -#include "common/compression.h" -#include "lib/stringinfo.h" -#include "pqexpbuffer.h" - -struct bbstreamer; -struct bbstreamer_ops; -typedef struct bbstreamer bbstreamer; -typedef struct bbstreamer_ops bbstreamer_ops; - -/* - * Each chunk of archive data passed to a bbstreamer is classified into one - * of these categories. When data is first received from the remote server, - * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will - * be of whatever size the remote server chose to send. - * - * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all - * chunks should be labelled as one of the other types listed here. In - * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and - * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if - * that means a zero-length call. There can be any number of - * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There - * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the - * last BBSTREAMER_MEMBER_TRAILER chunk. - * - * In theory, we could need other classifications here, such as a way of - * indicating an archive header, but the "tar" format doesn't need anything - * else, so for the time being there's no point. - */ -typedef enum -{ - BBSTREAMER_UNKNOWN, - BBSTREAMER_MEMBER_HEADER, - BBSTREAMER_MEMBER_CONTENTS, - BBSTREAMER_MEMBER_TRAILER, - BBSTREAMER_ARCHIVE_TRAILER, -} bbstreamer_archive_context; - -/* - * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER, - * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also - * pass a pointer to an instance of this struct. The details are expected - * to be present in the archive header and used to fill the struct, after - * which all subsequent calls for the same archive member are expected to - * pass the same details. - */ -typedef struct -{ - char pathname[MAXPGPATH]; - pgoff_t size; - mode_t mode; - uid_t uid; - gid_t gid; - bool is_directory; - bool is_link; - char linktarget[MAXPGPATH]; -} bbstreamer_member; - -/* - * Generally, each type of bbstreamer will define its own struct, but the - * first element should be 'bbstreamer base'. A bbstreamer that does not - * require any additional private data could use this structure directly. - * - * bbs_ops is a pointer to the bbstreamer_ops object which contains the - * function pointers appropriate to this type of bbstreamer. - * - * bbs_next is a pointer to the successor bbstreamer, for those types of - * bbstreamer which forward data to a successor. It need not be used and - * should be set to NULL when not relevant. - * - * bbs_buffer is a buffer for accumulating data for temporary storage. Each - * type of bbstreamer makes its own decisions about whether and how to use - * this buffer. - */ -struct bbstreamer -{ - const bbstreamer_ops *bbs_ops; - bbstreamer *bbs_next; - StringInfoData bbs_buffer; -}; - -/* - * There are three callbacks for a bbstreamer. The 'content' callback is - * called repeatedly, as described in the bbstreamer_archive_context comments. - * Then, the 'finalize' callback is called once at the end, to give the - * bbstreamer a chance to perform cleanup such as closing files. Finally, - * because this code is running in a frontend environment where, as of this - * writing, there are no memory contexts, the 'free' callback is called to - * release memory. These callbacks should always be invoked using the static - * inline functions defined below. - */ -struct bbstreamer_ops -{ - void (*content) (bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); - void (*finalize) (bbstreamer *streamer); - void (*free) (bbstreamer *streamer); -}; - -/* Send some content to a bbstreamer. */ -static inline void -bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - Assert(streamer != NULL); - streamer->bbs_ops->content(streamer, member, data, len, context); -} - -/* Finalize a bbstreamer. */ -static inline void -bbstreamer_finalize(bbstreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->finalize(streamer); -} - -/* Free a bbstreamer. */ -static inline void -bbstreamer_free(bbstreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->free(streamer); -} - -/* - * This is a convenience method for use when implementing a bbstreamer; it is - * not for use by outside callers. It adds the amount of data specified by - * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data' - * accordingly. - */ -static inline void -bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len, - int nbytes) -{ - Assert(nbytes <= *len); - - appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); - *len -= nbytes; - *data += nbytes; -} - -/* - * This is a convenience method for use when implementing a bbstreamer; it is - * not for use by outsider callers. It attempts to add enough data to the - * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len' - * and '*data' accordingly. It returns true if the target length has been - * reached and false otherwise. - */ -static inline bool -bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len, - int target_bytes) -{ - int buflen = streamer->bbs_buffer.len; - - if (buflen >= target_bytes) - { - /* Target length already reached; nothing to do. */ - return true; - } - - if (buflen + *len < target_bytes) - { - /* Not enough data to reach target length; buffer all of it. */ - bbstreamer_buffer_bytes(streamer, data, len, *len); - return false; - } - - /* Buffer just enough to reach the target length. */ - bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); - return true; -} - -/* - * Functions for creating bbstreamer objects of various types. See the header - * comments for each of these functions for details. - */ -extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file); -extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)); - -extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); - -extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next, - bool is_recovery_guc_supported, - PQExpBuffer recoveryconfcontents); -extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname, - char *data, int len); - -#endif diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/bbstreamer_file.c deleted file mode 100644 index bab6cd4a6b1..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_file.c +++ /dev/null @@ -1,396 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_file.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_file.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#include "bbstreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -typedef struct bbstreamer_plain_writer -{ - bbstreamer base; - char *pathname; - FILE *file; - bool should_close_file; -} bbstreamer_plain_writer; - -typedef struct bbstreamer_extractor -{ - bbstreamer base; - char *basepath; - const char *(*link_map) (const char *); - void (*report_output_file) (const char *); - char filename[MAXPGPATH]; - FILE *file; -} bbstreamer_extractor; - -static void bbstreamer_plain_writer_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_plain_writer_finalize(bbstreamer *streamer); -static void bbstreamer_plain_writer_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_plain_writer_ops = { - .content = bbstreamer_plain_writer_content, - .finalize = bbstreamer_plain_writer_finalize, - .free = bbstreamer_plain_writer_free -}; - -static void bbstreamer_extractor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_extractor_finalize(bbstreamer *streamer); -static void bbstreamer_extractor_free(bbstreamer *streamer); -static void extract_directory(const char *filename, mode_t mode); -static void extract_link(const char *filename, const char *linktarget); -static FILE *create_file_for_extract(const char *filename, mode_t mode); - -static const bbstreamer_ops bbstreamer_extractor_ops = { - .content = bbstreamer_extractor_content, - .finalize = bbstreamer_extractor_finalize, - .free = bbstreamer_extractor_free -}; - -/* - * Create a bbstreamer that just writes data to a file. - * - * The caller must specify a pathname and may specify a file. The pathname is - * used for error-reporting purposes either way. If file is NULL, the pathname - * also identifies the file to which the data should be written: it is opened - * for writing and closed when done. If file is not NULL, the data is written - * there. - */ -bbstreamer * -bbstreamer_plain_writer_new(char *pathname, FILE *file) -{ - bbstreamer_plain_writer *streamer; - - streamer = palloc0(sizeof(bbstreamer_plain_writer)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_plain_writer_ops; - - streamer->pathname = pstrdup(pathname); - streamer->file = file; - - if (file == NULL) - { - streamer->file = fopen(pathname, "wb"); - if (streamer->file == NULL) - pg_fatal("could not create file \"%s\": %m", pathname); - streamer->should_close_file = true; - } - - return &streamer->base; -} - -/* - * Write archive content to file. - */ -static void -bbstreamer_plain_writer_content(bbstreamer *streamer, - bbstreamer_member *member, const char *data, - int len, bbstreamer_archive_context context) -{ - bbstreamer_plain_writer *mystreamer; - - mystreamer = (bbstreamer_plain_writer *) streamer; - - if (len == 0) - return; - - errno = 0; - if (fwrite(data, len, 1, mystreamer->file) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to file \"%s\": %m", - mystreamer->pathname); - } -} - -/* - * End-of-archive processing when writing to a plain file consists of closing - * the file if we opened it, but not if the caller provided it. - */ -static void -bbstreamer_plain_writer_finalize(bbstreamer *streamer) -{ - bbstreamer_plain_writer *mystreamer; - - mystreamer = (bbstreamer_plain_writer *) streamer; - - if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) - pg_fatal("could not close file \"%s\": %m", - mystreamer->pathname); - - mystreamer->file = NULL; - mystreamer->should_close_file = false; -} - -/* - * Free memory associated with this bbstreamer. - */ -static void -bbstreamer_plain_writer_free(bbstreamer *streamer) -{ - bbstreamer_plain_writer *mystreamer; - - mystreamer = (bbstreamer_plain_writer *) streamer; - - Assert(!mystreamer->should_close_file); - Assert(mystreamer->base.bbs_next == NULL); - - pfree(mystreamer->pathname); - pfree(mystreamer); -} - -/* - * Create a bbstreamer that extracts an archive. - * - * All pathnames in the archive are interpreted relative to basepath. - * - * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here - * with untyped chunks; we need typed chunks which follow the rules described - * in bbstreamer.h. Assuming we have that, we don't need to worry about the - * original archive format; it's enough to just look at the member information - * provided and write to the corresponding file. - * - * 'link_map' is a function that will be applied to the target of any - * symbolic link, and which should return a replacement pathname to be used - * in its place. If NULL, the symbolic link target is used without - * modification. - * - * 'report_output_file' is a function that will be called each time we open a - * new output file. The pathname to that file is passed as an argument. If - * NULL, the call is skipped. - */ -bbstreamer * -bbstreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)) -{ - bbstreamer_extractor *streamer; - - streamer = palloc0(sizeof(bbstreamer_extractor)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_extractor_ops; - streamer->basepath = pstrdup(basepath); - streamer->link_map = link_map; - streamer->report_output_file = report_output_file; - - return &streamer->base; -} - -/* - * Extract archive contents to the filesystem. - */ -static void -bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; - int fnamelen; - - Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); - Assert(context != BBSTREAMER_UNKNOWN); - - switch (context) - { - case BBSTREAMER_MEMBER_HEADER: - Assert(mystreamer->file == NULL); - - /* Prepend basepath. */ - snprintf(mystreamer->filename, sizeof(mystreamer->filename), - "%s/%s", mystreamer->basepath, member->pathname); - - /* Remove any trailing slash. */ - fnamelen = strlen(mystreamer->filename); - if (mystreamer->filename[fnamelen - 1] == '/') - mystreamer->filename[fnamelen - 1] = '\0'; - - /* Dispatch based on file type. */ - if (member->is_directory) - extract_directory(mystreamer->filename, member->mode); - else if (member->is_link) - { - const char *linktarget = member->linktarget; - - if (mystreamer->link_map) - linktarget = mystreamer->link_map(linktarget); - extract_link(mystreamer->filename, linktarget); - } - else - mystreamer->file = - create_file_for_extract(mystreamer->filename, - member->mode); - - /* Report output file change. */ - if (mystreamer->report_output_file) - mystreamer->report_output_file(mystreamer->filename); - break; - - case BBSTREAMER_MEMBER_CONTENTS: - if (mystreamer->file == NULL) - break; - - errno = 0; - if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to file \"%s\": %m", - mystreamer->filename); - } - break; - - case BBSTREAMER_MEMBER_TRAILER: - if (mystreamer->file == NULL) - break; - fclose(mystreamer->file); - mystreamer->file = NULL; - break; - - case BBSTREAMER_ARCHIVE_TRAILER: - break; - - default: - /* Shouldn't happen. */ - pg_fatal("unexpected state while extracting archive"); - } -} - -/* - * Should we tolerate an already-existing directory? - * - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been - * created by the wal receiver process. Also, when the WAL directory location - * was specified, pg_wal (or pg_xlog) has already been created as a symbolic - * link before starting the actual backup. So just ignore creation failures - * on related directories. - * - * If in-place tablespaces are used, pg_tblspc and subdirectories may already - * exist when we get here. So tolerate that case, too. - */ -static bool -should_allow_existing_directory(const char *pathname) -{ - const char *filename = last_dir_separator(pathname) + 1; - - if (strcmp(filename, "pg_wal") == 0 || - strcmp(filename, "pg_xlog") == 0 || - strcmp(filename, "archive_status") == 0 || - strcmp(filename, "summaries") == 0 || - strcmp(filename, "pg_tblspc") == 0) - return true; - - if (strspn(filename, "0123456789") == strlen(filename)) - { - const char *pg_tblspc = strstr(pathname, "/pg_tblspc/"); - - return pg_tblspc != NULL && pg_tblspc + 11 == filename; - } - - return false; -} - -/* - * Create a directory. - */ -static void -extract_directory(const char *filename, mode_t mode) -{ - if (mkdir(filename, pg_dir_create_mode) != 0 && - (errno != EEXIST || !should_allow_existing_directory(filename))) - pg_fatal("could not create directory \"%s\": %m", - filename); - -#ifndef WIN32 - if (chmod(filename, mode)) - pg_fatal("could not set permissions on directory \"%s\": %m", - filename); -#endif -} - -/* - * Create a symbolic link. - * - * It's most likely a link in pg_tblspc directory, to the location of a - * tablespace. Apply any tablespace mapping given on the command line - * (--tablespace-mapping). (We blindly apply the mapping without checking that - * the link really is inside pg_tblspc. We don't expect there to be other - * symlinks in a data directory, but if there are, you can call it an - * undocumented feature that you can map them too.) - */ -static void -extract_link(const char *filename, const char *linktarget) -{ - if (symlink(linktarget, filename) != 0) - pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m", - filename, linktarget); -} - -/* - * Create a regular file. - * - * Return the resulting handle so we can write the content to the file. - */ -static FILE * -create_file_for_extract(const char *filename, mode_t mode) -{ - FILE *file; - - file = fopen(filename, "wb"); - if (file == NULL) - pg_fatal("could not create file \"%s\": %m", filename); - -#ifndef WIN32 - if (chmod(filename, mode)) - pg_fatal("could not set permissions on file \"%s\": %m", - filename); -#endif - - return file; -} - -/* - * End-of-stream processing for extracting an archive. - * - * There's nothing to do here but sanity checking. - */ -static void -bbstreamer_extractor_finalize(bbstreamer *streamer) -{ - bbstreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY - = (bbstreamer_extractor *) streamer; - - Assert(mystreamer->file == NULL); -} - -/* - * Free memory. - */ -static void -bbstreamer_extractor_free(bbstreamer *streamer) -{ - bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; - - pfree(mystreamer->basepath); - pfree(mystreamer); -} diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/bbstreamer_gzip.c deleted file mode 100644 index 0417fd9bc2c..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_gzip.c +++ /dev/null @@ -1,364 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_gzip.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_gzip.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef HAVE_LIBZ -#include -#endif - -#include "bbstreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -#ifdef HAVE_LIBZ -typedef struct bbstreamer_gzip_writer -{ - bbstreamer base; - char *pathname; - gzFile gzfile; -} bbstreamer_gzip_writer; - -typedef struct bbstreamer_gzip_decompressor -{ - bbstreamer base; - z_stream zstream; - size_t bytes_written; -} bbstreamer_gzip_decompressor; - -static void bbstreamer_gzip_writer_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer); -static void bbstreamer_gzip_writer_free(bbstreamer *streamer); -static const char *get_gz_error(gzFile gzf); - -static const bbstreamer_ops bbstreamer_gzip_writer_ops = { - .content = bbstreamer_gzip_writer_content, - .finalize = bbstreamer_gzip_writer_finalize, - .free = bbstreamer_gzip_writer_free -}; - -static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer); -static void *gzip_palloc(void *opaque, unsigned items, unsigned size); -static void gzip_pfree(void *opaque, void *address); - -static const bbstreamer_ops bbstreamer_gzip_decompressor_ops = { - .content = bbstreamer_gzip_decompressor_content, - .finalize = bbstreamer_gzip_decompressor_finalize, - .free = bbstreamer_gzip_decompressor_free -}; -#endif - -/* - * Create a bbstreamer that just compresses data using gzip, and then writes - * it to a file. - * - * As in the case of bbstreamer_plain_writer_new, pathname is always used - * for error reporting purposes; if file is NULL, it is also the opened and - * closed so that the data may be written there. - */ -bbstreamer * -bbstreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress) -{ -#ifdef HAVE_LIBZ - bbstreamer_gzip_writer *streamer; - - streamer = palloc0(sizeof(bbstreamer_gzip_writer)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_gzip_writer_ops; - - streamer->pathname = pstrdup(pathname); - - if (file == NULL) - { - streamer->gzfile = gzopen(pathname, "wb"); - if (streamer->gzfile == NULL) - pg_fatal("could not create compressed file \"%s\": %m", - pathname); - } - else - { - int fd = dup(fileno(file)); - - if (fd < 0) - pg_fatal("could not duplicate stdout: %m"); - - streamer->gzfile = gzdopen(fd, "wb"); - if (streamer->gzfile == NULL) - pg_fatal("could not open output file: %m"); - } - - if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) - pg_fatal("could not set compression level %d: %s", - compress->level, get_gz_error(streamer->gzfile)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "gzip"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef HAVE_LIBZ -/* - * Write archive content to gzip file. - */ -static void -bbstreamer_gzip_writer_content(bbstreamer *streamer, - bbstreamer_member *member, const char *data, - int len, bbstreamer_archive_context context) -{ - bbstreamer_gzip_writer *mystreamer; - - mystreamer = (bbstreamer_gzip_writer *) streamer; - - if (len == 0) - return; - - errno = 0; - if (gzwrite(mystreamer->gzfile, data, len) != len) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to compressed file \"%s\": %s", - mystreamer->pathname, get_gz_error(mystreamer->gzfile)); - } -} - -/* - * End-of-archive processing when writing to a gzip file consists of just - * calling gzclose. - * - * It makes no difference whether we opened the file or the caller did it, - * because libz provides no way of avoiding a close on the underlying file - * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to - * work around this issue, so that the behavior from the caller's viewpoint - * is the same as for bbstreamer_plain_writer. - */ -static void -bbstreamer_gzip_writer_finalize(bbstreamer *streamer) -{ - bbstreamer_gzip_writer *mystreamer; - - mystreamer = (bbstreamer_gzip_writer *) streamer; - - errno = 0; /* in case gzclose() doesn't set it */ - if (gzclose(mystreamer->gzfile) != 0) - pg_fatal("could not close compressed file \"%s\": %m", - mystreamer->pathname); - - mystreamer->gzfile = NULL; -} - -/* - * Free memory associated with this bbstreamer. - */ -static void -bbstreamer_gzip_writer_free(bbstreamer *streamer) -{ - bbstreamer_gzip_writer *mystreamer; - - mystreamer = (bbstreamer_gzip_writer *) streamer; - - Assert(mystreamer->base.bbs_next == NULL); - Assert(mystreamer->gzfile == NULL); - - pfree(mystreamer->pathname); - pfree(mystreamer); -} - -/* - * Helper function for libz error reporting. - */ -static const char * -get_gz_error(gzFile gzf) -{ - int errnum; - const char *errmsg; - - errmsg = gzerror(gzf, &errnum); - if (errnum == Z_ERRNO) - return strerror(errno); - else - return errmsg; -} -#endif - -/* - * Create a new base backup streamer that performs decompression of gzip - * compressed blocks. - */ -bbstreamer * -bbstreamer_gzip_decompressor_new(bbstreamer *next) -{ -#ifdef HAVE_LIBZ - bbstreamer_gzip_decompressor *streamer; - z_stream *zs; - - Assert(next != NULL); - - streamer = palloc0(sizeof(bbstreamer_gzip_decompressor)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_gzip_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - - /* Initialize internal stream state for decompression */ - zs = &streamer->zstream; - zs->zalloc = gzip_palloc; - zs->zfree = gzip_pfree; - zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; - zs->avail_out = streamer->base.bbs_buffer.maxlen; - - /* - * Data compression was initialized using deflateInit2 to request a gzip - * header. Similarly, we are using inflateInit2 to initialize data - * decompression. - * - * Per the documentation for inflateInit2, the second argument is - * "windowBits" and its value must be greater than or equal to the value - * provided while compressing the data, so we are using the maximum - * possible value for safety. - */ - if (inflateInit2(zs, 15 + 16) != Z_OK) - pg_fatal("could not initialize compression library"); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "gzip"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef HAVE_LIBZ -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -bbstreamer_gzip_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_gzip_decompressor *mystreamer; - z_stream *zs; - - mystreamer = (bbstreamer_gzip_decompressor *) streamer; - - zs = &mystreamer->zstream; - zs->next_in = (const uint8 *) data; - zs->avail_in = len; - - /* Process the current chunk */ - while (zs->avail_in > 0) - { - int res; - - Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); - - zs->next_out = (uint8 *) - mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - zs->avail_out = - mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - - /* - * This call decompresses data starting at zs->next_in and updates - * zs->next_in * and zs->avail_in. It generates output data starting - * at zs->next_out and updates zs->next_out and zs->avail_out - * accordingly. - */ - res = inflate(zs, Z_NO_FLUSH); - - if (res == Z_STREAM_ERROR) - pg_log_error("could not decompress data: %s", zs->msg); - - mystreamer->bytes_written = - mystreamer->base.bbs_buffer.maxlen - zs->avail_out; - - /* If output buffer is full then pass data to next streamer */ - if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) - { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, context); - mystreamer->bytes_written = 0; - } - } -} - -/* - * End-of-stream processing. - */ -static void -bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer) -{ - bbstreamer_gzip_decompressor *mystreamer; - - mystreamer = (bbstreamer_gzip_decompressor *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); - - bbstreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -bbstreamer_gzip_decompressor_free(bbstreamer *streamer) -{ - bbstreamer_free(streamer->bbs_next); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} - -/* - * Wrapper function to adjust the signature of palloc to match what libz - * expects. - */ -static void * -gzip_palloc(void *opaque, unsigned items, unsigned size) -{ - return palloc(items * size); -} - -/* - * Wrapper function to adjust the signature of pfree to match what libz - * expects. - */ -static void -gzip_pfree(void *opaque, void *address) -{ - pfree(address); -} -#endif diff --git a/src/bin/pg_basebackup/bbstreamer_inject.c b/src/bin/pg_basebackup/bbstreamer_inject.c deleted file mode 100644 index 194026b56e9..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_inject.c +++ /dev/null @@ -1,249 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_inject.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_inject.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include "bbstreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" - -typedef struct bbstreamer_recovery_injector -{ - bbstreamer base; - bool skip_file; - bool is_recovery_guc_supported; - bool is_postgresql_auto_conf; - bool found_postgresql_auto_conf; - PQExpBuffer recoveryconfcontents; - bbstreamer_member member; -} bbstreamer_recovery_injector; - -static void bbstreamer_recovery_injector_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer); -static void bbstreamer_recovery_injector_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_recovery_injector_ops = { - .content = bbstreamer_recovery_injector_content, - .finalize = bbstreamer_recovery_injector_finalize, - .free = bbstreamer_recovery_injector_free -}; - -/* - * Create a bbstreamer that can edit recoverydata into an archive stream. - * - * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as - * per the conventions described in bbstreamer.h; the chunks forwarded to - * the next bbstreamer will be similarly typed, but the - * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've - * edited the archive stream. - * - * Our goal is to do one of the following three things with the content passed - * via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then - * put the content into recovery.conf, replacing any existing archive member - * by that name; (2) if is_recovery_guc_supported is true and - * postgresql.auto.conf exists in the archive, then append the content - * provided to the existing file; and (3) if is_recovery_guc_supported is - * true but postgresql.auto.conf does not exist in the archive, then create - * it with the specified content. - * - * In addition, if is_recovery_guc_supported is true, then we create a - * zero-length standby.signal file, dropping any file with that name from - * the archive. - */ -bbstreamer * -bbstreamer_recovery_injector_new(bbstreamer *next, - bool is_recovery_guc_supported, - PQExpBuffer recoveryconfcontents) -{ - bbstreamer_recovery_injector *streamer; - - streamer = palloc0(sizeof(bbstreamer_recovery_injector)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_recovery_injector_ops; - streamer->base.bbs_next = next; - streamer->is_recovery_guc_supported = is_recovery_guc_supported; - streamer->recoveryconfcontents = recoveryconfcontents; - - return &streamer->base; -} - -/* - * Handle each chunk of tar content while injecting recovery configuration. - */ -static void -bbstreamer_recovery_injector_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_recovery_injector *mystreamer; - - mystreamer = (bbstreamer_recovery_injector *) streamer; - Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); - - switch (context) - { - case BBSTREAMER_MEMBER_HEADER: - /* Must copy provided data so we have the option to modify it. */ - memcpy(&mystreamer->member, member, sizeof(bbstreamer_member)); - - /* - * On v12+, skip standby.signal and edit postgresql.auto.conf; on - * older versions, skip recovery.conf. - */ - if (mystreamer->is_recovery_guc_supported) - { - mystreamer->skip_file = - (strcmp(member->pathname, "standby.signal") == 0); - mystreamer->is_postgresql_auto_conf = - (strcmp(member->pathname, "postgresql.auto.conf") == 0); - if (mystreamer->is_postgresql_auto_conf) - { - /* Remember we saw it so we don't add it again. */ - mystreamer->found_postgresql_auto_conf = true; - - /* Increment length by data to be injected. */ - mystreamer->member.size += - mystreamer->recoveryconfcontents->len; - - /* - * Zap data and len because the archive header is no - * longer valid; some subsequent bbstreamer must - * regenerate it if it's necessary. - */ - data = NULL; - len = 0; - } - } - else - mystreamer->skip_file = - (strcmp(member->pathname, "recovery.conf") == 0); - - /* Do not forward if the file is to be skipped. */ - if (mystreamer->skip_file) - return; - break; - - case BBSTREAMER_MEMBER_CONTENTS: - /* Do not forward if the file is to be skipped. */ - if (mystreamer->skip_file) - return; - break; - - case BBSTREAMER_MEMBER_TRAILER: - /* Do not forward it the file is to be skipped. */ - if (mystreamer->skip_file) - return; - - /* Append provided content to whatever we already sent. */ - if (mystreamer->is_postgresql_auto_conf) - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len, - BBSTREAMER_MEMBER_CONTENTS); - break; - - case BBSTREAMER_ARCHIVE_TRAILER: - if (mystreamer->is_recovery_guc_supported) - { - /* - * If we didn't already find (and thus modify) - * postgresql.auto.conf, inject it as an additional archive - * member now. - */ - if (!mystreamer->found_postgresql_auto_conf) - bbstreamer_inject_file(mystreamer->base.bbs_next, - "postgresql.auto.conf", - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len); - - /* Inject empty standby.signal file. */ - bbstreamer_inject_file(mystreamer->base.bbs_next, - "standby.signal", "", 0); - } - else - { - /* Inject recovery.conf file with specified contents. */ - bbstreamer_inject_file(mystreamer->base.bbs_next, - "recovery.conf", - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len); - } - - /* Nothing to do here. */ - break; - - default: - /* Shouldn't happen. */ - pg_fatal("unexpected state while injecting recovery settings"); - } - - bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member, - data, len, context); -} - -/* - * End-of-stream processing for this bbstreamer. - */ -static void -bbstreamer_recovery_injector_finalize(bbstreamer *streamer) -{ - bbstreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with this bbstreamer. - */ -static void -bbstreamer_recovery_injector_free(bbstreamer *streamer) -{ - bbstreamer_free(streamer->bbs_next); - pfree(streamer); -} - -/* - * Inject a member into the archive with specified contents. - */ -void -bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data, - int len) -{ - bbstreamer_member member; - - strlcpy(member.pathname, pathname, MAXPGPATH); - member.size = len; - member.mode = pg_file_create_mode; - member.is_directory = false; - member.is_link = false; - member.linktarget[0] = '\0'; - - /* - * There seems to be no principled argument for these values, but they are - * what PostgreSQL has historically used. - */ - member.uid = 04000; - member.gid = 02000; - - /* - * We don't know here how to generate valid member headers and trailers - * for the archiving format in use, so if those are needed, some successor - * bbstreamer will have to generate them using the data from 'member'. - */ - bbstreamer_content(streamer, &member, NULL, 0, - BBSTREAMER_MEMBER_HEADER); - bbstreamer_content(streamer, &member, data, len, - BBSTREAMER_MEMBER_CONTENTS); - bbstreamer_content(streamer, &member, NULL, 0, - BBSTREAMER_MEMBER_TRAILER); -} diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c deleted file mode 100644 index f5c9e68150c..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_lz4.c +++ /dev/null @@ -1,422 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_lz4.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_lz4.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef USE_LZ4 -#include -#endif - -#include "bbstreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -#ifdef USE_LZ4 -typedef struct bbstreamer_lz4_frame -{ - bbstreamer base; - - LZ4F_compressionContext_t cctx; - LZ4F_decompressionContext_t dctx; - LZ4F_preferences_t prefs; - - size_t bytes_written; - bool header_written; -} bbstreamer_lz4_frame; - -static void bbstreamer_lz4_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer); -static void bbstreamer_lz4_compressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_lz4_compressor_ops = { - .content = bbstreamer_lz4_compressor_content, - .finalize = bbstreamer_lz4_compressor_finalize, - .free = bbstreamer_lz4_compressor_free -}; - -static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = { - .content = bbstreamer_lz4_decompressor_content, - .finalize = bbstreamer_lz4_decompressor_finalize, - .free = bbstreamer_lz4_decompressor_free -}; -#endif - -/* - * Create a new base backup streamer that performs lz4 compression of tar - * blocks. - */ -bbstreamer * -bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress) -{ -#ifdef USE_LZ4 - bbstreamer_lz4_frame *streamer; - LZ4F_errorCode_t ctxError; - LZ4F_preferences_t *prefs; - - Assert(next != NULL); - - streamer = palloc0(sizeof(bbstreamer_lz4_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_lz4_compressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - streamer->header_written = false; - - /* Initialize stream compression preferences */ - prefs = &streamer->prefs; - memset(prefs, 0, sizeof(LZ4F_preferences_t)); - prefs->frameInfo.blockSizeID = LZ4F_max256KB; - prefs->compressionLevel = compress->level; - - ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); - if (LZ4F_isError(ctxError)) - pg_log_error("could not create lz4 compression context: %s", - LZ4F_getErrorName(ctxError)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "LZ4"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_LZ4 -/* - * Compress the input data to output buffer. - * - * Find out the compression bound based on input data length for each - * invocation to make sure that output buffer has enough capacity to - * accommodate the compressed data. In case if the output buffer - * capacity falls short of compression bound then forward the content - * of output buffer to next streamer and empty the buffer. - */ -static void -bbstreamer_lz4_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_lz4_frame *mystreamer; - uint8 *next_in, - *next_out; - size_t out_bound, - compressed_size, - avail_out; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - next_in = (uint8 *) data; - - /* Write header before processing the first input chunk. */ - if (!mystreamer->header_written) - { - compressed_size = LZ4F_compressBegin(mystreamer->cctx, - (uint8 *) mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - &mystreamer->prefs); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not write lz4 header: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; - mystreamer->header_written = true; - } - - /* - * Update the offset and capacity of output buffer based on number of - * bytes written to output buffer. - */ - next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - - /* - * Find out the compression bound and make sure that output buffer has the - * required capacity for the success of LZ4F_compressUpdate. If needed - * forward the content to next streamer and empty the buffer. - */ - out_bound = LZ4F_compressBound(len, &mystreamer->prefs); - if (avail_out < out_bound) - { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - context); - - /* Enlarge buffer if it falls short of out bound. */ - if (mystreamer->base.bbs_buffer.maxlen < out_bound) - enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - - /* - * This call compresses the data starting at next_in and generates the - * output starting at next_out. It expects the caller to provide the size - * of input buffer and capacity of output buffer by providing parameters - * len and avail_out. - * - * It returns the number of bytes compressed to output buffer. - */ - compressed_size = LZ4F_compressUpdate(mystreamer->cctx, - next_out, avail_out, - next_in, len, NULL); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not compress data: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; -} - -/* - * End-of-stream processing. - */ -static void -bbstreamer_lz4_compressor_finalize(bbstreamer *streamer) -{ - bbstreamer_lz4_frame *mystreamer; - uint8 *next_out; - size_t footer_bound, - compressed_size, - avail_out; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - - /* Find out the footer bound and update the output buffer. */ - footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); - if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < - footer_bound) - { - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - BBSTREAMER_UNKNOWN); - - /* Enlarge buffer if it falls short of footer bound. */ - if (mystreamer->base.bbs_buffer.maxlen < footer_bound) - enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - else - { - next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - } - - /* - * Finalize the frame and flush whatever data remaining in compression - * context. - */ - compressed_size = LZ4F_compressEnd(mystreamer->cctx, - next_out, avail_out, NULL); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not end lz4 compression: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; - - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - BBSTREAMER_UNKNOWN); - - bbstreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -bbstreamer_lz4_compressor_free(bbstreamer *streamer) -{ - bbstreamer_lz4_frame *mystreamer; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - bbstreamer_free(streamer->bbs_next); - LZ4F_freeCompressionContext(mystreamer->cctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif - -/* - * Create a new base backup streamer that performs decompression of lz4 - * compressed blocks. - */ -bbstreamer * -bbstreamer_lz4_decompressor_new(bbstreamer *next) -{ -#ifdef USE_LZ4 - bbstreamer_lz4_frame *streamer; - LZ4F_errorCode_t ctxError; - - Assert(next != NULL); - - streamer = palloc0(sizeof(bbstreamer_lz4_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_lz4_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - - /* Initialize internal stream state for decompression */ - ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); - if (LZ4F_isError(ctxError)) - pg_fatal("could not initialize compression library: %s", - LZ4F_getErrorName(ctxError)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "LZ4"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_LZ4 -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -bbstreamer_lz4_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_lz4_frame *mystreamer; - uint8 *next_in, - *next_out; - size_t avail_in, - avail_out; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - next_in = (uint8 *) data; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - avail_in = len; - avail_out = mystreamer->base.bbs_buffer.maxlen; - - while (avail_in > 0) - { - size_t ret, - read_size, - out_size; - - read_size = avail_in; - out_size = avail_out; - - /* - * This call decompresses the data starting at next_in and generates - * the output data starting at next_out. It expects the caller to - * provide size of the input buffer and total capacity of the output - * buffer by providing the read_size and out_size parameters - * respectively. - * - * Per the documentation of LZ4, parameters read_size and out_size - * behaves as dual parameters. On return, the number of bytes consumed - * from the input buffer will be written back to read_size and the - * number of bytes decompressed to output buffer will be written back - * to out_size respectively. - */ - ret = LZ4F_decompress(mystreamer->dctx, - next_out, &out_size, - next_in, &read_size, NULL); - - if (LZ4F_isError(ret)) - pg_log_error("could not decompress data: %s", - LZ4F_getErrorName(ret)); - - /* Update input buffer based on number of bytes consumed */ - avail_in -= read_size; - next_in += read_size; - - mystreamer->bytes_written += out_size; - - /* - * If output buffer is full then forward the content to next streamer - * and update the output buffer. - */ - if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) - { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - context); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - else - { - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - next_out += mystreamer->bytes_written; - } - } -} - -/* - * End-of-stream processing. - */ -static void -bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer) -{ - bbstreamer_lz4_frame *mystreamer; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); - - bbstreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -bbstreamer_lz4_decompressor_free(bbstreamer *streamer) -{ - bbstreamer_lz4_frame *mystreamer; - - mystreamer = (bbstreamer_lz4_frame *) streamer; - bbstreamer_free(streamer->bbs_next); - LZ4F_freeDecompressionContext(mystreamer->dctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif diff --git a/src/bin/pg_basebackup/bbstreamer_tar.c b/src/bin/pg_basebackup/bbstreamer_tar.c deleted file mode 100644 index 9137d17ddc1..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_tar.c +++ /dev/null @@ -1,514 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_tar.c - * - * This module implements three types of tar processing. A tar parser - * expects unlabelled chunks of data (e.g. BBSTREAMER_UNKNOWN) and splits - * it into labelled chunks (any other value of bbstreamer_archive_context). - * A tar archiver does the reverse: it takes a bunch of labelled chunks - * and produces a tarfile, optionally replacing member headers and trailers - * so that upstream bbstreamer objects can perform surgery on the tarfile - * contents without knowing the details of the tar format. A tar terminator - * just adds two blocks of NUL bytes to the end of the file, since older - * server versions produce files with this terminator omitted. - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_tar.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#include "bbstreamer.h" -#include "common/logging.h" -#include "pgtar.h" - -typedef struct bbstreamer_tar_parser -{ - bbstreamer base; - bbstreamer_archive_context next_context; - bbstreamer_member member; - size_t file_bytes_sent; - size_t pad_bytes_expected; -} bbstreamer_tar_parser; - -typedef struct bbstreamer_tar_archiver -{ - bbstreamer base; - bool rearchive_member; -} bbstreamer_tar_archiver; - -static void bbstreamer_tar_parser_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_parser_finalize(bbstreamer *streamer); -static void bbstreamer_tar_parser_free(bbstreamer *streamer); -static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer); - -static const bbstreamer_ops bbstreamer_tar_parser_ops = { - .content = bbstreamer_tar_parser_content, - .finalize = bbstreamer_tar_parser_finalize, - .free = bbstreamer_tar_parser_free -}; - -static void bbstreamer_tar_archiver_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer); -static void bbstreamer_tar_archiver_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_tar_archiver_ops = { - .content = bbstreamer_tar_archiver_content, - .finalize = bbstreamer_tar_archiver_finalize, - .free = bbstreamer_tar_archiver_free -}; - -static void bbstreamer_tar_terminator_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_terminator_finalize(bbstreamer *streamer); -static void bbstreamer_tar_terminator_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_tar_terminator_ops = { - .content = bbstreamer_tar_terminator_content, - .finalize = bbstreamer_tar_terminator_finalize, - .free = bbstreamer_tar_terminator_free -}; - -/* - * Create a bbstreamer that can parse a stream of content as tar data. - * - * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer - * specified by 'next' will receive a series of typed chunks, as per the - * conventions described in bbstreamer.h. - */ -bbstreamer * -bbstreamer_tar_parser_new(bbstreamer *next) -{ - bbstreamer_tar_parser *streamer; - - streamer = palloc0(sizeof(bbstreamer_tar_parser)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_tar_parser_ops; - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - streamer->next_context = BBSTREAMER_MEMBER_HEADER; - - return &streamer->base; -} - -/* - * Parse unknown content as tar data. - */ -static void -bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; - size_t nbytes; - - /* Expect unparsed input. */ - Assert(member == NULL); - Assert(context == BBSTREAMER_UNKNOWN); - - while (len > 0) - { - switch (mystreamer->next_context) - { - case BBSTREAMER_MEMBER_HEADER: - - /* - * If we're expecting an archive member header, accumulate a - * full block of data before doing anything further. - */ - if (!bbstreamer_buffer_until(streamer, &data, &len, - TAR_BLOCK_SIZE)) - return; - - /* - * Now we can process the header and get ready to process the - * file contents; however, we might find out that what we - * thought was the next file header is actually the start of - * the archive trailer. Switch modes accordingly. - */ - if (bbstreamer_tar_header(mystreamer)) - { - if (mystreamer->member.size == 0) - { - /* No content; trailer is zero-length. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - BBSTREAMER_MEMBER_TRAILER); - - /* Expect next header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; - } - else - { - /* Expect contents. */ - mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS; - } - mystreamer->base.bbs_buffer.len = 0; - mystreamer->file_bytes_sent = 0; - } - else - mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER; - break; - - case BBSTREAMER_MEMBER_CONTENTS: - - /* - * Send as much content as we have, but not more than the - * remaining file length. - */ - Assert(mystreamer->file_bytes_sent < mystreamer->member.size); - nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; - nbytes = Min(nbytes, len); - Assert(nbytes > 0); - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, nbytes, - BBSTREAMER_MEMBER_CONTENTS); - mystreamer->file_bytes_sent += nbytes; - data += nbytes; - len -= nbytes; - - /* - * If we've not yet sent the whole file, then there's more - * content to come; otherwise, it's time to expect the file - * trailer. - */ - Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); - if (mystreamer->file_bytes_sent == mystreamer->member.size) - { - if (mystreamer->pad_bytes_expected == 0) - { - /* Trailer is zero-length. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - BBSTREAMER_MEMBER_TRAILER); - - /* Expect next header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; - } - else - { - /* Trailer is not zero-length. */ - mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER; - } - mystreamer->base.bbs_buffer.len = 0; - } - break; - - case BBSTREAMER_MEMBER_TRAILER: - - /* - * If we're expecting an archive member trailer, accumulate - * the expected number of padding bytes before sending - * anything onward. - */ - if (!bbstreamer_buffer_until(streamer, &data, &len, - mystreamer->pad_bytes_expected)) - return; - - /* OK, now we can send it. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, mystreamer->pad_bytes_expected, - BBSTREAMER_MEMBER_TRAILER); - - /* Expect next file header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; - mystreamer->base.bbs_buffer.len = 0; - break; - - case BBSTREAMER_ARCHIVE_TRAILER: - - /* - * We've seen an end-of-archive indicator, so anything more is - * buffered and sent as part of the archive trailer. But we - * don't expect more than 2 blocks. - */ - bbstreamer_buffer_bytes(streamer, &data, &len, len); - if (len > 2 * TAR_BLOCK_SIZE) - pg_fatal("tar file trailer exceeds 2 blocks"); - return; - - default: - /* Shouldn't happen. */ - pg_fatal("unexpected state while parsing tar archive"); - } - } -} - -/* - * Parse a file header within a tar stream. - * - * The return value is true if we found a file header and passed it on to the - * next bbstreamer; it is false if we have reached the archive trailer. - */ -static bool -bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer) -{ - bool has_nonzero_byte = false; - int i; - bbstreamer_member *member = &mystreamer->member; - char *buffer = mystreamer->base.bbs_buffer.data; - - Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); - - /* Check whether we've got a block of all zero bytes. */ - for (i = 0; i < TAR_BLOCK_SIZE; ++i) - { - if (buffer[i] != '\0') - { - has_nonzero_byte = true; - break; - } - } - - /* - * If the entire block was zeros, this is the end of the archive, not the - * start of the next file. - */ - if (!has_nonzero_byte) - return false; - - /* - * Parse key fields out of the header. - */ - strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); - if (member->pathname[0] == '\0') - pg_fatal("tar member has empty name"); - member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); - member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); - member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); - member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); - member->is_directory = - (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); - member->is_link = - (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); - if (member->is_link) - strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); - - /* Compute number of padding bytes. */ - mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); - - /* Forward the entire header to the next bbstreamer. */ - bbstreamer_content(mystreamer->base.bbs_next, member, - buffer, TAR_BLOCK_SIZE, - BBSTREAMER_MEMBER_HEADER); - - return true; -} - -/* - * End-of-stream processing for a tar parser. - */ -static void -bbstreamer_tar_parser_finalize(bbstreamer *streamer) -{ - bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; - - if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER && - (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER || - mystreamer->base.bbs_buffer.len > 0)) - pg_fatal("COPY stream ended before last file was finished"); - - /* Send the archive trailer, even if empty. */ - bbstreamer_content(streamer->bbs_next, NULL, - streamer->bbs_buffer.data, streamer->bbs_buffer.len, - BBSTREAMER_ARCHIVE_TRAILER); - - /* Now finalize successor. */ - bbstreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar parser. - */ -static void -bbstreamer_tar_parser_free(bbstreamer *streamer) -{ - pfree(streamer->bbs_buffer.data); - bbstreamer_free(streamer->bbs_next); -} - -/* - * Create a bbstreamer that can generate a tar archive. - * - * This is intended to be usable either for generating a brand-new tar archive - * or for modifying one on the fly. The input should be a series of typed - * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for - * bbstreamer_tar_parser_content. - */ -bbstreamer * -bbstreamer_tar_archiver_new(bbstreamer *next) -{ - bbstreamer_tar_archiver *streamer; - - streamer = palloc0(sizeof(bbstreamer_tar_archiver)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_tar_archiver_ops; - streamer->base.bbs_next = next; - - return &streamer->base; -} - -/* - * Fix up the stream of input chunks to create a valid tar file. - * - * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a - * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is - * passed through without change. Any other size is a fatal error (and - * indicates a bug). - * - * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the - * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from - * scratch. Specifically, we construct a block of zero bytes sufficient to - * pad out to a block boundary, as required by the tar format. Other - * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change. - * - * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change. - * - * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two - * blocks of zero bytes. Not all tar programs require this, but apparently - * some do. The server does not supply this trailer. If no archive trailer is - * present, one will be added by bbstreamer_tar_parser_finalize. - */ -static void -bbstreamer_tar_archiver_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer; - char buffer[2 * TAR_BLOCK_SIZE]; - - Assert(context != BBSTREAMER_UNKNOWN); - - if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) - { - Assert(len == 0); - - /* Replace zero-length tar header with a newly constructed one. */ - tarCreateHeader(buffer, member->pathname, NULL, - member->size, member->mode, member->uid, member->gid, - time(NULL)); - data = buffer; - len = TAR_BLOCK_SIZE; - - /* Also make a note to replace padding, in case size changed. */ - mystreamer->rearchive_member = true; - } - else if (context == BBSTREAMER_MEMBER_TRAILER && - mystreamer->rearchive_member) - { - int pad_bytes = tarPaddingBytesRequired(member->size); - - /* Also replace padding, if we regenerated the header. */ - memset(buffer, 0, pad_bytes); - data = buffer; - len = pad_bytes; - - /* Don't do this again unless we replace another header. */ - mystreamer->rearchive_member = false; - } - else if (context == BBSTREAMER_ARCHIVE_TRAILER) - { - /* Trailer should always be two blocks of zero bytes. */ - memset(buffer, 0, 2 * TAR_BLOCK_SIZE); - data = buffer; - len = 2 * TAR_BLOCK_SIZE; - } - - bbstreamer_content(streamer->bbs_next, member, data, len, context); -} - -/* - * End-of-stream processing for a tar archiver. - */ -static void -bbstreamer_tar_archiver_finalize(bbstreamer *streamer) -{ - bbstreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar archiver. - */ -static void -bbstreamer_tar_archiver_free(bbstreamer *streamer) -{ - bbstreamer_free(streamer->bbs_next); - pfree(streamer); -} - -/* - * Create a bbstreamer that blindly adds two blocks of NUL bytes to the - * end of an incomplete tarfile that the server might send us. - */ -bbstreamer * -bbstreamer_tar_terminator_new(bbstreamer *next) -{ - bbstreamer *streamer; - - streamer = palloc0(sizeof(bbstreamer)); - *((const bbstreamer_ops **) &streamer->bbs_ops) = - &bbstreamer_tar_terminator_ops; - streamer->bbs_next = next; - - return streamer; -} - -/* - * Pass all the content through without change. - */ -static void -bbstreamer_tar_terminator_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - /* Expect unparsed input. */ - Assert(member == NULL); - Assert(context == BBSTREAMER_UNKNOWN); - - /* Just forward it. */ - bbstreamer_content(streamer->bbs_next, member, data, len, context); -} - -/* - * At the end, blindly add the two blocks of NUL bytes which the server fails - * to supply. - */ -static void -bbstreamer_tar_terminator_finalize(bbstreamer *streamer) -{ - char buffer[2 * TAR_BLOCK_SIZE]; - - memset(buffer, 0, 2 * TAR_BLOCK_SIZE); - bbstreamer_content(streamer->bbs_next, NULL, buffer, - 2 * TAR_BLOCK_SIZE, BBSTREAMER_UNKNOWN); - bbstreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar terminator. - */ -static void -bbstreamer_tar_terminator_free(bbstreamer *streamer) -{ - bbstreamer_free(streamer->bbs_next); - pfree(streamer); -} diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c deleted file mode 100644 index 20f11d4450e..00000000000 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ /dev/null @@ -1,368 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer_zstd.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_zstd.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef USE_ZSTD -#include -#endif - -#include "bbstreamer.h" -#include "common/logging.h" - -#ifdef USE_ZSTD - -typedef struct bbstreamer_zstd_frame -{ - bbstreamer base; - - ZSTD_CCtx *cctx; - ZSTD_DCtx *dctx; - ZSTD_outBuffer zstd_outBuf; -} bbstreamer_zstd_frame; - -static void bbstreamer_zstd_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer); -static void bbstreamer_zstd_compressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_zstd_compressor_ops = { - .content = bbstreamer_zstd_compressor_content, - .finalize = bbstreamer_zstd_compressor_finalize, - .free = bbstreamer_zstd_compressor_free -}; - -static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { - .content = bbstreamer_zstd_decompressor_content, - .finalize = bbstreamer_zstd_decompressor_finalize, - .free = bbstreamer_zstd_decompressor_free -}; -#endif - -/* - * Create a new base backup streamer that performs zstd compression of tar - * blocks. - */ -bbstreamer * -bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress) -{ -#ifdef USE_ZSTD - bbstreamer_zstd_frame *streamer; - size_t ret; - - Assert(next != NULL); - - streamer = palloc0(sizeof(bbstreamer_zstd_frame)); - - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_zstd_compressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); - - streamer->cctx = ZSTD_createCCtx(); - if (!streamer->cctx) - pg_fatal("could not create zstd compression context"); - - /* Set compression level */ - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, - compress->level); - if (ZSTD_isError(ret)) - pg_fatal("could not set zstd compression level to %d: %s", - compress->level, ZSTD_getErrorName(ret)); - - /* Set # of workers, if specified */ - if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0) - { - /* - * On older versions of libzstd, this option does not exist, and - * trying to set it will fail. Similarly for newer versions if they - * are compiled without threading support. - */ - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, - compress->workers); - if (ZSTD_isError(ret)) - pg_fatal("could not set compression worker count to %d: %s", - compress->workers, ZSTD_getErrorName(ret)); - } - - if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) - { - ret = ZSTD_CCtx_setParameter(streamer->cctx, - ZSTD_c_enableLongDistanceMatching, - compress->long_distance); - if (ZSTD_isError(ret)) - { - pg_log_error("could not enable long-distance mode: %s", - ZSTD_getErrorName(ret)); - exit(1); - } - } - - /* Initialize the ZSTD output buffer. */ - streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; - streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; - streamer->zstd_outBuf.pos = 0; - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "ZSTD"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_ZSTD -/* - * Compress the input data to output buffer. - * - * Find out the compression bound based on input data length for each - * invocation to make sure that output buffer has enough capacity to - * accommodate the compressed data. In case if the output buffer - * capacity falls short of compression bound then forward the content - * of output buffer to next streamer and empty the buffer. - */ -static void -bbstreamer_zstd_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - ZSTD_inBuffer inBuf = {data, len, 0}; - - while (inBuf.pos < inBuf.size) - { - size_t yet_to_flush; - size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); - - /* - * If the output buffer is not left with enough space, send the - * compressed bytes to the next streamer, and empty the buffer. - */ - if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < - max_needed) - { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - yet_to_flush = - ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, - &inBuf, ZSTD_e_continue); - - if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", - ZSTD_getErrorName(yet_to_flush)); - } -} - -/* - * End-of-stream processing. - */ -static void -bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - size_t yet_to_flush; - - do - { - ZSTD_inBuffer in = {NULL, 0, 0}; - size_t max_needed = ZSTD_compressBound(0); - - /* - * If the output buffer is not left with enough space, send the - * compressed bytes to the next streamer, and empty the buffer. - */ - if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < - max_needed) - { - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - BBSTREAMER_UNKNOWN); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, - &mystreamer->zstd_outBuf, - &in, ZSTD_e_end); - - if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", - ZSTD_getErrorName(yet_to_flush)); - - } while (yet_to_flush > 0); - - /* Make sure to pass any remaining bytes to the next streamer. */ - if (mystreamer->zstd_outBuf.pos > 0) - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - BBSTREAMER_UNKNOWN); - - bbstreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -bbstreamer_zstd_compressor_free(bbstreamer *streamer) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - - bbstreamer_free(streamer->bbs_next); - ZSTD_freeCCtx(mystreamer->cctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif - -/* - * Create a new base backup streamer that performs decompression of zstd - * compressed blocks. - */ -bbstreamer * -bbstreamer_zstd_decompressor_new(bbstreamer *next) -{ -#ifdef USE_ZSTD - bbstreamer_zstd_frame *streamer; - - Assert(next != NULL); - - streamer = palloc0(sizeof(bbstreamer_zstd_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_zstd_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); - - streamer->dctx = ZSTD_createDCtx(); - if (!streamer->dctx) - pg_fatal("could not create zstd decompression context"); - - /* Initialize the ZSTD output buffer. */ - streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; - streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; - streamer->zstd_outBuf.pos = 0; - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "ZSTD"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_ZSTD -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -bbstreamer_zstd_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - ZSTD_inBuffer inBuf = {data, len, 0}; - - while (inBuf.pos < inBuf.size) - { - size_t ret; - - /* - * If output buffer is full then forward the content to next streamer - * and update the output buffer. - */ - if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) - { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - ret = ZSTD_decompressStream(mystreamer->dctx, - &mystreamer->zstd_outBuf, &inBuf); - - if (ZSTD_isError(ret)) - pg_log_error("could not decompress data: %s", - ZSTD_getErrorName(ret)); - } -} - -/* - * End-of-stream processing. - */ -static void -bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - if (mystreamer->zstd_outBuf.pos > 0) - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); - - bbstreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -bbstreamer_zstd_decompressor_free(bbstreamer *streamer) -{ - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; - - bbstreamer_free(streamer->bbs_next); - ZSTD_freeDCtx(mystreamer->dctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build index c00acd5e118..a68dbd7837d 100644 --- a/src/bin/pg_basebackup/meson.build +++ b/src/bin/pg_basebackup/meson.build @@ -1,12 +1,12 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group common_sources = files( - 'bbstreamer_file.c', - 'bbstreamer_gzip.c', - 'bbstreamer_inject.c', - 'bbstreamer_lz4.c', - 'bbstreamer_tar.c', - 'bbstreamer_zstd.c', + 'astreamer_file.c', + 'astreamer_gzip.c', + 'astreamer_inject.c', + 'astreamer_lz4.c', + 'astreamer_tar.c', + 'astreamer_zstd.c', 'receivelog.c', 'streamutil.c', 'walmethods.c', diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk index 384dbb021e9..950b9797b1e 100644 --- a/src/bin/pg_basebackup/nls.mk +++ b/src/bin/pg_basebackup/nls.mk @@ -1,12 +1,12 @@ # src/bin/pg_basebackup/nls.mk CATALOG_NAME = pg_basebackup GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \ - bbstreamer_file.c \ - bbstreamer_gzip.c \ - bbstreamer_inject.c \ - bbstreamer_lz4.c \ - bbstreamer_tar.c \ - bbstreamer_zstd.c \ + astreamer_file.c \ + astreamer_gzip.c \ + astreamer_inject.c \ + astreamer_lz4.c \ + astreamer_tar.c \ + astreamer_zstd.c \ pg_basebackup.c \ pg_createsubscriber.c \ pg_receivewal.c \ diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8f3dd04fd22..1966ada69c9 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -26,8 +26,8 @@ #endif #include "access/xlog_internal.h" +#include "astreamer.h" #include "backup/basebackup.h" -#include "bbstreamer.h" #include "common/compression.h" #include "common/file_perm.h" #include "common/file_utils.h" @@ -57,8 +57,8 @@ typedef struct ArchiveStreamState { int tablespacenum; pg_compress_specification *compress; - bbstreamer *streamer; - bbstreamer *manifest_inject_streamer; + astreamer *streamer; + astreamer *manifest_inject_streamer; PQExpBuffer manifest_buffer; char manifest_filename[MAXPGPATH]; FILE *manifest_file; @@ -67,7 +67,7 @@ typedef struct ArchiveStreamState typedef struct WriteTarState { int tablespacenum; - bbstreamer *streamer; + astreamer *streamer; } WriteTarState; typedef struct WriteManifestState @@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo static void progress_update_filename(const char *filename); static void progress_report(int tablespacenum, bool force, bool finished); -static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, - bool is_recovery_guc_supported, - bool expect_unterminated_tarfile, - pg_compress_specification *compress); +static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation, + astreamer **manifest_inject_streamer_p, + bool is_recovery_guc_supported, + bool expect_unterminated_tarfile, + pg_compress_specification *compress); static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data); static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); @@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, * the options selected by the user. We may just write the results directly * to a file, or we might compress first, or we might extract the tar file * and write each member separately. This function doesn't do any of that - * directly, but it works out what kind of bbstreamer we need to create so + * directly, but it works out what kind of astreamer we need to create so * that the right stuff happens when, down the road, we actually receive * the data. */ -static bbstreamer * +static astreamer * CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, + astreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, bool expect_unterminated_tarfile, pg_compress_specification *compress) { - bbstreamer *streamer = NULL; - bbstreamer *manifest_inject_streamer = NULL; + astreamer *streamer = NULL; + astreamer *manifest_inject_streamer = NULL; bool inject_manifest; bool is_tar, is_tar_gz, @@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, directory = psprintf("%s/%s", basedir, spclocation); else directory = get_tablespace_mapping(spclocation); - streamer = bbstreamer_extractor_new(directory, - get_tablespace_mapping, - progress_update_filename); + streamer = astreamer_extractor_new(directory, + get_tablespace_mapping, + progress_update_filename); } else { @@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } if (compress->algorithm == PG_COMPRESSION_NONE) - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); else if (compress->algorithm == PG_COMPRESSION_GZIP) { strlcat(archive_filename, ".gz", sizeof(archive_filename)); - streamer = bbstreamer_gzip_writer_new(archive_filename, - archive_file, compress); + streamer = astreamer_gzip_writer_new(archive_filename, + archive_file, compress); } else if (compress->algorithm == PG_COMPRESSION_LZ4) { strlcat(archive_filename, ".lz4", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_lz4_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_lz4_compressor_new(streamer, compress); } else if (compress->algorithm == PG_COMPRESSION_ZSTD) { strlcat(archive_filename, ".zst", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_zstd_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_zstd_compressor_new(streamer, compress); } else { @@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * into it. */ if (must_parse_archive) - streamer = bbstreamer_tar_archiver_new(streamer); + streamer = astreamer_tar_archiver_new(streamer); progress_update_filename(archive_filename); } @@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (spclocation == NULL && writerecoveryconf) { Assert(must_parse_archive); - streamer = bbstreamer_recovery_injector_new(streamer, - is_recovery_guc_supported, - recoveryconfcontents); + streamer = astreamer_recovery_injector_new(streamer, + is_recovery_guc_supported, + recoveryconfcontents); } /* @@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * we're talking to such a server we'll need to add the terminator here. */ if (must_parse_archive) - streamer = bbstreamer_tar_parser_new(streamer); + streamer = astreamer_tar_parser_new(streamer); else if (expect_unterminated_tarfile) - streamer = bbstreamer_tar_terminator_new(streamer); + streamer = astreamer_tar_terminator_new(streamer); /* * If the user has requested a server compressed archive along with @@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (format == 'p') { if (is_tar_gz) - streamer = bbstreamer_gzip_decompressor_new(streamer); + streamer = astreamer_gzip_decompressor_new(streamer); else if (is_tar_lz4) - streamer = bbstreamer_lz4_decompressor_new(streamer); + streamer = astreamer_lz4_decompressor_new(streamer); else if (is_tar_zstd) - streamer = bbstreamer_zstd_decompressor_new(streamer); + streamer = astreamer_zstd_decompressor_new(streamer); } /* Return the results. */ @@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) if (state.manifest_inject_streamer != NULL && state.manifest_buffer != NULL) { - bbstreamer_inject_file(state.manifest_inject_streamer, - "backup_manifest", - state.manifest_buffer->data, - state.manifest_buffer->len); + astreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); destroyPQExpBuffer(state.manifest_buffer); state.manifest_buffer = NULL; } @@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) /* If there's still an archive in progress, end processing. */ if (state.streamer != NULL) { - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); state.streamer = NULL; } } @@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) /* End processing of any prior archive. */ if (state->streamer != NULL) { - bbstreamer_finalize(state->streamer); - bbstreamer_free(state->streamer); + astreamer_finalize(state->streamer); + astreamer_free(state->streamer); state->streamer = NULL; } @@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) else if (state->streamer != NULL) { /* Archive data. */ - bbstreamer_content(state->streamer, NULL, copybuf + 1, - r - 1, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, ASTREAMER_UNKNOWN); } else pg_fatal("unexpected payload data"); @@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum, pg_compress_specification *compress) { WriteTarState state; - bbstreamer *manifest_inject_streamer; + astreamer *manifest_inject_streamer; bool is_recovery_guc_supported; bool expect_unterminated_tarfile; @@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, pg_fatal("out of memory"); /* Inject it into the output tarfile. */ - bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest", - buf.data, buf.len); + astreamer_inject_file(manifest_inject_streamer, "backup_manifest", + buf.data, buf.len); /* Free memory. */ termPQExpBuffer(&buf); } /* Cleanup. */ - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); progress_report(tablespacenum, true, false); @@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data) { WriteTarState *state = callback_data; - bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN); totaldone += r; progress_report(state->tablespacenum, false, false); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6e6b7c27118..547d14b3e7c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3317,19 +3317,19 @@ bbsink_shell bbsink_state bbsink_throttle bbsink_zstd -bbstreamer -bbstreamer_archive_context -bbstreamer_extractor -bbstreamer_gzip_decompressor -bbstreamer_gzip_writer -bbstreamer_lz4_frame -bbstreamer_member -bbstreamer_ops -bbstreamer_plain_writer -bbstreamer_recovery_injector -bbstreamer_tar_archiver -bbstreamer_tar_parser -bbstreamer_zstd_frame +astreamer +astreamer_archive_context +astreamer_extractor +astreamer_gzip_decompressor +astreamer_gzip_writer +astreamer_lz4_frame +astreamer_member +astreamer_ops +astreamer_plain_writer +astreamer_recovery_injector +astreamer_tar_archiver +astreamer_tar_parser +astreamer_zstd_frame bgworker_main_type bh_node_type binaryheap -- cgit v1.2.3