diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 316 |
1 files changed, 108 insertions, 208 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index b0fa916b44b..fcd02694733 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -30,7 +30,7 @@ /* fd and filename for currently open WAL file */ -static int walfile = -1; +static Walfile *walfile = NULL; static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; @@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); static bool -mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) +mark_file_as_archived(StreamCtl *stream, const char *fname) { - int fd; + Walfile *f; static char tmppath[MAXPGPATH]; - snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done", - basedir, fname); + snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done", + fname); - fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) + f = stream->walmethod->open_for_write(tmppath, NULL, 0); + if (f == NULL) { fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); + progname, tmppath, stream->walmethod->getlasterror()); return false; } - close(fd); - - if (do_sync && fsync_fname(tmppath, false, progname) != 0) - return false; - - if (do_sync && fsync_parent_path(tmppath, progname) != 0) - return false; + stream->walmethod->close(f, CLOSE_NORMAL); return true; } @@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint) { - int f; + Walfile *f; char fn[MAXPGPATH]; - struct stat statbuf; - char *zerobuf; - int bytes; + ssize_t size; XLogSegNo segno; XLByteToSeg(startpoint, segno); XLogFileName(current_walfile_name, stream->timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, + snprintf(fn, sizeof(fn), "%s%s", current_walfile_name, stream->partial_suffix ? stream->partial_suffix : ""); - f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (f == -1) - { - fprintf(stderr, - _("%s: could not open transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - return false; - } /* - * Verify that the file is either empty (just created), or a complete - * XLogSegSize segment. Anything in between indicates a corrupt file. + * When streaming to files, if an existing file exists we verify that it's + * either empty (just created), or a complete XLogSegSize segment (in + * which case it has been created and padded). Anything else indicates a + * corrupt file. + * + * When streaming to tar, no file with this name will exist before, so we + * never have to verify a size. */ - if (fstat(f, &statbuf) != 0) + if (stream->walmethod->existsfile(fn)) { - fprintf(stderr, - _("%s: could not stat transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - close(f); - return false; - } - if (statbuf.st_size == XLogSegSize) - { - /* - * fsync, in case of a previous crash between padding and fsyncing the - * file. - */ - if (stream->do_sync) + size = stream->walmethod->get_file_size(fn); + if (size < 0) { - if (fsync_fname(fn, false, progname) != 0 || - fsync_parent_path(fn, progname) != 0) + fprintf(stderr, + _("%s: could not get size of transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); + return false; + } + if (size == XLogSegSize) + { + /* Already padded file. Open it for use */ + f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0); + if (f == NULL) { - /* error already printed */ - close(f); + fprintf(stderr, + _("%s: could not open existing transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); return false; } - } - /* File is open and ready to use */ - walfile = f; - return true; - } - if (statbuf.st_size != 0) - { - fprintf(stderr, - _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), - progname, fn, (int) statbuf.st_size, XLogSegSize); - close(f); - return false; - } + /* fsync file in case of a previous crash */ + if (!stream->walmethod->fsync(f)) + { + stream->walmethod->close(f, CLOSE_UNLINK); + return false; + } - /* - * New, empty, file. So pad it to 16Mb with zeroes. If we fail partway - * through padding, we should attempt to unlink the file on failure, so as - * not to leave behind a partially-filled file. - */ - zerobuf = pg_malloc0(XLOG_BLCKSZ); - for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ) - { - errno = 0; - if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + walfile = f; + return true; + } + if (size != 0) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; fprintf(stderr, - _("%s: could not pad transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - free(zerobuf); - close(f); - unlink(fn); + _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), + progname, fn, (int) size, XLogSegSize); return false; } + /* File existed and was empty, so fall through and open */ } - free(zerobuf); - /* - * fsync WAL file and containing directory, to ensure the file is - * persistently created and zeroed. That's particularly important when - * using synchronous mode, where the file is modified and fsynced - * in-place, without a directory fsync. - */ - if (stream->do_sync) - { - if (fsync_fname(fn, false, progname) != 0 || - fsync_parent_path(fn, progname) != 0) - { - /* error already printed */ - close(f); - return false; - } - } + /* No file existed, so create one */ - if (lseek(f, SEEK_SET, 0) != 0) + f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize); + if (f == NULL) { fprintf(stderr, - _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - close(f); + _("%s: could not open transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); return false; } - /* File is open and ready to use */ walfile = f; return true; } @@ -223,59 +178,46 @@ static bool close_walfile(StreamCtl *stream, XLogRecPtr pos) { off_t currpos; + int r; - if (walfile == -1) + if (walfile == NULL) return true; - currpos = lseek(walfile, 0, SEEK_CUR); + currpos = stream->walmethod->get_current_pos(walfile); if (currpos == -1) { fprintf(stderr, _("%s: could not determine seek position in file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - close(walfile); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); + stream->walmethod->close(walfile, CLOSE_UNLINK); + walfile = NULL; + return false; } - if (stream->do_sync && fsync(walfile) != 0) + if (stream->partial_suffix) { - fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - close(walfile); - walfile = -1; - return false; + if (currpos == XLOG_SEG_SIZE) + r = stream->walmethod->close(walfile, CLOSE_NORMAL); + else + { + fprintf(stderr, + _("%s: not renaming \"%s%s\", segment is not complete\n"), + progname, current_walfile_name, stream->partial_suffix); + r = stream->walmethod->close(walfile, CLOSE_NO_RENAME); + } } + else + r = stream->walmethod->close(walfile, CLOSE_NORMAL); - if (close(walfile) != 0) + walfile = NULL; + + if (r != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); return false; } - walfile = -1; - - /* - * If we finished writing a .partial file, rename it into place. - */ - if (currpos == XLOG_SEG_SIZE && stream->partial_suffix) - { - char oldfn[MAXPGPATH]; - char newfn[MAXPGPATH]; - - snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix); - snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name); - if (durable_rename(oldfn, newfn, progname) != 0) - { - /* durable_rename produced a log entry */ - return false; - } - } - else if (stream->partial_suffix) - fprintf(stderr, - _("%s: not renaming \"%s%s\", segment is not complete\n"), - progname, current_walfile_name, stream->partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs @@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) if (currpos == XLOG_SEG_SIZE && stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(stream->basedir, current_walfile_name, - stream->do_sync)) + if (!mark_file_as_archived(stream, current_walfile_name)) return false; } @@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) static bool existsTimeLineHistoryFile(StreamCtl *stream) { - char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; - int fd; /* * Timeline 1 never has a history file. We treat that as if it existed, @@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream) TLHistoryFileName(histfname, stream->timeline); - snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); - - fd = open(path, O_RDONLY | PG_BINARY, 0); - if (fd < 0) - { - if (errno != ENOENT) - fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"), - progname, path, strerror(errno)); - return false; - } - else - { - close(fd); - return true; - } + return stream->walmethod->existsfile(histfname); } static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) { int size = strlen(content); - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; char histfname[MAXFNAMELEN]; - int fd; + Walfile *f; /* * Check that the server's idea of how timeline history files should be @@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) return false; } - snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); - - /* - * Write into a temp file name. - */ - snprintf(tmppath, MAXPGPATH, "%s.tmp", path); - - unlink(tmppath); - - fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) + f = stream->walmethod->open_for_write(histfname, ".tmp", 0); + if (f == NULL) { fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); + progname, histfname, stream->walmethod->getlasterror()); return false; } - errno = 0; - if ((int) write(fd, content, size) != size) + if ((int) stream->walmethod->write(f, content, size) != size) { - int save_errno = errno; + fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"), + progname, histfname, stream->walmethod->getlasterror()); /* * If we fail to make the file, delete it to release disk space */ - close(fd); - unlink(tmppath); - errno = save_errno; + stream->walmethod->close(f, CLOSE_UNLINK); - fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); return false; } - if (close(fd) != 0) + if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); - return false; - } - - /* - * Now move the completed history file into place with its final name. - */ - if (durable_rename(tmppath, path, progname) < 0) - { - /* durable_rename produced a log entry */ + progname, histfname, stream->walmethod->getlasterror()); return false; } @@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) if (stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(stream->basedir, histfname, - stream->do_sync)) + if (!mark_file_as_archived(stream, histfname)) return false; } @@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { /* * Fetch the timeline history file for this timeline, if we don't have - * it already. + * it already. When streaming log to tar, this will always return + * false, as we are never streaming into an existing file and + * therefore there can be no pre-existing timeline history file. */ if (!existsTimeLineHistoryFile(stream)) { @@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } error: - if (walfile != -1 && close(walfile) != 0) + if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0) fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); + walfile = NULL; return false; } @@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ - if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) + if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL) { - if (stream->do_sync && fsync(walfile) != 0) + if (stream->walmethod->fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); + progname, current_walfile_name, stream->walmethod->getlasterror()); goto error; } lastFlushPosition = blockpos; @@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, if (replyRequested && still_sending) { if (reportFlushPosition && lastFlushPosition < blockpos && - walfile != -1) + walfile != NULL) { /* * If a valid flush location needs to be reported, flush the @@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * data has been successfully replicated or not, at the normal * shutdown of the server. */ - if (stream->do_sync && fsync(walfile) != 0) + if (stream->walmethod->fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); + progname, current_walfile_name, stream->walmethod->getlasterror()); return false; } lastFlushPosition = blockpos; @@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * Verify that the initial location in the stream matches where we think * we are. */ - if (walfile == -1) + if (walfile == NULL) { /* No file open yet */ if (xlogoff != 0) @@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, else { /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + if (stream->walmethod->get_current_pos(walfile) != xlogoff) { fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile)); return false; } } @@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, else bytes_to_write = bytes_left; - if (walfile == -1) + if (walfile == NULL) { if (!open_walfile(stream, *blockpos)) { @@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } } - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) { fprintf(stderr, _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), progname, bytes_to_write, current_walfile_name, - strerror(errno)); + stream->walmethod->getlasterror()); return false; } |