aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c204
1 files changed, 86 insertions, 118 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 692d13716e4..0ff9aa19a9e 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -27,6 +27,7 @@
#include "libpq-fe.h"
#include "access/xlog_internal.h"
#include "common/file_utils.h"
+#include "fe_utils/logging.h"
/* fd and filename for currently open WAL file */
@@ -68,8 +69,8 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
f = stream->walmethod->open_for_write(tmppath, NULL, 0);
if (f == NULL)
{
- fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
- progname, tmppath, stream->walmethod->getlasterror());
+ pg_log_error("could not create archive status file \"%s\": %s",
+ tmppath, stream->walmethod->getlasterror());
return false;
}
@@ -115,9 +116,8 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
size = stream->walmethod->get_file_size(fn);
if (size < 0)
{
- fprintf(stderr,
- _("%s: could not get size of write-ahead log file \"%s\": %s\n"),
- progname, fn, stream->walmethod->getlasterror());
+ pg_log_error("could not get size of write-ahead log file \"%s\": %s",
+ fn, stream->walmethod->getlasterror());
return false;
}
if (size == WalSegSz)
@@ -126,18 +126,16 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
if (f == NULL)
{
- fprintf(stderr,
- _("%s: could not open existing write-ahead log file \"%s\": %s\n"),
- progname, fn, stream->walmethod->getlasterror());
+ pg_log_error("could not open existing write-ahead log file \"%s\": %s",
+ fn, stream->walmethod->getlasterror());
return false;
}
/* fsync file in case of a previous crash */
if (stream->walmethod->sync(f) != 0)
{
- fprintf(stderr,
- _("%s: could not fsync existing write-ahead log file \"%s\": %s\n"),
- progname, fn, stream->walmethod->getlasterror());
+ pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
+ fn, stream->walmethod->getlasterror());
stream->walmethod->close(f, CLOSE_UNLINK);
return false;
}
@@ -150,11 +148,10 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
- fprintf(stderr,
- ngettext("%s: write-ahead log file \"%s\" has %d byte, should be 0 or %d\n",
- "%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n",
- size),
- progname, fn, (int) size, WalSegSz);
+ pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
+ "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
+ size),
+ fn, (int) size, WalSegSz);
return false;
}
/* File existed and was empty, so fall through and open */
@@ -166,9 +163,8 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
stream->partial_suffix, WalSegSz);
if (f == NULL)
{
- fprintf(stderr,
- _("%s: could not open write-ahead log file \"%s\": %s\n"),
- progname, fn, stream->walmethod->getlasterror());
+ pg_log_error("could not open write-ahead log file \"%s\": %s",
+ fn, stream->walmethod->getlasterror());
return false;
}
@@ -193,9 +189,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
currpos = stream->walmethod->get_current_pos(walfile);
if (currpos == -1)
{
- fprintf(stderr,
- _("%s: could not determine seek position in file \"%s\": %s\n"),
- progname, current_walfile_name, stream->walmethod->getlasterror());
+ pg_log_error("could not determine seek position in file \"%s\": %s",
+ current_walfile_name, stream->walmethod->getlasterror());
stream->walmethod->close(walfile, CLOSE_UNLINK);
walfile = NULL;
@@ -208,9 +203,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
else
{
- fprintf(stderr,
- _("%s: not renaming \"%s%s\", segment is not complete\n"),
- progname, current_walfile_name, stream->partial_suffix);
+ pg_log_info("not renaming \"%s%s\", segment is not complete",
+ current_walfile_name, stream->partial_suffix);
r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
}
}
@@ -221,8 +215,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
if (r != 0)
{
- fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, stream->walmethod->getlasterror());
+ pg_log_error("could not close file \"%s\": %s",
+ current_walfile_name, stream->walmethod->getlasterror());
return false;
}
@@ -278,23 +272,23 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
TLHistoryFileName(histfname, stream->timeline);
if (strcmp(histfname, filename) != 0)
{
- fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
- progname, stream->timeline, filename);
+ pg_log_error("server reported unexpected history file name for timeline %u: %s",
+ stream->timeline, filename);
return false;
}
f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
if (f == NULL)
{
- fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
- progname, histfname, stream->walmethod->getlasterror());
+ pg_log_error("could not create timeline history file \"%s\": %s",
+ histfname, stream->walmethod->getlasterror());
return false;
}
if ((int) stream->walmethod->write(f, content, size) != size)
{
- fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
- progname, histfname, stream->walmethod->getlasterror());
+ pg_log_error("could not write timeline history file \"%s\": %s",
+ histfname, stream->walmethod->getlasterror());
/*
* If we fail to make the file, delete it to release disk space
@@ -306,8 +300,8 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
{
- fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, histfname, stream->walmethod->getlasterror());
+ pg_log_error("could not close file \"%s\": %s",
+ histfname, stream->walmethod->getlasterror());
return false;
}
@@ -349,8 +343,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
{
- fprintf(stderr, _("%s: could not send feedback packet: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not send feedback packet: %s",
+ PQerrorMessage(conn));
return false;
}
@@ -383,8 +377,7 @@ CheckServerVersionForStreaming(PGconn *conn)
{
const char *serverver = PQparameterStatus(conn, "server_version");
- fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
- progname,
+ pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
serverver ? serverver : "'unknown'",
"9.3");
return false;
@@ -393,8 +386,7 @@ CheckServerVersionForStreaming(PGconn *conn)
{
const char *serverver = PQparameterStatus(conn, "server_version");
- fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
- progname,
+ pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
serverver ? serverver : "'unknown'",
PG_VERSION);
return false;
@@ -489,33 +481,28 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- fprintf(stderr,
- _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ pg_log_error("could not send replication command \"%s\": %s",
+ "IDENTIFY_SYSTEM", PQerrorMessage(conn));
PQclear(res);
return false;
}
if (PQntuples(res) != 1 || PQnfields(res) < 3)
{
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 3);
+ pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
+ PQntuples(res), PQnfields(res), 1, 3);
PQclear(res);
return false;
}
if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
- fprintf(stderr,
- _("%s: system identifier does not match between base backup and streaming connection\n"),
- progname);
+ pg_log_error("system identifier does not match between base backup and streaming connection");
PQclear(res);
return false;
}
if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{
- fprintf(stderr,
- _("%s: starting timeline %u is not present in the server\n"),
- progname, stream->timeline);
+ pg_log_error("starting timeline %u is not present in the server",
+ stream->timeline);
PQclear(res);
return false;
}
@@ -543,8 +530,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
/* FIXME: we might send it ok, but get an error */
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
+ pg_log_error("could not send replication command \"%s\": %s",
+ "TIMELINE_HISTORY", PQresultErrorMessage(res));
PQclear(res);
return false;
}
@@ -555,9 +542,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
*/
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
- fprintf(stderr,
- _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 2);
+ pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
+ PQntuples(res), PQnfields(res), 1, 2);
}
/* Write the history file to disk */
@@ -583,8 +569,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "START_REPLICATION", PQresultErrorMessage(res));
+ pg_log_error("could not send replication command \"%s\": %s",
+ "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res);
return false;
}
@@ -627,16 +613,13 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
/* Sanity check the values the server gave us */
if (newtimeline <= stream->timeline)
{
- fprintf(stderr,
- _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
- progname, newtimeline, stream->timeline);
+ pg_log_error("server reported unexpected next timeline %u, following timeline %u",
+ newtimeline, stream->timeline);
goto error;
}
if (stream->startpos > stoppos)
{
- fprintf(stderr,
- _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
- progname,
+ pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error;
@@ -646,9 +629,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- fprintf(stderr,
- _("%s: unexpected termination of replication stream: %s"),
- progname, PQresultErrorMessage(res));
+ pg_log_error("unexpected termination of replication stream: %s",
+ PQresultErrorMessage(res));
PQclear(res);
goto error;
}
@@ -677,17 +659,15 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
return true;
else
{
- fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
- progname);
+ pg_log_error("replication stream was terminated before stop point");
goto error;
}
}
else
{
/* Server returned an error. */
- fprintf(stderr,
- _("%s: unexpected termination of replication stream: %s"),
- progname, PQresultErrorMessage(res));
+ pg_log_error("unexpected termination of replication stream: %s",
+ PQresultErrorMessage(res));
PQclear(res);
goto error;
}
@@ -695,8 +675,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
error:
if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
- fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, stream->walmethod->getlasterror());
+ pg_log_error("could not close file \"%s\": %s",
+ current_walfile_name, stream->walmethod->getlasterror());
walfile = NULL;
return false;
}
@@ -725,9 +705,8 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
{
- fprintf(stderr,
- _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 2);
+ pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
+ PQntuples(res), PQnfields(res), 1, 2);
return false;
}
@@ -735,9 +714,8 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
&startpos_xrecoff) != 2)
{
- fprintf(stderr,
- _("%s: could not parse next timeline's starting point \"%s\"\n"),
- progname, PQgetvalue(res, 0, 1));
+ pg_log_error("could not parse next timeline's starting point \"%s\"",
+ PQgetvalue(res, 0, 1));
return false;
}
*startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
@@ -785,8 +763,8 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
{
if (stream->walmethod->sync(walfile) != 0)
{
- fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, stream->walmethod->getlasterror());
+ pg_log_error("could not fsync file \"%s\": %s",
+ current_walfile_name, stream->walmethod->getlasterror());
goto error;
}
lastFlushPosition = blockpos;
@@ -855,8 +833,8 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
}
else
{
- fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
- progname, copybuf[0]);
+ pg_log_error("unrecognized streaming header: \"%c\"",
+ copybuf[0]);
goto error;
}
@@ -895,8 +873,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
connsocket = PQsocket(conn);
if (connsocket < 0)
{
- fprintf(stderr, _("%s: invalid socket: %s"), progname,
- PQerrorMessage(conn));
+ pg_log_error("invalid socket: %s", PQerrorMessage(conn));
return -1;
}
@@ -924,8 +901,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
{
if (errno == EINTR)
return 0; /* Got a signal, so not an error */
- fprintf(stderr, _("%s: select() failed: %s\n"),
- progname, strerror(errno));
+ pg_log_error("select() failed: %m");
return -1;
}
if (ret > 0 && FD_ISSET(connsocket, &input_mask))
@@ -975,9 +951,8 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
/* Now there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
- fprintf(stderr,
- _("%s: could not receive data from WAL stream: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not receive data from WAL stream: %s",
+ PQerrorMessage(conn));
return -1;
}
@@ -990,8 +965,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
return -2;
if (rawlen == -2)
{
- fprintf(stderr, _("%s: could not read COPY data: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
return -1;
}
@@ -1021,8 +995,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
if (len < pos + 1)
{
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, len);
+ pg_log_error("streaming header too small: %d", len);
return false;
}
replyRequested = copybuf[pos];
@@ -1042,8 +1015,8 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
*/
if (stream->walmethod->sync(walfile) != 0)
{
- fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, stream->walmethod->getlasterror());
+ pg_log_error("could not fsync file \"%s\": %s",
+ current_walfile_name, stream->walmethod->getlasterror());
return false;
}
lastFlushPosition = blockpos;
@@ -1088,8 +1061,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
hdr_len += 8; /* sendTime */
if (len < hdr_len)
{
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, len);
+ pg_log_error("streaming header too small: %d", len);
return false;
}
*blockpos = fe_recvint64(&copybuf[1]);
@@ -1106,9 +1078,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
/* No file open yet */
if (xlogoff != 0)
{
- fprintf(stderr,
- _("%s: received write-ahead log record for offset %u with no file open\n"),
- progname, xlogoff);
+ pg_log_error("received write-ahead log record for offset %u with no file open",
+ xlogoff);
return false;
}
}
@@ -1117,9 +1088,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
/* More data in existing segment */
if (stream->walmethod->get_current_pos(walfile) != xlogoff)
{
- fprintf(stderr,
- _("%s: got WAL data offset %08x, expected %08x\n"),
- progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
+ pg_log_error("got WAL data offset %08x, expected %08x",
+ xlogoff, (int) stream->walmethod->get_current_pos(walfile));
return false;
}
}
@@ -1152,10 +1122,9 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write)
{
- fprintf(stderr,
- _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
- progname, bytes_to_write, current_walfile_name,
- stream->walmethod->getlasterror());
+ pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
+ bytes_to_write, current_walfile_name,
+ stream->walmethod->getlasterror());
return false;
}
@@ -1178,8 +1147,8 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not send copy-end packet: %s",
+ PQerrorMessage(conn));
return false;
}
still_sending = false;
@@ -1218,9 +1187,8 @@ HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
- fprintf(stderr,
- _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not send copy-end packet: %s",
+ PQerrorMessage(conn));
PQclear(res);
return NULL;
}
@@ -1250,8 +1218,8 @@ CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not send copy-end packet: %s",
+ PQerrorMessage(conn));
return false;
}
still_sending = false;