aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c83
1 files changed, 74 insertions, 9 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index f297003c62f..98e874f4ffe 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,6 +37,9 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
+static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
+ uint32 *timeline);
+
/*
* Open a new WAL file in the specified directory.
*
@@ -627,26 +630,44 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* There are two possible reasons for that: a controlled shutdown,
* or we reached the end of the current timeline. In case of
* end-of-timeline, the server sends a result set after Copy has
- * finished, containing the next timeline's ID. Read that, and
- * restart streaming from the next timeline.
+ * finished, containing information about the next timeline. Read
+ * that, and restart streaming from the next timeline. In case of
+ * controlled shutdown, stop here.
*/
-
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
- * End-of-timeline. Read the next timeline's ID.
+ * End-of-timeline. Read the next timeline's ID and starting
+ * position. Usually, the starting position will match the end of
+ * the previous timeline, but there are corner cases like if the
+ * server had sent us half of a WAL record, when it was promoted.
+ * The new timeline will begin at the end of the last complete
+ * record in that case, overlapping the partial WAL record on the
+ * the old timeline.
*/
uint32 newtimeline;
+ bool parsed;
- newtimeline = atoi(PQgetvalue(res, 0, 0));
+ parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
PQclear(res);
+ if (!parsed)
+ goto error;
+ /* Sanity check the values the server gave us */
if (newtimeline <= timeline)
{
- /* shouldn't happen */
fprintf(stderr,
- "server reported unexpected next timeline %u, following timeline %u\n",
- newtimeline, timeline);
+ _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
+ progname, newtimeline, timeline);
+ goto error;
+ }
+ if (startpos > stoppos)
+ {
+ fprintf(stderr,
+ _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
+ progname,
+ timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
+ newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
goto error;
}
@@ -666,7 +687,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Always start streaming at the beginning of a segment.
*/
timeline = newtimeline;
- startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
+ startpos = startpos - (startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -705,6 +726,50 @@ error:
}
/*
+ * Helper function to parse the result set returned by server after streaming
+ * has finished. On failure, prints an error to stderr and returns false.
+ */
+static bool
+ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
+{
+ uint32 startpos_xlogid,
+ startpos_xrecoff;
+
+ /*----------
+ * The result set consists of one row and two columns, e.g:
+ *
+ * next_tli | next_tli_startpos
+ * ----------+-------------------
+ * 4 | 0/9949AE0
+ *
+ * next_tli is the timeline ID of the next timeline after the one that
+ * just finished streaming. next_tli_startpos is the XLOG position where
+ * the server switched to it.
+ *----------
+ */
+ if (PQnfields(res) < 2 || PQntuples(res) != 1)
+ {
+ fprintf(stderr,
+ _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 2);
+ return false;
+ }
+
+ *timeline = atoi(PQgetvalue(res, 0, 0));
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
+ &startpos_xrecoff) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse next timeline's starting point \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 1));
+ return false;
+ }
+ *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
+
+ return true;
+}
+
+/*
* The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*