blob: f813f0c920cf23634878ae78905d8b44e8455e8d [file] [log] [blame] [raw]
/*
* Copyright 2009 Leo Ponomarev.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>
#include <ngx_http_push_module.h>
#include <store/ngx_http_push_store_local.h>
#include <ngx_http_push_rbtree_util.c>
#include <ngx_http_push_module_ipc.c>
#include <store/ngx_http_push_store_local.c>
#include <ngx_http_push_module_setup.c>
static void ngx_http_push_clean_timeouted_subscriber(ngx_event_t *ev)
{
ngx_http_push_subscriber_t *subscriber = NULL;
ngx_http_request_t *r = NULL;
subscriber = ev->data;
r = subscriber->request;
r->discard_body=0; //hacky hacky!
if (r->connection->destroyed) {
return;
}
ngx_int_t rc = ngx_http_push_respond_status_only(r, NGX_HTTP_NOT_MODIFIED, NULL);
ngx_http_finalize_request(r, rc);
//the subscriber and channel counter will be freed by the pool cleanup callback
}
static void ngx_http_push_subscriber_del_timer(ngx_http_push_subscriber_t *sb) {
if (sb->event.timer_set) {
ngx_del_timer(&sb->event);
}
}
static void ngx_http_push_subscriber_clear_ctx(ngx_http_push_subscriber_t *sb) {
ngx_http_push_subscriber_del_timer(sb);
sb->clndata->subscriber = NULL;
sb->clndata->channel = NULL;
}
#define NGX_HTTP_PUSH_NO_CHANNEL_ID_MESSAGE "No channel id provided."
static ngx_str_t * ngx_http_push_get_channel_id(ngx_http_request_t *r, ngx_http_push_loc_conf_t *cf) {
ngx_http_variable_value_t *vv = ngx_http_get_indexed_variable(r, cf->index);
ngx_str_t *group = &cf->channel_group;
size_t group_len = group->len;
size_t var_len;
size_t len;
ngx_str_t *id;
if (vv == NULL || vv->not_found || vv->len == 0) {
ngx_buf_t *buf = ngx_create_temp_buf(r->pool, sizeof(NGX_HTTP_PUSH_NO_CHANNEL_ID_MESSAGE));
ngx_chain_t *chain;
if(buf==NULL) {
return NULL;
}
buf->pos=(u_char *)NGX_HTTP_PUSH_NO_CHANNEL_ID_MESSAGE;
buf->last=buf->pos + sizeof(NGX_HTTP_PUSH_NO_CHANNEL_ID_MESSAGE)-1;
chain = ngx_http_push_create_output_chain(buf, r->pool, r->connection->log);
buf->last_buf=1;
r->headers_out.content_length_n=ngx_buf_size(buf);
r->headers_out.status=NGX_HTTP_NOT_FOUND;
r->headers_out.content_type.len = sizeof("text/plain") - 1;
r->headers_out.content_type.data = (u_char *) "text/plain";
r->headers_out.content_type_len = r->headers_out.content_type.len;
ngx_http_send_header(r);
ngx_http_output_filter(r, chain);
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"push module: the $push_channel_id variable is required but is not set");
return NULL;
}
//maximum length limiter for channel id
var_len = vv->len <= cf->max_channel_id_length ? vv->len : cf->max_channel_id_length;
len = group_len + 1 + var_len;
if((id = ngx_palloc(r->pool, sizeof(*id) + len))==NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"push module: unable to allocate memory for $push_channel_id string");
return NULL;
}
id->len=len;
id->data=(u_char *)(id+1);
ngx_memcpy(id->data, group->data, group_len);
id->data[group_len]='/';
ngx_memcpy(id->data + group_len + 1, vv->data, var_len);
return id;
}
#define NGX_HTTP_PUSH_MAKE_CONTENT_TYPE(content_type, content_type_len, msg, pool) \
if(((content_type) = ngx_palloc(pool, sizeof(*content_type)+content_type_len))!=NULL) { \
(content_type)->len=content_type_len; \
(content_type)->data=(u_char *)((content_type)+1); \
ngx_memcpy(content_type->data, (msg)->content_type.data, content_type_len); \
}
#define NGX_HTTP_PUSH_OPTIONS_OK_MESSAGE "Go ahead"
static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) {
ngx_http_push_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_module);
ngx_str_t *id;
ngx_http_push_channel_t *channel;
ngx_http_push_msg_t *msg;
ngx_int_t msg_search_outcome;
ngx_str_t *content_type=NULL;
ngx_str_t *etag;
if (r->method == NGX_HTTP_OPTIONS) {
ngx_buf_t *buf = ngx_create_temp_buf(r->pool, sizeof(NGX_HTTP_PUSH_OPTIONS_OK_MESSAGE));
ngx_chain_t *chain;
buf->pos=(u_char *)NGX_HTTP_PUSH_OPTIONS_OK_MESSAGE;
buf->last=buf->pos + sizeof(NGX_HTTP_PUSH_OPTIONS_OK_MESSAGE)-1;
chain = ngx_http_push_create_output_chain(buf, r->pool, r->connection->log);
buf->last_buf=1;
r->headers_out.content_length_n=ngx_buf_size(buf);
r->headers_out.status=NGX_HTTP_OK;
ngx_http_send_header(r);
ngx_http_output_filter(r, chain);
return NGX_OK;
}
if (r->method != NGX_HTTP_GET) {
ngx_http_push_add_response_header(r, &NGX_HTTP_PUSH_HEADER_ALLOW, &NGX_HTTP_PUSH_ALLOW_GET); //valid HTTP for the win
return NGX_HTTP_NOT_ALLOWED;
}
if((id=ngx_http_push_get_channel_id(r, cf)) == NULL) {
return r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (cf->authorize_channel==1) {
channel = ngx_http_push_store_local.find_channel(id, cf->channel_timeout, r->connection->log);
}else{
channel = ngx_http_push_store_local.get_channel(id, cf->channel_timeout, r->connection->log);
}
if (channel==NULL) {
//unable to allocate channel OR channel not found
if(cf->authorize_channel) {
return NGX_HTTP_FORBIDDEN;
}
else {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: unable to allocate shared memory for channel");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
msg = ngx_http_push_store_local.get_message(channel, r, &msg_search_outcome, cf, r->connection->log);
if (cf->ignore_queue_on_no_cache && !ngx_http_push_allow_caching(r)) {
msg_search_outcome = NGX_HTTP_PUSH_MESSAGE_EXPECTED;
msg = NULL;
}
switch(ngx_http_push_handle_subscriber_concurrency(r, channel, cf)) {
case NGX_DECLINED: //this request was declined for some reason.
//status codes and whatnot should have already been written. just get out of here quickly.
return NGX_OK;
case NGX_ERROR:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: error handling subscriber concurrency setting");
return NGX_ERROR;
}
switch(msg_search_outcome) {
//for message-found:
ngx_chain_t *chain;
time_t last_modified;
size_t content_type_len;
ngx_http_push_subscriber_cleanup_t *clndata;
ngx_http_push_subscriber_t *subscriber;
ngx_http_cleanup_t *cln;
case NGX_HTTP_PUSH_MESSAGE_EXPECTED:
// ♫ It's gonna be the future soon ♫
switch(cf->subscriber_poll_mechanism) {
//for NGX_HTTP_PUSH_MECHANISM_LONGPOLL
case NGX_HTTP_PUSH_MECHANISM_LONGPOLL:
subscriber = ngx_http_push_store_local.subscribe(channel, r);
if (subscriber == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
//attach a cleaner to remove the request from the channel and handle shared buffer deallocation.
if ((cln=ngx_http_cleanup_add(r, sizeof(*clndata))) == NULL) { //make sure we can.
return NGX_ERROR;
}
cln->handler = (ngx_http_cleanup_pt) ngx_http_push_subscriber_cleanup;
clndata = (ngx_http_push_subscriber_cleanup_t *) cln->data;
clndata->channel=channel;
clndata->subscriber=subscriber;
clndata->buf_use_count=0;
clndata->buf=NULL;
clndata->rchain=NULL;
clndata->rpool=NULL;
subscriber->clndata=clndata;
//set up subscriber timeout event
ngx_memzero(&subscriber->event, sizeof(subscriber->event));
if (cf->subscriber_timeout > 0) {
subscriber->event.handler = ngx_http_push_clean_timeouted_subscriber;
subscriber->event.data = subscriber;
subscriber->event.log = r->connection->log;
ngx_add_timer(&subscriber->event, cf->subscriber_timeout * 1000);
}
//r->read_event_handler = ngx_http_test_reading;
//r->write_event_handler = ngx_http_request_empty_handler;
r->discard_body = 1;
//r->keepalive = 1; //stayin' alive!!
return NGX_DONE;
case NGX_HTTP_PUSH_MECHANISM_INTERVALPOLL:
//interval-polling subscriber requests get a 304 with their entity tags preserved.
if (r->headers_in.if_modified_since != NULL) {
r->headers_out.last_modified_time=ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len);
}
if ((etag=ngx_http_push_subscriber_get_etag(r)) != NULL) {
r->headers_out.etag=ngx_http_push_add_response_header(r, &NGX_HTTP_PUSH_HEADER_ETAG, etag);
}
return NGX_HTTP_NOT_MODIFIED;
default:
//if this ever happens, there's a bug somewhere else. probably config stuff.
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
case NGX_HTTP_PUSH_MESSAGE_EXPIRED:
//subscriber wants an expired message
//TODO: maybe respond with entity-identifiers for oldest available message?
return NGX_HTTP_NO_CONTENT;
case NGX_HTTP_PUSH_MESSAGE_FOUND:
//found the message
ngx_http_push_store_local.reserve_message(channel, msg);
if((etag = ngx_http_push_store_local.message_etag(msg))==NULL) {
//oh, nevermind...
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: unable to allocate memory for Etag header");
return NGX_ERROR;
}
ngx_http_push_store_local.lock();
content_type_len = msg->content_type.len;
if(content_type_len>0) {
NGX_HTTP_PUSH_MAKE_CONTENT_TYPE(content_type, content_type_len, msg, r->pool);
if(content_type==NULL) {
ngx_http_push_store_local.unlock();
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: unable to allocate memory for content-type header while responding to subscriber request");
return NGX_ERROR;
}
}
//preallocate output chain. yes, same one for every waiting subscriber
if((chain = ngx_http_push_create_output_chain(msg->buf, r->pool, r->connection->log))==NULL) {
ngx_http_push_store_local.unlock();
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: unable to allocate buffer chain while responding to subscriber request");
return NGX_ERROR;
}
last_modified = msg->message_time;
ngx_http_push_store_local.unlock();
//is the message still needed?
ngx_http_push_store_local.release_message(channel, msg);
if(chain->buf->file!=NULL) {
//close file when we're done with it
ngx_pool_cleanup_t *cln;
ngx_pool_cleanup_file_t *clnf;
if((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_pool_cleanup_file_t)))==NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
cln->handler = ngx_pool_cleanup_file;
clnf = cln->data;
clnf->fd = chain->buf->file->fd;
clnf->name = chain->buf->file->name.data;
clnf->log = r->pool->log;
}
return ngx_http_push_prepare_response_to_subscriber_request(r, chain, content_type, etag, last_modified);
default: //we shouldn't be here.
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
static ngx_int_t ngx_http_push_handle_subscriber_concurrency(ngx_http_request_t *r, ngx_http_push_channel_t *channel, ngx_http_push_loc_conf_t *loc_conf) {
ngx_int_t max_subscribers = loc_conf->max_channel_subscribers;
ngx_int_t current_subscribers = ngx_http_push_store_local.channel_subscribers(channel) ;
if(current_subscribers==0) {
//empty channels are always okay.
return NGX_OK;
}
if(max_subscribers!=0 && current_subscribers >= max_subscribers) {
//max_channel_subscribers setting
ngx_http_push_respond_status_only(r, NGX_HTTP_FORBIDDEN, NULL);
return NGX_DECLINED;
}
//nonzero number of subscribers present
switch(loc_conf->subscriber_concurrency) {
case NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_BROADCAST:
return NGX_OK;
case NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_LASTIN:
//send "everyone" a 409 Conflict response.
//in most reasonable cases, there'll be at most one subscriber on the
//channel. However, since settings are bound to locations and not
//specific channels, this assumption need not hold. Hence this broadcast.
ngx_http_push_broadcast_status(channel, NGX_HTTP_NOT_FOUND, &NGX_HTTP_PUSH_HTTP_STATUS_409, r->connection->log);
return NGX_OK;
case NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_FIRSTIN:
ngx_http_push_respond_status_only(r, NGX_HTTP_NOT_FOUND, &NGX_HTTP_PUSH_HTTP_STATUS_409);
return NGX_DECLINED;
default:
return NGX_ERROR;
}
}
#define NGX_HTTP_BUF_ALLOC_SIZE(buf) \
(sizeof(*buf) + \
(((buf)->temporary || (buf)->memory) ? ngx_buf_size(buf) : 0) + \
(((buf)->file!=NULL) ? (sizeof(*(buf)->file) + (buf)->file->name.len + 1) : 0))
static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r) {
ngx_str_t *id;
ngx_http_push_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_module);
ngx_http_push_channel_t *channel;
ngx_uint_t method = r->method;
time_t last_seen = 0;
ngx_uint_t subscribers = 0;
ngx_uint_t messages = 0;
if((id = ngx_http_push_get_channel_id(r, cf))==NULL) {
ngx_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//POST requests will need a channel created if it doesn't yet exist.
if(method==NGX_HTTP_POST || method==NGX_HTTP_PUT) {
if(method==NGX_HTTP_POST && (r->headers_in.content_length_n == -1 || r->headers_in.content_length_n == 0)) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push module: trying to push an empty message");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
channel = ngx_http_push_store_local.get_channel(id, cf->channel_timeout, r->connection->log);
if(channel==NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push module: unable to allocate memory for new channel");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
//no other request method needs that.
else {
//just find the channel. if it's not there, NULL.
channel = ngx_http_push_store_local.find_channel(id, cf->channel_timeout, r->connection->log);
}
if(channel!=NULL) {
ngx_http_push_store_local.lock();
subscribers = channel->subscribers;
last_seen = channel->last_seen;
messages = channel->messages;
ngx_http_push_store_local.unlock();
}
else {
//404!
r->headers_out.status=NGX_HTTP_NOT_FOUND;
//just the headers, please. we don't care to describe the situation or
//respond with an html page
r->headers_out.content_length_n=0;
r->header_only = 1;
ngx_http_finalize_request(r, ngx_http_send_header(r));
return;
}
switch(method) {
ngx_http_push_msg_t *msg;
case NGX_HTTP_POST:
if((msg = ngx_http_push_store_local.create_message(channel, r))==NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
switch(ngx_http_push_broadcast_message(channel, msg, r->connection->log)) {
case NGX_HTTP_PUSH_MESSAGE_QUEUED:
//message was queued successfully, but there were no
//subscribers to receive it.
r->headers_out.status = NGX_HTTP_ACCEPTED;
r->headers_out.status_line.len =sizeof("202 Accepted")- 1;
r->headers_out.status_line.data=(u_char *) "202 Accepted";
break;
case NGX_HTTP_PUSH_MESSAGE_RECEIVED:
//message was queued successfully, and it was already sent
//to at least one subscriber
r->headers_out.status = NGX_HTTP_CREATED;
r->headers_out.status_line.len =sizeof("201 Created")- 1;
r->headers_out.status_line.data=(u_char *) "201 Created";
//update the number of times the message was received.
//in the interest of premature optimization, I assume all
//current subscribers have received the message successfully.
break;
case NGX_ERROR:
//WTF?
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: error broadcasting message to workers");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
default:
//for debugging, mostly. I don't expect this branch to be
//hit during regular operation
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: TOTALLY UNEXPECTED error broadcasting message to workers");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_push_store_local.lock();
messages = channel->messages;
ngx_http_push_store_local.unlock();
ngx_http_finalize_request(r, ngx_http_push_channel_info(r, messages, subscribers, last_seen));
return;
case NGX_HTTP_PUT:
case NGX_HTTP_GET:
r->headers_out.status = NGX_HTTP_OK;
ngx_http_finalize_request(r, ngx_http_push_channel_info(r, messages, subscribers, last_seen));
return;
case NGX_HTTP_DELETE:
if(ngx_http_push_store_local.delete_channel(channel, r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
}
else {
r->headers_out.status=NGX_HTTP_OK;
ngx_http_finalize_request(r, ngx_http_push_channel_info(r, messages, subscribers, last_seen));
}
return;
default:
//some other weird request method
ngx_http_push_add_response_header(r, &NGX_HTTP_PUSH_HEADER_ALLOW, &NGX_HTTP_PUSH_ALLOW_GET_POST_PUT_DELETE);
ngx_http_finalize_request(r, NGX_HTTP_NOT_ALLOWED);
return;
}
}
static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *channel, ngx_http_push_subscriber_t *sentinel, ngx_http_push_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line) {
ngx_http_push_subscriber_t *cur, *next;
ngx_int_t responded_subscribers=0;
if(sentinel==NULL) {
return NGX_OK;
}
if(msg!=NULL) {
//copy everything we need first
ngx_str_t *content_type=NULL;
ngx_str_t *etag=NULL;
time_t last_modified_time;
ngx_chain_t *chain;
ngx_http_request_t *r;
ngx_buf_t *buffer;
ngx_chain_t *rchain;
ngx_buf_t *rbuffer;
ngx_int_t *buf_use_count;
ngx_http_push_subscriber_cleanup_t *clndata;
if((etag=ngx_http_push_store_local.message_etag(msg))==NULL) {
return NGX_ERROR;
}
if((content_type=ngx_http_push_store_local.message_content_type(msg))==NULL) {
return NGX_ERROR;
}
ngx_http_push_store_local.lock();
//preallocate output chain. this won't actually be used in the request (except the buffer data)
chain = ngx_http_push_create_output_chain(msg->buf, ngx_http_push_pool, ngx_cycle->log);
ngx_http_push_store_local.unlock();
if(chain==NULL) {
ngx_pfree(ngx_http_push_pool, etag);
ngx_pfree(ngx_http_push_pool, content_type);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push module: unable to create output chain while responding to several subscriber request");
return NGX_ERROR;
}
buffer = chain->buf;
buffer->recycled = 1;
ngx_http_push_store_local.lock();
last_modified_time = msg->message_time;
ngx_http_push_store_local.unlock();
buf_use_count = ngx_pcalloc(ngx_http_push_pool, sizeof(*buf_use_count));
*buf_use_count = ngx_http_push_store_local.channel_worker_subscribers(sentinel);
cur=(ngx_http_push_subscriber_t *)ngx_queue_head(&sentinel->queue);
//now let's respond to some requests!
while(cur!=sentinel) {
next=(ngx_http_push_subscriber_t *)ngx_queue_next(&cur->queue);
//in this block, nothing in shared memory should be dereferenced.
r=cur->request;
//chain and buffer for this request
rchain = ngx_pcalloc(r->pool, sizeof(*rchain));
rchain->next = NULL;
rbuffer = ngx_pcalloc(r->pool, sizeof(*rbuffer));
rchain->buf = rbuffer;
ngx_memcpy(rbuffer, buffer, sizeof(*buffer));
//request buffer cleanup
clndata = cur->clndata;
clndata->buf = buffer;
clndata->buf_use_count = buf_use_count;
clndata->rchain = rchain;
clndata->rpool = r->pool;
//cleanup oughtn't dequeue anything. or decrement the subscriber count, for that matter
ngx_http_push_subscriber_clear_ctx(cur);
r->discard_body=0; //hacky hacky!
if (rbuffer->in_file && (fcntl(rbuffer->file->fd, F_GETFD) == -1)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: buffer in invalid file descriptor");
}
ngx_http_finalize_request(r, ngx_http_push_prepare_response_to_subscriber_request(r, rchain, content_type, etag, last_modified_time)); //BAM!
responded_subscribers++;
//done with this subscriber. free the sucker.
ngx_pfree(ngx_http_push_pool, cur);
cur=next;
}
ngx_pfree(ngx_http_push_pool, etag);
ngx_pfree(ngx_http_push_pool, content_type);
ngx_pfree(ngx_http_push_pool, chain);
//the rest will be deallocated on request pool cleanup
if(responded_subscribers) {
ngx_http_push_store_local.release_message(channel, msg);
}
}
else {
//headers only probably
ngx_http_request_t *r;
cur=(ngx_http_push_subscriber_t *)ngx_queue_head(&sentinel->queue);
while(cur!=sentinel) {
next=(ngx_http_push_subscriber_t *)ngx_queue_next(&cur->queue);
r=cur->request;
//cleanup oughtn't dequeue anything. or decrement the subscriber count, for that matter
ngx_http_push_subscriber_clear_ctx(cur);
ngx_http_finalize_request(r, ngx_http_push_respond_status_only(r, status_code, status_line));
responded_subscribers++;
ngx_pfree(ngx_http_push_pool, cur);
cur=next;
}
}
ngx_http_push_store_local.lock();
channel->subscribers-=responded_subscribers;
//is the message still needed?
//ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "deleting subscriber sentinel at %p.", sentinel);
ngx_pfree(ngx_http_push_pool, sentinel);
ngx_http_push_store_local.unlock();
return NGX_OK;
}
static ngx_int_t ngx_http_push_publisher_handler(ngx_http_request_t * r) {
ngx_int_t rc;
/* Instruct ngx_http_read_subscriber_request_body to store the request
body entirely in a memory buffer or in a file */
r->request_body_in_single_buf = 1;
r->request_body_in_persistent_file = 1;
r->request_body_in_clean_file = 0;
r->request_body_file_log_level = 0;
rc = ngx_http_read_client_request_body(r, ngx_http_push_publisher_body_handler);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}
static void ngx_http_push_match_channel_info_subtype(size_t off, u_char *cur, size_t rem, u_char **priority, const ngx_str_t **format, ngx_str_t *content_type) {
static ngx_http_push_content_subtype_t subtypes[] = {
{ "json" , 4, &NGX_HTTP_PUSH_CHANNEL_INFO_JSON },
{ "yaml" , 4, &NGX_HTTP_PUSH_CHANNEL_INFO_YAML },
{ "xml" , 3, &NGX_HTTP_PUSH_CHANNEL_INFO_XML },
{ "x-json", 6, &NGX_HTTP_PUSH_CHANNEL_INFO_JSON },
{ "x-yaml", 6, &NGX_HTTP_PUSH_CHANNEL_INFO_YAML }
};
u_char *start = cur + off;
ngx_uint_t i;
for(i=0; i<(sizeof(subtypes)/sizeof(ngx_http_push_content_subtype_t)); i++) {
if(ngx_strncmp(start, subtypes[i].subtype, rem<subtypes[i].len ? rem : subtypes[i].len)==0) {
if(*priority>start) {
*format = subtypes[i].format;
*priority = start;
content_type->data=cur;
content_type->len= off + 1 + subtypes[i].len;
}
}
}
}
//print information about a channel
static ngx_int_t ngx_http_push_channel_info(ngx_http_request_t *r, ngx_uint_t messages, ngx_uint_t subscribers, time_t last_seen) {
ngx_buf_t *b;
ngx_uint_t len;
ngx_str_t content_type = ngx_string("text/plain");
const ngx_str_t *format = &NGX_HTTP_PUSH_CHANNEL_INFO_PLAIN;
time_t time_elapsed = ngx_time() - last_seen;
if(r->headers_in.accept) {
//lame content-negotiation (without regard for qvalues)
u_char *accept = r->headers_in.accept->value.data;
size_t len = r->headers_in.accept->value.len;
size_t rem;
u_char *cur = accept;
u_char *priority=&accept[len-1];
for(rem=len; (cur = ngx_strnstr(cur, "text/", rem))!=NULL; cur += sizeof("text/")-1) {
rem=len - ((size_t)(cur-accept)+sizeof("text/")-1);
if(ngx_strncmp(cur+sizeof("text/")-1, "plain", rem<5 ? rem : 5)==0) {
if(priority) {
format = &NGX_HTTP_PUSH_CHANNEL_INFO_PLAIN;
priority = cur+sizeof("text/")-1;
//content-type is already set by default
}
}
ngx_http_push_match_channel_info_subtype(sizeof("text/")-1, cur, rem, &priority, &format, &content_type);
}
cur = accept;
for(rem=len; (cur = ngx_strnstr(cur, "application/", rem))!=NULL; cur += sizeof("application/")-1) {
rem=len - ((size_t)(cur-accept)+sizeof("application/")-1);
ngx_http_push_match_channel_info_subtype(sizeof("application/")-1, cur, rem, &priority, &format, &content_type);
}
}
r->headers_out.content_type.len = content_type.len;
r->headers_out.content_type.data = content_type.data;
r->headers_out.content_type_len = r->headers_out.content_type.len;
len = format->len - 8 - 1 + 3*NGX_INT_T_LEN; //minus 8 sprintf
if ((b = ngx_create_temp_buf(r->pool, len)) == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
b->last = ngx_sprintf(b->last, (char *)format->data, messages, last_seen==0 ? -1 : (ngx_int_t) time_elapsed ,subscribers);
//lastly, set the content-length, because if the status code isn't 200, nginx may not do so automatically
r->headers_out.content_length_n = ngx_buf_size(b);
if (ngx_http_send_header(r) > NGX_HTTP_SPECIAL_RESPONSE) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_output_filter(r, ngx_http_push_create_output_chain(b, r->pool, r->connection->log));
}
static ngx_table_elt_t * ngx_http_push_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value) {
ngx_table_elt_t *h = ngx_list_push(&r->headers_out.headers);
if (h == NULL) {
return NULL;
}
h->hash = 1;
h->key.len = header_name->len;
h->key.data = header_name->data;
h->value.len = header_value->len;
h->value.data = header_value->data;
return h;
}
static ngx_int_t ngx_http_push_subscriber_get_etag_int(ngx_http_request_t * r) {
ngx_str_t *if_none_match = ngx_http_push_subscriber_get_etag(r);
ngx_int_t tag;
if(if_none_match==NULL || (if_none_match!=NULL && (tag = ngx_atoi(if_none_match->data, if_none_match->len))==NGX_ERROR)) {
tag=0;
}
return ngx_abs(tag);
}
static ngx_str_t * ngx_http_push_find_in_header_value(ngx_http_request_t * r, ngx_str_t header_name) {
ngx_uint_t i;
ngx_list_part_t *part = &r->headers_in.headers.part;
ngx_table_elt_t *header= part->elts;
for (i = 0; /* void */ ; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
header = part->elts;
i = 0;
}
if (header[i].key.len == header_name.len
&& ngx_strncasecmp(header[i].key.data, header_name.data, header[i].key.len) == 0) {
return &header[i].value;
}
}
return NULL;
}
static ngx_int_t ngx_http_push_allow_caching(ngx_http_request_t * r) {
ngx_str_t *tmp_header;
ngx_str_t header_checks[2] = { NGX_HTTP_PUSH_HEADER_CACHE_CONTROL, NGX_HTTP_PUSH_HEADER_PRAGMA };
ngx_int_t i = 0;
for(; i < 2; i++) {
tmp_header = ngx_http_push_find_in_header_value(r, header_checks[i]);
if (tmp_header != NULL) {
return !!ngx_strncasecmp(tmp_header->data, NGX_HTTP_PUSH_CACHE_CONTROL_VALUE.data, tmp_header->len);
}
}
return 1;
}
static ngx_str_t * ngx_http_push_subscriber_get_etag(ngx_http_request_t * r) {
ngx_uint_t i;
ngx_list_part_t *part = &r->headers_in.headers.part;
ngx_table_elt_t *header= part->elts;
for (i = 0; /* void */ ; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
header = part->elts;
i = 0;
}
if (header[i].key.len == NGX_HTTP_PUSH_HEADER_IF_NONE_MATCH.len
&& ngx_strncasecmp(header[i].key.data, NGX_HTTP_PUSH_HEADER_IF_NONE_MATCH.data, header[i].key.len) == 0) {
return &header[i].value;
}
}
return NULL;
}
//buffer is _copied_
static ngx_chain_t * ngx_http_push_create_output_chain(ngx_buf_t *buf, ngx_pool_t *pool, ngx_log_t *log) {
ngx_chain_t *out;
ngx_file_t *file;
if((out = ngx_pcalloc(pool, sizeof(*out)))==NULL) {
return NULL;
}
ngx_buf_t *buf_copy;
if((buf_copy = ngx_pcalloc(pool, NGX_HTTP_BUF_ALLOC_SIZE(buf)))==NULL) {
return NULL;
}
ngx_http_push_copy_preallocated_buffer(buf, buf_copy);
if (buf->file!=NULL) {
file = buf_copy->file;
file->log=log;
if(file->fd==NGX_INVALID_FILE) {
file->fd=ngx_open_file(file->name.data, NGX_FILE_RDONLY, NGX_FILE_OPEN, NGX_FILE_OWNER_ACCESS);
}
if(file->fd==NGX_INVALID_FILE) {
return NULL;
}
}
buf_copy->last_buf = 1;
out->buf = buf_copy;
out->next = NULL;
return out;
}
static void ngx_http_push_subscriber_cleanup(ngx_http_push_subscriber_cleanup_t *data) {
if(data->subscriber!=NULL) { //still queued up
ngx_http_push_subscriber_t* sb = data->subscriber;
ngx_http_push_subscriber_del_timer(sb);
ngx_queue_remove(&data->subscriber->queue);
ngx_pfree(ngx_http_push_pool, data->subscriber); //was there an error? oh whatever.
}
if (data->rchain != NULL) {
ngx_pfree(data->rpool, data->rchain->buf);
ngx_pfree(data->rpool, data->rchain);
data->rchain=NULL;
}
if(data->buf_use_count != NULL && --(*data->buf_use_count) <= 0) {
ngx_buf_t *buf;
ngx_pfree(ngx_http_push_pool, data->buf_use_count);
buf=data->buf;
if(buf->file) {
ngx_close_file(buf->file->fd);
}
ngx_pfree(ngx_http_push_pool, buf);
}
if(data->channel!=NULL) { //we're expected to decrement the subscriber count
ngx_http_push_store_local.lock();
data->channel->subscribers--;
ngx_http_push_store_local.unlock();
}
}
static ngx_int_t ngx_http_push_respond_status_only(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *statusline) {
r->headers_out.status=status_code;
if(statusline!=NULL) {
r->headers_out.status_line.len =statusline->len;
r->headers_out.status_line.data=statusline->data;
}
r->headers_out.content_length_n = 0;
r->header_only = 1;
return ngx_http_send_header(r);
}
//allocates nothing
static ngx_int_t ngx_http_push_prepare_response_to_subscriber_request(ngx_http_request_t *r, ngx_chain_t *chain, ngx_str_t *content_type, ngx_str_t *etag, time_t last_modified) {
ngx_int_t res;
if (content_type!=NULL) {
r->headers_out.content_type.len=content_type->len;
r->headers_out.content_type.data = content_type->data;
r->headers_out.content_type_len = r->headers_out.content_type.len;
}
if(last_modified) {
//if-modified-since header
r->headers_out.last_modified_time=last_modified;
}
if(etag!=NULL) {
//etag, if we need one
if ((r->headers_out.etag=ngx_http_push_add_response_header(r, &NGX_HTTP_PUSH_HEADER_ETAG, etag))==NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
//Vary header needed for proper HTTP caching.
ngx_http_push_add_response_header(r, &NGX_HTTP_PUSH_HEADER_VARY, &NGX_HTTP_PUSH_VARY_HEADER_VALUE);
r->headers_out.status=NGX_HTTP_OK;
//we know the entity length, and we're using just one buffer. so no chunking please.
r->headers_out.content_length_n=ngx_buf_size(chain->buf);
if((res = ngx_http_send_header(r)) >= NGX_HTTP_SPECIAL_RESPONSE) {
return res;
}
return ngx_http_output_filter(r, chain);
}
static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf) {
if (cbuf!=NULL) {
ngx_memcpy(cbuf, buf, sizeof(*buf)); //overkill?
if(buf->temporary || buf->memory) { //we don't want to copy mmpapped memory, so no ngx_buf_in_momory(buf)
cbuf->pos = (u_char *) (cbuf+1);
cbuf->last = cbuf->pos + ngx_buf_size(buf);
cbuf->start=cbuf->pos;
cbuf->end = cbuf->start + ngx_buf_size(buf);
ngx_memcpy(cbuf->pos, buf->pos, ngx_buf_size(buf));
cbuf->memory=ngx_buf_in_memory_only(buf) ? 1 : 0;
}
if (buf->file!=NULL) {
cbuf->file = (ngx_file_t *) (cbuf+1) + ((buf->temporary || buf->memory) ? ngx_buf_size(buf) : 0);
cbuf->file->fd=NGX_INVALID_FILE;
cbuf->file->log=NULL;
cbuf->file->offset=buf->file->offset;
cbuf->file->sys_offset=buf->file->sys_offset;
cbuf->file->name.len=buf->file->name.len;
cbuf->file->name.data=(u_char *) (cbuf->file+1);
ngx_memcpy(cbuf->file->name.data, buf->file->name.data, buf->file->name.len);
}
}
}