aboutsummaryrefslogtreecommitdiff
path: root/src/unix/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/unix/udp.c')
-rw-r--r--src/unix/udp.c134
1 files changed, 73 insertions, 61 deletions
diff --git a/src/unix/udp.c b/src/unix/udp.c
index 5dcd5a4d..ae09f3a7 100644
--- a/src/unix/udp.c
+++ b/src/unix/udp.c
@@ -275,8 +275,61 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
&& handle->recv_cb != NULL);
}
-static void uv__udp_sendmsg(uv_udp_t* handle) {
+static void uv__udp_sendmsg_one(uv_udp_t* handle, uv_udp_send_t* req) {
+ struct uv__queue* q;
+ struct msghdr h;
+ ssize_t size;
+
+ for (;;) {
+ memset(&h, 0, sizeof h);
+ if (req->addr.ss_family == AF_UNSPEC) {
+ h.msg_name = NULL;
+ h.msg_namelen = 0;
+ } else {
+ h.msg_name = &req->addr;
+ if (req->addr.ss_family == AF_INET6)
+ h.msg_namelen = sizeof(struct sockaddr_in6);
+ else if (req->addr.ss_family == AF_INET)
+ h.msg_namelen = sizeof(struct sockaddr_in);
+ else if (req->addr.ss_family == AF_UNIX)
+ h.msg_namelen = sizeof(struct sockaddr_un);
+ else {
+ assert(0 && "unsupported address family");
+ abort();
+ }
+ }
+ h.msg_iov = (struct iovec*) req->bufs;
+ h.msg_iovlen = req->nbufs;
+
+ do
+ size = sendmsg(handle->io_watcher.fd, &h, 0);
+ while (size == -1 && errno == EINTR);
+
+ if (size == -1)
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ return;
+
+ req->status = (size == -1 ? UV__ERR(errno) : size);
+
+ /* Sending a datagram is an atomic operation: either all data
+ * is written or nothing is (and EMSGSIZE is raised). That is
+ * why we don't handle partial writes. Just pop the request
+ * off the write queue and onto the completed queue, done.
+ */
+ uv__queue_remove(&req->queue);
+ uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
+ uv__io_feed(handle->loop, &handle->io_watcher);
+
+ if (uv__queue_empty(&handle->write_queue))
+ return;
+
+ q = uv__queue_head(&handle->write_queue);
+ req = uv__queue_data(q, uv_udp_send_t, queue);
+ }
+}
+
#if defined(__linux__) || defined(__FreeBSD__)
+static void uv__udp_sendmsg_many(uv_udp_t* handle) {
uv_udp_send_t* req;
struct mmsghdr h[20];
struct mmsghdr* p;
@@ -285,16 +338,11 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
size_t pkts;
size_t i;
- if (uv__queue_empty(&handle->write_queue))
- return;
-
write_queue_drain:
for (pkts = 0, q = uv__queue_head(&handle->write_queue);
pkts < ARRAY_SIZE(h) && q != &handle->write_queue;
++pkts, q = uv__queue_head(q)) {
- assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
- assert(req != NULL);
p = &h[pkts];
memset(p, 0, sizeof(*p));
@@ -328,10 +376,7 @@ write_queue_drain:
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < pkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
- assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
- assert(req != NULL);
-
req->status = UV__ERR(errno);
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
@@ -346,10 +391,7 @@ write_queue_drain:
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < (size_t)npkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
- assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
- assert(req != NULL);
-
req->status = req->bufs[0].len;
/* Sending a datagram is an atomic operation: either all data
@@ -364,61 +406,31 @@ write_queue_drain:
/* couldn't batch everything, continue sending (jump to avoid stack growth) */
if (!uv__queue_empty(&handle->write_queue))
goto write_queue_drain;
- uv__io_feed(handle->loop, &handle->io_watcher);
-#else /* __linux__ || ____FreeBSD__ */
- uv_udp_send_t* req;
- struct msghdr h;
- struct uv__queue* q;
- ssize_t size;
-
- while (!uv__queue_empty(&handle->write_queue)) {
- q = uv__queue_head(&handle->write_queue);
- assert(q != NULL);
- req = uv__queue_data(q, uv_udp_send_t, queue);
- assert(req != NULL);
+ uv__io_feed(handle->loop, &handle->io_watcher);
+}
+#endif /* __linux__ || ____FreeBSD__ */
- memset(&h, 0, sizeof h);
- if (req->addr.ss_family == AF_UNSPEC) {
- h.msg_name = NULL;
- h.msg_namelen = 0;
- } else {
- h.msg_name = &req->addr;
- if (req->addr.ss_family == AF_INET6)
- h.msg_namelen = sizeof(struct sockaddr_in6);
- else if (req->addr.ss_family == AF_INET)
- h.msg_namelen = sizeof(struct sockaddr_in);
- else if (req->addr.ss_family == AF_UNIX)
- h.msg_namelen = sizeof(struct sockaddr_un);
- else {
- assert(0 && "unsupported address family");
- abort();
- }
- }
- h.msg_iov = (struct iovec*) req->bufs;
- h.msg_iovlen = req->nbufs;
+static void uv__udp_sendmsg(uv_udp_t* handle) {
+ struct uv__queue* q;
+ uv_udp_send_t* req;
- do {
- size = sendmsg(handle->io_watcher.fd, &h, 0);
- } while (size == -1 && errno == EINTR);
+ if (uv__queue_empty(&handle->write_queue))
+ return;
- if (size == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
- break;
- }
+ q = uv__queue_head(&handle->write_queue);
+ req = uv__queue_data(q, uv_udp_send_t, queue);
- req->status = (size == -1 ? UV__ERR(errno) : size);
+#if defined(__linux__) || defined(__FreeBSD__)
+ /* Use sendmmsg() if this send request contains more than one datagram OR
+ * there is more than one send request (because that automatically implies
+ * there is more than one datagram.)
+ */
+ if (req->nbufs != 1 || &handle->write_queue != uv__queue_next(&req->queue))
+ return uv__udp_sendmsg_many(handle);
+#endif
- /* Sending a datagram is an atomic operation: either all data
- * is written or nothing is (and EMSGSIZE is raised). That is
- * why we don't handle partial writes. Just pop the request
- * off the write queue and onto the completed queue, done.
- */
- uv__queue_remove(&req->queue);
- uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
- uv__io_feed(handle->loop, &handle->io_watcher);
- }
-#endif /* __linux__ || ____FreeBSD__ */
+ return uv__udp_sendmsg_one(handle, req);
}
/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional