diff options
Diffstat (limited to 'src/http/modules/ngx_http_memcached_module.c')
-rw-r--r-- | src/http/modules/ngx_http_memcached_module.c | 616 |
1 files changed, 616 insertions, 0 deletions
diff --git a/src/http/modules/ngx_http_memcached_module.c b/src/http/modules/ngx_http_memcached_module.c new file mode 100644 index 000000000..6217dd109 --- /dev/null +++ b/src/http/modules/ngx_http_memcached_module.c @@ -0,0 +1,616 @@ + +/* + * Copyright (C) Igor Sysoev + */ + + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_event.h> +#include <ngx_http.h> + + +typedef struct { + ngx_http_upstream_conf_t upstream; + ngx_peers_t *peers; +} ngx_http_memcached_loc_conf_t; + + +typedef struct { + size_t rest; + ngx_http_request_t *request; +} ngx_http_memcached_ctx_t; + + +static ngx_int_t ngx_http_memcached_create_request(ngx_http_request_t *r); +static ngx_int_t ngx_http_memcached_reinit_request(ngx_http_request_t *r); +static ngx_int_t ngx_http_memcached_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_memcached_filter_init(void *data); +static ngx_int_t ngx_http_memcached_filter(void *data, ssize_t bytes); +static void ngx_http_memcached_abort_request(ngx_http_request_t *r); +static void ngx_http_memcached_finalize_request(ngx_http_request_t *r, + ngx_int_t rc); + +static void *ngx_http_memcached_create_loc_conf(ngx_conf_t *cf); +static char *ngx_http_memcached_merge_loc_conf(ngx_conf_t *cf, + void *parent, void *child); + +static char *ngx_http_memcached_pass(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_conf_bitmask_t ngx_http_memcached_next_upstream_masks[] = { + { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR }, + { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT }, + { ngx_string("invalid_response"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER }, + { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 }, + { ngx_null_string, 0 } +}; + + +static ngx_command_t ngx_http_memcached_commands[] = { + + { ngx_string("memcached_pass"), + NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_http_memcached_pass, + NGX_HTTP_LOC_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("memcached_connect_timeout"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.connect_timeout), + NULL }, + + { ngx_string("memcached_send_timeout"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.send_timeout), + NULL }, + + { ngx_string("memcached_buffer_size"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.buffer_size), + NULL }, + + { ngx_string("memcached_read_timeout"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.read_timeout), + NULL }, + + { ngx_string("memcached_next_upstream"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_1MORE, + ngx_conf_set_bitmask_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.next_upstream), + &ngx_http_memcached_next_upstream_masks }, + + { ngx_string("memcached_upstream_max_fails"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.max_fails), + NULL }, + + { ngx_string("memcached_upstream_fail_timeout"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_sec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_memcached_loc_conf_t, upstream.fail_timeout), + NULL }, + + ngx_null_command +}; + + +ngx_http_module_t ngx_http_memcached_module_ctx = { + NULL, /* preconfiguration */ + NULL, /* postconfiguration */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + + ngx_http_memcached_create_loc_conf, /* create location configration */ + ngx_http_memcached_merge_loc_conf /* merge location configration */ +}; + + +ngx_module_t ngx_http_memcached_module = { + NGX_MODULE_V1, + &ngx_http_memcached_module_ctx, /* module context */ + ngx_http_memcached_commands, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +#define NGX_HTTP_MEMCACHED_END (sizeof(ngx_http_memcached_end) - 1) +static u_char ngx_http_memcached_end[] = CRLF "END" CRLF; + + +static ngx_int_t +ngx_http_memcached_handler(ngx_http_request_t *r) +{ + ngx_int_t rc; + ngx_http_upstream_t *u; + ngx_http_memcached_ctx_t *ctx; + ngx_http_memcached_loc_conf_t *mlcf; + + if (r->method != NGX_HTTP_GET && r->method != NGX_HTTP_HEAD) { + return NGX_HTTP_NOT_ALLOWED; + } + + rc = ngx_http_discard_body(r); + + if (rc != NGX_OK && rc != NGX_AGAIN) { + return rc; + } + + if (ngx_http_set_content_type(r) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module); + + u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t)); + if (u == NULL) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + u->peer.log = r->connection->log; + u->peer.log_error = NGX_ERROR_ERR; + u->peer.peers = mlcf->peers; + u->peer.tries = mlcf->peers->number; +#if (NGX_THREADS) + u->peer.lock = &r->connection->lock; +#endif + + u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module; + + u->conf = &mlcf->upstream; + + u->create_request = ngx_http_memcached_create_request; + u->reinit_request = ngx_http_memcached_reinit_request; + u->process_header = ngx_http_memcached_process_header; + u->abort_request = ngx_http_memcached_abort_request; + u->finalize_request = ngx_http_memcached_finalize_request; + + r->upstream = u; + + ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t)); + if (ctx == NULL) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + ctx->rest = NGX_HTTP_MEMCACHED_END; + ctx->request = r; + + u->input_filter_init = ngx_http_memcached_filter_init; + u->input_filter = ngx_http_memcached_filter; + u->input_filter_ctx = ctx; + + ngx_http_upstream_init(r); + + return NGX_DONE; +} + + +static ngx_int_t +ngx_http_memcached_create_request(ngx_http_request_t *r) +{ + size_t len; + ngx_buf_t *b; + ngx_chain_t *cl; + + len = sizeof("get ") - 1 + r->uri.len + sizeof(" " CRLF) - 1; + if (r->args.len) { + len += 1+ r->args.len; + } + + b = ngx_create_temp_buf(r->pool, len); + if (b == NULL) { + return NGX_ERROR; + } + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->next = NULL; + + r->upstream->request_bufs = cl; + + *b->last++ = 'g'; *b->last++ = 'e'; *b->last++ = 't'; *b->last++ = ' '; + + b->last = ngx_copy(b->last, r->uri.data, r->uri.len); + + if (r->args.len) { + *b->last++ = '?'; + b->last = ngx_copy(b->last, r->args.data, r->args.len); + } + +#if (NGX_DEBUG) + { + ngx_str_t s; + + s.len = b->last - b->pos; + s.data = b->pos; + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http memcached request: \"%V\"", &s); + } +#endif + + *b->last++ = ' '; *b->last++ = CR; *b->last++ = LF; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_memcached_reinit_request(ngx_http_request_t *r) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_http_memcached_process_header(ngx_http_request_t *r) +{ + u_char *p, *len; + ngx_str_t line; + ngx_http_upstream_t *u; + + u = r->upstream; + + for (p = u->buffer.pos; p < u->buffer.last; p++) { + if (*p == LF) { + goto found; + } + } + + return NGX_AGAIN; + +found: + + *p = '\0'; + + line.len = p - u->buffer.pos - 1; + line.data = u->buffer.pos; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "memcached: \"%V\"", &line); + + p = u->buffer.pos; + + if (ngx_strncmp(p, "VALUE ", sizeof("VALUE ") - 1) == 0) { + + p += sizeof("VALUE ") - 1; + + if (ngx_strncmp(p, r->uri.data, r->uri.len) != 0) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "memcached sent invalid key in response \"%V\" " + "for key \"%V\"", + &line, &r->uri); + + return NGX_HTTP_UPSTREAM_INVALID_HEADER; + } + + p += r->uri.len; + + if (*p++ != ' ') { + goto no_valid; + } + + /* skip flags */ + + while (*p) { + if (*p++ == ' ') { + goto length; + } + } + + goto no_valid; + + length: + + len = p; + + while (*p && *p++ != CR) { /* void */ } + + r->headers_out.content_length_n = ngx_atoof(len, p - len - 1); + if (r->headers_out.content_length_n == -1) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "memcached sent invalid length in response \"%V\" " + "for key \"%V\"", + &line, &r->uri); + return NGX_HTTP_UPSTREAM_INVALID_HEADER; + } + + u->headers_in.status_n = 200; + u->buffer.pos = p + 1; + + return NGX_OK; + } + + if (ngx_strcmp(p, "END\x0d") == 0) { + ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, + "key: \"%V\" was not found by memcached", &r->uri); + + u->headers_in.status_n = 404; + + return NGX_OK; + } + +no_valid: + + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "memcached sent invalid response: \"%V\"", &line); + + return NGX_HTTP_UPSTREAM_INVALID_HEADER; +} + + +static ngx_int_t +ngx_http_memcached_filter_init(void *data) +{ + ngx_http_memcached_ctx_t *ctx = data; + + ngx_http_upstream_t *u; + + u = ctx->request->upstream; + + u->length += NGX_HTTP_MEMCACHED_END; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_memcached_filter(void *data, ssize_t bytes) +{ + ngx_http_memcached_ctx_t *ctx = data; + + u_char *last; + ngx_buf_t *b; + ngx_chain_t *cl, **ll; + ngx_http_upstream_t *u; + + u = ctx->request->upstream; + b = &u->buffer; + + if (u->length == ctx->rest) { + + if (ngx_strncmp(b->last, + ngx_http_memcached_end + NGX_HTTP_MEMCACHED_END + - ctx->rest, + bytes) != 0) + { + ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0, + "memcached sent invalid trailer"); + } + + u->length -= bytes; + ctx->rest -= bytes; + + return NGX_OK; + } + + for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { + ll = &cl->next; + } + + cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf->flush = 1; + cl->buf->memory = 1; + + *ll = cl; + + cl->buf->pos = b->last; + b->last += bytes; + cl->buf->last = b->last; + + ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0, + "memcached filter bytes:%z size:%z length:%z rest:%z", + bytes, b->last - b->pos, u->length, ctx->rest); + + if (b->last - b->pos <= (ssize_t) (u->length - NGX_HTTP_MEMCACHED_END)) { + u->length -= bytes; + return NGX_OK; + } + + + last = b->pos + u->length - NGX_HTTP_MEMCACHED_END; + + if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) { + ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0, + "memcached sent invalid trailer"); + } + + ctx->rest = u->length - (b->last - b->pos); + b->last = last; + cl->buf->last = last; + u->length = ctx->rest; + + return NGX_OK; +} + + +static void +ngx_http_memcached_abort_request(ngx_http_request_t *r) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "abort http memcached request"); + return; +} + + +static void +ngx_http_memcached_finalize_request(ngx_http_request_t *r, ngx_int_t rc) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "finalize http memcached request"); + return; +} + + +static void * +ngx_http_memcached_create_loc_conf(ngx_conf_t *cf) +{ + ngx_http_memcached_loc_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_memcached_loc_conf_t)); + if (conf == NULL) { + return NGX_CONF_ERROR; + } + + /* + * set by ngx_pcalloc(): + * + * conf->upstream.bufs.num = 0; + * conf->upstream.next_upstream = 0; + * conf->upstream.temp_path = NULL; + * conf->upstream.schema = { 0, NULL }; + * conf->upstream.uri = { 0, NULL }; + * conf->upstream.location = NULL; + * + * conf->peers = NULL; + */ + + conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC; + conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC; + conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC; + + conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE; + + conf->upstream.max_fails = NGX_CONF_UNSET_UINT; + conf->upstream.fail_timeout = NGX_CONF_UNSET; + + /* "fastcgi_cyclic_temp_file" is disabled */ + conf->upstream.cyclic_temp_file = 0; + + /* the hardcoded values */ + conf->upstream.send_lowat = 0; + conf->upstream.bufs.num = 0; + conf->upstream.busy_buffers_size = 0; + conf->upstream.max_temp_file_size = 0; + conf->upstream.temp_file_write_size = 0; + conf->upstream.pass_x_powered_by = 0; + conf->upstream.redirect_errors = 1; + conf->upstream.redirect_404 = 1; + conf->upstream.pass_server = 1; + conf->upstream.pass_date = 1; + conf->upstream.pass_request_headers = 0; + conf->upstream.pass_request_body = 0; + + return conf; +} + + +static char * +ngx_http_memcached_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_http_memcached_loc_conf_t *prev = parent; + ngx_http_memcached_loc_conf_t *conf = child; + + ngx_uint_t i; + + ngx_conf_merge_msec_value(conf->upstream.connect_timeout, + prev->upstream.connect_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.send_timeout, + prev->upstream.send_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.read_timeout, + prev->upstream.read_timeout, 60000); + + ngx_conf_merge_size_value(conf->upstream.buffer_size, + prev->upstream.buffer_size, + (size_t) ngx_pagesize); + + ngx_conf_merge_bitmask_value(conf->upstream.next_upstream, + prev->upstream.next_upstream, + (NGX_CONF_BITMASK_SET + |NGX_HTTP_UPSTREAM_FT_ERROR + |NGX_HTTP_UPSTREAM_FT_TIMEOUT)); + + ngx_conf_merge_unsigned_value(conf->upstream.max_fails, + prev->upstream.max_fails, 1); + + ngx_conf_merge_sec_value(conf->upstream.fail_timeout, + prev->upstream.fail_timeout, 10); + + if (conf->peers && conf->peers->number > 1) { + for (i = 0; i < conf->peers->number; i++) { + conf->peers->peer[i].weight = 1; + conf->peers->peer[i].max_fails = conf->upstream.max_fails; + conf->peers->peer[i].fail_timeout = conf->upstream.fail_timeout; + } + } + + return NGX_CONF_OK; +} + + +static char * +ngx_http_memcached_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_memcached_loc_conf_t *lcf = conf; + + ngx_str_t *value; + ngx_inet_upstream_t inet_upstream; + ngx_http_core_loc_conf_t *clcf; + + if (lcf->upstream.schema.len) { + return "is duplicate"; + } + + value = cf->args->elts; + + ngx_memzero(&inet_upstream, sizeof(ngx_inet_upstream_t)); + + inet_upstream.name = value[1]; + inet_upstream.url = value[1]; + + lcf->peers = ngx_inet_upstream_parse(cf, &inet_upstream); + if (lcf->peers == NULL) { + return NGX_CONF_ERROR; + } + + lcf->upstream.schema.len = sizeof("memcached://") - 1; + lcf->upstream.schema.data = (u_char *) "memcached://"; + + clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module); + + clcf->handler = ngx_http_memcached_handler; + + lcf->upstream.location = clcf->name; + + if (clcf->name.data[clcf->name.len - 1] == '/') { + clcf->auto_redirect = 1; + } + + return NGX_CONF_OK; +} |