aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/pg_recvlogical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/pg_recvlogical.c')
-rw-r--r--src/bin/pg_basebackup/pg_recvlogical.c147
1 files changed, 54 insertions, 93 deletions
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 10429a529d9..3c95f231a2a 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -25,6 +25,7 @@
#include "access/xlog_internal.h"
#include "common/file_perm.h"
#include "common/fe_memutils.h"
+#include "fe_utils/logging.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
@@ -131,9 +132,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
return true;
if (verbose)
- fprintf(stderr,
- _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
- progname,
+ pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
replication_slot);
@@ -157,8 +156,8 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
{
- fprintf(stderr, _("%s: could not send feedback packet: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not send feedback packet: %s",
+ PQerrorMessage(conn));
return false;
}
@@ -193,9 +192,7 @@ OutputFsync(TimestampTz now)
if (fsync(outfd) != 0)
{
- fprintf(stderr,
- _("%s: could not fsync file \"%s\": %s\n"),
- progname, outfile, strerror(errno));
+ pg_log_error("could not fsync file \"%s\": %m", outfile);
return false;
}
@@ -232,10 +229,9 @@ StreamLogicalLog(void)
* Start the replication
*/
if (verbose)
- fprintf(stderr,
- _("%s: starting log streaming at %X/%X (slot %s)\n"),
- progname, (uint32) (startpos >> 32), (uint32) startpos,
- replication_slot);
+ pg_log_info("starting log streaming at %X/%X (slot %s)",
+ (uint32) (startpos >> 32), (uint32) startpos,
+ replication_slot);
/* Initiate the replication stream at specified location */
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
@@ -265,8 +261,8 @@ StreamLogicalLog(void)
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query->data, PQresultErrorMessage(res));
+ pg_log_error("could not send replication command \"%s\": %s",
+ query->data, PQresultErrorMessage(res));
PQclear(res);
goto error;
}
@@ -274,9 +270,7 @@ StreamLogicalLog(void)
resetPQExpBuffer(query);
if (verbose)
- fprintf(stderr,
- _("%s: streaming initiated\n"),
- progname);
+ pg_log_info("streaming initiated");
while (!time_to_abort)
{
@@ -340,16 +334,12 @@ StreamLogicalLog(void)
S_IRUSR | S_IWUSR);
if (outfd == -1)
{
- fprintf(stderr,
- _("%s: could not open log file \"%s\": %s\n"),
- progname, outfile, strerror(errno));
+ pg_log_error("could not open log file \"%s\": %m", outfile);
goto error;
}
if (fstat(outfd, &statbuf) != 0)
- fprintf(stderr,
- _("%s: could not stat file \"%s\": %s\n"),
- progname, outfile, strerror(errno));
+ pg_log_error("could not stat file \"%s\": %m", outfile);
output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
}
@@ -370,9 +360,7 @@ StreamLogicalLog(void)
if (PQsocket(conn) < 0)
{
- fprintf(stderr,
- _("%s: invalid socket: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("invalid socket: %s", PQerrorMessage(conn));
goto error;
}
@@ -425,17 +413,15 @@ StreamLogicalLog(void)
}
else if (r < 0)
{
- fprintf(stderr, _("%s: select() failed: %s\n"),
- progname, strerror(errno));
+ pg_log_error("select() failed: %m");
goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
- fprintf(stderr,
- _("%s: could not receive data from WAL stream: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not receive data from WAL stream: %s",
+ PQerrorMessage(conn));
goto error;
}
continue;
@@ -448,8 +434,8 @@ StreamLogicalLog(void)
/* Failure while reading the copy stream */
if (r == -2)
{
- fprintf(stderr, _("%s: could not read COPY data: %s"),
- progname, PQerrorMessage(conn));
+ pg_log_error("could not read COPY data: %s",
+ PQerrorMessage(conn));
goto error;
}
@@ -476,8 +462,7 @@ StreamLogicalLog(void)
if (r < pos + 1)
{
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
+ pg_log_error("streaming header too small: %d", r);
goto error;
}
replyRequested = copybuf[pos];
@@ -512,8 +497,8 @@ StreamLogicalLog(void)
}
else if (copybuf[0] != 'w')
{
- fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
- progname, copybuf[0]);
+ pg_log_error("unrecognized streaming header: \"%c\"",
+ copybuf[0]);
goto error;
}
@@ -528,8 +513,7 @@ StreamLogicalLog(void)
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
+ pg_log_error("streaming header too small: %d", r);
goto error;
}
@@ -567,10 +551,8 @@ StreamLogicalLog(void)
if (ret < 0)
{
- fprintf(stderr,
- _("%s: could not write %u bytes to log file \"%s\": %s\n"),
- progname, bytes_left, outfile,
- strerror(errno));
+ pg_log_error("could not write %u bytes to log file \"%s\": %m",
+ bytes_left, outfile);
goto error;
}
@@ -581,10 +563,8 @@ StreamLogicalLog(void)
if (write(outfd, "\n", 1) != 1)
{
- fprintf(stderr,
- _("%s: could not write %u bytes to log file \"%s\": %s\n"),
- progname, 1, outfile,
- strerror(errno));
+ pg_log_error("could not write %u bytes to log file \"%s\": %m",
+ 1, outfile);
goto error;
}
@@ -611,9 +591,8 @@ StreamLogicalLog(void)
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- fprintf(stderr,
- _("%s: unexpected termination of replication stream: %s"),
- progname, PQresultErrorMessage(res));
+ pg_log_error("unexpected termination of replication stream: %s",
+ PQresultErrorMessage(res));
goto error;
}
PQclear(res);
@@ -626,8 +605,7 @@ StreamLogicalLog(void)
OutputFsync(t);
if (close(outfd) != 0)
- fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, outfile, strerror(errno));
+ pg_log_error("could not close file \"%s\": %m", outfile);
}
outfd = -1;
error:
@@ -705,6 +683,7 @@ main(int argc, char **argv)
lo;
char *db_name;
+ pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
@@ -736,8 +715,7 @@ main(int argc, char **argv)
fsync_interval = atoi(optarg) * 1000;
if (fsync_interval < 0)
{
- fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
- progname, optarg);
+ pg_log_error("invalid fsync interval \"%s\"", optarg);
exit(1);
}
break;
@@ -757,8 +735,7 @@ main(int argc, char **argv)
case 'p':
if (atoi(optarg) <= 0)
{
- fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
- progname, optarg);
+ pg_log_error("invalid port number \"%s\"", optarg);
exit(1);
}
dbport = pg_strdup(optarg);
@@ -776,9 +753,7 @@ main(int argc, char **argv)
case 'I':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
- fprintf(stderr,
- _("%s: could not parse start position \"%s\"\n"),
- progname, optarg);
+ pg_log_error("could not parse start position \"%s\"", optarg);
exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
@@ -786,9 +761,7 @@ main(int argc, char **argv)
case 'E':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
- fprintf(stderr,
- _("%s: could not parse end position \"%s\"\n"),
- progname, optarg);
+ pg_log_error("could not parse end position \"%s\"", optarg);
exit(1);
}
endpos = ((uint64) hi) << 32 | lo;
@@ -820,8 +793,7 @@ main(int argc, char **argv)
standby_message_timeout = atoi(optarg) * 1000;
if (standby_message_timeout < 0)
{
- fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
- progname, optarg);
+ pg_log_error("invalid status interval \"%s\"", optarg);
exit(1);
}
break;
@@ -858,9 +830,8 @@ main(int argc, char **argv)
*/
if (optind < argc)
{
- fprintf(stderr,
- _("%s: too many command-line arguments (first is \"%s\")\n"),
- progname, argv[optind]);
+ pg_log_error("too many command-line arguments (first is \"%s\")",
+ argv[optind]);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -871,7 +842,7 @@ main(int argc, char **argv)
*/
if (replication_slot == NULL)
{
- fprintf(stderr, _("%s: no slot specified\n"), progname);
+ pg_log_error("no slot specified");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -879,7 +850,7 @@ main(int argc, char **argv)
if (do_start_slot && outfile == NULL)
{
- fprintf(stderr, _("%s: no target file specified\n"), progname);
+ pg_log_error("no target file specified");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -887,7 +858,7 @@ main(int argc, char **argv)
if (!do_drop_slot && dbname == NULL)
{
- fprintf(stderr, _("%s: no database specified\n"), progname);
+ pg_log_error("no database specified");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -895,7 +866,7 @@ main(int argc, char **argv)
if (!do_drop_slot && !do_create_slot && !do_start_slot)
{
- fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
+ pg_log_error("at least one action needs to be specified");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -903,7 +874,7 @@ main(int argc, char **argv)
if (do_drop_slot && (do_create_slot || do_start_slot))
{
- fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
+ pg_log_error("cannot use --create-slot or --start together with --drop-slot");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -911,7 +882,7 @@ main(int argc, char **argv)
if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
{
- fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
+ pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -919,9 +890,7 @@ main(int argc, char **argv)
if (endpos != InvalidXLogRecPtr && !do_start_slot)
{
- fprintf(stderr,
- _("%s: --endpos may only be specified with --start\n"),
- progname);
+ pg_log_error("--endpos may only be specified with --start");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
@@ -952,9 +921,7 @@ main(int argc, char **argv)
if (db_name == NULL)
{
- fprintf(stderr,
- _("%s: could not establish database-specific replication connection\n"),
- progname);
+ pg_log_error("could not establish database-specific replication connection");
exit(1);
}
@@ -972,9 +939,7 @@ main(int argc, char **argv)
if (do_drop_slot)
{
if (verbose)
- fprintf(stderr,
- _("%s: dropping replication slot \"%s\"\n"),
- progname, replication_slot);
+ pg_log_info("dropping replication slot \"%s\"", replication_slot);
if (!DropReplicationSlot(conn, replication_slot))
exit(1);
@@ -984,9 +949,7 @@ main(int argc, char **argv)
if (do_create_slot)
{
if (verbose)
- fprintf(stderr,
- _("%s: creating replication slot \"%s\"\n"),
- progname, replication_slot);
+ pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
false, false, slot_exists_ok))
@@ -1011,15 +974,14 @@ main(int argc, char **argv)
}
else if (noloop)
{
- fprintf(stderr, _("%s: disconnected\n"), progname);
+ pg_log_error("disconnected");
exit(1);
}
else
{
- fprintf(stderr,
/* translator: check source for value for %d */
- _("%s: disconnected; waiting %d seconds to try again\n"),
- progname, RECONNECT_SLEEP_TIME);
+ pg_log_info("disconnected; waiting %d seconds to try again",
+ RECONNECT_SLEEP_TIME);
pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
}
}
@@ -1058,12 +1020,11 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr l
if (verbose)
{
if (keepalive)
- fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
- progname,
+ pg_log_info("endpos %X/%X reached by keepalive",
(uint32) (endpos >> 32), (uint32) endpos);
else
- fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
- progname, (uint32) (endpos >> 32), (uint32) (endpos),
+ pg_log_info("endpos %X/%X reached by record at %X/%X",
+ (uint32) (endpos >> 32), (uint32) (endpos),
(uint32) (lsn >> 32), (uint32) lsn);
}