aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/walmethods.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2022-09-19 12:53:46 -0400
committerRobert Haas <rhaas@postgresql.org>2022-09-19 12:53:46 -0400
commitebfb814f7ce0d5ab6f47f0b86db51a1b8f3342f4 (patch)
tree787012d7a8e3e26b511c308ebb358ed99468c340 /src/bin/pg_basebackup/walmethods.c
parentc35ba141de1fa04373671ba24c73eb0fe4862415 (diff)
downloadpostgresql-ebfb814f7ce0d5ab6f47f0b86db51a1b8f3342f4.tar.gz
postgresql-ebfb814f7ce0d5ab6f47f0b86db51a1b8f3342f4.zip
walmethods.c/h: Make WalWriteMethod more object-oriented.
Normally when we use object-oriented programming techniques, we provide a pointer to an object and then some way of looking up the associated table of callbacks, but walmethods.c/h took the alternative approach of providing only a pointer to the table of callbacks and thus imposed the artificial restriction that there could only ever be one object of each type, so that the callbacks could find it via a global variable. That doesn't seem like the right idea, so revise the approach. Each callback which does not already have an argument of type Walfile * now takes a pointer to the relevant WalWriteMethod * so that these callbacks need not rely on there being only one object of each type. Freeing a WalWriteMethod is now performed via a callback provided for that purpose rather than requiring the caller to know which WAL method they want to free. Discussion: http://postgr.es/m/CA+TgmoZS0Kw98fOoAcGz8B9iDhdqB4Be4e=vDZaJZ5A-xMYBqA@mail.gmail.com
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r--src/bin/pg_basebackup/walmethods.c510
1 files changed, 267 insertions, 243 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index d98a2681b90..bc2e83d02be 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -2,9 +2,6 @@
*
* walmethods.c - implementations of different ways to write received wal
*
- * NOTE! The caller must ensure that only one method is instantiated in
- * any given program, and that it's only instantiated once!
- *
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
@@ -43,19 +40,41 @@
*-------------------------------------------------------------------------
*/
+static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int dir_close(Walfile *f, WalCloseMethod method);
+static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
+static int dir_sync(Walfile *f);
+static bool dir_finish(WalWriteMethod *wwmethod);
+static void dir_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalDirectoryMethodOps = {
+ .open_for_write = dir_open_for_write,
+ .close = dir_close,
+ .existsfile = dir_existsfile,
+ .get_file_size = dir_get_file_size,
+ .get_file_name = dir_get_file_name,
+ .write = dir_write,
+ .sync = dir_sync,
+ .finish = dir_finish,
+ .free = dir_free
+};
+
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
+ WalWriteMethod base;
char *basedir;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
} DirectoryMethodData;
-static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
@@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile
#endif
} DirectoryMethodFile;
-#define dir_clear_error() \
- (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
-#define dir_set_error(msg) \
- (dir_data->lasterrstring = _(msg))
-
-static const char *
-dir_getlasterror(void)
-{
- if (dir_data->lasterrstring)
- return dir_data->lasterrstring;
- return strerror(dir_data->lasterrno);
-}
+#define clear_error(wwmethod) \
+ ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
static char *
-dir_get_file_name(const char *pathname, const char *temp_suffix)
+dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
- dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+ wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+ wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
}
static Walfile *
-dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
char *filename;
int fd;
@@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
void *lz4buf = NULL;
#endif
- dir_clear_error();
+ clear_error(wwmethod);
- filename = dir_get_file_name(pathname, temp_suffix);
+ filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
if (fd < 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
gzfp = gzdopen(fd, "wb");
if (gzfp == NULL)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
- if (gzsetparams(gzfp, dir_data->compression_level,
+ if (gzsetparams(gzfp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
gzclose(gzfp);
return NULL;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t ctx_out;
size_t header_size;
@@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(ctx_out))
{
- dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+ wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
close(fd);
return NULL;
}
@@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
/* assign the compression level, default is 0 */
memset(&prefs, 0, sizeof(prefs));
- prefs.compressionLevel = dir_data->compression_level;
+ prefs.compressionLevel = wwmethod->compression_level;
/* add the header */
header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
if (LZ4F_isError(header_size))
{
- dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+ wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, lz4buf, header_size) != header_size)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
#endif
/* Do pre-padding on non-compressed files */
- if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
PGAlignedXLogBlock zerobuf;
int bytes;
@@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
close(fd);
return NULL;
}
@@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (lseek(fd, 0, SEEK_SET) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
@@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
- if (dir_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tmppath, false) != 0 ||
fsync_parent_path(tmppath) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
gzclose(gzfp);
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
(void) LZ4F_freeCompressionContext(ctx);
@@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
f->ctx = ctx;
f->lz4buf = lz4buf;
@@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
#endif
+ f->base.wwmethod = wwmethod;
f->base.currpos = 0;
f->base.pathname = pg_strdup(pathname);
f->fd = fd;
@@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count)
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0;
r = (ssize_t) gzwrite(df->gzfp, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t chunk;
size_t remaining;
@@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
if (r > 0)
@@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0; /* in case gzclose() doesn't set it */
r = gzclose(df->gzfp);
@@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method)
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t compressed;
@@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method)
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
/* permanent name, so no need for the prefix */
- filename2 = dir_get_file_name(df->base.pathname, NULL);
+ filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, filename2);
pg_free(filename2);
- if (dir_data->sync)
+ if (f->wwmethod->sync)
r = durable_rename(tmppath, tmppath2);
else
{
@@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method)
char *filename;
/* Unlink the file once it's closed */
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method)
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
- if (dir_data->sync)
+ if (f->wwmethod->sync)
{
r = fsync_fname(df->fullpath, false);
if (r == 0)
@@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method)
}
if (r != 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
#ifdef USE_LZ4
pg_free(df->lz4buf);
@@ -501,23 +517,23 @@ dir_sync(Walfile *f)
int r;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
- if (!dir_data->sync)
+ if (!f->wwmethod->sync)
return 0;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
{
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
size_t compressed;
@@ -526,7 +542,7 @@ dir_sync(Walfile *f)
compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -534,7 +550,7 @@ dir_sync(Walfile *f)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
@@ -542,13 +558,14 @@ dir_sync(Walfile *f)
r = fsync(((DirectoryMethodFile *) f)->fd);
if (r < 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
static ssize_t
-dir_get_file_size(const char *pathname)
+dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
struct stat statbuf;
char tmppath[MAXPGPATH];
@@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname)
if (stat(tmppath, &statbuf) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return -1;
}
return statbuf.st_size;
}
-static pg_compress_algorithm
-dir_compression_algorithm(void)
-{
- return dir_data->compression_algorithm;
-}
-
static bool
-dir_existsfile(const char *pathname)
+dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
int fd;
- dir_clear_error();
+ clear_error(wwmethod);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
@@ -589,60 +601,54 @@ dir_existsfile(const char *pathname)
}
static bool
-dir_finish(void)
+dir_finish(WalWriteMethod *wwmethod)
{
- dir_clear_error();
+ clear_error(wwmethod);
- if (dir_data->sync)
+ if (wwmethod->sync)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
return true;
}
+static void
+dir_free(WalWriteMethod *wwmethod)
+{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
+ pg_free(dir_data->basedir);
+ pg_free(wwmethod);
+}
+
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
-
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = dir_open_for_write;
- method->write = dir_write;
- method->get_file_size = dir_get_file_size;
- method->get_file_name = dir_get_file_name;
- method->compression_algorithm = dir_compression_algorithm;
- method->close = dir_close;
- method->sync = dir_sync;
- method->existsfile = dir_existsfile;
- method->finish = dir_finish;
- method->getlasterror = dir_getlasterror;
-
- dir_data = pg_malloc0(sizeof(DirectoryMethodData));
- dir_data->compression_algorithm = compression_algorithm;
- dir_data->compression_level = compression_level;
- dir_data->basedir = pg_strdup(basedir);
- dir_data->sync = sync;
-
- return method;
-}
-
-void
-FreeWalDirectoryMethod(void)
-{
- pg_free(dir_data->basedir);
- pg_free(dir_data);
- dir_data = NULL;
+ DirectoryMethodData *wwmethod;
+
+ wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalDirectoryMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+ wwmethod->basedir = pg_strdup(basedir);
+
+ return &wwmethod->base;
}
@@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void)
*-------------------------------------------------------------------------
*/
+static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int tar_close(Walfile *f, WalCloseMethod method);
+static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *tar_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
+static int tar_sync(Walfile *f);
+static bool tar_finish(WalWriteMethod *wwmethod);
+static void tar_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalTarMethodOps = {
+ .open_for_write = tar_open_for_write,
+ .close = tar_close,
+ .existsfile = tar_existsfile,
+ .get_file_size = tar_get_file_size,
+ .get_file_name = tar_get_file_name,
+ .write = tar_write,
+ .sync = tar_sync,
+ .finish = tar_finish,
+ .free = tar_free
+};
+
typedef struct TarMethodFile
{
Walfile base;
@@ -661,37 +694,20 @@ typedef struct TarMethodFile
typedef struct TarMethodData
{
+ WalWriteMethod base;
char *tarfilename;
int fd;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
TarMethodFile *currentfile;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
#ifdef HAVE_LIBZ
z_streamp zp;
void *zlibOut;
#endif
} TarMethodData;
-static TarMethodData *tar_data = NULL;
-
-#define tar_clear_error() \
- (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
-#define tar_set_error(msg) \
- (tar_data->lasterrstring = _(msg))
-
-static const char *
-tar_getlasterror(void)
-{
- if (tar_data->lasterrstring)
- return tar_data->lasterrstring;
- return strerror(tar_data->lasterrno);
-}
#ifdef HAVE_LIBZ
static bool
-tar_write_compressed_data(void *buf, size_t count, bool flush)
+tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
+ bool flush)
{
tar_data->zp->next_in = buf;
tar_data->zp->avail_in = count;
@@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ tar_data->base.lasterrstring = "could not compress data";
return false;
}
@@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ tar_data->base.lasterrno = errno ? errno : ENOSPC;
return false;
}
@@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
/* Reset the stream for writing */
if (deflateReset(tar_data->zp) != Z_OK)
{
- tar_set_error("could not reset compression stream");
+ tar_data->base.lasterrstring = "could not reset compression stream";
return false;
}
}
@@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
static ssize_t
tar_write(Walfile *f, const void *buf, size_t count)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
ssize_t r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
/* Tarfile will always be positioned at the end */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
r = write(tar_data->fd, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
f->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+ if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
+ count, false))
return -1;
f->currpos += count;
return count;
@@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count)
else
{
/* Can't happen - compression enabled with no method set */
- tar_data->lasterrno = ENOSYS;
+ f->wwmethod->lasterrno = ENOSYS;
return -1;
}
}
@@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
}
static char *
-tar_get_file_name(const char *pathname, const char *temp_suffix)
+tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
@@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix)
}
static Walfile *
-tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char *tmppath;
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->fd < 0)
{
@@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_file_create_mode);
if (tar_data->fd < 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
tar_data->zp->zalloc = Z_NULL;
@@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* default 15 for the windowBits parameter makes the output be
* gzip instead of zlib.
*/
- if (deflateInit2(tar_data->zp, tar_data->compression_level,
+ if (deflateInit2(tar_data->zp, wwmethod->compression_level,
Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_free(tar_data->zp);
tar_data->zp = NULL;
- tar_set_error("could not initialize compression library");
+ wwmethod->lasterrstring =
+ "could not initialize compression library";
return NULL;
}
}
@@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (tar_data->currentfile != NULL)
{
- tar_set_error("implementation error: tar files can't have more than one open file");
+ wwmethod->lasterrstring =
+ "implementation error: tar files can't have more than one open file";
return NULL;
}
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+ tar_data->currentfile->base.wwmethod = wwmethod;
- tmppath = tar_get_file_name(pathname, temp_suffix);
+ tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
/* Create a header with size set to 0 - we will fill out the size on close */
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
@@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_free(tar_data->currentfile);
pg_free(tmppath);
tar_data->currentfile = NULL;
- tar_set_error("could not create tar header");
+ wwmethod->lasterrstring = "could not create tar header";
return NULL;
}
pg_free(tmppath);
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush existing data */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return NULL;
/* Turn off compression for header */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring =
+ "could not change compression parameters";
return NULL;
}
}
@@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
if (tar_data->currentfile->ofs_start == -1)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
tar_data->currentfile->base.currpos = 0;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tar_data->currentfile->header,
TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Write header through the zlib APIs but with no compression */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return NULL;
/* Re-enable compression for the rest of the file */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring = "could not change compression parameters";
return NULL;
}
}
@@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (pad_to_size)
{
tar_data->currentfile->pad_to_size = pad_to_size;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
/* Uncompressed, so pad now */
if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
@@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
@@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
static ssize_t
-tar_get_file_size(const char *pathname)
+tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* Currently not used, so not supported */
- tar_data->lasterrno = ENOSYS;
+ wwmethod->lasterrno = ENOSYS;
return -1;
}
-static pg_compress_algorithm
-tar_compression_algorithm(void)
-{
- return tar_data->compression_algorithm;
-}
-
static int
tar_sync(Walfile *f)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
int r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
- if (!tar_data->sync)
+ if (!f->wwmethod->sync)
return 0;
/*
* Always sync the whole tarfile, because that's all we can do. This makes
* no sense on compressed files, so just ignore those.
*/
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
return 0;
r = fsync(tar_data->fd);
if (r < 0)
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
@@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method)
{
ssize_t filesize;
int padding;
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
TarMethodFile *tf = (TarMethodFile *) f;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
if (method == CLOSE_UNLINK)
{
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
{
- tar_set_error("unlink not supported with compression");
+ f->wwmethod->lasterrstring = "unlink not supported with compression";
return -1;
}
@@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (tf->pad_to_size)
{
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/*
* A compressed tarfile is padded on close since we cannot know
@@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method)
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush the current buffer */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return -1;
}
#endif
@@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method)
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Turn off compression */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
/* Overwrite the header, assuming the size will be the same */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return -1;
/* Turn compression back on */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
}
@@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method)
/* Move file pointer back down to end, so we can write the next file */
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method)
{
/* XXX this seems pretty bogus; why is only this case fatal? */
pg_fatal("could not fsync file \"%s\": %s",
- tf->base.pathname, tar_getlasterror());
+ tf->base.pathname, GetLastWalMethodError(f->wwmethod));
}
/* Clean up and done */
@@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method)
}
static bool
-tar_existsfile(const char *pathname)
+tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* We only deal with new tarfiles, so nothing externally created exists */
return false;
}
static bool
-tar_finish(void)
+tar_finish(WalWriteMethod *wwmethod)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char zerobuf[1024] = {0};
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->currentfile)
{
@@ -1212,20 +1234,21 @@ tar_finish(void)
}
/* A tarfile always ends with two empty blocks */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
+ false))
return false;
/* Also flush all data to make sure the gzip stream is finished */
@@ -1239,7 +1262,7 @@ tar_finish(void)
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ wwmethod->lasterrstring = "could not compress data";
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
@@ -1253,7 +1276,7 @@ tar_finish(void)
* If write didn't set errno, assume problem is no disk
* space.
*/
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
@@ -1263,7 +1286,7 @@ tar_finish(void)
if (deflateEnd(tar_data->zp) != Z_OK)
{
- tar_set_error("could not close compression stream");
+ wwmethod->lasterrstring = "could not close compression stream";
return false;
}
}
@@ -1275,29 +1298,29 @@ tar_finish(void)
}
/* sync the empty blocks as well, since they're after the last file */
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
if (close(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
tar_data->fd = -1;
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tar_data->tarfilename, false) != 0 ||
fsync_parent_path(tar_data->tarfilename) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
@@ -1305,6 +1328,19 @@ tar_finish(void)
return true;
}
+static void
+tar_free(WalWriteMethod *wwmethod)
+{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
+
+ pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
+ pg_free(tar_data->zlibOut);
+#endif
+ pg_free(wwmethod);
+}
+
/*
* The argument compression_algorithm is currently ignored. It is in place for
* symmetry with CreateWalDirectoryMethod which uses it for distinguishing
@@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
+ TarMethodData *wwmethod;
const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
".tar.gz" : ".tar";
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = tar_open_for_write;
- method->write = tar_write;
- method->get_file_size = tar_get_file_size;
- method->get_file_name = tar_get_file_name;
- method->compression_algorithm = tar_compression_algorithm;
- method->close = tar_close;
- method->sync = tar_sync;
- method->existsfile = tar_existsfile;
- method->finish = tar_finish;
- method->getlasterror = tar_getlasterror;
-
- tar_data = pg_malloc0(sizeof(TarMethodData));
- tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
- sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
- tar_data->fd = -1;
- tar_data->compression_algorithm = compression_algorithm;
- tar_data->compression_level = compression_level;
- tar_data->sync = sync;
+ wwmethod = pg_malloc0(sizeof(TarMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalTarMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+
+ wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
+ wwmethod->fd = -1;
#ifdef HAVE_LIBZ
if (compression_algorithm == PG_COMPRESSION_GZIP)
- tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+ wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
#endif
- return method;
+ return &wwmethod->base;
}
-void
-FreeWalTarMethod(void)
+const char *
+GetLastWalMethodError(WalWriteMethod *wwmethod)
{
- pg_free(tar_data->tarfilename);
-#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
- pg_free(tar_data->zlibOut);
-#endif
- pg_free(tar_data);
- tar_data = NULL;
+ if (wwmethod->lasterrstring)
+ return wwmethod->lasterrstring;
+ return strerror(wwmethod->lasterrno);
}