aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c316
1 files changed, 108 insertions, 208 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index b0fa916b44b..fcd02694733 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -30,7 +30,7 @@
/* fd and filename for currently open WAL file */
-static int walfile = -1;
+static Walfile *walfile = NULL;
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
@@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
static bool
-mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
{
- int fd;
+ Walfile *f;
static char tmppath[MAXPGPATH];
- snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
- basedir, fname);
+ snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+ fname);
- fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (fd < 0)
+ f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+ if (f == NULL)
{
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
+ progname, tmppath, stream->walmethod->getlasterror());
return false;
}
- close(fd);
-
- if (do_sync && fsync_fname(tmppath, false, progname) != 0)
- return false;
-
- if (do_sync && fsync_parent_path(tmppath, progname) != 0)
- return false;
+ stream->walmethod->close(f, CLOSE_NORMAL);
return true;
}
@@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
static bool
open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{
- int f;
+ Walfile *f;
char fn[MAXPGPATH];
- struct stat statbuf;
- char *zerobuf;
- int bytes;
+ ssize_t size;
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
XLogFileName(current_walfile_name, stream->timeline, segno);
- snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+ snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
stream->partial_suffix ? stream->partial_suffix : "");
- f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (f == -1)
- {
- fprintf(stderr,
- _("%s: could not open transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- return false;
- }
/*
- * Verify that the file is either empty (just created), or a complete
- * XLogSegSize segment. Anything in between indicates a corrupt file.
+ * When streaming to files, if an existing file exists we verify that it's
+ * either empty (just created), or a complete XLogSegSize segment (in
+ * which case it has been created and padded). Anything else indicates a
+ * corrupt file.
+ *
+ * When streaming to tar, no file with this name will exist before, so we
+ * never have to verify a size.
*/
- if (fstat(f, &statbuf) != 0)
+ if (stream->walmethod->existsfile(fn))
{
- fprintf(stderr,
- _("%s: could not stat transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- close(f);
- return false;
- }
- if (statbuf.st_size == XLogSegSize)
- {
- /*
- * fsync, in case of a previous crash between padding and fsyncing the
- * file.
- */
- if (stream->do_sync)
+ size = stream->walmethod->get_file_size(fn);
+ if (size < 0)
{
- if (fsync_fname(fn, false, progname) != 0 ||
- fsync_parent_path(fn, progname) != 0)
+ fprintf(stderr,
+ _("%s: could not get size of transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
+ return false;
+ }
+ if (size == XLogSegSize)
+ {
+ /* Already padded file. Open it for use */
+ f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+ if (f == NULL)
{
- /* error already printed */
- close(f);
+ fprintf(stderr,
+ _("%s: could not open existing transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
return false;
}
- }
- /* File is open and ready to use */
- walfile = f;
- return true;
- }
- if (statbuf.st_size != 0)
- {
- fprintf(stderr,
- _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
- progname, fn, (int) statbuf.st_size, XLogSegSize);
- close(f);
- return false;
- }
+ /* fsync file in case of a previous crash */
+ if (!stream->walmethod->fsync(f))
+ {
+ stream->walmethod->close(f, CLOSE_UNLINK);
+ return false;
+ }
- /*
- * New, empty, file. So pad it to 16Mb with zeroes. If we fail partway
- * through padding, we should attempt to unlink the file on failure, so as
- * not to leave behind a partially-filled file.
- */
- zerobuf = pg_malloc0(XLOG_BLCKSZ);
- for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
- {
- errno = 0;
- if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ walfile = f;
+ return true;
+ }
+ if (size != 0)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
fprintf(stderr,
- _("%s: could not pad transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- free(zerobuf);
- close(f);
- unlink(fn);
+ _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+ progname, fn, (int) size, XLogSegSize);
return false;
}
+ /* File existed and was empty, so fall through and open */
}
- free(zerobuf);
- /*
- * fsync WAL file and containing directory, to ensure the file is
- * persistently created and zeroed. That's particularly important when
- * using synchronous mode, where the file is modified and fsynced
- * in-place, without a directory fsync.
- */
- if (stream->do_sync)
- {
- if (fsync_fname(fn, false, progname) != 0 ||
- fsync_parent_path(fn, progname) != 0)
- {
- /* error already printed */
- close(f);
- return false;
- }
- }
+ /* No file existed, so create one */
- if (lseek(f, SEEK_SET, 0) != 0)
+ f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+ if (f == NULL)
{
fprintf(stderr,
- _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- close(f);
+ _("%s: could not open transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
return false;
}
- /* File is open and ready to use */
walfile = f;
return true;
}
@@ -223,59 +178,46 @@ static bool
close_walfile(StreamCtl *stream, XLogRecPtr pos)
{
off_t currpos;
+ int r;
- if (walfile == -1)
+ if (walfile == NULL)
return true;
- currpos = lseek(walfile, 0, SEEK_CUR);
+ currpos = stream->walmethod->get_current_pos(walfile);
if (currpos == -1)
{
fprintf(stderr,
_("%s: could not determine seek position in file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- close(walfile);
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
+ stream->walmethod->close(walfile, CLOSE_UNLINK);
+ walfile = NULL;
+
return false;
}
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->partial_suffix)
{
- fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- close(walfile);
- walfile = -1;
- return false;
+ if (currpos == XLOG_SEG_SIZE)
+ r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ else
+ {
+ fprintf(stderr,
+ _("%s: not renaming \"%s%s\", segment is not complete\n"),
+ progname, current_walfile_name, stream->partial_suffix);
+ r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+ }
}
+ else
+ r = stream->walmethod->close(walfile, CLOSE_NORMAL);
- if (close(walfile) != 0)
+ walfile = NULL;
+
+ if (r != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
return false;
}
- walfile = -1;
-
- /*
- * If we finished writing a .partial file, rename it into place.
- */
- if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
- {
- char oldfn[MAXPGPATH];
- char newfn[MAXPGPATH];
-
- snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
- snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
- if (durable_rename(oldfn, newfn, progname) != 0)
- {
- /* durable_rename produced a log entry */
- return false;
- }
- }
- else if (stream->partial_suffix)
- fprintf(stderr,
- _("%s: not renaming \"%s%s\", segment is not complete\n"),
- progname, current_walfile_name, stream->partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
@@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(stream->basedir, current_walfile_name,
- stream->do_sync))
+ if (!mark_file_as_archived(stream, current_walfile_name))
return false;
}
@@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool
existsTimeLineHistoryFile(StreamCtl *stream)
{
- char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
- int fd;
/*
* Timeline 1 never has a history file. We treat that as if it existed,
@@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream)
TLHistoryFileName(histfname, stream->timeline);
- snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
- fd = open(path, O_RDONLY | PG_BINARY, 0);
- if (fd < 0)
- {
- if (errno != ENOENT)
- fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
- progname, path, strerror(errno));
- return false;
- }
- else
- {
- close(fd);
- return true;
- }
+ return stream->walmethod->existsfile(histfname);
}
static bool
writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{
int size = strlen(content);
- char path[MAXPGPATH];
- char tmppath[MAXPGPATH];
char histfname[MAXFNAMELEN];
- int fd;
+ Walfile *f;
/*
* Check that the server's idea of how timeline history files should be
@@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
return false;
}
- snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
- /*
- * Write into a temp file name.
- */
- snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
- unlink(tmppath);
-
- fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (fd < 0)
+ f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+ if (f == NULL)
{
fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
+ progname, histfname, stream->walmethod->getlasterror());
return false;
}
- errno = 0;
- if ((int) write(fd, content, size) != size)
+ if ((int) stream->walmethod->write(f, content, size) != size)
{
- int save_errno = errno;
+ fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+ progname, histfname, stream->walmethod->getlasterror());
/*
* If we fail to make the file, delete it to release disk space
*/
- close(fd);
- unlink(tmppath);
- errno = save_errno;
+ stream->walmethod->close(f, CLOSE_UNLINK);
- fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
return false;
}
- if (close(fd) != 0)
+ if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
- return false;
- }
-
- /*
- * Now move the completed history file into place with its final name.
- */
- if (durable_rename(tmppath, path, progname) < 0)
- {
- /* durable_rename produced a log entry */
+ progname, histfname, stream->walmethod->getlasterror());
return false;
}
@@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
if (stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(stream->basedir, histfname,
- stream->do_sync))
+ if (!mark_file_as_archived(stream, histfname))
return false;
}
@@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
/*
* Fetch the timeline history file for this timeline, if we don't have
- * it already.
+ * it already. When streaming log to tar, this will always return
+ * false, as we are never streaming into an existing file and
+ * therefore there can be no pre-existing timeline history file.
*/
if (!existsTimeLineHistoryFile(stream))
{
@@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
}
error:
- if (walfile != -1 && close(walfile) != 0)
+ if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
+ walfile = NULL;
return false;
}
@@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
- if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+ if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->walmethod->fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
+ progname, current_walfile_name, stream->walmethod->getlasterror());
goto error;
}
lastFlushPosition = blockpos;
@@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
if (replyRequested && still_sending)
{
if (reportFlushPosition && lastFlushPosition < blockpos &&
- walfile != -1)
+ walfile != NULL)
{
/*
* If a valid flush location needs to be reported, flush the
@@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* data has been successfully replicated or not, at the normal
* shutdown of the server.
*/
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->walmethod->fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
+ progname, current_walfile_name, stream->walmethod->getlasterror());
return false;
}
lastFlushPosition = blockpos;
@@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* Verify that the initial location in the stream matches where we think
* we are.
*/
- if (walfile == -1)
+ if (walfile == NULL)
{
/* No file open yet */
if (xlogoff != 0)
@@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
else
{
/* More data in existing segment */
- /* XXX: store seek value don't reseek all the time */
- if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ if (stream->walmethod->get_current_pos(walfile) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
- progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
return false;
}
}
@@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
else
bytes_to_write = bytes_left;
- if (walfile == -1)
+ if (walfile == NULL)
{
if (!open_walfile(stream, *blockpos))
{
@@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
}
}
- if (write(walfile,
- copybuf + hdr_len + bytes_written,
- bytes_to_write) != bytes_to_write)
+ if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+ bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
- strerror(errno));
+ stream->walmethod->getlasterror());
return false;
}