diff options
Diffstat (limited to 'src/os/unix')
-rw-r--r-- | src/os/unix/ngx_aio.h | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_aio_read.c | 111 | ||||
-rw-r--r-- | src/os/unix/ngx_aio_read_chain.c | 57 | ||||
-rw-r--r-- | src/os/unix/ngx_aio_write.c | 114 | ||||
-rw-r--r-- | src/os/unix/ngx_aio_write_chain.c | 60 | ||||
-rw-r--r-- | src/os/unix/ngx_readv_chain.c | 9 |
6 files changed, 195 insertions, 157 deletions
diff --git a/src/os/unix/ngx_aio.h b/src/os/unix/ngx_aio.h index a37dafc6a..a357ff524 100644 --- a/src/os/unix/ngx_aio.h +++ b/src/os/unix/ngx_aio.h @@ -6,6 +6,7 @@ ssize_t ngx_aio_read(ngx_connection_t *c, char *buf, size_t size); +ssize_t ngx_aio_read_chain(ngx_connection_t *c, ngx_chain_t *cl); ssize_t ngx_aio_write(ngx_connection_t *c, char *buf, size_t size); ngx_chain_t *ngx_aio_write_chain(ngx_connection_t *c, ngx_chain_t *in); diff --git a/src/os/unix/ngx_aio_read.c b/src/os/unix/ngx_aio_read.c index bc5055860..cb498eb5d 100644 --- a/src/os/unix/ngx_aio_read.c +++ b/src/os/unix/ngx_aio_read.c @@ -10,102 +10,89 @@ /* - The data is ready - 3 syscalls: - aio_read(), aio_error(), aio_return() - The data is not ready - 4 (kqueue) or 5 syscalls: - aio_read(), aio_error(), notifiction, - aio_error(), aio_return() - aio_cancel(), aio_error() -*/ - + * the ready data requires 3 syscalls: + * aio_write(), aio_error(), aio_return() + * the non-ready data requires 4 (kqueue) or 5 syscalls: + * aio_write(), aio_error(), notifiction, aio_error(), aio_return() + * timeout, aio_cancel(), aio_error() + */ ssize_t ngx_aio_read(ngx_connection_t *c, char *buf, size_t size) { - int rc, first, canceled; - ngx_event_t *ev; - - ev = c->read; - - canceled = 0; - - if (ev->timedout) { - ngx_set_socket_errno(NGX_ETIMEDOUT); - ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_read() timed out"); - - rc = aio_cancel(c->fd, &ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, - "aio_cancel() failed"); - return NGX_ERROR; - } - - ngx_log_debug(ev->log, "aio_cancel: %d" _ rc); + int n; + ngx_event_t *rev; - canceled = 1; + rev = c->read; - ev->ready = 1; + if (rev->active) { + ngx_log_error(NGX_LOG_ALERT, rev->log, 0, "SECOND AIO POST"); + return NGX_AGAIN; } - first = 0; + if (!rev->aio_complete) { + ngx_memzero(&rev->aiocb, sizeof(struct aiocb)); - if (!ev->ready) { - ngx_memzero(&ev->aiocb, sizeof(struct aiocb)); - - ev->aiocb.aio_fildes = c->fd; - ev->aiocb.aio_buf = buf; - ev->aiocb.aio_nbytes = size; + rev->aiocb.aio_fildes = c->fd; + rev->aiocb.aio_buf = buf; + rev->aiocb.aio_nbytes = size; #if (HAVE_KQUEUE) - ev->aiocb.aio_sigevent.sigev_notify_kqueue = ngx_kqueue; - ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; - ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev; + rev->aiocb.aio_sigevent.sigev_notify_kqueue = ngx_kqueue; + rev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + rev->aiocb.aio_sigevent.sigev_value.sigval_ptr = rev; #endif - if (aio_read(&ev->aiocb) == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + if (aio_read(&rev->aiocb) == -1) { + ngx_log_error(NGX_LOG_CRIT, rev->log, ngx_errno, "aio_read() failed"); + rev->error = 1; return NGX_ERROR; } - ngx_log_debug(ev->log, "aio_read: OK"); + ngx_log_debug(rev->log, "aio_read: OK"); - ev->active = 1; - first = 1; + rev->active = 1; } - ev->ready = 0; + rev->aio_complete = 0; - rc = aio_error(&ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed"); + n = aio_error(&rev->aiocb); + if (n == -1) { + ngx_log_error(NGX_LOG_ALERT, rev->log, ngx_errno, "aio_error() failed"); + rev->error = 1; return NGX_ERROR; } - if (rc != 0) { - if (rc == NGX_EINPROGRESS) { - if (!first) { - ngx_log_error(NGX_LOG_CRIT, ev->log, rc, + if (n != 0) { + if (n == NGX_EINPROGRESS) { + if (!rev->active) { + ngx_log_error(NGX_LOG_ALERT, rev->log, n, "aio_read() still in progress"); } return NGX_AGAIN; } - if (rc == NGX_ECANCELED && canceled) { - return NGX_ERROR; - } - - ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_read() failed"); + ngx_log_error(NGX_LOG_CRIT, rev->log, n, "aio_read() failed"); + rev->error = 1; return NGX_ERROR; } - rc = aio_return(&ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed"); + n = aio_return(&rev->aiocb); + if (n == -1) { + ngx_log_error(NGX_LOG_ALERT, rev->log, ngx_errno, + "aio_return() failed"); + rev->error = 1; return NGX_ERROR; } - ngx_log_debug(ev->log, "aio_read: %d" _ rc); + rev->active = 0; + + ngx_log_debug(rev->log, "aio_read: %d" _ n); + + if (n == 0) { + rev->eof = 1; + } - return rc; + return n; } diff --git a/src/os/unix/ngx_aio_read_chain.c b/src/os/unix/ngx_aio_read_chain.c new file mode 100644 index 000000000..31f7c8573 --- /dev/null +++ b/src/os/unix/ngx_aio_read_chain.c @@ -0,0 +1,57 @@ + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_event.h> +#include <ngx_aio.h> + + +ssize_t ngx_aio_read_chain(ngx_connection_t *c, ngx_chain_t *cl) +{ + int n; + char *buf, *prev; + size_t size, total; + ngx_err_t err; + + total = 0; + + while (cl) { + + /* we can post the single aio operation only */ + + if (c->read->active) { + return total ? total : NGX_AGAIN; + } + + buf = cl->hunk->pos; + prev = buf; + size = 0; + + /* coalesce the neighbouring hunks */ + + while (cl && prev == cl->hunk->pos) { + size += cl->hunk->last - cl->hunk->pos; + prev = cl->hunk->last; + cl = cl->next; + } + + n = ngx_aio_read(c, buf, size); + + ngx_log_debug(c->log, "aio_read: %d" _ n); + + if (n == NGX_AGAIN) { + return total ? total : NGX_AGAIN; + } + + if (n == NGX_ERROR) { + return NGX_ERROR; + } + + if (n > 0) { + total += n; + } + + ngx_log_debug(c->log, "aio_read total: %d" _ total); + } + + return total ? total : NGX_AGAIN; +} diff --git a/src/os/unix/ngx_aio_write.c b/src/os/unix/ngx_aio_write.c index 5eded37b7..dcdecc5b6 100644 --- a/src/os/unix/ngx_aio_write.c +++ b/src/os/unix/ngx_aio_write.c @@ -10,107 +10,89 @@ /* - The data is ready - 3 syscalls: - aio_write(), aio_error(), aio_return() - The data is not ready - 4 (kqueue) or 5 syscalls: - aio_write(), aio_error(), notifiction, - aio_error(), aio_return() - aio_cancel(), aio_error() -*/ + * the ready data requires 3 syscalls: + * aio_write(), aio_error(), aio_return() + * the non-ready data requires 4 (kqueue) or 5 syscalls: + * aio_write(), aio_error(), notifiction, aio_error(), aio_return() + * timeout, aio_cancel(), aio_error() + */ ssize_t ngx_aio_write(ngx_connection_t *c, char *buf, size_t size) { - int rc, first, canceled; - ngx_event_t *ev; + int n; + ngx_event_t *wev; - ev = c->write; + wev = c->write; - canceled = 0; - -ngx_log_debug(ev->log, "aio: ev->ready: %d" _ ev->ready); -ngx_log_debug(ev->log, "aio: aiocb: %08x" _ &ev->aiocb); - -#if 0 - if (ev->timedout) { - ngx_set_socket_errno(NGX_ETIMEDOUT); - ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_write() timed out"); - - rc = aio_cancel(c->fd, &ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, - "aio_cancel() failed"); - return NGX_ERROR; - } - - ngx_log_debug(ev->log, "aio_cancel: %d" _ rc); - - canceled = 1; - - ev->ready = 1; + if (wev->active) { + return NGX_AGAIN; } -#endif - first = 0; +ngx_log_debug(wev->log, "aio: wev->aio_complete: %d" _ wev->aio_complete); - if (!ev->ready) { - ngx_memzero(&ev->aiocb, sizeof(struct aiocb)); + if (!wev->aio_complete) { + ngx_memzero(&wev->aiocb, sizeof(struct aiocb)); - ev->aiocb.aio_fildes = c->fd; - ev->aiocb.aio_buf = buf; - ev->aiocb.aio_nbytes = size; + wev->aiocb.aio_fildes = c->fd; + wev->aiocb.aio_buf = buf; + wev->aiocb.aio_nbytes = size; #if (HAVE_KQUEUE) - ev->aiocb.aio_sigevent.sigev_notify_kqueue = ngx_kqueue; - ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; - ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev; + wev->aiocb.aio_sigevent.sigev_notify_kqueue = ngx_kqueue; + wev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + wev->aiocb.aio_sigevent.sigev_value.sigval_ptr = wev; #endif - if (aio_write(&ev->aiocb) == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + if (aio_write(&wev->aiocb) == -1) { + ngx_log_error(NGX_LOG_CRIT, wev->log, ngx_errno, "aio_write() failed"); return NGX_ERROR; } - ngx_log_debug(ev->log, "aio_write: OK"); + ngx_log_debug(wev->log, "aio_write: OK"); - ev->active = 1; - first = 1; + wev->active = 1; } - ev->ready = 0; + wev->aio_complete = 0; - rc = aio_error(&ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed"); + n = aio_error(&wev->aiocb); + if (n == -1) { + ngx_log_error(NGX_LOG_CRIT, wev->log, ngx_errno, "aio_error() failed"); + wev->error = 1; return NGX_ERROR; } - if (rc != 0) { - if (rc == NGX_EINPROGRESS) { - if (!first) { - ngx_log_error(NGX_LOG_CRIT, ev->log, rc, + if (n != 0) { + if (n == NGX_EINPROGRESS) { + if (!wev->active) { + ngx_log_error(NGX_LOG_ALERT, wev->log, n, "aio_write() still in progress"); } return NGX_AGAIN; } - if (rc == NGX_ECANCELED && canceled) { - return NGX_ERROR; - } - - ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_write() failed"); + ngx_log_error(NGX_LOG_CRIT, wev->log, n, "aio_write() failed"); + wev->error = 1; return NGX_ERROR; } - rc = aio_return(&ev->aiocb); - if (rc == -1) { - ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed"); + n = aio_return(&wev->aiocb); + if (n == -1) { + ngx_log_error(NGX_LOG_ALERT, wev->log, ngx_errno, + "aio_return() failed"); + wev->error = 1; return NGX_ERROR; } - ev->active = 0; - ngx_log_debug(ev->log, "aio_write: %d" _ rc); + wev->active = 0; + + ngx_log_debug(wev->log, "aio_write: %d" _ n); + + if (n == 0) { + wev->eof = 1; + } - return rc; + return n; } diff --git a/src/os/unix/ngx_aio_write_chain.c b/src/os/unix/ngx_aio_write_chain.c index b8760b0fa..ba24d808e 100644 --- a/src/os/unix/ngx_aio_write_chain.c +++ b/src/os/unix/ngx_aio_write_chain.c @@ -7,68 +7,74 @@ ngx_chain_t *ngx_aio_write_chain(ngx_connection_t *c, ngx_chain_t *in) { - int rc; - char *buf, *prev; - off_t sent; - size_t size; - ngx_err_t err; - ngx_chain_t *ce; + int n; + char *buf, *prev; + off_t sent; + size_t size; + ngx_err_t err; + ngx_chain_t *cl; sent = 0; - ce = in; + cl = in; - while (ce) { + while (cl) { + + if (cl->hunk->last - cl->hunk->pos == 0) { + cl = cl->next; + continue; + } /* we can post the single aio operation only */ if (c->write->active) { - return ce; + return cl; } - buf = prev = ce->hunk->pos; + buf = cl->hunk->pos; + prev = buf; size = 0; - /* coalesce the neighbouring chain entries */ + /* coalesce the neighbouring hunks */ - while (ce && prev == ce->hunk->pos) { - size += ce->hunk->last - ce->hunk->pos; - prev = ce->hunk->last; - ce = ce->next; + while (cl && prev == cl->hunk->pos) { + size += cl->hunk->last - cl->hunk->pos; + prev = cl->hunk->last; + cl = cl->next; } - rc = ngx_aio_write(c, buf, size); + n = ngx_aio_write(c, buf, size); #if (NGX_DEBUG_WRITE_CHAIN) - ngx_log_debug(c->log, "aio_write rc: %d" _ rc); + ngx_log_debug(c->log, "aio_write: %d" _ n); #endif - if (rc == NGX_ERROR) { + if (n == NGX_ERROR) { return NGX_CHAIN_ERROR; } - if (rc > 0) { - sent += rc; - c->sent += rc; + if (n > 0) { + sent += n; + c->sent += n; } #if (NGX_DEBUG_WRITE_CHAIN) ngx_log_debug(c->log, "aio_write sent: " OFF_FMT _ c->sent); #endif - for (ce = in; ce; ce = ce->next) { + for (cl = in; cl; cl = cl->next) { - if (sent >= ce->hunk->last - ce->hunk->pos) { - sent -= ce->hunk->last - ce->hunk->pos; - ce->hunk->pos = ce->hunk->last; + if (sent >= cl->hunk->last - cl->hunk->pos) { + sent -= cl->hunk->last - cl->hunk->pos; + cl->hunk->pos = cl->hunk->last; continue; } - ce->hunk->pos += sent; + cl->hunk->pos += sent; break; } } - return ce; + return cl; } diff --git a/src/os/unix/ngx_readv_chain.c b/src/os/unix/ngx_readv_chain.c index f19569879..1ea683f0d 100644 --- a/src/os/unix/ngx_readv_chain.c +++ b/src/os/unix/ngx_readv_chain.c @@ -7,13 +7,14 @@ ssize_t ngx_readv_chain(ngx_connection_t *c, ngx_chain_t *chain) { char *prev; - ssize_t n; + ssize_t n, size; struct iovec *iov; ngx_err_t err; ngx_array_t io; prev = NULL; iov = NULL; + size = 0; ngx_init_array(io, c->pool, 10, sizeof(struct iovec), NGX_ERROR); @@ -29,6 +30,7 @@ ssize_t ngx_readv_chain(ngx_connection_t *c, ngx_chain_t *chain) iov->iov_len = chain->hunk->end - chain->hunk->last; } + size += chain->hunk->end - chain->hunk->last; prev = chain->hunk->end; chain = chain->next; } @@ -42,7 +44,6 @@ ngx_log_debug(c->log, "recv: %d:%d" _ io.nelts _ iov->iov_len); } else if (n == -1) { c->read->ready = 0; - c->read->error = 1; err = ngx_errno; if (err == NGX_EAGAIN) { @@ -50,8 +51,12 @@ ngx_log_debug(c->log, "recv: %d:%d" _ io.nelts _ iov->iov_len); return NGX_AGAIN; } + c->read->error = 1; ngx_log_error(NGX_LOG_ERR, c->log, err, "readv() failed"); return NGX_ERROR; + + } else if (n < size) { + c->read->ready = 0; } return n; |