aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c5
-rw-r--r--src/bin/pg_basebackup/pg_receivewal.c1
-rw-r--r--src/bin/pg_basebackup/receivelog.c78
-rw-r--r--src/bin/pg_basebackup/receivelog.h3
4 files changed, 59 insertions, 28 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 40ec0e17dc5..e2a2ebb30f9 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param)
stream.timeline = param->timeline;
stream.sysidentifier = param->sysidentifier;
stream.stream_stop = reached_end_position;
+#ifndef WIN32
+ stream.stop_socket = bgpipe[0];
+#else
+ stream.stop_socket = PGINVALID_SOCKET;
+#endif
stream.standby_message_timeout = standby_message_timeout;
stream.synchronous = false;
stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 1a9fe81be14..09385c5cbfc 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -409,6 +409,7 @@ StreamLog(void)
stream.timeline);
stream.stream_stop = stop_streaming;
+ stream.stop_socket = PGINVALID_SOCKET;
stream.standby_message_timeout = standby_message_timeout;
stream.synchronous = synchronous;
stream.do_sync = true;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57cf7d..c41bba28cdf 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
XLogRecPtr *stoppos);
-static int CopyStreamPoll(PGconn *conn, long timeout_ms);
-static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
+static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+ char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
int len, XLogRecPtr blockpos, TimestampTz *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
@@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
* return. As long as it returns false, streaming will continue
* indefinitely.
*
+ * If stream_stop() checks for external input, stop_socket should be set to
+ * the FD it checks. This will allow such input to be detected promptly
+ * rather than after standby_message_timeout (which might be indefinite).
+ * Note that signals will interrupt waits for input as well, but that is
+ * race-y since a signal received while busy won't interrupt the wait.
+ *
* standby_message_timeout controls how often we send a message
* back to the master letting it know our progress, in milliseconds.
+ * Zero means no messages are sent.
* This message will only contain the write location, and never
* flush or replay.
*
@@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
- r = CopyStreamReceive(conn, sleeptime, &copybuf);
+ r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
while (r != 0)
{
if (r == -1)
@@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
* Process the received data, and any subsequent data we can read
* without blocking.
*/
- r = CopyStreamReceive(conn, 0, &copybuf);
+ r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
}
}
@@ -881,20 +889,25 @@ error:
}
/*
- * Wait until we can read CopyData message, or timeout.
+ * Wait until we can read a CopyData message,
+ * or timeout, or occurrence of a signal or input on the stop_socket.
+ * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
*
* Returns 1 if data has become available for reading, 0 if timed out
- * or interrupted by signal, and -1 on an error.
+ * or interrupted by signal or stop_socket input, and -1 on an error.
*/
static int
-CopyStreamPoll(PGconn *conn, long timeout_ms)
+CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
{
int ret;
fd_set input_mask;
+ int connsocket;
+ int maxfd;
struct timeval timeout;
struct timeval *timeoutptr;
- if (PQsocket(conn) < 0)
+ connsocket = PQsocket(conn);
+ if (connsocket < 0)
{
fprintf(stderr, _("%s: invalid socket: %s"), progname,
PQerrorMessage(conn));
@@ -902,7 +915,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
}
FD_ZERO(&input_mask);
- FD_SET(PQsocket(conn), &input_mask);
+ FD_SET(connsocket, &input_mask);
+ maxfd = connsocket;
+ if (stop_socket != PGINVALID_SOCKET)
+ {
+ FD_SET(stop_socket, &input_mask);
+ maxfd = Max(maxfd, stop_socket);
+ }
if (timeout_ms < 0)
timeoutptr = NULL;
@@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
timeoutptr = &timeout;
}
- ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- if (ret == 0 || (ret < 0 && errno == EINTR))
- return 0; /* Got a timeout or signal */
- else if (ret < 0)
+ ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
+
+ if (ret < 0)
{
+ if (errno == EINTR)
+ return 0; /* Got a signal, so not an error */
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
return -1;
}
+ if (ret > 0 && FD_ISSET(connsocket, &input_mask))
+ return 1; /* Got input on connection socket */
- return 1;
+ return 0; /* Got timeout or input on stop_socket */
}
/*
@@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
* point to a buffer holding the received message. The buffer is only valid
* until the next CopyStreamReceive call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal. -1 on error. -2 if the server ended the COPY.
+ * Returns 0 if no data was available within timeout, or if wait was
+ * interrupted by signal or stop_socket input.
+ * -1 on error. -2 if the server ended the COPY.
*/
static int
-CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+ char **buffer)
{
char *copybuf = NULL;
int rawlen;
@@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
rawlen = PQgetCopyData(conn, &copybuf, 1);
if (rawlen == 0)
{
+ int ret;
+
/*
- * No data available. Wait for some to appear, but not longer than the
- * specified timeout, so that we can ping the server.
+ * No data available. Wait for some to appear, but not longer than
+ * the specified timeout, so that we can ping the server. Also stop
+ * waiting if input appears on stop_socket.
*/
- if (timeout != 0)
- {
- int ret;
-
- ret = CopyStreamPoll(conn, timeout);
- if (ret <= 0)
- return ret;
- }
+ ret = CopyStreamPoll(conn, timeout, stop_socket);
+ if (ret <= 0)
+ return ret;
- /* Else there is actually data on the socket */
+ /* Now there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 42e93ac7454..9a51d9a9c49 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -42,6 +42,9 @@ typedef struct StreamCtl
stream_stop_callback stream_stop; /* Stop streaming when returns true */
+ pgsocket stop_socket; /* if valid, watch for input on this socket
+ * and check stream_stop() when there is any */
+
WalWriteMethod *walmethod; /* How to write the WAL */
char *partial_suffix; /* Suffix appended to partially received files */
char *replication_slot; /* Replication slot to use, or NULL */