diff options
-rw-r--r-- | src/unix/linux.c | 410 | ||||
-rw-r--r-- | src/uv-common.h | 2 |
2 files changed, 288 insertions, 124 deletions
diff --git a/src/unix/linux.c b/src/unix/linux.c index 3f14e756..c5911552 100644 --- a/src/unix/linux.c +++ b/src/unix/linux.c @@ -149,9 +149,11 @@ enum { UV__IORING_OP_OPENAT = 18, UV__IORING_OP_CLOSE = 19, UV__IORING_OP_STATX = 21, + UV__IORING_OP_EPOLL_CTL = 29, }; enum { + UV__IORING_ENTER_GETEVENTS = 1u, UV__IORING_ENTER_SQ_WAKEUP = 2u, }; @@ -248,6 +250,10 @@ STATIC_ASSERT(40 + 40 + 40 == sizeof(struct uv__io_uring_params)); STATIC_ASSERT(40 == offsetof(struct uv__io_uring_params, sq_off)); STATIC_ASSERT(80 == offsetof(struct uv__io_uring_params, cq_off)); +STATIC_ASSERT(EPOLL_CTL_ADD < 4); +STATIC_ASSERT(EPOLL_CTL_DEL < 4); +STATIC_ASSERT(EPOLL_CTL_MOD < 4); + struct watcher_list { RB_ENTRY(watcher_list) entry; QUEUE watchers; @@ -269,6 +275,17 @@ static int compare_watchers(const struct watcher_list* a, static void maybe_free_watcher_list(struct watcher_list* w, uv_loop_t* loop); +static void uv__epoll_ctl_flush(int epollfd, + struct uv__iou* ctl, + struct epoll_event (*events)[256]); + +static void uv__epoll_ctl_prep(int epollfd, + struct uv__iou* ctl, + struct epoll_event (*events)[256], + int op, + int fd, + struct epoll_event* e); + RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers) @@ -384,45 +401,40 @@ static int uv__use_io_uring(void) { } -int uv__platform_loop_init(uv_loop_t* loop) { +static void uv__iou_init(int epollfd, + struct uv__iou* iou, + uint32_t entries, + uint32_t flags) { struct uv__io_uring_params params; struct epoll_event e; - struct uv__iou* iou; size_t cqlen; size_t sqlen; size_t maxlen; size_t sqelen; + uint32_t i; char* sq; char* sqe; int ringfd; - iou = &uv__get_internal_fields(loop)->iou; - iou->ringfd = -1; - - loop->inotify_watchers = NULL; - loop->inotify_fd = -1; - loop->backend_fd = epoll_create1(O_CLOEXEC); - - if (loop->backend_fd == -1) - return UV__ERR(errno); - - if (!uv__use_io_uring()) - return 0; - sq = MAP_FAILED; sqe = MAP_FAILED; + if (!uv__use_io_uring()) + return; + /* SQPOLL required CAP_SYS_NICE until linux v5.12 relaxed that requirement. * Mostly academic because we check for a v5.13 kernel afterwards anyway. */ memset(¶ms, 0, sizeof(params)); - params.flags = UV__IORING_SETUP_SQPOLL; - params.sq_thread_idle = 10; /* milliseconds */ + params.flags = flags; + + if (flags & UV__IORING_SETUP_SQPOLL) + params.sq_thread_idle = 10; /* milliseconds */ /* Kernel returns a file descriptor with O_CLOEXEC flag set. */ - ringfd = uv__io_uring_setup(64, ¶ms); + ringfd = uv__io_uring_setup(entries, ¶ms); if (ringfd == -1) - return 0; /* Not an error, falls back to thread pool. */ + return; /* IORING_FEAT_RSRC_TAGS is used to detect linux v5.13 but what we're * actually detecting is whether IORING_OP_STATX works with SQPOLL. @@ -461,6 +473,18 @@ int uv__platform_loop_init(uv_loop_t* loop) { if (sq == MAP_FAILED || sqe == MAP_FAILED) goto fail; + if (flags & UV__IORING_SETUP_SQPOLL) { + /* Only interested in completion events. To get notified when + * the kernel pulls items from the submission ring, add POLLOUT. + */ + memset(&e, 0, sizeof(e)); + e.events = POLLIN; + e.data.fd = ringfd; + + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e)) + goto fail; + } + iou->sqhead = (uint32_t*) (sq + params.sq_off.head); iou->sqtail = (uint32_t*) (sq + params.sq_off.tail); iou->sqmask = *(uint32_t*) (sq + params.sq_off.ring_mask); @@ -479,17 +503,10 @@ int uv__platform_loop_init(uv_loop_t* loop) { iou->ringfd = ringfd; iou->in_flight = 0; - /* Only interested in completion events. To get notified when - * the kernel pulls items from the submission ring, add POLLOUT. - */ - memset(&e, 0, sizeof(e)); - e.events = POLLIN; - e.data.fd = ringfd; - - if (epoll_ctl(loop->backend_fd, EPOLL_CTL_ADD, ringfd, &e)) - goto fail; + for (i = 0; i <= iou->sqmask; i++) + iou->sqarray[i] = i; /* Slot -> sqe identity mapping. */ - return 0; + return; fail: if (sq != MAP_FAILED) @@ -499,8 +516,37 @@ fail: munmap(sqe, sqelen); uv__close(ringfd); +} + + +static void uv__iou_delete(struct uv__iou* iou) { + if (iou->ringfd != -1) { + munmap(iou->sq, iou->maxlen); + munmap(iou->sqe, iou->sqelen); + uv__close(iou->ringfd); + iou->ringfd = -1; + } +} + + +int uv__platform_loop_init(uv_loop_t* loop) { + uv__loop_internal_fields_t* lfields; + + lfields = uv__get_internal_fields(loop); + lfields->ctl.ringfd = -1; + lfields->iou.ringfd = -1; - return 0; /* Not an error, falls back to thread pool. */ + loop->inotify_watchers = NULL; + loop->inotify_fd = -1; + loop->backend_fd = epoll_create1(O_CLOEXEC); + + if (loop->backend_fd == -1) + return UV__ERR(errno); + + uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL); + uv__iou_init(loop->backend_fd, &lfields->ctl, 256, 0); + + return 0; } @@ -525,54 +571,62 @@ int uv__io_fork(uv_loop_t* loop) { void uv__platform_loop_delete(uv_loop_t* loop) { - struct uv__iou* iou; + uv__loop_internal_fields_t* lfields; - iou = &uv__get_internal_fields(loop)->iou; + lfields = uv__get_internal_fields(loop); + uv__iou_delete(&lfields->ctl); + uv__iou_delete(&lfields->iou); if (loop->inotify_fd != -1) { uv__io_stop(loop, &loop->inotify_read_watcher, POLLIN); uv__close(loop->inotify_fd); loop->inotify_fd = -1; } - - if (iou->ringfd != -1) { - munmap(iou->sq, iou->maxlen); - munmap(iou->sqe, iou->sqelen); - uv__close(iou->ringfd); - iou->ringfd = -1; - } } -void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { +struct uv__invalidate { + struct epoll_event (*prep)[256]; struct epoll_event* events; + int nfds; +}; + + +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + uv__loop_internal_fields_t* lfields; + struct uv__invalidate* inv; struct epoll_event dummy; - uintptr_t i; - uintptr_t nfds; + int i; - assert(loop->watchers != NULL); - assert(fd >= 0); + lfields = uv__get_internal_fields(loop); + inv = lfields->inv; - events = (struct epoll_event*) loop->watchers[loop->nwatchers]; - nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; - if (events != NULL) - /* Invalidate events with same file descriptor */ - for (i = 0; i < nfds; i++) - if (events[i].data.fd == fd) - events[i].data.fd = -1; + /* Invalidate events with same file descriptor */ + if (inv != NULL) + for (i = 0; i < inv->nfds; i++) + if (inv->events[i].data.fd == fd) + inv->events[i].data.fd = -1; /* Remove the file descriptor from the epoll. * This avoids a problem where the same file description remains open * in another process, causing repeated junk epoll events. * * We pass in a dummy epoll_event, to work around a bug in old kernels. + * + * Work around a bug in kernels 3.10 to 3.19 where passing a struct that + * has the EPOLLWAKEUP flag set generates spurious audit syslog warnings. */ - if (loop->backend_fd >= 0) { - /* Work around a bug in kernels 3.10 to 3.19 where passing a struct that - * has the EPOLLWAKEUP flag set generates spurious audit syslog warnings. - */ - memset(&dummy, 0, sizeof(dummy)); + memset(&dummy, 0, sizeof(dummy)); + + if (inv == NULL) { epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, &dummy); + } else { + uv__epoll_ctl_prep(loop->backend_fd, + &lfields->ctl, + inv->prep, + EPOLL_CTL_DEL, + fd, + &dummy); } } @@ -620,8 +674,6 @@ static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou, return NULL; /* No room in ring buffer. TODO(bnoordhuis) maybe flush it? */ slot = tail & mask; - iou->sqarray[slot] = slot; /* Identity mapping of index -> sqe. */ - sqe = iou->sqe; sqe = &sqe[slot]; memset(sqe, 0, sizeof(*sqe)); @@ -652,7 +704,7 @@ static void uv__iou_submit(struct uv__iou* iou) { if (flags & UV__IORING_SQ_NEED_WAKEUP) if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP)) - perror("libuv: io_uring_enter"); /* Can't happen. */ + perror("libuv: io_uring_enter(wakeup)"); /* Can't happen. */ } @@ -885,11 +937,135 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) { } +static void uv__epoll_ctl_prep(int epollfd, + struct uv__iou* ctl, + struct epoll_event (*events)[256], + int op, + int fd, + struct epoll_event* e) { + struct uv__io_uring_sqe* sqe; + struct epoll_event* pe; + uint32_t mask; + uint32_t slot; + + if (ctl->ringfd == -1) { + if (!epoll_ctl(epollfd, op, fd, e)) + return; + + if (op == EPOLL_CTL_DEL) + return; /* Ignore errors, may be racing with another thread. */ + + if (op != EPOLL_CTL_ADD) + abort(); + + if (errno != EEXIST) + abort(); + + /* File descriptor that's been watched before, update event mask. */ + if (!epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, e)) + return; + + abort(); + } else { + mask = ctl->sqmask; + slot = (*ctl->sqtail)++ & mask; + + pe = &(*events)[slot]; + *pe = *e; + + sqe = ctl->sqe; + sqe = &sqe[slot]; + + memset(sqe, 0, sizeof(*sqe)); + sqe->addr = (uintptr_t) pe; + sqe->fd = epollfd; + sqe->len = op; + sqe->off = fd; + sqe->opcode = UV__IORING_OP_EPOLL_CTL; + sqe->user_data = op | slot << 2 | (int64_t) fd << 32; + + if ((*ctl->sqhead & mask) == (*ctl->sqtail & mask)) + uv__epoll_ctl_flush(epollfd, ctl, events); + } +} + + +static void uv__epoll_ctl_flush(int epollfd, + struct uv__iou* ctl, + struct epoll_event (*events)[256]) { + struct epoll_event oldevents[256]; + struct uv__io_uring_cqe* cqe; + uint32_t oldslot; + uint32_t slot; + uint32_t n; + int fd; + int op; + int rc; + + STATIC_ASSERT(sizeof(oldevents) == sizeof(*events)); + assert(ctl->ringfd != -1); + assert(*ctl->sqhead != *ctl->sqtail); + + n = *ctl->sqtail - *ctl->sqhead; + do + rc = uv__io_uring_enter(ctl->ringfd, n, n, UV__IORING_ENTER_GETEVENTS); + while (rc == -1 && errno == EINTR); + + if (rc < 0) + perror("libuv: io_uring_enter(getevents)"); /* Can't happen. */ + + if (rc != (int) n) + abort(); + + assert(*ctl->sqhead == *ctl->sqtail); + + memcpy(oldevents, *events, sizeof(*events)); + + /* Failed submissions are either EPOLL_CTL_DEL commands for file descriptors + * that have been closed, or EPOLL_CTL_ADD commands for file descriptors + * that we are already watching. Ignore the former and retry the latter + * with EPOLL_CTL_MOD. + */ + while (*ctl->cqhead != *ctl->cqtail) { + slot = (*ctl->cqhead)++ & ctl->cqmask; + + cqe = ctl->cqe; + cqe = &cqe[slot]; + + if (cqe->res == 0) + continue; + + fd = cqe->user_data >> 32; + op = 3 & cqe->user_data; + oldslot = 255 & (cqe->user_data >> 2); + + if (op == EPOLL_CTL_DEL) + continue; + + if (op != EPOLL_CTL_ADD) + abort(); + + if (cqe->res != -EEXIST) + abort(); + + uv__epoll_ctl_prep(epollfd, + ctl, + events, + EPOLL_CTL_MOD, + fd, + &oldevents[oldslot]); + } +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { uv__loop_internal_fields_t* lfields; struct epoll_event events[1024]; + struct epoll_event prep[256]; + struct uv__invalidate inv; struct epoll_event* pe; struct epoll_event e; + struct uv__iou* ctl; struct uv__iou* iou; int real_timeout; QUEUE* q; @@ -900,6 +1076,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int have_iou_events; int have_signals; int nevents; + int epollfd; int count; int nfds; int fd; @@ -909,45 +1086,9 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; lfields = uv__get_internal_fields(loop); + ctl = &lfields->ctl; iou = &lfields->iou; - memset(&e, 0, sizeof(e)); - - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); - - w = QUEUE_DATA(q, uv__io_t, watcher_queue); - assert(w->pevents != 0); - assert(w->fd >= 0); - assert(w->fd < (int) loop->nwatchers); - - e.events = w->pevents; - e.data.fd = w->fd; - - if (w->events == 0) - op = EPOLL_CTL_ADD; - else - op = EPOLL_CTL_MOD; - - /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching - * events, skip the syscall and squelch the events after epoll_wait(). - */ - if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) { - if (errno != EEXIST) - abort(); - - assert(op == EPOLL_CTL_ADD); - - /* We've reactivated a file descriptor that's been watched before. */ - if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e)) - abort(); - } - - w->events = w->pevents; - } - sigmask = NULL; if (loop->flags & UV_LOOP_BLOCK_SIGPROF) { sigemptyset(&sigset); @@ -969,10 +1110,42 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { user_timeout = 0; } + epollfd = loop->backend_fd; + + memset(&e, 0, sizeof(e)); + + while (!QUEUE_EMPTY(&loop->watcher_queue)) { + q = QUEUE_HEAD(&loop->watcher_queue); + w = QUEUE_DATA(q, uv__io_t, watcher_queue); + QUEUE_REMOVE(q); + QUEUE_INIT(q); + + op = EPOLL_CTL_MOD; + if (w->events == 0) + op = EPOLL_CTL_ADD; + + w->events = w->pevents; + e.events = w->pevents; + e.data.fd = w->fd; + + uv__epoll_ctl_prep(epollfd, ctl, &prep, op, w->fd, &e); + } + + inv.events = events; + inv.prep = &prep; + inv.nfds = -1; + for (;;) { if (loop->nfds == 0) if (iou->in_flight == 0) - return; + break; + + /* All event mask mutations should be visible to the kernel before + * we enter epoll_pwait(). + */ + if (ctl->ringfd != -1) + while (*ctl->sqhead != *ctl->sqtail) + uv__epoll_ctl_flush(epollfd, ctl, &prep); /* Only need to set the provider_entry_time if timeout != 0. The function * will return early if the loop isn't configured with UV_METRICS_IDLE_TIME. @@ -986,11 +1159,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { */ lfields->current_timeout = timeout; - nfds = epoll_pwait(loop->backend_fd, - events, - ARRAY_SIZE(events), - timeout, - sigmask); + nfds = epoll_pwait(epollfd, events, ARRAY_SIZE(events), timeout, sigmask); /* Update loop->time unconditionally. It's tempting to skip the update when * timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the @@ -1010,7 +1179,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { continue; if (timeout == 0) - return; + break; /* We may have been inside the system call for longer than |timeout| * milliseconds so we need to update the timestamp to avoid drift. @@ -1031,7 +1200,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { continue; if (timeout == 0) - return; + break; /* Interrupted by a signal. Update timeout and poll again. */ goto update_timeout; @@ -1041,18 +1210,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { have_signals = 0; nevents = 0; - { - /* Squelch a -Waddress-of-packed-member warning with gcc >= 9. */ - union { - struct epoll_event* events; - uv__io_t* watchers; - } x; - - x.events = events; - assert(loop->watchers != NULL); - loop->watchers[loop->nwatchers] = x.watchers; - loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; - } + inv.nfds = nfds; + lfields->inv = &inv; for (i = 0; i < nfds; i++) { pe = events + i; @@ -1079,7 +1238,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { * Ignore all errors because we may be racing with another thread * when the file descriptor is closed. */ - epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe); + uv__epoll_ctl_prep(epollfd, ctl, &prep, EPOLL_CTL_DEL, fd, pe); continue; } @@ -1136,14 +1295,13 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN); } - loop->watchers[loop->nwatchers] = NULL; - loop->watchers[loop->nwatchers + 1] = NULL; + lfields->inv = NULL; if (have_iou_events != 0) - return; /* Event loop should cycle now so don't poll again. */ + break; /* Event loop should cycle now so don't poll again. */ if (have_signals != 0) - return; /* Event loop should cycle now so don't poll again. */ + break; /* Event loop should cycle now so don't poll again. */ if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { @@ -1151,11 +1309,11 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { timeout = 0; continue; } - return; + break; } if (timeout == 0) - return; + break; if (timeout == -1) continue; @@ -1165,10 +1323,14 @@ update_timeout: real_timeout -= (loop->time - base); if (real_timeout <= 0) - return; + break; timeout = real_timeout; } + + if (ctl->ringfd != -1) + while (*ctl->sqhead != *ctl->sqtail) + uv__epoll_ctl_flush(epollfd, ctl, &prep); } uint64_t uv__hrtime(uv_clocktype_t type) { diff --git a/src/uv-common.h b/src/uv-common.h index b0d9c747..decde536 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -423,7 +423,9 @@ struct uv__loop_internal_fields_s { uv__loop_metrics_t loop_metrics; int current_timeout; #ifdef __linux__ + struct uv__iou ctl; struct uv__iou iou; + void* inv; /* used by uv__platform_invalidate_fd() */ #endif /* __linux__ */ }; |