aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c123
1 files changed, 114 insertions, 9 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 0ca30c425f3..dea944beb94 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -27,6 +27,7 @@
#include "receivelog.h"
#include "streamutil.h"
+#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
@@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer...
+ *
+ * The file will be padded to 16Mb with zeroes.
*/
static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
{
int f;
char fn[MAXPGPATH];
+ struct stat statbuf;
+ char *zerobuf;
+ int bytes;
XLogFileName(namebuf, timeline, startpoint.xlogid,
startpoint.xrecoff / XLOG_SEG_SIZE);
- snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
- f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
+ {
fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
- progname, namebuf, strerror(errno));
+ progname, fn, strerror(errno));
+ return -1;
+ }
+
+ /*
+ * Verify that the file is either empty (just created), or a complete
+ * XLogSegSize segment. Anything in between indicates a corrupt file.
+ */
+ if (fstat(f, &statbuf) != 0)
+ {
+ fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
+ progname, fn, strerror(errno));
+ close(f);
+ return -1;
+ }
+ if (statbuf.st_size == XLogSegSize)
+ return f; /* File is open and ready to use */
+ if (statbuf.st_size != 0)
+ {
+ fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
+ progname, fn, (int) statbuf.st_size, XLogSegSize);
+ close(f);
+ return -1;
+ }
+
+ /* New, empty, file. So pad it to 16Mb with zeroes */
+ zerobuf = xmalloc0(XLOG_BLCKSZ);
+ for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
+ {
+ if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
+ progname, fn, strerror(errno));
+ close(f);
+ unlink(fn);
+ return -1;
+ }
+ }
+ free(zerobuf);
+
+ if (lseek(f, SEEK_SET, 0) != 0)
+ {
+ fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
+ progname, fn, strerror(errno));
+ close(f);
+ return -1;
+ }
return f;
}
+static bool
+close_walfile(int walfile, char *basedir, char *walname)
+{
+ off_t currpos = lseek(walfile, 0, SEEK_CUR);
+
+ if (currpos == -1)
+ {
+ fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ if (fsync(walfile) != 0)
+ {
+ fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ if (close(walfile) != 0)
+ {
+ fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ /*
+ * Rename the .partial file only if we've completed writing the
+ * whole segment.
+ */
+ if (currpos == XLOG_SEG_SIZE)
+ {
+ char oldfn[MAXPGPATH];
+ char newfn[MAXPGPATH];
+
+ snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+ snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+ if (rename(oldfn, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+ }
+ else
+ fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+ progname, walname);
+
+ return true;
+}
+
+
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
* backend code.
@@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
if (stream_continue && stream_continue())
{
if (walfile != -1)
- {
- fsync(walfile);
- close(walfile);
- }
+ /* Potential error message is written by close_walfile */
+ return close_walfile(walfile, basedir, current_walfile_name);
return true;
}
@@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
- fsync(walfile);
- close(walfile);
+ if (!close_walfile(walfile, basedir, current_walfile_name))
+ /* Error message written in close_walfile() */
+ return false;
+
walfile = -1;
xlogoff = 0;