diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 687 |
1 files changed, 487 insertions, 200 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 88d0c136b07..03e275cb5b6 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -28,19 +28,24 @@ #include "streamutil.h" -/* fd for currently open WAL file */ +/* fd and filename for currently open WAL file */ static int walfile = -1; +static char current_walfile_name[MAXPGPATH] = ""; + +static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + int standby_message_timeout, char *partial_suffix, + XLogRecPtr *stoppos); /* - * Open a new WAL file in the specified directory. Store the name - * (not including the full directory) in namebuf. Assumes there is - * enough room in this buffer... + * Open a new WAL file in the specified directory. * - * The file will be padded to 16Mb with zeroes. + * The file will be padded to 16Mb with zeroes. The base filename (without + * partial_suffix) is stored in current_walfile_name. */ -static int +static bool open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, - char *namebuf) + char *partial_suffix) { int f; char fn[MAXPGPATH]; @@ -50,16 +55,17 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, XLogSegNo segno; XLByteToSeg(startpoint, segno); - XLogFileName(namebuf, timeline, segno); + XLogFileName(current_walfile_name, timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf); + snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, + partial_suffix ? partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { fprintf(stderr, _("%s: could not open transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); - return -1; + return false; } /* @@ -72,17 +78,21 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, _("%s: could not stat transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); close(f); - return -1; + return false; } if (statbuf.st_size == XLogSegSize) - return f; /* File is open and ready to use */ + { + /* File is open and ready to use */ + walfile = f; + return true; + } if (statbuf.st_size != 0) { fprintf(stderr, _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), progname, fn, (int) statbuf.st_size, XLogSegSize); close(f); - return -1; + return false; } /* New, empty, file. So pad it to 16Mb with zeroes */ @@ -97,7 +107,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, free(zerobuf); close(f); unlink(fn); - return -1; + return false; } } free(zerobuf); @@ -108,42 +118,45 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); close(f); - return -1; + return false; } - return f; + walfile = f; + return true; } /* - * Close the current WAL file, and rename it to the correct filename if it's - * complete. - * - * If segment_complete is true, rename the current WAL file even if we've not - * completed writing the whole segment. + * Close the current WAL file (if open), and rename it to the correct + * filename if it's complete. On failure, prints an error message to stderr + * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *walname, bool segment_complete) +close_walfile(char *basedir, char *partial_suffix) { - off_t currpos = lseek(walfile, 0, SEEK_CUR); + off_t currpos; + + if (walfile == -1) + return true; + currpos = lseek(walfile, 0, SEEK_CUR); if (currpos == -1) { fprintf(stderr, _("%s: could not determine seek position in file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } if (fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } if (close(walfile) != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); walfile = -1; return false; } @@ -153,24 +166,24 @@ close_walfile(char *basedir, char *walname, bool segment_complete) * Rename the .partial file only if we've completed writing the whole * segment or segment_complete is true. */ - if (currpos == XLOG_SEG_SIZE || segment_complete) + if (currpos == XLOG_SEG_SIZE && partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; - snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname); - snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); + snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); + snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } } - else + else if (partial_suffix) fprintf(stderr, - _("%s: not renaming \"%s\", segment is not complete\n"), - progname, walname); + _("%s: not renaming \"%s%s\", segment is not complete\n"), + progname, current_walfile_name, partial_suffix); return true; } @@ -234,6 +247,123 @@ localTimestampDifferenceExceeds(int64 start_time, } /* + * Check if a timeline history file exists. + */ +static bool +existsTimeLineHistoryFile(char *basedir, TimeLineID tli) +{ + char path[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + int fd; + + /* + * Timeline 1 never has a history file. We treat that as if it existed, + * since we never need to stream it. + */ + if (tli == 1) + return true; + + TLHistoryFileName(histfname, tli); + + snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + + fd = open(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + { + if (errno != ENOENT) + fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s"), + progname, path, strerror(errno)); + return false; + } + else + { + close(fd); + return true; + } +} + +static bool +writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content) +{ + int size = strlen(content); + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + int fd; + + /* + * Check that the server's idea of how timeline history files should be + * named matches ours. + */ + TLHistoryFileName(histfname, tli); + if (strcmp(histfname, filename) != 0) + { + fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s"), + progname, tli, filename); + return false; + } + + /* + * Write into a temp file name. + */ + snprintf(tmppath, MAXPGPATH, "%s.tmp", path); + + unlink(tmppath); + + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s"), + progname, tmppath, strerror(errno)); + return false; + } + + errno = 0; + if ((int) write(fd, content, size) != size) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk space + */ + unlink(tmppath); + errno = save_errno; + + fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s"), + progname, tmppath, strerror(errno)); + return false; + } + + if (fsync(fd) != 0) + { + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + if (close(fd) != 0) + { + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + /* + * Now move the completed history file into place with its final name. + */ + + snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + if (rename(tmppath, path) < 0) + { + fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"), + progname, tmppath, path, strerror(errno)); + return false; + } + + return true; +} + +/* * Converts an int64 to network byte order. */ static void @@ -314,7 +444,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) * (by sending an extra IDENTIFY_SYSTEM command) * * All received segments will be written to the directory - * specified by basedir. + * specified by basedir. This will also fetch any missing timeline history + * files. * * The stream_stop callback will be called every time data * is received, and whenever a segment is completed. If it returns @@ -327,20 +458,22 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) * This message will only contain the write location, and never * flush or replay. * + * If 'partial_suffix' is not NULL, files are initially created with the + * given suffix, and the suffix is removed once the file is finished. That + * allows you to tell the difference between partial and completed files, + * so that you can continue later where you left. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, bool rename_partial) + int standby_message_timeout, char *partial_suffix) { char query[128]; - char current_walfile_name[MAXPGPATH]; PGresult *res; - char *copybuf = NULL; - int64 last_status = -1; - XLogRecPtr blockpos = InvalidXLogRecPtr; + XLogRecPtr stoppos; /* * The message format used in streaming replication changed in 9.3, so we @@ -359,7 +492,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (sysidentifier != NULL) { - /* Validate system identifier and timeline hasn't changed */ + /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -385,33 +518,184 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (timeline != atoi(PQgetvalue(res, 0, 1))) + if (timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, - _("%s: timeline does not match between base backup and streaming connection\n"), - progname); + _("%s: starting timeline %u is not present in the server\n"), + progname, timeline); PQclear(res); return false; } PQclear(res); } - /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %X/%X", - (uint32) (startpos >> 32), (uint32) startpos); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_COPY_BOTH) + while (1) { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "START_REPLICATION", PQresultErrorMessage(res)); + /* + * Fetch the timeline history file for this timeline, if we don't + * have it already. + */ + if (!existsTimeLineHistoryFile(basedir, timeline)) + { + snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + /* FIXME: we might send it ok, but get an error */ + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); + PQclear(res); + return false; + } + + /* + * The response to TIMELINE_HISTORY is a single row result set + * with two fields: filename and content + */ + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + fprintf(stderr, + _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 2); + } + + /* Write the history file to disk */ + writeTimeLineHistoryFile(basedir, timeline, + PQgetvalue(res, 0, 0), + PQgetvalue(res, 0, 1)); + + PQclear(res); + } + + /* + * Before we start streaming from the requested location, check + * if the callback tells us to stop here. + */ + if (stream_stop(startpos, timeline, false)) + return true; + + /* Initiate the replication stream at specified location */ + snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpos >> 32), (uint32) startpos, + timeline); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "START_REPLICATION", PQresultErrorMessage(res)); + PQclear(res); + return false; + } PQclear(res); - return false; + + /* Stream the WAL */ + if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, + standby_message_timeout, partial_suffix, + &stoppos)) + goto error; + + /* + * Streaming finished. + * + * There are two possible reasons for that: a controlled shutdown, + * or we reached the end of the current timeline. In case of + * end-of-timeline, the server sends a result set after Copy has + * finished, containing the next timeline's ID. Read that, and + * restart streaming from the next timeline. + */ + + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + /* + * End-of-timeline. Read the next timeline's ID. + */ + uint32 newtimeline; + + newtimeline = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + if (newtimeline <= timeline) + { + /* shouldn't happen */ + fprintf(stderr, + "server reported unexpected next timeline %u, following timeline %u\n", + newtimeline, timeline); + goto error; + } + + /* Read the final result, which should be CommandComplete. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } + PQclear(res); + + /* + * Loop back to start streaming from the new timeline. + * Always start streaming at the beginning of a segment. + */ + timeline = newtimeline; + startpos = stoppos - (stoppos % XLOG_SEG_SIZE); + continue; + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * End of replication (ie. controlled shut down of the server). + * + * Check if the callback thinks it's OK to stop here. If not, + * complain. + */ + if (stream_stop(stoppos, timeline, false)) + return true; + else + { + fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), + progname); + goto error; + } + } + else + { + /* Server returned an error. */ + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } } - PQclear(res); - /* - * Receive the actual xlog data - */ +error: + if (walfile != -1 && close(walfile) != 0) + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, current_walfile_name, strerror(errno)); + walfile = -1; + return false; +} + +/* + * The main loop of ReceiveXLogStream. Handles the COPY stream after + * initiating streaming with the START_STREAMING command. + * + * If the COPY ends normally, returns true and sets *stoppos to the last + * byte written. On error, returns false. + */ +static bool +HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + int standby_message_timeout, char *partial_suffix, + XLogRecPtr *stoppos) +{ + char *copybuf = NULL; + int64 last_status = -1; + XLogRecPtr blockpos = startpos; + bool still_sending = true; + while (1) { int r; @@ -430,20 +714,27 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ - if (stream_stop && stream_stop(blockpos, timeline, false)) + if (still_sending && stream_stop(blockpos, timeline, false)) { - if (walfile != -1 && !close_walfile(basedir, current_walfile_name, - rename_partial)) + if (!close_walfile(basedir, partial_suffix)) + { /* Potential error message is written by close_walfile */ goto error; - return true; + } + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; } /* * Potentially send a status message to the master */ now = localGetCurrentTimestamp(); - if (standby_message_timeout > 0 && + if (still_sending && standby_message_timeout > 0 && localTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) { @@ -457,9 +748,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == 0) { /* - * In async mode, and no data available. We block on reading but - * not more than the specified timeout, so that we can send a - * response back to the client. + * No data available. Wait for some to appear, but not longer + * than the specified timeout, so that we can ping the server. */ fd_set input_mask; struct timeval timeout; @@ -467,7 +757,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, FD_ZERO(&input_mask); FD_SET(PQsocket(conn), &input_mask); - if (standby_message_timeout) + if (standby_message_timeout && still_sending) { int64 targettime; long secs; @@ -493,8 +783,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, { /* * Got a timeout or signal. Continue the loop and either - * deliver a status packet to the server or just go back into - * blocking. + * deliver a status packet to the server or just go back + * into blocking. */ continue; } @@ -515,8 +805,31 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, continue; } if (r == -1) - /* End of copy stream */ - break; + { + /* + * The server closed its end of the copy stream. Close ours + * if we haven't done so already, and exit. + */ + if (still_sending) + { + if (!close_walfile(basedir, partial_suffix)) + { + /* Error message written in close_walfile() */ + goto error; + } + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; + } + if (copybuf != NULL) + PQfreemem(copybuf); + *stoppos = blockpos; + return true; + } if (r == -2) { fprintf(stderr, _("%s: could not read COPY data: %s"), @@ -548,174 +861,148 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, replyRequested = copybuf[pos]; /* If the server requested an immediate reply, send one. */ - if (replyRequested) + if (replyRequested && still_sending) { now = localGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; } - continue; } - else if (copybuf[0] != 'w') - { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); - goto error; - } - - /* - * Read the header of the XLogData message, enclosed in the CopyData - * message. We only need the WAL location field (dataStart), the rest - * of the header is ignored. - */ - hdr_len = 1; /* msgtype 'w' */ - hdr_len += 8; /* dataStart */ - hdr_len += 8; /* walEnd */ - hdr_len += 8; /* sendTime */ - if (r < hdr_len + 1) + else if (copybuf[0] == 'w') { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); - goto error; - } - blockpos = recvint64(©buf[1]); - - /* Extract WAL location for this block */ - xlogoff = blockpos % XLOG_SEG_SIZE; + /* + * Once we've decided we don't want to receive any more, just + * ignore any subsequent XLogData messages. + */ + if (!still_sending) + continue; - /* - * Verify that the initial location in the stream matches where we - * think we are. - */ - if (walfile == -1) - { - /* No file open yet */ - if (xlogoff != 0) - { - fprintf(stderr, - _("%s: received transaction log record for offset %u with no file open\n"), - progname, xlogoff); - goto error; - } - } - else - { - /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + /* + * Read the header of the XLogData message, enclosed in the + * CopyData message. We only need the WAL location field + * (dataStart), the rest of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) { - fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); goto error; } - } - - bytes_left = r - hdr_len; - bytes_written = 0; + blockpos = recvint64(©buf[1]); - while (bytes_left) - { - int bytes_to_write; + /* Extract WAL location for this block */ + xlogoff = blockpos % XLOG_SEG_SIZE; /* - * If crossing a WAL boundary, only write up until we reach - * XLOG_SEG_SIZE. + * Verify that the initial location in the stream matches where + * we think we are. */ - if (xlogoff + bytes_left > XLOG_SEG_SIZE) - bytes_to_write = XLOG_SEG_SIZE - xlogoff; - else - bytes_to_write = bytes_left; - if (walfile == -1) { - walfile = open_walfile(blockpos, timeline, - basedir, current_walfile_name); - if (walfile == -1) - /* Error logged by open_walfile */ + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, + _("%s: received transaction log record for offset %u with no file open\n"), + progname, xlogoff); goto error; + } } - - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + else { - fprintf(stderr, - _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), - progname, bytes_to_write, current_walfile_name, - strerror(errno)); - goto error; + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, + _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + goto error; + } } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - blockpos += bytes_to_write; - xlogoff += bytes_to_write; + bytes_left = r - hdr_len; + bytes_written = 0; - /* Did we reach the end of a WAL segment? */ - if (blockpos % XLOG_SEG_SIZE == 0) + while (bytes_left) { - if (!close_walfile(basedir, current_walfile_name, false)) - /* Error message written in close_walfile() */ - goto error; + int bytes_to_write; - xlogoff = 0; + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; - if (stream_stop != NULL) + if (walfile == -1) { - /* - * Callback when the segment finished, and return if it - * told us to. - */ - if (stream_stop(blockpos, timeline, true)) - return true; + if (!open_walfile(blockpos, timeline, + basedir, partial_suffix)) + { + /* Error logged by open_walfile */ + goto error; + } } - } - } - /* No more data left to write, start receiving next copy packet */ - } - /* - * The only way to get out of the loop is if the server shut down the - * replication stream. If it's a controlled shutdown, the server will send - * a shutdown message, and we'll return the latest xlog location that has - * been streamed. - */ + if (write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, + _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), + progname, bytes_to_write, current_walfile_name, + strerror(errno)); + goto error; + } - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), - progname, PQresultErrorMessage(res)); - goto error; - } - PQclear(res); + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + blockpos += bytes_to_write; + xlogoff += bytes_to_write; - /* Complain if we've not reached stop point yet */ - if (stream_stop != NULL && !stream_stop(blockpos, timeline, false)) - { - fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), - progname); - goto error; + /* Did we reach the end of a WAL segment? */ + if (blockpos % XLOG_SEG_SIZE == 0) + { + if (!close_walfile(basedir, partial_suffix)) + /* Error message written in close_walfile() */ + goto error; + + xlogoff = 0; + + if (still_sending && stream_stop(blockpos, timeline, false)) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; + break; /* ignore the rest of this XLogData packet */ + } + } + } + /* No more data left to write, receive next copy packet */ + } + else + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + goto error; + } } - if (copybuf != NULL) - PQfreemem(copybuf); - if (walfile != -1 && close(walfile) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; - return true; - error: if (copybuf != NULL) PQfreemem(copybuf); - if (walfile != -1 && close(walfile) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; return false; } |