diff options
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r-- | src/bin/pg_basebackup/walmethods.c | 510 |
1 files changed, 267 insertions, 243 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index d98a2681b90..bc2e83d02be 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -2,9 +2,6 @@ * * walmethods.c - implementations of different ways to write received wal * - * NOTE! The caller must ensure that only one method is instantiated in - * any given program, and that it's only instantiated once! - * * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group * * IDENTIFICATION @@ -43,19 +40,41 @@ *------------------------------------------------------------------------- */ +static Walfile *dir_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int dir_close(Walfile *f, WalCloseMethod method); +static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t dir_write(Walfile *f, const void *buf, size_t count); +static int dir_sync(Walfile *f); +static bool dir_finish(WalWriteMethod *wwmethod); +static void dir_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalDirectoryMethodOps = { + .open_for_write = dir_open_for_write, + .close = dir_close, + .existsfile = dir_existsfile, + .get_file_size = dir_get_file_size, + .get_file_name = dir_get_file_name, + .write = dir_write, + .sync = dir_sync, + .finish = dir_finish, + .free = dir_free +}; + /* * Global static data for this method */ typedef struct DirectoryMethodData { + WalWriteMethod base; char *basedir; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; } DirectoryMethodData; -static DirectoryMethodData *dir_data = NULL; /* * Local file handle @@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile #endif } DirectoryMethodFile; -#define dir_clear_error() \ - (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0) -#define dir_set_error(msg) \ - (dir_data->lasterrstring = _(msg)) - -static const char * -dir_getlasterror(void) -{ - if (dir_data->lasterrstring) - return dir_data->lasterrstring; - return strerror(dir_data->lasterrno); -} +#define clear_error(wwmethod) \ + ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0) static char * -dir_get_file_name(const char *pathname, const char *temp_suffix) +dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); snprintf(filename, MAXPGPATH, "%s%s%s", pathname, - dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : - dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", + wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : + wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", temp_suffix ? temp_suffix : ""); return filename; } static Walfile * -dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; char *filename; int fd; @@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ void *lz4buf = NULL; #endif - dir_clear_error(); + clear_error(wwmethod); - filename = dir_get_file_name(pathname, temp_suffix); + filename = dir_get_file_name(wwmethod, pathname, temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode); if (fd < 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { gzfp = gzdopen(fd, "wb"); if (gzfp == NULL) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } - if (gzsetparams(gzfp, dir_data->compression_level, + if (gzsetparams(gzfp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; gzclose(gzfp); return NULL; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t ctx_out; size_t header_size; @@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(ctx_out)) { - dir_data->lasterrstring = LZ4F_getErrorName(ctx_out); + wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out); close(fd); return NULL; } @@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ /* assign the compression level, default is 0 */ memset(&prefs, 0, sizeof(prefs)); - prefs.compressionLevel = dir_data->compression_level; + prefs.compressionLevel = wwmethod->compression_level; /* add the header */ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs); if (LZ4F_isError(header_size)) { - dir_data->lasterrstring = LZ4F_getErrorName(header_size); + wwmethod->lasterrstring = LZ4F_getErrorName(header_size); (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, lz4buf, header_size) != header_size) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #endif /* Do pre-padding on non-compressed files */ - if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE) + if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { PGAlignedXLogBlock zerobuf; int bytes; @@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; close(fd); return NULL; } @@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (lseek(fd, 0, SEEK_SET) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } @@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * important when using synchronous mode, where the file is modified and * fsynced in-place, without a directory fsync. */ - if (dir_data->sync) + if (wwmethod->sync) { if (fsync_fname(tmppath, false) != 0 || fsync_parent_path(tmppath) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) gzclose(gzfp); else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); (void) LZ4F_freeCompressionContext(ctx); @@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) f->gzfp = gzfp; #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { f->ctx = ctx; f->lz4buf = lz4buf; @@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } #endif + f->base.wwmethod = wwmethod; f->base.currpos = 0; f->base.pathname = pg_strdup(pathname); f->fd = fd; @@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count) DirectoryMethodFile *df = (DirectoryMethodFile *) f; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; r = (ssize_t) gzwrite(df->gzfp, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t chunk; size_t remaining; @@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } if (r > 0) @@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method) { int r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; + DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod; char tmppath[MAXPGPATH]; char tmppath2[MAXPGPATH]; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; /* in case gzclose() doesn't set it */ r = gzclose(df->gzfp); @@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method) else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t compressed; @@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method) * If we have a temp prefix, normal operation is to rename the * file. */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); /* permanent name, so no need for the prefix */ - filename2 = dir_get_file_name(df->base.pathname, NULL); + filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL); snprintf(tmppath2, sizeof(tmppath2), "%s/%s", dir_data->basedir, filename2); pg_free(filename2); - if (dir_data->sync) + if (f->wwmethod->sync) r = durable_rename(tmppath, tmppath2); else { @@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method) char *filename; /* Unlink the file once it's closed */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method) * CLOSE_NO_RENAME. In this case, fsync the file and containing * directory if sync mode is requested. */ - if (dir_data->sync) + if (f->wwmethod->sync) { r = fsync_fname(df->fullpath, false); if (r == 0) @@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method) } if (r != 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; #ifdef USE_LZ4 pg_free(df->lz4buf); @@ -501,23 +517,23 @@ dir_sync(Walfile *f) int r; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); - if (!dir_data->sync) + if (!f->wwmethod->sync) return 0; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) { - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { DirectoryMethodFile *df = (DirectoryMethodFile *) f; size_t compressed; @@ -526,7 +542,7 @@ dir_sync(Walfile *f) compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL); if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -534,7 +550,7 @@ dir_sync(Walfile *f) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } @@ -542,13 +558,14 @@ dir_sync(Walfile *f) r = fsync(((DirectoryMethodFile *) f)->fd); if (r < 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } static ssize_t -dir_get_file_size(const char *pathname) +dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; struct stat statbuf; char tmppath[MAXPGPATH]; @@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname) if (stat(tmppath, &statbuf) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return -1; } return statbuf.st_size; } -static pg_compress_algorithm -dir_compression_algorithm(void) -{ - return dir_data->compression_algorithm; -} - static bool -dir_existsfile(const char *pathname) +dir_existsfile(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; int fd; - dir_clear_error(); + clear_error(wwmethod); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, pathname); @@ -589,60 +601,54 @@ dir_existsfile(const char *pathname) } static bool -dir_finish(void) +dir_finish(WalWriteMethod *wwmethod) { - dir_clear_error(); + clear_error(wwmethod); - if (dir_data->sync) + if (wwmethod->sync) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + /* * Files are fsynced when they are closed, but we need to fsync the * directory entry here as well. */ if (fsync_fname(dir_data->basedir, true) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } return true; } +static void +dir_free(WalWriteMethod *wwmethod) +{ + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + + pg_free(dir_data->basedir); + pg_free(wwmethod); +} + WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; - - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = dir_open_for_write; - method->write = dir_write; - method->get_file_size = dir_get_file_size; - method->get_file_name = dir_get_file_name; - method->compression_algorithm = dir_compression_algorithm; - method->close = dir_close; - method->sync = dir_sync; - method->existsfile = dir_existsfile; - method->finish = dir_finish; - method->getlasterror = dir_getlasterror; - - dir_data = pg_malloc0(sizeof(DirectoryMethodData)); - dir_data->compression_algorithm = compression_algorithm; - dir_data->compression_level = compression_level; - dir_data->basedir = pg_strdup(basedir); - dir_data->sync = sync; - - return method; -} - -void -FreeWalDirectoryMethod(void) -{ - pg_free(dir_data->basedir); - pg_free(dir_data); - dir_data = NULL; + DirectoryMethodData *wwmethod; + + wwmethod = pg_malloc0(sizeof(DirectoryMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalDirectoryMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); + wwmethod->basedir = pg_strdup(basedir); + + return &wwmethod->base; } @@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void) *------------------------------------------------------------------------- */ +static Walfile *tar_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int tar_close(Walfile *f, WalCloseMethod method); +static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *tar_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t tar_write(Walfile *f, const void *buf, size_t count); +static int tar_sync(Walfile *f); +static bool tar_finish(WalWriteMethod *wwmethod); +static void tar_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalTarMethodOps = { + .open_for_write = tar_open_for_write, + .close = tar_close, + .existsfile = tar_existsfile, + .get_file_size = tar_get_file_size, + .get_file_name = tar_get_file_name, + .write = tar_write, + .sync = tar_sync, + .finish = tar_finish, + .free = tar_free +}; + typedef struct TarMethodFile { Walfile base; @@ -661,37 +694,20 @@ typedef struct TarMethodFile typedef struct TarMethodData { + WalWriteMethod base; char *tarfilename; int fd; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; TarMethodFile *currentfile; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; #ifdef HAVE_LIBZ z_streamp zp; void *zlibOut; #endif } TarMethodData; -static TarMethodData *tar_data = NULL; - -#define tar_clear_error() \ - (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0) -#define tar_set_error(msg) \ - (tar_data->lasterrstring = _(msg)) - -static const char * -tar_getlasterror(void) -{ - if (tar_data->lasterrstring) - return tar_data->lasterrstring; - return strerror(tar_data->lasterrno); -} #ifdef HAVE_LIBZ static bool -tar_write_compressed_data(void *buf, size_t count, bool flush) +tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count, + bool flush) { tar_data->zp->next_in = buf; tar_data->zp->avail_in = count; @@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH); if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + tar_data->base.lasterrstring = "could not compress data"; return false; } @@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) if (write(tar_data->fd, tar_data->zlibOut, len) != len) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + tar_data->base.lasterrno = errno ? errno : ENOSPC; return false; } @@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) /* Reset the stream for writing */ if (deflateReset(tar_data->zp) != Z_OK) { - tar_set_error("could not reset compression stream"); + tar_data->base.lasterrstring = "could not reset compression stream"; return false; } } @@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) static ssize_t tar_write(Walfile *f, const void *buf, size_t count) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; ssize_t r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); /* Tarfile will always be positioned at the end */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; r = write(tar_data->fd, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } f->currpos += r; return r; } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(unconstify(void *, buf), count, false)) + if (!tar_write_compressed_data(tar_data, unconstify(void *, buf), + count, false)) return -1; f->currpos += count; return count; @@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count) else { /* Can't happen - compression enabled with no method set */ - tar_data->lasterrno = ENOSYS; + f->wwmethod->lasterrno = ENOSYS; return -1; } } @@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes) } static char * -tar_get_file_name(const char *pathname, const char *temp_suffix) +tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); @@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix) } static Walfile * -tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char *tmppath; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->fd < 0) { @@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_file_create_mode); if (tar_data->fd < 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream)); tar_data->zp->zalloc = Z_NULL; @@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * default 15 for the windowBits parameter makes the output be * gzip instead of zlib. */ - if (deflateInit2(tar_data->zp, tar_data->compression_level, + if (deflateInit2(tar_data->zp, wwmethod->compression_level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { pg_free(tar_data->zp); tar_data->zp = NULL; - tar_set_error("could not initialize compression library"); + wwmethod->lasterrstring = + "could not initialize compression library"; return NULL; } } @@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (tar_data->currentfile != NULL) { - tar_set_error("implementation error: tar files can't have more than one open file"); + wwmethod->lasterrstring = + "implementation error: tar files can't have more than one open file"; return NULL; } tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile)); + tar_data->currentfile->base.wwmethod = wwmethod; - tmppath = tar_get_file_name(pathname, temp_suffix); + tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix); /* Create a header with size set to 0 - we will fill out the size on close */ if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK) @@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_free(tar_data->currentfile); pg_free(tmppath); tar_data->currentfile = NULL; - tar_set_error("could not create tar header"); + wwmethod->lasterrstring = "could not create tar header"; return NULL; } pg_free(tmppath); #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush existing data */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return NULL; /* Turn off compression for header */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = + "could not change compression parameters"; return NULL; } } @@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR); if (tar_data->currentfile->ofs_start == -1) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } tar_data->currentfile->base.currpos = 0; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tar_data->currentfile->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Write header through the zlib APIs but with no compression */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return NULL; /* Re-enable compression for the rest of the file */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = "could not change compression parameters"; return NULL; } } @@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (pad_to_size) { tar_data->currentfile->pad_to_size = pad_to_size; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { /* Uncompressed, so pad now */ if (!tar_write_padding_data(tar_data->currentfile, pad_to_size)) @@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE, SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } @@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } static ssize_t -tar_get_file_size(const char *pathname) +tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* Currently not used, so not supported */ - tar_data->lasterrno = ENOSYS; + wwmethod->lasterrno = ENOSYS; return -1; } -static pg_compress_algorithm -tar_compression_algorithm(void) -{ - return tar_data->compression_algorithm; -} - static int tar_sync(Walfile *f) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; int r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); - if (!tar_data->sync) + if (!f->wwmethod->sync) return 0; /* * Always sync the whole tarfile, because that's all we can do. This makes * no sense on compressed files, so just ignore those. */ - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) return 0; r = fsync(tar_data->fd); if (r < 0) - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } @@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method) { ssize_t filesize; int padding; + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; TarMethodFile *tf = (TarMethodFile *) f; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); if (method == CLOSE_UNLINK) { - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) { - tar_set_error("unlink not supported with compression"); + f->wwmethod->lasterrstring = "unlink not supported with compression"; return -1; } @@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (ftruncate(tar_data->fd, tf->ofs_start) != 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (tf->pad_to_size) { - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* * A compressed tarfile is padded on close since we cannot know @@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method) #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush the current buffer */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return -1; } #endif @@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method) print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Turn off compression */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } /* Overwrite the header, assuming the size will be the same */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return -1; /* Turn compression back on */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, f->wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } } @@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method) /* Move file pointer back down to end, so we can write the next file */ if (lseek(tar_data->fd, 0, SEEK_END) < 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method) { /* XXX this seems pretty bogus; why is only this case fatal? */ pg_fatal("could not fsync file \"%s\": %s", - tf->base.pathname, tar_getlasterror()); + tf->base.pathname, GetLastWalMethodError(f->wwmethod)); } /* Clean up and done */ @@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method) } static bool -tar_existsfile(const char *pathname) +tar_existsfile(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* We only deal with new tarfiles, so nothing externally created exists */ return false; } static bool -tar_finish(void) +tar_finish(WalWriteMethod *wwmethod) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char zerobuf[1024] = {0}; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->currentfile) { @@ -1212,20 +1234,21 @@ tar_finish(void) } /* A tarfile always ends with two empty blocks */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false)) + if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf), + false)) return false; /* Also flush all data to make sure the gzip stream is finished */ @@ -1239,7 +1262,7 @@ tar_finish(void) if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + wwmethod->lasterrstring = "could not compress data"; return false; } if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) @@ -1253,7 +1276,7 @@ tar_finish(void) * If write didn't set errno, assume problem is no disk * space. */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } @@ -1263,7 +1286,7 @@ tar_finish(void) if (deflateEnd(tar_data->zp) != Z_OK) { - tar_set_error("could not close compression stream"); + wwmethod->lasterrstring = "could not close compression stream"; return false; } } @@ -1275,29 +1298,29 @@ tar_finish(void) } /* sync the empty blocks as well, since they're after the last file */ - if (tar_data->sync) + if (wwmethod->sync) { if (fsync(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } if (close(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } tar_data->fd = -1; - if (tar_data->sync) + if (wwmethod->sync) { if (fsync_fname(tar_data->tarfilename, false) != 0 || fsync_parent_path(tar_data->tarfilename) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } @@ -1305,6 +1328,19 @@ tar_finish(void) return true; } +static void +tar_free(WalWriteMethod *wwmethod) +{ + TarMethodData *tar_data = (TarMethodData *) wwmethod; + + pg_free(tar_data->tarfilename); +#ifdef HAVE_LIBZ + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) + pg_free(tar_data->zlibOut); +#endif + pg_free(wwmethod); +} + /* * The argument compression_algorithm is currently ignored. It is in place for * symmetry with CreateWalDirectoryMethod which uses it for distinguishing @@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; + TarMethodData *wwmethod; const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ? ".tar.gz" : ".tar"; - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = tar_open_for_write; - method->write = tar_write; - method->get_file_size = tar_get_file_size; - method->get_file_name = tar_get_file_name; - method->compression_algorithm = tar_compression_algorithm; - method->close = tar_close; - method->sync = tar_sync; - method->existsfile = tar_existsfile; - method->finish = tar_finish; - method->getlasterror = tar_getlasterror; - - tar_data = pg_malloc0(sizeof(TarMethodData)); - tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); - sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix); - tar_data->fd = -1; - tar_data->compression_algorithm = compression_algorithm; - tar_data->compression_level = compression_level; - tar_data->sync = sync; + wwmethod = pg_malloc0(sizeof(TarMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalTarMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); + + wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); + sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix); + wwmethod->fd = -1; #ifdef HAVE_LIBZ if (compression_algorithm == PG_COMPRESSION_GZIP) - tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); #endif - return method; + return &wwmethod->base; } -void -FreeWalTarMethod(void) +const char * +GetLastWalMethodError(WalWriteMethod *wwmethod) { - pg_free(tar_data->tarfilename); -#ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) - pg_free(tar_data->zlibOut); -#endif - pg_free(tar_data); - tar_data = NULL; + if (wwmethod->lasterrstring) + return wwmethod->lasterrstring; + return strerror(wwmethod->lasterrno); } |