aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/win/pipe.c205
1 files changed, 113 insertions, 92 deletions
diff --git a/src/win/pipe.c b/src/win/pipe.c
index fc7fb410..8d5d0902 100644
--- a/src/win/pipe.c
+++ b/src/win/pipe.c
@@ -667,15 +667,10 @@ void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
}
handle->pipe.conn.ipc_xfer_queue_length = 0;
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
- UnregisterWait(handle->read_req.wait_handle);
- handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
- }
- if (handle->read_req.event_handle != NULL) {
- CloseHandle(handle->read_req.event_handle);
- handle->read_req.event_handle = NULL;
- }
+ assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE);
+ if (handle->read_req.event_handle != NULL) {
+ CloseHandle(handle->read_req.event_handle);
+ handle->read_req.event_handle = NULL;
}
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
@@ -1417,13 +1412,12 @@ static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (req->wait_handle == INVALID_HANDLE_VALUE) {
- if (!RegisterWaitForSingleObject(&req->wait_handle,
- req->event_handle, post_completion_read_wait, (void*) req,
- INFINITE, WT_EXECUTEINWAITTHREAD)) {
- SET_REQ_ERROR(req, GetLastError());
- goto error;
- }
+ assert(req->wait_handle == INVALID_HANDLE_VALUE);
+ if (!RegisterWaitForSingleObject(&req->wait_handle,
+ req->event_handle, post_completion_read_wait, (void*) req,
+ INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
+ SET_REQ_ERROR(req, GetLastError());
+ goto error;
}
}
}
@@ -1451,16 +1445,16 @@ int uv__pipe_read_start(uv_pipe_t* handle,
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
+ if (handle->read_req.event_handle == NULL) {
+ handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (handle->read_req.event_handle == NULL) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ }
+
/* If reading was stopped and then started again, there could still be a read
* request pending. */
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
- if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
- handle->read_req.event_handle == NULL) {
- handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
- if (handle->read_req.event_handle == NULL) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
- }
uv__pipe_queue_read(loop, handle);
}
@@ -1713,7 +1707,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_write_wait, (void*) req,
- INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
return GetLastError();
}
}
@@ -1889,7 +1883,7 @@ static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
- int error, uv_buf_t buf) {
+ DWORD error, uv_buf_t buf) {
if (error == ERROR_BROKEN_PIPE) {
uv__pipe_read_eof(loop, handle, buf);
} else {
@@ -1919,17 +1913,25 @@ static void uv__pipe_queue_ipc_xfer_info(
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
* encountered before the requested number of bytes are read, an error is
* returned. */
-static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
- DWORD bytes_read, bytes_read_now;
+static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) {
+ uv_read_t* req;
+ DWORD bytes_read;
+ DWORD bytes_read_now;
bytes_read = 0;
while (bytes_read < count) {
- if (!ReadFile(h,
+ req = &handle->read_req;
+ memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
+ req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
+ if (!ReadFile(handle->handle,
(char*) buffer + bytes_read,
count - bytes_read,
&bytes_read_now,
- NULL)) {
- return GetLastError();
+ &req->u.io.overlapped)) {
+ if (GetLastError() != ERROR_IO_PENDING)
+ return GetLastError();
+ if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE))
+ return GetLastError();
}
bytes_read += bytes_read_now;
@@ -1940,16 +1942,17 @@ static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
}
-static DWORD uv__pipe_read_data(uv_loop_t* loop,
- uv_pipe_t* handle,
- DWORD suggested_bytes,
- DWORD max_bytes) {
- DWORD bytes_read;
+static int uv__pipe_read_data(uv_loop_t* loop,
+ uv_pipe_t* handle,
+ DWORD* bytes_read,
+ DWORD max_bytes) {
uv_buf_t buf;
+ uv_read_t* req;
+ DWORD r;
/* Ask the user for a buffer to read data into. */
buf = uv_buf_init(NULL, 0);
- handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
+ handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
return 0; /* Break out of read loop. */
@@ -1962,29 +1965,50 @@ static DWORD uv__pipe_read_data(uv_loop_t* loop,
if (max_bytes > buf.len)
max_bytes = buf.len;
- /* Read into the user buffer. */
- if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
- uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
- return 0; /* Break out of read loop. */
+ /* Read into the user buffer.
+ * Prepare an Event so that we can cancel if it doesn't complete immediately.
+ */
+ req = &handle->read_req;
+ memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
+ req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
+ if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) {
+ r = ERROR_SUCCESS;
+ } else {
+ r = GetLastError();
+ *bytes_read = 0;
+ if (r == ERROR_IO_PENDING) {
+ r = CancelIoEx(handle->handle, &req->u.io.overlapped);
+ assert(r || GetLastError() == ERROR_NOT_FOUND);
+ if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) {
+ r = GetLastError();
+ *bytes_read = 0;
+ }
+ }
}
/* Call the read callback. */
- handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
+ if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED)
+ handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf);
+ else
+ uv__pipe_read_error_or_eof(loop, handle, r, buf);
- return bytes_read;
+ return *bytes_read == max_bytes;
}
-static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
- uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
- int err;
+static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
+ uint32_t* data_remaining;
+ DWORD err;
+ DWORD more;
+ DWORD bytes_read;
+
+ data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
if (*data_remaining > 0) {
/* Read frame data payload. */
- DWORD bytes_read =
- uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
+ bytes_read = *data_remaining;
+ more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read);
*data_remaining -= bytes_read;
- return bytes_read;
} else {
/* Start of a new IPC frame. */
@@ -1995,7 +2019,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
/* Read the IPC frame header. */
err = uv__pipe_read_exactly(
- handle->handle, &frame_header, sizeof frame_header);
+ handle, &frame_header, sizeof frame_header);
if (err)
goto error;
@@ -2031,21 +2055,28 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
/* If no socket xfer info follows, return here. Data will be read in a
* subsequent invocation of uv__pipe_read_ipc(). */
- if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
- return sizeof frame_header; /* Number of bytes read. */
+ if (xfer_type != UV__IPC_SOCKET_XFER_NONE) {
+ /* Read transferred socket information. */
+ err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info);
+ if (err)
+ goto error;
- /* Read transferred socket information. */
- err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
- if (err)
- goto error;
+ /* Store the pending socket info. */
+ uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
+ }
- /* Store the pending socket info. */
- uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
+ more = 1;
+ }
- /* Return number of bytes read. */
- return sizeof frame_header + sizeof xfer_info;
+ /* Return whether the caller should immediately try another read call to get
+ * more data. */
+ if (more && *data_remaining == 0) {
+ /* TODO: use PeekNamedPipe to see if it is really worth trying to do
+ * another ReadFile call. */
}
+ return more;
+
invalid:
/* Invalid frame. */
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
@@ -2059,12 +2090,20 @@ error:
void uv__process_pipe_read_req(uv_loop_t* loop,
uv_pipe_t* handle,
uv_req_t* req) {
+ DWORD err;
+ DWORD more;
+ DWORD bytes_requested;
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
DECREASE_PENDING_REQ_COUNT(handle);
eof_timer_stop(handle);
+ if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(handle->read_req.wait_handle);
+ handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
+ }
+
/* At this point, we're done with bookkeeping. If the user has stopped
* reading the pipe in the meantime, there is nothing left to do, since there
* is no callback that we can call. */
@@ -2073,7 +2112,7 @@ void uv__process_pipe_read_req(uv_loop_t* loop,
if (!REQ_SUCCESS(req)) {
/* An error occurred doing the zero-read. */
- DWORD err = GET_REQ_ERROR(req);
+ err = GET_REQ_ERROR(req);
/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
@@ -2084,34 +2123,18 @@ void uv__process_pipe_read_req(uv_loop_t* loop,
} else {
/* The zero-read completed without error, indicating there is data
* available in the kernel buffer. */
- DWORD avail;
-
- /* Get the number of bytes available. */
- avail = 0;
- if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
- uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
-
- /* Read until we've either read all the bytes available, or the 'reading'
- * flag is cleared. */
- while (avail > 0 && handle->flags & UV_HANDLE_READING) {
+ while (handle->flags & UV_HANDLE_READING) {
+ bytes_requested = 65536;
/* Depending on the type of pipe, read either IPC frames or raw data. */
- DWORD bytes_read =
- handle->ipc ? uv__pipe_read_ipc(loop, handle)
- : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
+ if (handle->ipc)
+ more = uv__pipe_read_ipc(loop, handle);
+ else
+ more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX);
/* If no bytes were read, treat this as an indication that an error
* occurred, and break out of the read loop. */
- if (bytes_read == 0)
+ if (more == 0)
break;
-
- /* It is possible that more bytes were read than we thought were
- * available. To prevent `avail` from underflowing, break out of the loop
- * if this is the case. */
- if (bytes_read > avail)
- break;
-
- /* Recompute the number of bytes available. */
- avail -= bytes_read;
}
}
@@ -2134,15 +2157,13 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
UNREGISTER_HANDLE_REQ(loop, handle);
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (req->wait_handle != INVALID_HANDLE_VALUE) {
- UnregisterWait(req->wait_handle);
- req->wait_handle = INVALID_HANDLE_VALUE;
- }
- if (req->event_handle) {
- CloseHandle(req->event_handle);
- req->event_handle = NULL;
- }
+ if (req->wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(req->wait_handle);
+ req->wait_handle = INVALID_HANDLE_VALUE;
+ }
+ if (req->event_handle) {
+ CloseHandle(req->event_handle);
+ req->event_handle = NULL;
}
err = GET_REQ_ERROR(req);