diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 123 |
1 files changed, 114 insertions, 9 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 0ca30c425f3..dea944beb94 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -27,6 +27,7 @@ #include "receivelog.h" #include "streamutil.h" +#include <sys/stat.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> @@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0}; * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is * enough room in this buffer... + * + * The file will be padded to 16Mb with zeroes. */ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) { int f; char fn[MAXPGPATH]; + struct stat statbuf; + char *zerobuf; + int bytes; XLogFileName(namebuf, timeline, startpoint.xlogid, startpoint.xrecoff / XLOG_SEG_SIZE); - snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); - f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); + snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf); + f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) + { fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), - progname, namebuf, strerror(errno)); + progname, fn, strerror(errno)); + return -1; + } + + /* + * Verify that the file is either empty (just created), or a complete + * XLogSegSize segment. Anything in between indicates a corrupt file. + */ + if (fstat(f, &statbuf) != 0) + { + fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + return -1; + } + if (statbuf.st_size == XLogSegSize) + return f; /* File is open and ready to use */ + if (statbuf.st_size != 0) + { + fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"), + progname, fn, (int) statbuf.st_size, XLogSegSize); + close(f); + return -1; + } + + /* New, empty, file. So pad it to 16Mb with zeroes */ + zerobuf = xmalloc0(XLOG_BLCKSZ); + for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ) + { + if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + unlink(fn); + return -1; + } + } + free(zerobuf); + + if (lseek(f, SEEK_SET, 0) != 0) + { + fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + return -1; + } return f; } +static bool +close_walfile(int walfile, char *basedir, char *walname) +{ + off_t currpos = lseek(walfile, 0, SEEK_CUR); + + if (currpos == -1) + { + fprintf(stderr, _("%s: could not get current position in file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (fsync(walfile) != 0) + { + fprintf(stderr, _("%s: could not fsync file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (close(walfile) != 0) + { + fprintf(stderr, _("%s: could not close file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + /* + * Rename the .partial file only if we've completed writing the + * whole segment. + */ + if (currpos == XLOG_SEG_SIZE) + { + char oldfn[MAXPGPATH]; + char newfn[MAXPGPATH]; + + snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname); + snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); + if (rename(oldfn, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + } + else + fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"), + progname, walname); + + return true; +} + + /* * Local version of GetCurrentTimestamp(), since we are not linked with * backend code. @@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi if (stream_continue && stream_continue()) { if (walfile != -1) - { - fsync(walfile); - close(walfile); - } + /* Potential error message is written by close_walfile */ + return close_walfile(walfile, basedir, current_walfile_name); return true; } @@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { - fsync(walfile); - close(walfile); + if (!close_walfile(walfile, basedir, current_walfile_name)) + /* Error message written in close_walfile() */ + return false; + walfile = -1; xlogoff = 0; |