diff options
Diffstat (limited to 'src/fe_utils/astreamer_tar.c')
-rw-r--r-- | src/fe_utils/astreamer_tar.c | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c new file mode 100644 index 00000000000..f5d3562d280 --- /dev/null +++ b/src/fe_utils/astreamer_tar.c @@ -0,0 +1,514 @@ +/*------------------------------------------------------------------------- + * + * astreamer_tar.c + * + * This module implements three types of tar processing. A tar parser + * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits + * it into labelled chunks (any other value of astreamer_archive_context). + * A tar archiver does the reverse: it takes a bunch of labelled chunks + * and produces a tarfile, optionally replacing member headers and trailers + * so that upstream astreamer objects can perform surgery on the tarfile + * contents without knowing the details of the tar format. A tar terminator + * just adds two blocks of NUL bytes to the end of the file, since older + * server versions produce files with this terminator omitted. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_tar.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <time.h> + +#include "common/logging.h" +#include "fe_utils/astreamer.h" +#include "pgtar.h" + +typedef struct astreamer_tar_parser +{ + astreamer base; + astreamer_archive_context next_context; + astreamer_member member; + size_t file_bytes_sent; + size_t pad_bytes_expected; +} astreamer_tar_parser; + +typedef struct astreamer_tar_archiver +{ + astreamer base; + bool rearchive_member; +} astreamer_tar_archiver; + +static void astreamer_tar_parser_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_parser_finalize(astreamer *streamer); +static void astreamer_tar_parser_free(astreamer *streamer); +static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); + +static const astreamer_ops astreamer_tar_parser_ops = { + .content = astreamer_tar_parser_content, + .finalize = astreamer_tar_parser_finalize, + .free = astreamer_tar_parser_free +}; + +static void astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_archiver_finalize(astreamer *streamer); +static void astreamer_tar_archiver_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_archiver_ops = { + .content = astreamer_tar_archiver_content, + .finalize = astreamer_tar_archiver_finalize, + .free = astreamer_tar_archiver_free +}; + +static void astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_terminator_finalize(astreamer *streamer); +static void astreamer_tar_terminator_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_terminator_ops = { + .content = astreamer_tar_terminator_content, + .finalize = astreamer_tar_terminator_finalize, + .free = astreamer_tar_terminator_free +}; + +/* + * Create a astreamer that can parse a stream of content as tar data. + * + * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer + * specified by 'next' will receive a series of typed chunks, as per the + * conventions described in astreamer.h. + */ +astreamer * +astreamer_tar_parser_new(astreamer *next) +{ + astreamer_tar_parser *streamer; + + streamer = palloc0(sizeof(astreamer_tar_parser)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_parser_ops; + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->next_context = ASTREAMER_MEMBER_HEADER; + + return &streamer->base; +} + +/* + * Parse unknown content as tar data. + */ +static void +astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + size_t nbytes; + + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + while (len > 0) + { + switch (mystreamer->next_context) + { + case ASTREAMER_MEMBER_HEADER: + + /* + * If we're expecting an archive member header, accumulate a + * full block of data before doing anything further. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) + return; + + /* + * Now we can process the header and get ready to process the + * file contents; however, we might find out that what we + * thought was the next file header is actually the start of + * the archive trailer. Switch modes accordingly. + */ + if (astreamer_tar_header(mystreamer)) + { + if (mystreamer->member.size == 0) + { + /* No content; trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Expect contents. */ + mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; + } + mystreamer->base.bbs_buffer.len = 0; + mystreamer->file_bytes_sent = 0; + } + else + mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; + break; + + case ASTREAMER_MEMBER_CONTENTS: + + /* + * Send as much content as we have, but not more than the + * remaining file length. + */ + Assert(mystreamer->file_bytes_sent < mystreamer->member.size); + nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; + nbytes = Min(nbytes, len); + Assert(nbytes > 0); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + ASTREAMER_MEMBER_CONTENTS); + mystreamer->file_bytes_sent += nbytes; + data += nbytes; + len -= nbytes; + + /* + * If we've not yet sent the whole file, then there's more + * content to come; otherwise, it's time to expect the file + * trailer. + */ + Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); + if (mystreamer->file_bytes_sent == mystreamer->member.size) + { + if (mystreamer->pad_bytes_expected == 0) + { + /* Trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Trailer is not zero-length. */ + mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; + } + mystreamer->base.bbs_buffer.len = 0; + } + break; + + case ASTREAMER_MEMBER_TRAILER: + + /* + * If we're expecting an archive member trailer, accumulate + * the expected number of padding bytes before sending + * anything onward. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) + return; + + /* OK, now we can send it. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next file header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + mystreamer->base.bbs_buffer.len = 0; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + + /* + * We've seen an end-of-archive indicator, so anything more is + * buffered and sent as part of the archive trailer. But we + * don't expect more than 2 blocks. + */ + astreamer_buffer_bytes(streamer, &data, &len, len); + if (len > 2 * TAR_BLOCK_SIZE) + pg_fatal("tar file trailer exceeds 2 blocks"); + return; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar archive"); + } + } +} + +/* + * Parse a file header within a tar stream. + * + * The return value is true if we found a file header and passed it on to the + * next astreamer; it is false if we have reached the archive trailer. + */ +static bool +astreamer_tar_header(astreamer_tar_parser *mystreamer) +{ + bool has_nonzero_byte = false; + int i; + astreamer_member *member = &mystreamer->member; + char *buffer = mystreamer->base.bbs_buffer.data; + + Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); + + /* Check whether we've got a block of all zero bytes. */ + for (i = 0; i < TAR_BLOCK_SIZE; ++i) + { + if (buffer[i] != '\0') + { + has_nonzero_byte = true; + break; + } + } + + /* + * If the entire block was zeros, this is the end of the archive, not the + * start of the next file. + */ + if (!has_nonzero_byte) + return false; + + /* + * Parse key fields out of the header. + */ + strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); + if (member->pathname[0] == '\0') + pg_fatal("tar member has empty name"); + member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); + member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); + member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); + member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); + member->is_directory = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); + member->is_link = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); + if (member->is_link) + strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); + + /* Compute number of padding bytes. */ + mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); + + /* Forward the entire header to the next astreamer. */ + astreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + ASTREAMER_MEMBER_HEADER); + + return true; +} + +/* + * End-of-stream processing for a tar parser. + */ +static void +astreamer_tar_parser_finalize(astreamer *streamer) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + + if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || + mystreamer->base.bbs_buffer.len > 0)) + pg_fatal("COPY stream ended before last file was finished"); + + /* Send the archive trailer, even if empty. */ + astreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + ASTREAMER_ARCHIVE_TRAILER); + + /* Now finalize successor. */ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar parser. + */ +static void +astreamer_tar_parser_free(astreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + astreamer_free(streamer->bbs_next); +} + +/* + * Create a astreamer that can generate a tar archive. + * + * This is intended to be usable either for generating a brand-new tar archive + * or for modifying one on the fly. The input should be a series of typed + * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for + * astreamer_tar_parser_content. + */ +astreamer * +astreamer_tar_archiver_new(astreamer *next) +{ + astreamer_tar_archiver *streamer; + + streamer = palloc0(sizeof(astreamer_tar_archiver)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_archiver_ops; + streamer->base.bbs_next = next; + + return &streamer->base; +} + +/* + * Fix up the stream of input chunks to create a valid tar file. + * + * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is + * passed through without change. Any other size is a fatal error (and + * indicates a bug). + * + * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from + * scratch. Specifically, we construct a block of zero bytes sufficient to + * pad out to a block boundary, as required by the tar format. Other + * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. + * + * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * + * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * blocks of zero bytes. Not all tar programs require this, but apparently + * some do. The server does not supply this trailer. If no archive trailer is + * present, one will be added by astreamer_tar_parser_finalize. + */ +static void +astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; + char buffer[2 * TAR_BLOCK_SIZE]; + + Assert(context != ASTREAMER_UNKNOWN); + + if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + { + Assert(len == 0); + + /* Replace zero-length tar header with a newly constructed one. */ + tarCreateHeader(buffer, member->pathname, NULL, + member->size, member->mode, member->uid, member->gid, + time(NULL)); + data = buffer; + len = TAR_BLOCK_SIZE; + + /* Also make a note to replace padding, in case size changed. */ + mystreamer->rearchive_member = true; + } + else if (context == ASTREAMER_MEMBER_TRAILER && + mystreamer->rearchive_member) + { + int pad_bytes = tarPaddingBytesRequired(member->size); + + /* Also replace padding, if we regenerated the header. */ + memset(buffer, 0, pad_bytes); + data = buffer; + len = pad_bytes; + + /* Don't do this again unless we replace another header. */ + mystreamer->rearchive_member = false; + } + else if (context == ASTREAMER_ARCHIVE_TRAILER) + { + /* Trailer should always be two blocks of zero bytes. */ + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + data = buffer; + len = 2 * TAR_BLOCK_SIZE; + } + + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * End-of-stream processing for a tar archiver. + */ +static void +astreamer_tar_archiver_finalize(astreamer *streamer) +{ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar archiver. + */ +static void +astreamer_tar_archiver_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Create a astreamer that blindly adds two blocks of NUL bytes to the + * end of an incomplete tarfile that the server might send us. + */ +astreamer * +astreamer_tar_terminator_new(astreamer *next) +{ + astreamer *streamer; + + streamer = palloc0(sizeof(astreamer)); + *((const astreamer_ops **) &streamer->bbs_ops) = + &astreamer_tar_terminator_ops; + streamer->bbs_next = next; + + return streamer; +} + +/* + * Pass all the content through without change. + */ +static void +astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + /* Just forward it. */ + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * At the end, blindly add the two blocks of NUL bytes which the server fails + * to supply. + */ +static void +astreamer_tar_terminator_finalize(astreamer *streamer) +{ + char buffer[2 * TAR_BLOCK_SIZE]; + + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + astreamer_content(streamer->bbs_next, NULL, buffer, + 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar terminator. + */ +static void +astreamer_tar_terminator_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} |