diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_recvlogical.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 147 |
1 files changed, 54 insertions, 93 deletions
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 10429a529d9..3c95f231a2a 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -25,6 +25,7 @@ #include "access/xlog_internal.h" #include "common/file_perm.h" #include "common/fe_memutils.h" +#include "fe_utils/logging.h" #include "getopt_long.h" #include "libpq-fe.h" #include "libpq/pqsignal.h" @@ -131,9 +132,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) return true; if (verbose) - fprintf(stderr, - _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"), - progname, + pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)", (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn, (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn, replication_slot); @@ -157,8 +156,8 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { - fprintf(stderr, _("%s: could not send feedback packet: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not send feedback packet: %s", + PQerrorMessage(conn)); return false; } @@ -193,9 +192,7 @@ OutputFsync(TimestampTz now) if (fsync(outfd) != 0) { - fprintf(stderr, - _("%s: could not fsync file \"%s\": %s\n"), - progname, outfile, strerror(errno)); + pg_log_error("could not fsync file \"%s\": %m", outfile); return false; } @@ -232,10 +229,9 @@ StreamLogicalLog(void) * Start the replication */ if (verbose) - fprintf(stderr, - _("%s: starting log streaming at %X/%X (slot %s)\n"), - progname, (uint32) (startpos >> 32), (uint32) startpos, - replication_slot); + pg_log_info("starting log streaming at %X/%X (slot %s)", + (uint32) (startpos >> 32), (uint32) startpos, + replication_slot); /* Initiate the replication stream at specified location */ appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X", @@ -265,8 +261,8 @@ StreamLogicalLog(void) res = PQexec(conn, query->data); if (PQresultStatus(res) != PGRES_COPY_BOTH) { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query->data, PQresultErrorMessage(res)); + pg_log_error("could not send replication command \"%s\": %s", + query->data, PQresultErrorMessage(res)); PQclear(res); goto error; } @@ -274,9 +270,7 @@ StreamLogicalLog(void) resetPQExpBuffer(query); if (verbose) - fprintf(stderr, - _("%s: streaming initiated\n"), - progname); + pg_log_info("streaming initiated"); while (!time_to_abort) { @@ -340,16 +334,12 @@ StreamLogicalLog(void) S_IRUSR | S_IWUSR); if (outfd == -1) { - fprintf(stderr, - _("%s: could not open log file \"%s\": %s\n"), - progname, outfile, strerror(errno)); + pg_log_error("could not open log file \"%s\": %m", outfile); goto error; } if (fstat(outfd, &statbuf) != 0) - fprintf(stderr, - _("%s: could not stat file \"%s\": %s\n"), - progname, outfile, strerror(errno)); + pg_log_error("could not stat file \"%s\": %m", outfile); output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd); } @@ -370,9 +360,7 @@ StreamLogicalLog(void) if (PQsocket(conn) < 0) { - fprintf(stderr, - _("%s: invalid socket: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("invalid socket: %s", PQerrorMessage(conn)); goto error; } @@ -425,17 +413,15 @@ StreamLogicalLog(void) } else if (r < 0) { - fprintf(stderr, _("%s: select() failed: %s\n"), - progname, strerror(errno)); + pg_log_error("select() failed: %m"); goto error; } /* Else there is actually data on the socket */ if (PQconsumeInput(conn) == 0) { - fprintf(stderr, - _("%s: could not receive data from WAL stream: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not receive data from WAL stream: %s", + PQerrorMessage(conn)); goto error; } continue; @@ -448,8 +434,8 @@ StreamLogicalLog(void) /* Failure while reading the copy stream */ if (r == -2) { - fprintf(stderr, _("%s: could not read COPY data: %s"), - progname, PQerrorMessage(conn)); + pg_log_error("could not read COPY data: %s", + PQerrorMessage(conn)); goto error; } @@ -476,8 +462,7 @@ StreamLogicalLog(void) if (r < pos + 1) { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); + pg_log_error("streaming header too small: %d", r); goto error; } replyRequested = copybuf[pos]; @@ -512,8 +497,8 @@ StreamLogicalLog(void) } else if (copybuf[0] != 'w') { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); + pg_log_error("unrecognized streaming header: \"%c\"", + copybuf[0]); goto error; } @@ -528,8 +513,7 @@ StreamLogicalLog(void) hdr_len += 8; /* sendTime */ if (r < hdr_len + 1) { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); + pg_log_error("streaming header too small: %d", r); goto error; } @@ -567,10 +551,8 @@ StreamLogicalLog(void) if (ret < 0) { - fprintf(stderr, - _("%s: could not write %u bytes to log file \"%s\": %s\n"), - progname, bytes_left, outfile, - strerror(errno)); + pg_log_error("could not write %u bytes to log file \"%s\": %m", + bytes_left, outfile); goto error; } @@ -581,10 +563,8 @@ StreamLogicalLog(void) if (write(outfd, "\n", 1) != 1) { - fprintf(stderr, - _("%s: could not write %u bytes to log file \"%s\": %s\n"), - progname, 1, outfile, - strerror(errno)); + pg_log_error("could not write %u bytes to log file \"%s\": %m", + 1, outfile); goto error; } @@ -611,9 +591,8 @@ StreamLogicalLog(void) } else if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), - progname, PQresultErrorMessage(res)); + pg_log_error("unexpected termination of replication stream: %s", + PQresultErrorMessage(res)); goto error; } PQclear(res); @@ -626,8 +605,7 @@ StreamLogicalLog(void) OutputFsync(t); if (close(outfd) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, outfile, strerror(errno)); + pg_log_error("could not close file \"%s\": %m", outfile); } outfd = -1; error: @@ -705,6 +683,7 @@ main(int argc, char **argv) lo; char *db_name; + pg_logging_init(argv[0]); progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); @@ -736,8 +715,7 @@ main(int argc, char **argv) fsync_interval = atoi(optarg) * 1000; if (fsync_interval < 0) { - fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), - progname, optarg); + pg_log_error("invalid fsync interval \"%s\"", optarg); exit(1); } break; @@ -757,8 +735,7 @@ main(int argc, char **argv) case 'p': if (atoi(optarg) <= 0) { - fprintf(stderr, _("%s: invalid port number \"%s\"\n"), - progname, optarg); + pg_log_error("invalid port number \"%s\"", optarg); exit(1); } dbport = pg_strdup(optarg); @@ -776,9 +753,7 @@ main(int argc, char **argv) case 'I': if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) { - fprintf(stderr, - _("%s: could not parse start position \"%s\"\n"), - progname, optarg); + pg_log_error("could not parse start position \"%s\"", optarg); exit(1); } startpos = ((uint64) hi) << 32 | lo; @@ -786,9 +761,7 @@ main(int argc, char **argv) case 'E': if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) { - fprintf(stderr, - _("%s: could not parse end position \"%s\"\n"), - progname, optarg); + pg_log_error("could not parse end position \"%s\"", optarg); exit(1); } endpos = ((uint64) hi) << 32 | lo; @@ -820,8 +793,7 @@ main(int argc, char **argv) standby_message_timeout = atoi(optarg) * 1000; if (standby_message_timeout < 0) { - fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), - progname, optarg); + pg_log_error("invalid status interval \"%s\"", optarg); exit(1); } break; @@ -858,9 +830,8 @@ main(int argc, char **argv) */ if (optind < argc) { - fprintf(stderr, - _("%s: too many command-line arguments (first is \"%s\")\n"), - progname, argv[optind]); + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -871,7 +842,7 @@ main(int argc, char **argv) */ if (replication_slot == NULL) { - fprintf(stderr, _("%s: no slot specified\n"), progname); + pg_log_error("no slot specified"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -879,7 +850,7 @@ main(int argc, char **argv) if (do_start_slot && outfile == NULL) { - fprintf(stderr, _("%s: no target file specified\n"), progname); + pg_log_error("no target file specified"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -887,7 +858,7 @@ main(int argc, char **argv) if (!do_drop_slot && dbname == NULL) { - fprintf(stderr, _("%s: no database specified\n"), progname); + pg_log_error("no database specified"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -895,7 +866,7 @@ main(int argc, char **argv) if (!do_drop_slot && !do_create_slot && !do_start_slot) { - fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname); + pg_log_error("at least one action needs to be specified"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -903,7 +874,7 @@ main(int argc, char **argv) if (do_drop_slot && (do_create_slot || do_start_slot)) { - fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname); + pg_log_error("cannot use --create-slot or --start together with --drop-slot"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -911,7 +882,7 @@ main(int argc, char **argv) if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot)) { - fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname); + pg_log_error("cannot use --create-slot or --drop-slot together with --startpos"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -919,9 +890,7 @@ main(int argc, char **argv) if (endpos != InvalidXLogRecPtr && !do_start_slot) { - fprintf(stderr, - _("%s: --endpos may only be specified with --start\n"), - progname); + pg_log_error("--endpos may only be specified with --start"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -952,9 +921,7 @@ main(int argc, char **argv) if (db_name == NULL) { - fprintf(stderr, - _("%s: could not establish database-specific replication connection\n"), - progname); + pg_log_error("could not establish database-specific replication connection"); exit(1); } @@ -972,9 +939,7 @@ main(int argc, char **argv) if (do_drop_slot) { if (verbose) - fprintf(stderr, - _("%s: dropping replication slot \"%s\"\n"), - progname, replication_slot); + pg_log_info("dropping replication slot \"%s\"", replication_slot); if (!DropReplicationSlot(conn, replication_slot)) exit(1); @@ -984,9 +949,7 @@ main(int argc, char **argv) if (do_create_slot) { if (verbose) - fprintf(stderr, - _("%s: creating replication slot \"%s\"\n"), - progname, replication_slot); + pg_log_info("creating replication slot \"%s\"", replication_slot); if (!CreateReplicationSlot(conn, replication_slot, plugin, false, false, false, slot_exists_ok)) @@ -1011,15 +974,14 @@ main(int argc, char **argv) } else if (noloop) { - fprintf(stderr, _("%s: disconnected\n"), progname); + pg_log_error("disconnected"); exit(1); } else { - fprintf(stderr, /* translator: check source for value for %d */ - _("%s: disconnected; waiting %d seconds to try again\n"), - progname, RECONNECT_SLEEP_TIME); + pg_log_info("disconnected; waiting %d seconds to try again", + RECONNECT_SLEEP_TIME); pg_usleep(RECONNECT_SLEEP_TIME * 1000000); } } @@ -1058,12 +1020,11 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr l if (verbose) { if (keepalive) - fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n", - progname, + pg_log_info("endpos %X/%X reached by keepalive", (uint32) (endpos >> 32), (uint32) endpos); else - fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n", - progname, (uint32) (endpos >> 32), (uint32) (endpos), + pg_log_info("endpos %X/%X reached by record at %X/%X", + (uint32) (endpos >> 32), (uint32) (endpos), (uint32) (lsn >> 32), (uint32) lsn); } |