diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 204 |
1 files changed, 86 insertions, 118 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 692d13716e4..0ff9aa19a9e 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -27,6 +27,7 @@ #include "libpq-fe.h" #include "access/xlog_internal.h" #include "common/file_utils.h" +#include "fe_utils/logging.h" /* fd and filename for currently open WAL file */ @@ -68,8 +69,8 @@ mark_file_as_archived(StreamCtl *stream, const char *fname) f = stream->walmethod->open_for_write(tmppath, NULL, 0); if (f == NULL) { - fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), - progname, tmppath, stream->walmethod->getlasterror()); + pg_log_error("could not create archive status file \"%s\": %s", + tmppath, stream->walmethod->getlasterror()); return false; } @@ -115,9 +116,8 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) size = stream->walmethod->get_file_size(fn); if (size < 0) { - fprintf(stderr, - _("%s: could not get size of write-ahead log file \"%s\": %s\n"), - progname, fn, stream->walmethod->getlasterror()); + pg_log_error("could not get size of write-ahead log file \"%s\": %s", + fn, stream->walmethod->getlasterror()); return false; } if (size == WalSegSz) @@ -126,18 +126,16 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0); if (f == NULL) { - fprintf(stderr, - _("%s: could not open existing write-ahead log file \"%s\": %s\n"), - progname, fn, stream->walmethod->getlasterror()); + pg_log_error("could not open existing write-ahead log file \"%s\": %s", + fn, stream->walmethod->getlasterror()); return false; } /* fsync file in case of a previous crash */ if (stream->walmethod->sync(f) != 0) { - fprintf(stderr, - _("%s: could not fsync existing write-ahead log file \"%s\": %s\n"), - progname, fn, stream->walmethod->getlasterror()); + pg_log_error("could not fsync existing write-ahead log file \"%s\": %s", + fn, stream->walmethod->getlasterror()); stream->walmethod->close(f, CLOSE_UNLINK); return false; } @@ -150,11 +148,10 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; - fprintf(stderr, - ngettext("%s: write-ahead log file \"%s\" has %d byte, should be 0 or %d\n", - "%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n", - size), - progname, fn, (int) size, WalSegSz); + pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d", + "write-ahead log file \"%s\" has %d bytes, should be 0 or %d", + size), + fn, (int) size, WalSegSz); return false; } /* File existed and was empty, so fall through and open */ @@ -166,9 +163,8 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) stream->partial_suffix, WalSegSz); if (f == NULL) { - fprintf(stderr, - _("%s: could not open write-ahead log file \"%s\": %s\n"), - progname, fn, stream->walmethod->getlasterror()); + pg_log_error("could not open write-ahead log file \"%s\": %s", + fn, stream->walmethod->getlasterror()); return false; } @@ -193,9 +189,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) currpos = stream->walmethod->get_current_pos(walfile); if (currpos == -1) { - fprintf(stderr, - _("%s: could not determine seek position in file \"%s\": %s\n"), - progname, current_walfile_name, stream->walmethod->getlasterror()); + pg_log_error("could not determine seek position in file \"%s\": %s", + current_walfile_name, stream->walmethod->getlasterror()); stream->walmethod->close(walfile, CLOSE_UNLINK); walfile = NULL; @@ -208,9 +203,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) r = stream->walmethod->close(walfile, CLOSE_NORMAL); else { - fprintf(stderr, - _("%s: not renaming \"%s%s\", segment is not complete\n"), - progname, current_walfile_name, stream->partial_suffix); + pg_log_info("not renaming \"%s%s\", segment is not complete", + current_walfile_name, stream->partial_suffix); r = stream->walmethod->close(walfile, CLOSE_NO_RENAME); } } @@ -221,8 +215,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) if (r != 0) { - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, stream->walmethod->getlasterror()); + pg_log_error("could not close file \"%s\": %s", + current_walfile_name, stream->walmethod->getlasterror()); return false; } @@ -278,23 +272,23 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) TLHistoryFileName(histfname, stream->timeline); if (strcmp(histfname, filename) != 0) { - fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), - progname, stream->timeline, filename); + pg_log_error("server reported unexpected history file name for timeline %u: %s", + stream->timeline, filename); return false; } f = stream->walmethod->open_for_write(histfname, ".tmp", 0); if (f == NULL) { - fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"), - progname, histfname, stream->walmethod->getlasterror()); + pg_log_error("could not create timeline history file \"%s\": %s", + histfname, stream->walmethod->getlasterror()); return false; } if ((int) stream->walmethod->write(f, content, size) != size) { - fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"), - progname, histfname, stream->walmethod->getlasterror()); + pg_log_error("could not write timeline history file \"%s\": %s", + histfname, stream->walmethod->getlasterror()); /* * If we fail to make the file, delete it to release disk space @@ -306,8 +300,8 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) { - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, histfname, stream->walmethod->getlasterror()); + pg_log_error("could not close file \"%s\": %s", + histfname, stream->walmethod->getlasterror()); return false; } @@ -349,8 +343,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { - fprintf(stderr, _("%s: could not send feedback packet: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not send feedback packet: %s", + PQerrorMessage(conn)); return false; } @@ -383,8 +377,7 @@ CheckServerVersionForStreaming(PGconn *conn) { const char *serverver = PQparameterStatus(conn, "server_version"); - fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"), - progname, + pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s", serverver ? serverver : "'unknown'", "9.3"); return false; @@ -393,8 +386,7 @@ CheckServerVersionForStreaming(PGconn *conn) { const char *serverver = PQparameterStatus(conn, "server_version"); - fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"), - progname, + pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s", serverver ? serverver : "'unknown'", PG_VERSION); return false; @@ -489,33 +481,28 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) res = PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, - _("%s: could not send replication command \"%s\": %s"), - progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + pg_log_error("could not send replication command \"%s\": %s", + "IDENTIFY_SYSTEM", PQerrorMessage(conn)); PQclear(res); return false; } if (PQntuples(res) != 1 || PQnfields(res) < 3) { - fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields", + PQntuples(res), PQnfields(res), 1, 3); PQclear(res); return false; } if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0) { - fprintf(stderr, - _("%s: system identifier does not match between base backup and streaming connection\n"), - progname); + pg_log_error("system identifier does not match between base backup and streaming connection"); PQclear(res); return false; } if (stream->timeline > atoi(PQgetvalue(res, 0, 1))) { - fprintf(stderr, - _("%s: starting timeline %u is not present in the server\n"), - progname, stream->timeline); + pg_log_error("starting timeline %u is not present in the server", + stream->timeline); PQclear(res); return false; } @@ -543,8 +530,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) if (PQresultStatus(res) != PGRES_TUPLES_OK) { /* FIXME: we might send it ok, but get an error */ - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); + pg_log_error("could not send replication command \"%s\": %s", + "TIMELINE_HISTORY", PQresultErrorMessage(res)); PQclear(res); return false; } @@ -555,9 +542,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) */ if (PQnfields(res) != 2 || PQntuples(res) != 1) { - fprintf(stderr, - _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 2); + pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields", + PQntuples(res), PQnfields(res), 1, 2); } /* Write the history file to disk */ @@ -583,8 +569,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "START_REPLICATION", PQresultErrorMessage(res)); + pg_log_error("could not send replication command \"%s\": %s", + "START_REPLICATION", PQresultErrorMessage(res)); PQclear(res); return false; } @@ -627,16 +613,13 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) /* Sanity check the values the server gave us */ if (newtimeline <= stream->timeline) { - fprintf(stderr, - _("%s: server reported unexpected next timeline %u, following timeline %u\n"), - progname, newtimeline, stream->timeline); + pg_log_error("server reported unexpected next timeline %u, following timeline %u", + newtimeline, stream->timeline); goto error; } if (stream->startpos > stoppos) { - fprintf(stderr, - _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), - progname, + pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X", stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos, newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos); goto error; @@ -646,9 +629,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), - progname, PQresultErrorMessage(res)); + pg_log_error("unexpected termination of replication stream: %s", + PQresultErrorMessage(res)); PQclear(res); goto error; } @@ -677,17 +659,15 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) return true; else { - fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), - progname); + pg_log_error("replication stream was terminated before stop point"); goto error; } } else { /* Server returned an error. */ - fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), - progname, PQresultErrorMessage(res)); + pg_log_error("unexpected termination of replication stream: %s", + PQresultErrorMessage(res)); PQclear(res); goto error; } @@ -695,8 +675,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) error: if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, stream->walmethod->getlasterror()); + pg_log_error("could not close file \"%s\": %s", + current_walfile_name, stream->walmethod->getlasterror()); walfile = NULL; return false; } @@ -725,9 +705,8 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) */ if (PQnfields(res) < 2 || PQntuples(res) != 1) { - fprintf(stderr, - _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 2); + pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields", + PQntuples(res), PQnfields(res), 1, 2); return false; } @@ -735,9 +714,8 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid, &startpos_xrecoff) != 2) { - fprintf(stderr, - _("%s: could not parse next timeline's starting point \"%s\"\n"), - progname, PQgetvalue(res, 0, 1)); + pg_log_error("could not parse next timeline's starting point \"%s\"", + PQgetvalue(res, 0, 1)); return false; } *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff; @@ -785,8 +763,8 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, { if (stream->walmethod->sync(walfile) != 0) { - fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, stream->walmethod->getlasterror()); + pg_log_error("could not fsync file \"%s\": %s", + current_walfile_name, stream->walmethod->getlasterror()); goto error; } lastFlushPosition = blockpos; @@ -855,8 +833,8 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, } else { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); + pg_log_error("unrecognized streaming header: \"%c\"", + copybuf[0]); goto error; } @@ -895,8 +873,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) connsocket = PQsocket(conn); if (connsocket < 0) { - fprintf(stderr, _("%s: invalid socket: %s"), progname, - PQerrorMessage(conn)); + pg_log_error("invalid socket: %s", PQerrorMessage(conn)); return -1; } @@ -924,8 +901,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) { if (errno == EINTR) return 0; /* Got a signal, so not an error */ - fprintf(stderr, _("%s: select() failed: %s\n"), - progname, strerror(errno)); + pg_log_error("select() failed: %m"); return -1; } if (ret > 0 && FD_ISSET(connsocket, &input_mask)) @@ -975,9 +951,8 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, /* Now there is actually data on the socket */ if (PQconsumeInput(conn) == 0) { - fprintf(stderr, - _("%s: could not receive data from WAL stream: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not receive data from WAL stream: %s", + PQerrorMessage(conn)); return -1; } @@ -990,8 +965,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, return -2; if (rawlen == -2) { - fprintf(stderr, _("%s: could not read COPY data: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not read COPY data: %s", PQerrorMessage(conn)); return -1; } @@ -1021,8 +995,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, if (len < pos + 1) { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, len); + pg_log_error("streaming header too small: %d", len); return false; } replyRequested = copybuf[pos]; @@ -1042,8 +1015,8 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, */ if (stream->walmethod->sync(walfile) != 0) { - fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, stream->walmethod->getlasterror()); + pg_log_error("could not fsync file \"%s\": %s", + current_walfile_name, stream->walmethod->getlasterror()); return false; } lastFlushPosition = blockpos; @@ -1088,8 +1061,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, hdr_len += 8; /* sendTime */ if (len < hdr_len) { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, len); + pg_log_error("streaming header too small: %d", len); return false; } *blockpos = fe_recvint64(©buf[1]); @@ -1106,9 +1078,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, /* No file open yet */ if (xlogoff != 0) { - fprintf(stderr, - _("%s: received write-ahead log record for offset %u with no file open\n"), - progname, xlogoff); + pg_log_error("received write-ahead log record for offset %u with no file open", + xlogoff); return false; } } @@ -1117,9 +1088,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, /* More data in existing segment */ if (stream->walmethod->get_current_pos(walfile) != xlogoff) { - fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile)); + pg_log_error("got WAL data offset %08x, expected %08x", + xlogoff, (int) stream->walmethod->get_current_pos(walfile)); return false; } } @@ -1152,10 +1122,9 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written, bytes_to_write) != bytes_to_write) { - fprintf(stderr, - _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), - progname, bytes_to_write, current_walfile_name, - stream->walmethod->getlasterror()); + pg_log_error("could not write %u bytes to WAL file \"%s\": %s", + bytes_to_write, current_walfile_name, + stream->walmethod->getlasterror()); return false; } @@ -1178,8 +1147,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not send copy-end packet: %s", + PQerrorMessage(conn)); return false; } still_sending = false; @@ -1218,9 +1187,8 @@ HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { - fprintf(stderr, - _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not send copy-end packet: %s", + PQerrorMessage(conn)); PQclear(res); return NULL; } @@ -1250,8 +1218,8 @@ CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, } if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not send copy-end packet: %s", + PQerrorMessage(conn)); return false; } still_sending = false; |