aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/unix/linux.c410
-rw-r--r--src/uv-common.h2
2 files changed, 288 insertions, 124 deletions
diff --git a/src/unix/linux.c b/src/unix/linux.c
index 3f14e756..c5911552 100644
--- a/src/unix/linux.c
+++ b/src/unix/linux.c
@@ -149,9 +149,11 @@ enum {
UV__IORING_OP_OPENAT = 18,
UV__IORING_OP_CLOSE = 19,
UV__IORING_OP_STATX = 21,
+ UV__IORING_OP_EPOLL_CTL = 29,
};
enum {
+ UV__IORING_ENTER_GETEVENTS = 1u,
UV__IORING_ENTER_SQ_WAKEUP = 2u,
};
@@ -248,6 +250,10 @@ STATIC_ASSERT(40 + 40 + 40 == sizeof(struct uv__io_uring_params));
STATIC_ASSERT(40 == offsetof(struct uv__io_uring_params, sq_off));
STATIC_ASSERT(80 == offsetof(struct uv__io_uring_params, cq_off));
+STATIC_ASSERT(EPOLL_CTL_ADD < 4);
+STATIC_ASSERT(EPOLL_CTL_DEL < 4);
+STATIC_ASSERT(EPOLL_CTL_MOD < 4);
+
struct watcher_list {
RB_ENTRY(watcher_list) entry;
QUEUE watchers;
@@ -269,6 +275,17 @@ static int compare_watchers(const struct watcher_list* a,
static void maybe_free_watcher_list(struct watcher_list* w,
uv_loop_t* loop);
+static void uv__epoll_ctl_flush(int epollfd,
+ struct uv__iou* ctl,
+ struct epoll_event (*events)[256]);
+
+static void uv__epoll_ctl_prep(int epollfd,
+ struct uv__iou* ctl,
+ struct epoll_event (*events)[256],
+ int op,
+ int fd,
+ struct epoll_event* e);
+
RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers)
@@ -384,45 +401,40 @@ static int uv__use_io_uring(void) {
}
-int uv__platform_loop_init(uv_loop_t* loop) {
+static void uv__iou_init(int epollfd,
+ struct uv__iou* iou,
+ uint32_t entries,
+ uint32_t flags) {
struct uv__io_uring_params params;
struct epoll_event e;
- struct uv__iou* iou;
size_t cqlen;
size_t sqlen;
size_t maxlen;
size_t sqelen;
+ uint32_t i;
char* sq;
char* sqe;
int ringfd;
- iou = &uv__get_internal_fields(loop)->iou;
- iou->ringfd = -1;
-
- loop->inotify_watchers = NULL;
- loop->inotify_fd = -1;
- loop->backend_fd = epoll_create1(O_CLOEXEC);
-
- if (loop->backend_fd == -1)
- return UV__ERR(errno);
-
- if (!uv__use_io_uring())
- return 0;
-
sq = MAP_FAILED;
sqe = MAP_FAILED;
+ if (!uv__use_io_uring())
+ return;
+
/* SQPOLL required CAP_SYS_NICE until linux v5.12 relaxed that requirement.
* Mostly academic because we check for a v5.13 kernel afterwards anyway.
*/
memset(&params, 0, sizeof(params));
- params.flags = UV__IORING_SETUP_SQPOLL;
- params.sq_thread_idle = 10; /* milliseconds */
+ params.flags = flags;
+
+ if (flags & UV__IORING_SETUP_SQPOLL)
+ params.sq_thread_idle = 10; /* milliseconds */
/* Kernel returns a file descriptor with O_CLOEXEC flag set. */
- ringfd = uv__io_uring_setup(64, &params);
+ ringfd = uv__io_uring_setup(entries, &params);
if (ringfd == -1)
- return 0; /* Not an error, falls back to thread pool. */
+ return;
/* IORING_FEAT_RSRC_TAGS is used to detect linux v5.13 but what we're
* actually detecting is whether IORING_OP_STATX works with SQPOLL.
@@ -461,6 +473,18 @@ int uv__platform_loop_init(uv_loop_t* loop) {
if (sq == MAP_FAILED || sqe == MAP_FAILED)
goto fail;
+ if (flags & UV__IORING_SETUP_SQPOLL) {
+ /* Only interested in completion events. To get notified when
+ * the kernel pulls items from the submission ring, add POLLOUT.
+ */
+ memset(&e, 0, sizeof(e));
+ e.events = POLLIN;
+ e.data.fd = ringfd;
+
+ if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e))
+ goto fail;
+ }
+
iou->sqhead = (uint32_t*) (sq + params.sq_off.head);
iou->sqtail = (uint32_t*) (sq + params.sq_off.tail);
iou->sqmask = *(uint32_t*) (sq + params.sq_off.ring_mask);
@@ -479,17 +503,10 @@ int uv__platform_loop_init(uv_loop_t* loop) {
iou->ringfd = ringfd;
iou->in_flight = 0;
- /* Only interested in completion events. To get notified when
- * the kernel pulls items from the submission ring, add POLLOUT.
- */
- memset(&e, 0, sizeof(e));
- e.events = POLLIN;
- e.data.fd = ringfd;
-
- if (epoll_ctl(loop->backend_fd, EPOLL_CTL_ADD, ringfd, &e))
- goto fail;
+ for (i = 0; i <= iou->sqmask; i++)
+ iou->sqarray[i] = i; /* Slot -> sqe identity mapping. */
- return 0;
+ return;
fail:
if (sq != MAP_FAILED)
@@ -499,8 +516,37 @@ fail:
munmap(sqe, sqelen);
uv__close(ringfd);
+}
+
+
+static void uv__iou_delete(struct uv__iou* iou) {
+ if (iou->ringfd != -1) {
+ munmap(iou->sq, iou->maxlen);
+ munmap(iou->sqe, iou->sqelen);
+ uv__close(iou->ringfd);
+ iou->ringfd = -1;
+ }
+}
+
+
+int uv__platform_loop_init(uv_loop_t* loop) {
+ uv__loop_internal_fields_t* lfields;
+
+ lfields = uv__get_internal_fields(loop);
+ lfields->ctl.ringfd = -1;
+ lfields->iou.ringfd = -1;
- return 0; /* Not an error, falls back to thread pool. */
+ loop->inotify_watchers = NULL;
+ loop->inotify_fd = -1;
+ loop->backend_fd = epoll_create1(O_CLOEXEC);
+
+ if (loop->backend_fd == -1)
+ return UV__ERR(errno);
+
+ uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL);
+ uv__iou_init(loop->backend_fd, &lfields->ctl, 256, 0);
+
+ return 0;
}
@@ -525,54 +571,62 @@ int uv__io_fork(uv_loop_t* loop) {
void uv__platform_loop_delete(uv_loop_t* loop) {
- struct uv__iou* iou;
+ uv__loop_internal_fields_t* lfields;
- iou = &uv__get_internal_fields(loop)->iou;
+ lfields = uv__get_internal_fields(loop);
+ uv__iou_delete(&lfields->ctl);
+ uv__iou_delete(&lfields->iou);
if (loop->inotify_fd != -1) {
uv__io_stop(loop, &loop->inotify_read_watcher, POLLIN);
uv__close(loop->inotify_fd);
loop->inotify_fd = -1;
}
-
- if (iou->ringfd != -1) {
- munmap(iou->sq, iou->maxlen);
- munmap(iou->sqe, iou->sqelen);
- uv__close(iou->ringfd);
- iou->ringfd = -1;
- }
}
-void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
+struct uv__invalidate {
+ struct epoll_event (*prep)[256];
struct epoll_event* events;
+ int nfds;
+};
+
+
+void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
+ uv__loop_internal_fields_t* lfields;
+ struct uv__invalidate* inv;
struct epoll_event dummy;
- uintptr_t i;
- uintptr_t nfds;
+ int i;
- assert(loop->watchers != NULL);
- assert(fd >= 0);
+ lfields = uv__get_internal_fields(loop);
+ inv = lfields->inv;
- events = (struct epoll_event*) loop->watchers[loop->nwatchers];
- nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
- if (events != NULL)
- /* Invalidate events with same file descriptor */
- for (i = 0; i < nfds; i++)
- if (events[i].data.fd == fd)
- events[i].data.fd = -1;
+ /* Invalidate events with same file descriptor */
+ if (inv != NULL)
+ for (i = 0; i < inv->nfds; i++)
+ if (inv->events[i].data.fd == fd)
+ inv->events[i].data.fd = -1;
/* Remove the file descriptor from the epoll.
* This avoids a problem where the same file description remains open
* in another process, causing repeated junk epoll events.
*
* We pass in a dummy epoll_event, to work around a bug in old kernels.
+ *
+ * Work around a bug in kernels 3.10 to 3.19 where passing a struct that
+ * has the EPOLLWAKEUP flag set generates spurious audit syslog warnings.
*/
- if (loop->backend_fd >= 0) {
- /* Work around a bug in kernels 3.10 to 3.19 where passing a struct that
- * has the EPOLLWAKEUP flag set generates spurious audit syslog warnings.
- */
- memset(&dummy, 0, sizeof(dummy));
+ memset(&dummy, 0, sizeof(dummy));
+
+ if (inv == NULL) {
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, &dummy);
+ } else {
+ uv__epoll_ctl_prep(loop->backend_fd,
+ &lfields->ctl,
+ inv->prep,
+ EPOLL_CTL_DEL,
+ fd,
+ &dummy);
}
}
@@ -620,8 +674,6 @@ static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,
return NULL; /* No room in ring buffer. TODO(bnoordhuis) maybe flush it? */
slot = tail & mask;
- iou->sqarray[slot] = slot; /* Identity mapping of index -> sqe. */
-
sqe = iou->sqe;
sqe = &sqe[slot];
memset(sqe, 0, sizeof(*sqe));
@@ -652,7 +704,7 @@ static void uv__iou_submit(struct uv__iou* iou) {
if (flags & UV__IORING_SQ_NEED_WAKEUP)
if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP))
- perror("libuv: io_uring_enter"); /* Can't happen. */
+ perror("libuv: io_uring_enter(wakeup)"); /* Can't happen. */
}
@@ -885,11 +937,135 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
}
+static void uv__epoll_ctl_prep(int epollfd,
+ struct uv__iou* ctl,
+ struct epoll_event (*events)[256],
+ int op,
+ int fd,
+ struct epoll_event* e) {
+ struct uv__io_uring_sqe* sqe;
+ struct epoll_event* pe;
+ uint32_t mask;
+ uint32_t slot;
+
+ if (ctl->ringfd == -1) {
+ if (!epoll_ctl(epollfd, op, fd, e))
+ return;
+
+ if (op == EPOLL_CTL_DEL)
+ return; /* Ignore errors, may be racing with another thread. */
+
+ if (op != EPOLL_CTL_ADD)
+ abort();
+
+ if (errno != EEXIST)
+ abort();
+
+ /* File descriptor that's been watched before, update event mask. */
+ if (!epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, e))
+ return;
+
+ abort();
+ } else {
+ mask = ctl->sqmask;
+ slot = (*ctl->sqtail)++ & mask;
+
+ pe = &(*events)[slot];
+ *pe = *e;
+
+ sqe = ctl->sqe;
+ sqe = &sqe[slot];
+
+ memset(sqe, 0, sizeof(*sqe));
+ sqe->addr = (uintptr_t) pe;
+ sqe->fd = epollfd;
+ sqe->len = op;
+ sqe->off = fd;
+ sqe->opcode = UV__IORING_OP_EPOLL_CTL;
+ sqe->user_data = op | slot << 2 | (int64_t) fd << 32;
+
+ if ((*ctl->sqhead & mask) == (*ctl->sqtail & mask))
+ uv__epoll_ctl_flush(epollfd, ctl, events);
+ }
+}
+
+
+static void uv__epoll_ctl_flush(int epollfd,
+ struct uv__iou* ctl,
+ struct epoll_event (*events)[256]) {
+ struct epoll_event oldevents[256];
+ struct uv__io_uring_cqe* cqe;
+ uint32_t oldslot;
+ uint32_t slot;
+ uint32_t n;
+ int fd;
+ int op;
+ int rc;
+
+ STATIC_ASSERT(sizeof(oldevents) == sizeof(*events));
+ assert(ctl->ringfd != -1);
+ assert(*ctl->sqhead != *ctl->sqtail);
+
+ n = *ctl->sqtail - *ctl->sqhead;
+ do
+ rc = uv__io_uring_enter(ctl->ringfd, n, n, UV__IORING_ENTER_GETEVENTS);
+ while (rc == -1 && errno == EINTR);
+
+ if (rc < 0)
+ perror("libuv: io_uring_enter(getevents)"); /* Can't happen. */
+
+ if (rc != (int) n)
+ abort();
+
+ assert(*ctl->sqhead == *ctl->sqtail);
+
+ memcpy(oldevents, *events, sizeof(*events));
+
+ /* Failed submissions are either EPOLL_CTL_DEL commands for file descriptors
+ * that have been closed, or EPOLL_CTL_ADD commands for file descriptors
+ * that we are already watching. Ignore the former and retry the latter
+ * with EPOLL_CTL_MOD.
+ */
+ while (*ctl->cqhead != *ctl->cqtail) {
+ slot = (*ctl->cqhead)++ & ctl->cqmask;
+
+ cqe = ctl->cqe;
+ cqe = &cqe[slot];
+
+ if (cqe->res == 0)
+ continue;
+
+ fd = cqe->user_data >> 32;
+ op = 3 & cqe->user_data;
+ oldslot = 255 & (cqe->user_data >> 2);
+
+ if (op == EPOLL_CTL_DEL)
+ continue;
+
+ if (op != EPOLL_CTL_ADD)
+ abort();
+
+ if (cqe->res != -EEXIST)
+ abort();
+
+ uv__epoll_ctl_prep(epollfd,
+ ctl,
+ events,
+ EPOLL_CTL_MOD,
+ fd,
+ &oldevents[oldslot]);
+ }
+}
+
+
void uv__io_poll(uv_loop_t* loop, int timeout) {
uv__loop_internal_fields_t* lfields;
struct epoll_event events[1024];
+ struct epoll_event prep[256];
+ struct uv__invalidate inv;
struct epoll_event* pe;
struct epoll_event e;
+ struct uv__iou* ctl;
struct uv__iou* iou;
int real_timeout;
QUEUE* q;
@@ -900,6 +1076,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
int have_iou_events;
int have_signals;
int nevents;
+ int epollfd;
int count;
int nfds;
int fd;
@@ -909,45 +1086,9 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
int reset_timeout;
lfields = uv__get_internal_fields(loop);
+ ctl = &lfields->ctl;
iou = &lfields->iou;
- memset(&e, 0, sizeof(e));
-
- while (!QUEUE_EMPTY(&loop->watcher_queue)) {
- q = QUEUE_HEAD(&loop->watcher_queue);
- QUEUE_REMOVE(q);
- QUEUE_INIT(q);
-
- w = QUEUE_DATA(q, uv__io_t, watcher_queue);
- assert(w->pevents != 0);
- assert(w->fd >= 0);
- assert(w->fd < (int) loop->nwatchers);
-
- e.events = w->pevents;
- e.data.fd = w->fd;
-
- if (w->events == 0)
- op = EPOLL_CTL_ADD;
- else
- op = EPOLL_CTL_MOD;
-
- /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
- * events, skip the syscall and squelch the events after epoll_wait().
- */
- if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
- if (errno != EEXIST)
- abort();
-
- assert(op == EPOLL_CTL_ADD);
-
- /* We've reactivated a file descriptor that's been watched before. */
- if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
- abort();
- }
-
- w->events = w->pevents;
- }
-
sigmask = NULL;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
@@ -969,10 +1110,42 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
user_timeout = 0;
}
+ epollfd = loop->backend_fd;
+
+ memset(&e, 0, sizeof(e));
+
+ while (!QUEUE_EMPTY(&loop->watcher_queue)) {
+ q = QUEUE_HEAD(&loop->watcher_queue);
+ w = QUEUE_DATA(q, uv__io_t, watcher_queue);
+ QUEUE_REMOVE(q);
+ QUEUE_INIT(q);
+
+ op = EPOLL_CTL_MOD;
+ if (w->events == 0)
+ op = EPOLL_CTL_ADD;
+
+ w->events = w->pevents;
+ e.events = w->pevents;
+ e.data.fd = w->fd;
+
+ uv__epoll_ctl_prep(epollfd, ctl, &prep, op, w->fd, &e);
+ }
+
+ inv.events = events;
+ inv.prep = &prep;
+ inv.nfds = -1;
+
for (;;) {
if (loop->nfds == 0)
if (iou->in_flight == 0)
- return;
+ break;
+
+ /* All event mask mutations should be visible to the kernel before
+ * we enter epoll_pwait().
+ */
+ if (ctl->ringfd != -1)
+ while (*ctl->sqhead != *ctl->sqtail)
+ uv__epoll_ctl_flush(epollfd, ctl, &prep);
/* Only need to set the provider_entry_time if timeout != 0. The function
* will return early if the loop isn't configured with UV_METRICS_IDLE_TIME.
@@ -986,11 +1159,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
*/
lfields->current_timeout = timeout;
- nfds = epoll_pwait(loop->backend_fd,
- events,
- ARRAY_SIZE(events),
- timeout,
- sigmask);
+ nfds = epoll_pwait(epollfd, events, ARRAY_SIZE(events), timeout, sigmask);
/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
@@ -1010,7 +1179,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
continue;
if (timeout == 0)
- return;
+ break;
/* We may have been inside the system call for longer than |timeout|
* milliseconds so we need to update the timestamp to avoid drift.
@@ -1031,7 +1200,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
continue;
if (timeout == 0)
- return;
+ break;
/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
@@ -1041,18 +1210,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
have_signals = 0;
nevents = 0;
- {
- /* Squelch a -Waddress-of-packed-member warning with gcc >= 9. */
- union {
- struct epoll_event* events;
- uv__io_t* watchers;
- } x;
-
- x.events = events;
- assert(loop->watchers != NULL);
- loop->watchers[loop->nwatchers] = x.watchers;
- loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
- }
+ inv.nfds = nfds;
+ lfields->inv = &inv;
for (i = 0; i < nfds; i++) {
pe = events + i;
@@ -1079,7 +1238,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
- epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe);
+ uv__epoll_ctl_prep(epollfd, ctl, &prep, EPOLL_CTL_DEL, fd, pe);
continue;
}
@@ -1136,14 +1295,13 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);
}
- loop->watchers[loop->nwatchers] = NULL;
- loop->watchers[loop->nwatchers + 1] = NULL;
+ lfields->inv = NULL;
if (have_iou_events != 0)
- return; /* Event loop should cycle now so don't poll again. */
+ break; /* Event loop should cycle now so don't poll again. */
if (have_signals != 0)
- return; /* Event loop should cycle now so don't poll again. */
+ break; /* Event loop should cycle now so don't poll again. */
if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
@@ -1151,11 +1309,11 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
timeout = 0;
continue;
}
- return;
+ break;
}
if (timeout == 0)
- return;
+ break;
if (timeout == -1)
continue;
@@ -1165,10 +1323,14 @@ update_timeout:
real_timeout -= (loop->time - base);
if (real_timeout <= 0)
- return;
+ break;
timeout = real_timeout;
}
+
+ if (ctl->ringfd != -1)
+ while (*ctl->sqhead != *ctl->sqtail)
+ uv__epoll_ctl_flush(epollfd, ctl, &prep);
}
uint64_t uv__hrtime(uv_clocktype_t type) {
diff --git a/src/uv-common.h b/src/uv-common.h
index b0d9c747..decde536 100644
--- a/src/uv-common.h
+++ b/src/uv-common.h
@@ -423,7 +423,9 @@ struct uv__loop_internal_fields_s {
uv__loop_metrics_t loop_metrics;
int current_timeout;
#ifdef __linux__
+ struct uv__iou ctl;
struct uv__iou iou;
+ void* inv; /* used by uv__platform_invalidate_fd() */
#endif /* __linux__ */
};