diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/win/pipe.c | 205 |
1 files changed, 113 insertions, 92 deletions
diff --git a/src/win/pipe.c b/src/win/pipe.c index fc7fb410..8d5d0902 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -667,15 +667,10 @@ void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { } handle->pipe.conn.ipc_xfer_queue_length = 0; - if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { - UnregisterWait(handle->read_req.wait_handle); - handle->read_req.wait_handle = INVALID_HANDLE_VALUE; - } - if (handle->read_req.event_handle != NULL) { - CloseHandle(handle->read_req.event_handle); - handle->read_req.event_handle = NULL; - } + assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE); + if (handle->read_req.event_handle != NULL) { + CloseHandle(handle->read_req.event_handle); + handle->read_req.event_handle = NULL; } if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) @@ -1417,13 +1412,12 @@ static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { } if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - if (req->wait_handle == INVALID_HANDLE_VALUE) { - if (!RegisterWaitForSingleObject(&req->wait_handle, - req->event_handle, post_completion_read_wait, (void*) req, - INFINITE, WT_EXECUTEINWAITTHREAD)) { - SET_REQ_ERROR(req, GetLastError()); - goto error; - } + assert(req->wait_handle == INVALID_HANDLE_VALUE); + if (!RegisterWaitForSingleObject(&req->wait_handle, + req->event_handle, post_completion_read_wait, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { + SET_REQ_ERROR(req, GetLastError()); + goto error; } } } @@ -1451,16 +1445,16 @@ int uv__pipe_read_start(uv_pipe_t* handle, handle->read_cb = read_cb; handle->alloc_cb = alloc_cb; + if (handle->read_req.event_handle == NULL) { + handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); + if (handle->read_req.event_handle == NULL) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } + /* If reading was stopped and then started again, there could still be a read * request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) { - if (handle->flags & UV_HANDLE_EMULATE_IOCP && - handle->read_req.event_handle == NULL) { - handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); - if (handle->read_req.event_handle == NULL) { - uv_fatal_error(GetLastError(), "CreateEvent"); - } - } uv__pipe_queue_read(loop, handle); } @@ -1713,7 +1707,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (!RegisterWaitForSingleObject(&req->wait_handle, req->event_handle, post_completion_write_wait, (void*) req, - INFINITE, WT_EXECUTEINWAITTHREAD)) { + INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { return GetLastError(); } } @@ -1889,7 +1883,7 @@ static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, - int error, uv_buf_t buf) { + DWORD error, uv_buf_t buf) { if (error == ERROR_BROKEN_PIPE) { uv__pipe_read_eof(loop, handle, buf); } else { @@ -1919,17 +1913,25 @@ static void uv__pipe_queue_ipc_xfer_info( /* Read an exact number of bytes from a pipe. If an error or end-of-file is * encountered before the requested number of bytes are read, an error is * returned. */ -static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) { - DWORD bytes_read, bytes_read_now; +static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) { + uv_read_t* req; + DWORD bytes_read; + DWORD bytes_read_now; bytes_read = 0; while (bytes_read < count) { - if (!ReadFile(h, + req = &handle->read_req; + memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); + req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); + if (!ReadFile(handle->handle, (char*) buffer + bytes_read, count - bytes_read, &bytes_read_now, - NULL)) { - return GetLastError(); + &req->u.io.overlapped)) { + if (GetLastError() != ERROR_IO_PENDING) + return GetLastError(); + if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE)) + return GetLastError(); } bytes_read += bytes_read_now; @@ -1940,16 +1942,17 @@ static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) { } -static DWORD uv__pipe_read_data(uv_loop_t* loop, - uv_pipe_t* handle, - DWORD suggested_bytes, - DWORD max_bytes) { - DWORD bytes_read; +static int uv__pipe_read_data(uv_loop_t* loop, + uv_pipe_t* handle, + DWORD* bytes_read, + DWORD max_bytes) { uv_buf_t buf; + uv_read_t* req; + DWORD r; /* Ask the user for a buffer to read data into. */ buf = uv_buf_init(NULL, 0); - handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf); + handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf); if (buf.base == NULL || buf.len == 0) { handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); return 0; /* Break out of read loop. */ @@ -1962,29 +1965,50 @@ static DWORD uv__pipe_read_data(uv_loop_t* loop, if (max_bytes > buf.len) max_bytes = buf.len; - /* Read into the user buffer. */ - if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) { - uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf); - return 0; /* Break out of read loop. */ + /* Read into the user buffer. + * Prepare an Event so that we can cancel if it doesn't complete immediately. + */ + req = &handle->read_req; + memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); + req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); + if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) { + r = ERROR_SUCCESS; + } else { + r = GetLastError(); + *bytes_read = 0; + if (r == ERROR_IO_PENDING) { + r = CancelIoEx(handle->handle, &req->u.io.overlapped); + assert(r || GetLastError() == ERROR_NOT_FOUND); + if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) { + r = GetLastError(); + *bytes_read = 0; + } + } } /* Call the read callback. */ - handle->read_cb((uv_stream_t*) handle, bytes_read, &buf); + if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED) + handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf); + else + uv__pipe_read_error_or_eof(loop, handle, r, buf); - return bytes_read; + return *bytes_read == max_bytes; } -static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { - uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining; - int err; +static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { + uint32_t* data_remaining; + DWORD err; + DWORD more; + DWORD bytes_read; + + data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining; if (*data_remaining > 0) { /* Read frame data payload. */ - DWORD bytes_read = - uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining); + bytes_read = *data_remaining; + more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read); *data_remaining -= bytes_read; - return bytes_read; } else { /* Start of a new IPC frame. */ @@ -1995,7 +2019,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { /* Read the IPC frame header. */ err = uv__pipe_read_exactly( - handle->handle, &frame_header, sizeof frame_header); + handle, &frame_header, sizeof frame_header); if (err) goto error; @@ -2031,21 +2055,28 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { /* If no socket xfer info follows, return here. Data will be read in a * subsequent invocation of uv__pipe_read_ipc(). */ - if (xfer_type == UV__IPC_SOCKET_XFER_NONE) - return sizeof frame_header; /* Number of bytes read. */ + if (xfer_type != UV__IPC_SOCKET_XFER_NONE) { + /* Read transferred socket information. */ + err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info); + if (err) + goto error; - /* Read transferred socket information. */ - err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info); - if (err) - goto error; + /* Store the pending socket info. */ + uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info); + } - /* Store the pending socket info. */ - uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info); + more = 1; + } - /* Return number of bytes read. */ - return sizeof frame_header + sizeof xfer_info; + /* Return whether the caller should immediately try another read call to get + * more data. */ + if (more && *data_remaining == 0) { + /* TODO: use PeekNamedPipe to see if it is really worth trying to do + * another ReadFile call. */ } + return more; + invalid: /* Invalid frame. */ err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ @@ -2059,12 +2090,20 @@ error: void uv__process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { + DWORD err; + DWORD more; + DWORD bytes_requested; assert(handle->type == UV_NAMED_PIPE); handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); DECREASE_PENDING_REQ_COUNT(handle); eof_timer_stop(handle); + if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(handle->read_req.wait_handle); + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; + } + /* At this point, we're done with bookkeeping. If the user has stopped * reading the pipe in the meantime, there is nothing left to do, since there * is no callback that we can call. */ @@ -2073,7 +2112,7 @@ void uv__process_pipe_read_req(uv_loop_t* loop, if (!REQ_SUCCESS(req)) { /* An error occurred doing the zero-read. */ - DWORD err = GET_REQ_ERROR(req); + err = GET_REQ_ERROR(req); /* If the read was cancelled by uv__pipe_interrupt_read(), the request may * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to @@ -2084,34 +2123,18 @@ void uv__process_pipe_read_req(uv_loop_t* loop, } else { /* The zero-read completed without error, indicating there is data * available in the kernel buffer. */ - DWORD avail; - - /* Get the number of bytes available. */ - avail = 0; - if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL)) - uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); - - /* Read until we've either read all the bytes available, or the 'reading' - * flag is cleared. */ - while (avail > 0 && handle->flags & UV_HANDLE_READING) { + while (handle->flags & UV_HANDLE_READING) { + bytes_requested = 65536; /* Depending on the type of pipe, read either IPC frames or raw data. */ - DWORD bytes_read = - handle->ipc ? uv__pipe_read_ipc(loop, handle) - : uv__pipe_read_data(loop, handle, avail, (DWORD) -1); + if (handle->ipc) + more = uv__pipe_read_ipc(loop, handle); + else + more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX); /* If no bytes were read, treat this as an indication that an error * occurred, and break out of the read loop. */ - if (bytes_read == 0) + if (more == 0) break; - - /* It is possible that more bytes were read than we thought were - * available. To prevent `avail` from underflowing, break out of the loop - * if this is the case. */ - if (bytes_read > avail) - break; - - /* Recompute the number of bytes available. */ - avail -= bytes_read; } } @@ -2134,15 +2157,13 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, UNREGISTER_HANDLE_REQ(loop, handle); - if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - if (req->wait_handle != INVALID_HANDLE_VALUE) { - UnregisterWait(req->wait_handle); - req->wait_handle = INVALID_HANDLE_VALUE; - } - if (req->event_handle) { - CloseHandle(req->event_handle); - req->event_handle = NULL; - } + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; } err = GET_REQ_ERROR(req); |