diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.global.in | 1 | ||||
-rw-r--r-- | src/bin/pg_basebackup/Makefile | 1 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_receivewal.c | 158 | ||||
-rw-r--r-- | src/bin/pg_basebackup/t/020_pg_receivewal.pl | 72 | ||||
-rw-r--r-- | src/bin/pg_basebackup/walmethods.c | 160 | ||||
-rw-r--r-- | src/bin/pg_basebackup/walmethods.h | 1 |
6 files changed, 383 insertions, 10 deletions
diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 533c12fef95..05c54b27def 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -350,6 +350,7 @@ XGETTEXT = @XGETTEXT@ GZIP = gzip BZIP2 = bzip2 +LZ4 = lz4 DOWNLOAD = wget -O $@ --no-use-server-timestamps #DOWNLOAD = curl -o $@ diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 459d514183d..fd920fc197e 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -19,6 +19,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global # make these available to TAP test scripts +export LZ4 export TAR # Note that GZIP cannot be used directly as this environment variable is # used by the command "gzip" to pass down options, so stick with a different diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 8acc0fc009b..89810244ceb 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -32,6 +32,10 @@ #include "receivelog.h" #include "streamutil.h" +#ifdef HAVE_LIBLZ4 +#include "lz4frame.h" +#endif + /* Time to sleep between reconnection attempts */ #define RECONNECT_SLEEP_TIME 5 @@ -136,6 +140,15 @@ is_xlogfilename(const char *filename, bool *ispartial, return true; } + /* File looks like a completed LZ4-compressed WAL file */ + if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") && + strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0) + { + *ispartial = false; + *wal_compression_method = COMPRESSION_LZ4; + return true; + } + /* File looks like a partial uncompressed WAL file */ if (fname_len == XLOG_FNAME_LEN + strlen(".partial") && strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0) @@ -154,6 +167,15 @@ is_xlogfilename(const char *filename, bool *ispartial, return true; } + /* File looks like a partial LZ4-compressed WAL file */ + if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") && + strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0) + { + *ispartial = true; + *wal_compression_method = COMPRESSION_LZ4; + return true; + } + /* File does not look like something we know */ return false; } @@ -278,12 +300,20 @@ FindStreamingStart(uint32 *tli) /* * Check that the segment has the right size, if it's supposed to be * completed. For non-compressed segments just check the on-disk size - * and see if it matches a completed segment. For gzip-compressed + * and see if it matches a completed segment. For gzip-compressed * segments, look at the last 4 bytes of the compressed file, which is * where the uncompressed size is located for files with a size lower * than 4GB, and then compare it to the size of a completed segment. * The 4 last bytes correspond to the ISIZE member according to * http://www.zlib.org/rfc-gzip.html. + * + * For LZ4-compressed segments, uncompress the file in a throw-away + * buffer keeping track of the uncompressed size, then compare it to + * the size of a completed segment. Per its protocol, LZ4 does not + * store the uncompressed size of an object by default. contentSize + * is one possible way to do that, but we need to rely on a method + * where WAL segments could have been compressed by a different source + * than pg_receivewal, like an archive_command with lz4. */ if (!ispartial && wal_compression_method == COMPRESSION_NONE) { @@ -350,6 +380,114 @@ FindStreamingStart(uint32 *tli) continue; } } + else if (!ispartial && wal_compression_method == COMPRESSION_LZ4) + { +#ifdef HAVE_LIBLZ4 +#define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */ + int fd; + ssize_t r; + size_t uncompressed_size = 0; + char fullpath[MAXPGPATH * 2]; + char *outbuf; + char *readbuf; + LZ4F_decompressionContext_t ctx = NULL; + LZ4F_decompressOptions_t dec_opt; + LZ4F_errorCode_t status; + + memset(&dec_opt, 0, sizeof(dec_opt)); + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + + fd = open(fullpath, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + { + pg_log_error("could not open file \"%s\": %m", fullpath); + exit(1); + } + + status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION); + if (LZ4F_isError(status)) + { + pg_log_error("could not create LZ4 decompression context: %s", + LZ4F_getErrorName(status)); + exit(1); + } + + outbuf = pg_malloc0(LZ4_CHUNK_SZ); + readbuf = pg_malloc0(LZ4_CHUNK_SZ); + do + { + char *readp; + char *readend; + + r = read(fd, readbuf, LZ4_CHUNK_SZ); + if (r < 0) + { + pg_log_error("could not read file \"%s\": %m", fullpath); + exit(1); + } + + /* Done reading the file */ + if (r == 0) + break; + + /* Process one chunk */ + readp = readbuf; + readend = readbuf + r; + while (readp < readend) + { + size_t out_size = LZ4_CHUNK_SZ; + size_t read_size = readend - readp; + + memset(outbuf, 0, LZ4_CHUNK_SZ); + status = LZ4F_decompress(ctx, outbuf, &out_size, + readp, &read_size, &dec_opt); + if (LZ4F_isError(status)) + { + pg_log_error("could not decompress file \"%s\": %s", + fullpath, + LZ4F_getErrorName(status)); + exit(1); + } + + readp += read_size; + uncompressed_size += out_size; + } + + /* + * No need to continue reading the file when the + * uncompressed_size exceeds WalSegSz, even if there are still + * data left to read. However, if uncompressed_size is equal + * to WalSegSz, it should verify that there is no more data to + * read. + */ + } while (uncompressed_size <= WalSegSz && r > 0); + + close(fd); + pg_free(outbuf); + pg_free(readbuf); + + status = LZ4F_freeDecompressionContext(ctx); + if (LZ4F_isError(status)) + { + pg_log_error("could not free LZ4 decompression context: %s", + LZ4F_getErrorName(status)); + exit(1); + } + + if (uncompressed_size != WalSegSz) + { + pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %ld, skipping", + dirent->d_name, uncompressed_size); + continue; + } +#else + pg_log_error("could not check file \"%s\"", + dirent->d_name); + pg_log_error("this build does not support compression with %s", + "LZ4"); + exit(1); +#endif + } /* Looks like a valid segment. Remember that we saw it. */ if ((segno > high_segno) || @@ -650,6 +788,8 @@ main(int argc, char **argv) case 6: if (pg_strcasecmp(optarg, "gzip") == 0) compression_method = COMPRESSION_GZIP; + else if (pg_strcasecmp(optarg, "lz4") == 0) + compression_method = COMPRESSION_LZ4; else if (pg_strcasecmp(optarg, "none") == 0) compression_method = COMPRESSION_NONE; else @@ -748,6 +888,22 @@ main(int argc, char **argv) exit(1); #endif break; + case COMPRESSION_LZ4: +#ifdef HAVE_LIBLZ4 + if (compresslevel != 0) + { + pg_log_error("cannot use --compress with --compression-method=%s", + "lz4"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } +#else + pg_log_error("this build does not support compression with %s", + "LZ4"); + exit(1); +#endif + break; } diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl index 94786f0815e..43599d832b4 100644 --- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl +++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl @@ -5,7 +5,7 @@ use strict; use warnings; use PostgreSQL::Test::Utils; use PostgreSQL::Test::Cluster; -use Test::More tests => 37; +use Test::More tests => 42; program_help_ok('pg_receivewal'); program_version_ok('pg_receivewal'); @@ -138,13 +138,69 @@ SKIP: "gzip verified the integrity of compressed WAL segments"); } +# Check LZ4 compression if available +SKIP: +{ + skip "postgres was not built with LZ4 support", 5 + if (!check_pg_config("#define HAVE_LIBLZ4 1")); + + # Generate more WAL including one completed, compressed segment. + $primary->psql('postgres', 'SELECT pg_switch_wal();'); + $nextlsn = + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); + chomp($nextlsn); + $primary->psql('postgres', 'INSERT INTO test_table VALUES (3);'); + + # Stream up to the given position. + $primary->command_ok( + [ + 'pg_receivewal', '-D', + $stream_dir, '--verbose', + '--endpos', $nextlsn, + '--no-loop', '--compression-method', + 'lz4' + ], + 'streaming some WAL using --compression-method=lz4'); + + # Verify that the stored files are generated with their expected + # names. + my @lz4_wals = glob "$stream_dir/*.lz4"; + is(scalar(@lz4_wals), 1, + "one WAL segment compressed with LZ4 was created"); + my @lz4_partial_wals = glob "$stream_dir/*.lz4.partial"; + is(scalar(@lz4_partial_wals), + 1, "one partial WAL segment compressed with LZ4 was created"); + + # Verify that the start streaming position is computed correctly by + # comparing it with the partial file generated previously. The name + # of the previous partial, now-completed WAL segment is updated, keeping + # its base number. + $partial_wals[0] =~ s/(\.gz)?\.partial$/.lz4/; + is($lz4_wals[0] eq $partial_wals[0], + 1, "one partial WAL segment is now completed"); + # Update the list of partial wals with the current one. + @partial_wals = @lz4_partial_wals; + + # Check the integrity of the completed segment, if LZ4 is an available + # command. + my $lz4 = $ENV{LZ4}; + skip "program lz4 is not found in your system", 1 + if ( !defined $lz4 + || $lz4 eq '' + || system_log($lz4, '--version') != 0); + + my $lz4_is_valid = system_log($lz4, '-t', @lz4_wals); + is($lz4_is_valid, 0, + "lz4 verified the integrity of compressed WAL segments"); +} + # Verify that the start streaming position is computed and that the value is -# correct regardless of whether ZLIB is available. +# correct regardless of whether any compression is available. $primary->psql('postgres', 'SELECT pg_switch_wal();'); $nextlsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); chomp($nextlsn); -$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);'); +$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);'); $primary->command_ok( [ 'pg_receivewal', '-D', $stream_dir, '--verbose', @@ -152,7 +208,7 @@ $primary->command_ok( ], "streaming some WAL"); -$partial_wals[0] =~ s/(\.gz)?.partial//; +$partial_wals[0] =~ s/(\.gz|\.lz4)?.partial//; ok(-e $partial_wals[0], "check that previously partial WAL is now complete"); # Permissions on WAL files should be default @@ -190,7 +246,7 @@ my $walfile_streamed = $primary->safe_psql( # Switch to a new segment, to make sure that the segment retained by the # slot is still streamed. This may not be necessary, but play it safe. -$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);'); +$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);'); $primary->psql('postgres', 'SELECT pg_switch_wal();'); $nextlsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); @@ -198,7 +254,7 @@ chomp($nextlsn); # Add a bit more data to accelerate the end of the next pg_receivewal # commands. -$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);'); +$primary->psql('postgres', 'INSERT INTO test_table VALUES (6);'); # Check case where the slot does not exist. $primary->command_fails_like( @@ -253,13 +309,13 @@ $standby->promote; # on the new timeline. my $walfile_after_promotion = $standby->safe_psql('postgres', "SELECT pg_walfile_name(pg_current_wal_insert_lsn());"); -$standby->psql('postgres', 'INSERT INTO test_table VALUES (6);'); +$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);'); $standby->psql('postgres', 'SELECT pg_switch_wal();'); $nextlsn = $standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); chomp($nextlsn); # This speeds up the operation. -$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);'); +$standby->psql('postgres', 'INSERT INTO test_table VALUES (8);'); # Now try to resume from the slot after the promotion. my $timeline_dir = $primary->basedir . '/timeline_wal'; diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index 52f314af3bb..f1ba2a828a0 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -17,6 +17,10 @@ #include <sys/stat.h> #include <time.h> #include <unistd.h> + +#ifdef HAVE_LIBLZ4 +#include <lz4frame.h> +#endif #ifdef HAVE_LIBZ #include <zlib.h> #endif @@ -30,6 +34,9 @@ /* Size of zlib buffer for .tar.gz */ #define ZLIB_OUT_SIZE 4096 +/* Size of LZ4 input chunk for .lz4 */ +#define LZ4_IN_SIZE 4096 + /*------------------------------------------------------------------------- * WalDirectoryMethod - write wal to a directory looking like pg_wal *------------------------------------------------------------------------- @@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile #ifdef HAVE_LIBZ gzFile gzfp; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx; + size_t lz4bufsize; + void *lz4buf; +#endif } DirectoryMethodFile; static const char * @@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix) snprintf(filename, MAXPGPATH, "%s%s%s", pathname, - dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "", + dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : + dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "", temp_suffix ? temp_suffix : ""); return filename; @@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #ifdef HAVE_LIBZ gzFile gzfp = NULL; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx = NULL; + size_t lz4bufsize = 0; + void *lz4buf = NULL; +#endif filename = dir_get_file_name(pathname, temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", @@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } } #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t ctx_out; + size_t header_size; + + ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); + if (LZ4F_isError(ctx_out)) + { + close(fd); + return NULL; + } + + lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL); + lz4buf = pg_malloc0(lz4bufsize); + + /* add the header */ + header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL); + if (LZ4F_isError(header_size)) + { + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + return NULL; + } + + errno = 0; + if (write(fd, lz4buf, header_size) != header_size) + { + int save_errno = errno; + + (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + + /* + * If write didn't set errno, assume problem is no disk space. + */ + errno = save_errno ? save_errno : ENOSPC; + return NULL; + } + } +#endif /* Do pre-padding on non-compressed files */ if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE) @@ -177,6 +239,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ gzclose(gzfp); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + } + else +#endif close(fd); return NULL; } @@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (dir_data->compression_method == COMPRESSION_GZIP) f->gzfp = gzfp; #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + f->ctx = ctx; + f->lz4buf = lz4buf; + f->lz4bufsize = lz4bufsize; + } +#endif + f->fd = fd; f->currpos = 0; f->pathname = pg_strdup(pathname); @@ -210,6 +291,43 @@ dir_write(Walfile f, const void *buf, size_t count) r = (ssize_t) gzwrite(df->gzfp, buf, count); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t chunk; + size_t remaining; + const void *inbuf = buf; + + remaining = count; + while (remaining > 0) + { + size_t compressed; + + if (remaining > LZ4_IN_SIZE) + chunk = LZ4_IN_SIZE; + else + chunk = remaining; + + remaining -= chunk; + compressed = LZ4F_compressUpdate(df->ctx, + df->lz4buf, df->lz4bufsize, + inbuf, chunk, + NULL); + + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + + inbuf = ((char *) inbuf) + chunk; + } + + /* Our caller keeps track of the uncompressed size. */ + r = (ssize_t) count; + } + else +#endif r = write(df->fd, buf, count); if (r > 0) df->currpos += r; @@ -240,6 +358,25 @@ dir_close(Walfile f, WalCloseMethod method) r = gzclose(df->gzfp); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t compressed; + + compressed = LZ4F_compressEnd(df->ctx, + df->lz4buf, df->lz4bufsize, + NULL); + + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + + r = close(df->fd); + } + else +#endif r = close(df->fd); if (r == 0) @@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method) } } +#ifdef HAVE_LIBLZ4 + pg_free(df->lz4buf); + /* supports free on NULL */ + LZ4F_freeCompressionContext(df->ctx); +#endif + pg_free(df->pathname); pg_free(df->fullpath); if (df->temp_suffix) @@ -317,6 +460,21 @@ dir_sync(Walfile f) return -1; } #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + size_t compressed; + + /* Flush any internal buffers */ + compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL); + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + } +#endif return fsync(((DirectoryMethodFile *) f)->fd); } diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 5dfe330ea5d..83144ae7eb1 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -23,6 +23,7 @@ typedef enum typedef enum { COMPRESSION_GZIP, + COMPRESSION_LZ4, COMPRESSION_NONE } WalCompressionMethod; |