aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c93
1 files changed, 47 insertions, 46 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 98e874f4ffe..7ce81125bfe 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -30,11 +30,11 @@
/* fd and filename for currently open WAL file */
static int walfile = -1;
-static char current_walfile_name[MAXPGPATH] = "";
+static char current_walfile_name[MAXPGPATH] = "";
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
- stream_stop_callback stream_stop, int standby_message_timeout,
+ stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
@@ -200,7 +200,7 @@ close_walfile(char *basedir, char *partial_suffix)
static int64
localGetCurrentTimestamp(void)
{
- int64 result;
+ int64 result;
struct timeval tp;
gettimeofday(&tp, NULL);
@@ -221,7 +221,7 @@ static void
localTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs)
{
- int64 diff = stop_time - start_time;
+ int64 diff = stop_time - start_time;
if (diff <= 0)
{
@@ -244,7 +244,7 @@ localTimestampDifferenceExceeds(int64 start_time,
int64 stop_time,
int msec)
{
- int64 diff = stop_time - start_time;
+ int64 diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000));
}
@@ -309,7 +309,7 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
/*
* Write into a temp file name.
*/
- snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
+ snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
unlink(tmppath);
@@ -414,19 +414,19 @@ static bool
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
{
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
- int len = 0;
+ int len = 0;
replybuf[len] = 'r';
len += 1;
- sendint64(blockpos, &replybuf[len]); /* write */
+ sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
- sendint64(now, &replybuf[len]); /* sendTime */
+ sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
- replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
+ replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
@@ -464,6 +464,7 @@ CheckServerVersionForStreaming(PGconn *conn)
if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
{
const char *serverver = PQparameterStatus(conn, "server_version");
+
fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
progname,
serverver ? serverver : "'unknown'",
@@ -550,7 +551,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
- _("%s: starting timeline %u is not present in the server\n"),
+ _("%s: starting timeline %u is not present in the server\n"),
progname, timeline);
PQclear(res);
return false;
@@ -561,8 +562,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
while (1)
{
/*
- * Fetch the timeline history file for this timeline, if we don't
- * have it already.
+ * Fetch the timeline history file for this timeline, if we don't have
+ * it already.
*/
if (!existsTimeLineHistoryFile(basedir, timeline))
{
@@ -572,7 +573,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
{
/* FIXME: we might send it ok, but get an error */
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
+ progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
PQclear(res);
return false;
}
@@ -585,7 +586,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
{
fprintf(stderr,
_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 2);
+ progname, PQntuples(res), PQnfields(res), 1, 2);
}
/* Write the history file to disk */
@@ -597,8 +598,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/*
- * Before we start streaming from the requested location, check
- * if the callback tells us to stop here.
+ * Before we start streaming from the requested location, check if the
+ * callback tells us to stop here.
*/
if (stream_stop(startpos, timeline, false))
return true;
@@ -627,8 +628,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Streaming finished.
*
- * There are two possible reasons for that: a controlled shutdown,
- * or we reached the end of the current timeline. In case of
+ * There are two possible reasons for that: a controlled shutdown, or
+ * we reached the end of the current timeline. In case of
* end-of-timeline, the server sends a result set after Copy has
* finished, containing information about the next timeline. Read
* that, and restart streaming from the next timeline. In case of
@@ -667,7 +668,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
- newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
+ newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
goto error;
}
@@ -676,15 +677,15 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
- _("%s: unexpected termination of replication stream: %s"),
+ _("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
/*
- * Loop back to start streaming from the new timeline.
- * Always start streaming at the beginning of a segment.
+ * Loop back to start streaming from the new timeline. Always
+ * start streaming at the beginning of a segment.
*/
timeline = newtimeline;
startpos = startpos - (startpos % XLOG_SEG_SIZE);
@@ -738,9 +739,9 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
/*----------
* The result set consists of one row and two columns, e.g:
*
- * next_tli | next_tli_startpos
+ * next_tli | next_tli_startpos
* ----------+-------------------
- * 4 | 0/9949AE0
+ * 4 | 0/9949AE0
*
* next_tli is the timeline ID of the next timeline after the one that
* just finished streaming. next_tli_startpos is the XLOG position where
@@ -760,7 +761,7 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
&startpos_xrecoff) != 2)
{
fprintf(stderr,
- _("%s: could not parse next timeline's starting point \"%s\"\n"),
+ _("%s: could not parse next timeline's starting point \"%s\"\n"),
progname, PQgetvalue(res, 0, 1));
return false;
}
@@ -840,8 +841,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (r == 0)
{
/*
- * No data available. Wait for some to appear, but not longer
- * than the specified timeout, so that we can ping the server.
+ * No data available. Wait for some to appear, but not longer than
+ * the specified timeout, so that we can ping the server.
*/
fd_set input_mask;
struct timeval timeout;
@@ -875,8 +876,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
{
/*
* Got a timeout or signal. Continue the loop and either
- * deliver a status packet to the server or just go back
- * into blocking.
+ * deliver a status packet to the server or just go back into
+ * blocking.
*/
continue;
}
@@ -940,17 +941,17 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Check the message type. */
if (copybuf[0] == 'k')
{
- int pos;
- bool replyRequested;
+ int pos;
+ bool replyRequested;
/*
* Parse the keepalive message, enclosed in the CopyData message.
* We just check if the server requested a reply, and ignore the
* rest.
*/
- pos = 1; /* skip msgtype 'k' */
- pos += 8; /* skip walEnd */
- pos += 8; /* skip sendTime */
+ pos = 1; /* skip msgtype 'k' */
+ pos += 8; /* skip walEnd */
+ pos += 8; /* skip sendTime */
if (r < pos + 1)
{
@@ -983,10 +984,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* CopyData message. We only need the WAL location field
* (dataStart), the rest of the header is ignored.
*/
- hdr_len = 1; /* msgtype 'w' */
- hdr_len += 8; /* dataStart */
- hdr_len += 8; /* walEnd */
- hdr_len += 8; /* sendTime */
+ hdr_len = 1; /* msgtype 'w' */
+ hdr_len += 8; /* dataStart */
+ hdr_len += 8; /* walEnd */
+ hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
@@ -999,8 +1000,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
xlogoff = blockpos % XLOG_SEG_SIZE;
/*
- * Verify that the initial location in the stream matches where
- * we think we are.
+ * Verify that the initial location in the stream matches where we
+ * think we are.
*/
if (walfile == -1)
{
@@ -1020,8 +1021,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr,
- _("%s: got WAL data offset %08x, expected %08x\n"),
- progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
@@ -1087,7 +1088,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
}
still_sending = false;
- break; /* ignore the rest of this XLogData packet */
+ break; /* ignore the rest of this XLogData packet */
}
}
}