blob: 9fe56e39f3ae924426c62273a8f0a14e3de4431a [file] [log] [blame] [raw]
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_channel.h>
#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 33554432 //32 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
#define NGX_HTTP_PUSH_DEFAULT_SUBSCRIBER_TIMEOUT 0 //default: never timeout
//(liucougar: this is a bit confusing, but it is what's the default behavior before this option is introducecd)
#define NGX_HTTP_PUSH_DEFAULT_CHANNEL_TIMEOUT 0 //default: timeout immediately
#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGES 1
#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGES 10
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_LASTIN 0
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_FIRSTIN 1
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_BROADCAST 2
#define NGX_HTTP_PUSH_MECHANISM_LONGPOLL 0
#define NGX_HTTP_PUSH_MECHANISM_INTERVALPOLL 1
#define NGX_HTTP_PUSH_MIN_MESSAGE_RECIPIENTS 0
#define NGX_HTTP_PUSH_MAX_CHANNEL_ID_LENGTH 1024 //bytes
#ifndef NGX_HTTP_CONFLICT
#define NGX_HTTP_CONFLICT 409
#endif
#ifndef NGX_HTTP_GONE
#define NGX_HTTP_GONE 410
#endif
#ifndef NGX_HTTP_CREATED
#define NGX_HTTP_CREATED 201
#endif
#ifndef NGX_HTTP_ACCEPTED
#define NGX_HTTP_ACCEPTED 202
#endif
#define NGX_HTTP_PUSH_MESSAGE_RECEIVED 9000
#define NGX_HTTP_PUSH_MESSAGE_QUEUED 9001
#define NGX_HTTP_PUSH_MESSAGE_FOUND 1000
#define NGX_HTTP_PUSH_MESSAGE_EXPECTED 1001
#define NGX_HTTP_PUSH_MESSAGE_EXPIRED 1002
//on with the declarations
typedef struct {
size_t shm_size;
} ngx_http_push_main_conf_t;
//message queue
typedef struct {
ngx_queue_t queue; //this MUST be first.
ngx_str_t content_type;
// ngx_str_t charset;
ngx_buf_t *buf;
time_t expires;
ngx_uint_t delete_oldest_received_min_messages; //NGX_MAX_UINT32_VALUE for 'never'
time_t message_time; //tag message by time
ngx_int_t message_tag; //used in conjunction with message_time if more than one message have the same time.
ngx_int_t refcount;
} ngx_http_push_msg_t;
typedef struct ngx_http_push_subscriber_cleanup_s ngx_http_push_subscriber_cleanup_t;
//subscriber request queue
typedef struct {
ngx_queue_t queue; //this MUST be first.
ngx_http_request_t *request;
ngx_http_push_subscriber_cleanup_t *clndata;
ngx_event_t event;
} ngx_http_push_subscriber_t;
typedef struct {
ngx_queue_t queue;
pid_t pid;
ngx_int_t slot;
ngx_http_push_subscriber_t *subscriber_sentinel;
} ngx_http_push_pid_queue_t;
//our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; //this MUST be first.
ngx_str_t id;
ngx_http_push_msg_t *message_queue;
ngx_uint_t messages;
ngx_http_push_pid_queue_t workers_with_subscribers;
ngx_uint_t subscribers;
time_t last_seen;
time_t expires;
} ngx_http_push_channel_t;
//cleaning supplies
struct ngx_http_push_subscriber_cleanup_s {
ngx_http_push_subscriber_t *subscriber;
ngx_http_push_channel_t *channel;
ngx_int_t *buf_use_count;
ngx_buf_t *buf;
ngx_chain_t *rchain;
ngx_pool_t *rpool;
};
//garbage collecting goodness
typedef struct {
ngx_queue_t queue;
ngx_http_push_channel_t *channel;
} ngx_http_push_channel_queue_t;
//messages to worker processes
typedef struct {
ngx_queue_t queue;
ngx_http_push_msg_t *msg; //->shared memory
ngx_int_t status_code;
ngx_pid_t pid;
ngx_http_push_channel_t *channel; //->shared memory
ngx_http_push_subscriber_t *subscriber_sentinel; //->a worker's local pool
} ngx_http_push_worker_msg_t;
//shared memory
typedef struct {
ngx_rbtree_t tree;
ngx_uint_t channels; //# of channels being used
ngx_http_push_worker_msg_t *ipc; //interprocess stuff
} ngx_http_push_shm_data_t;
typedef struct {
ngx_int_t index;
time_t buffer_timeout;
ngx_int_t min_messages;
ngx_int_t max_messages;
ngx_int_t subscriber_concurrency;
ngx_int_t subscriber_poll_mechanism;
time_t subscriber_timeout;
ngx_int_t authorize_channel;
ngx_int_t store_messages;
ngx_int_t delete_oldest_received_message;
ngx_str_t channel_group;
ngx_int_t max_channel_id_length;
ngx_int_t max_channel_subscribers;
ngx_int_t ignore_queue_on_no_cache;
time_t channel_timeout;
} ngx_http_push_loc_conf_t;
typedef struct {
//init
ngx_int_t (*init_module)(ngx_cycle_t *cycle);
ngx_int_t (*init_worker)(ngx_cycle_t *cycle);
ngx_int_t (*init_postconfig)(ngx_conf_t *cf);
void (*create_main_conf)(ngx_conf_t *cf, ngx_http_push_main_conf_t *mcf);
//quit
void (*exit_worker)(ngx_cycle_t *cycle);
void (*exit_master)(ngx_cycle_t *cycle);
//channel actions
ngx_http_push_channel_t *(*get_channel)(ngx_str_t *id, time_t channel_timeout, ngx_log_t *log);
ngx_http_push_channel_t *(*find_channel)(ngx_str_t *id, time_t channel_timeout, ngx_log_t *log);
ngx_int_t (*delete_channel)(ngx_http_push_channel_t *channel, ngx_http_request_t *r);
ngx_http_push_msg_t *(*get_message)(ngx_http_push_channel_t *channel, ngx_http_request_t *r, ngx_int_t *msg_search_outcome, ngx_http_push_loc_conf_t *cf, ngx_log_t *log);
void (*reserve_message)(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg);
void (*release_message)(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg);
//pub/sub
// ngx_int_t (*publish)(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg); //would like it to be this
ngx_int_t (*publish)(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line, ngx_log_t *log); //instead it's this
ngx_http_push_subscriber_t * (*subscribe)(ngx_http_push_channel_t *channel, ngx_http_request_t *r);
//channel properties
ngx_int_t (*channel_subscribers)(ngx_http_push_channel_t * channel);
ngx_int_t (*channel_worker_subscribers)(ngx_http_push_subscriber_t * worker_sentinel);
void (*lock)(void); //legacy shared-memory store helpers
void (*unlock)(void);
//message actions and properties
ngx_http_push_msg_t * (*create_message)(ngx_http_push_channel_t *channel, ngx_http_request_t *r);
ngx_str_t * (*message_etag)(ngx_http_push_msg_t *msg);
ngx_str_t * (*message_content_type)(ngx_http_push_msg_t *msg);
} ngx_http_push_store_t;
ngx_int_t ngx_http_push_worker_processes;
ngx_pool_t *ngx_http_push_pool;
ngx_module_t ngx_http_push_module;
//garbage-collecting shared memory slab allocation
void * ngx_http_push_slab_alloc(size_t size);
void * ngx_http_push_slab_alloc_locked(size_t size);
//channel messages
static ngx_http_push_msg_t *ngx_http_push_get_latest_message_locked(ngx_http_push_channel_t * channel);
static ngx_http_push_msg_t *ngx_http_push_get_oldest_message_locked(ngx_http_push_channel_t * channel);
static ngx_inline void ngx_http_push_general_delete_message_locked(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg, ngx_int_t force, ngx_slab_pool_t *shpool);
#define ngx_http_push_delete_message_locked(channel, msg, shpool) ngx_http_push_general_delete_message_locked(channel, msg, 0, shpool)
#define ngx_http_push_force_delete_message_locked(channel, msg, shpool) ngx_http_push_general_delete_message_locked(channel, msg, 1, shpool)
static ngx_inline void ngx_http_push_free_message_locked(ngx_http_push_msg_t *msg, ngx_slab_pool_t *shpool);
static ngx_http_push_msg_t * ngx_http_push_find_message_locked(ngx_http_push_channel_t *channel, ngx_http_request_t *r, ngx_int_t *status);
//channel
static ngx_str_t * ngx_http_push_get_channel_id(ngx_http_request_t *r, ngx_http_push_loc_conf_t *cf);
static ngx_int_t ngx_http_push_channel_info(ngx_http_request_t *r, ngx_uint_t message_queue_size, ngx_uint_t subscriber_queue_size, time_t last_seen);
//subscriber
static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_handle_subscriber_concurrency(ngx_http_request_t *r, ngx_http_push_channel_t *channel, ngx_http_push_loc_conf_t *loc_conf);
#define ngx_http_push_broadcast_status(channel, status_code, status_line, log) ngx_http_push_store_local.publish(channel, NULL, status_code, status_line, log)
#define ngx_http_push_broadcast_message(channel, msg, log) ngx_http_push_store_local.publish(channel, msg, 0, NULL, log)
static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *channel, ngx_http_push_subscriber_t *sentinel, ngx_http_push_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line);
static ngx_int_t ngx_http_push_allow_caching(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_subscriber_get_etag_int(ngx_http_request_t * r);
static ngx_str_t * ngx_http_push_subscriber_get_etag(ngx_http_request_t * r);
static void ngx_http_push_subscriber_cleanup(ngx_http_push_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_prepare_response_to_subscriber_request(ngx_http_request_t *r, ngx_chain_t *chain, ngx_str_t *content_type, ngx_str_t *etag, time_t last_modified);
//publisher
static ngx_int_t ngx_http_push_publisher_handler(ngx_http_request_t * r);
static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r);
//utilities
//general request handling
static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf);
static ngx_table_elt_t * ngx_http_push_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_int_t ngx_http_push_respond_status_only(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *statusline);
static ngx_chain_t * ngx_http_push_create_output_chain(ngx_buf_t *buf, ngx_pool_t *pool, ngx_log_t *log);
//string constants
//headers
const ngx_str_t NGX_HTTP_PUSH_HEADER_ETAG = ngx_string("Etag");
const ngx_str_t NGX_HTTP_PUSH_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
const ngx_str_t NGX_HTTP_PUSH_HEADER_VARY = ngx_string("Vary");
const ngx_str_t NGX_HTTP_PUSH_HEADER_ALLOW = ngx_string("Allow");
const ngx_str_t NGX_HTTP_PUSH_HEADER_CACHE_CONTROL = ngx_string("Cache-Control");
const ngx_str_t NGX_HTTP_PUSH_HEADER_PRAGMA = ngx_string("Pragma");
//header values
const ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");
//status strings
const ngx_str_t NGX_HTTP_PUSH_HTTP_STATUS_409 = ngx_string("409 Conflict");
const ngx_str_t NGX_HTTP_PUSH_HTTP_STATUS_410 = ngx_string("410 Gone");
//other stuff
const ngx_str_t NGX_HTTP_PUSH_ALLOW_GET_POST_PUT_DELETE= ngx_string("GET, POST, PUT, DELETE");
const ngx_str_t NGX_HTTP_PUSH_ALLOW_GET= ngx_string("GET");
const ngx_str_t NGX_HTTP_PUSH_VARY_HEADER_VALUE = ngx_string("If-None-Match, If-Modified-Since");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_PLAIN = ngx_string(
"queued messages: %ui" CRLF
"last requested: %d sec. ago (-1=never)" CRLF
"active subscribers: %ui"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_JSON = ngx_string(
"{\"messages\": %ui, "
"\"requested\": %d, "
"\"subscribers\": %ui }"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_XML = ngx_string(
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF
"<channel>" CRLF
" <messages>%ui</messages>" CRLF
" <requested>%d</requested>" CRLF
" <subscribers>%ui</subscribers>" CRLF
"</channel>"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_YAML = ngx_string(
"---" CRLF
"messages: %ui" CRLF
"requested: %d" CRLF
"subscribers %ui" CRLF
"\0");
typedef struct {
char *subtype;
size_t len;
const ngx_str_t *format;
} ngx_http_push_content_subtype_t;