blob: 65795eade6146b60cff6112358017d42c474d68c [file] [log] [blame] [raw]
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t * id, time_t timeout, ngx_log_t * log);
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t * id, time_t timeout, ngx_log_t * log);
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash);
static ngx_http_push_channel_t * ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool);
static ngx_http_push_channel_t * ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel) {
ngx_queue_t *sentinel = &channel->message_queue->queue;
time_t now = ngx_time();
ngx_http_push_msg_t *msg=NULL;
while(!ngx_queue_empty(sentinel)){
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_msg_t, queue);
if (msg!=NULL && msg->expires != 0 && now > msg->expires) {
ngx_http_push_delete_message_locked(channel, msg, ngx_http_push_shpool);
}
else { //definitely a message left to send
return NULL;
}
}
//at this point, the queue is empty
return (channel->subscribers==0 && (channel->expires <= now)) ? channel : NULL; //if no waiting requests and channel expired, return this channel to be deleted
}
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash) {
ngx_int_t res;
res = ngx_http_push_delete_node_locked(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, (ngx_rbtree_node_t *)trash, ngx_http_push_shpool);
if(res==NGX_OK) {
((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels--;
return NGX_OK;
}
return res;
}
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool) {
//assume the shm zone is already locked
if(trash != NULL){ //take out the trash
ngx_rbtree_delete(tree, trash);
//delete the worker-subscriber queue
ngx_queue_t *sentinel = (ngx_queue_t *)(&((ngx_http_push_channel_t *)trash)->workers_with_subscribers);
ngx_queue_t *cur = ngx_queue_head(sentinel);
ngx_queue_t *next;
while(cur!=sentinel) {
next = ngx_queue_next(cur);
ngx_slab_free_locked(shpool, cur);
cur = next;
}
ngx_slab_free_locked(shpool, trash);
return NGX_OK;
}
return NGX_DECLINED;
}
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t *id, time_t timeout, ngx_log_t *log) {
ngx_rbtree_t *tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
uint32_t hash;
ngx_rbtree_node_t *node, *sentinel;
ngx_int_t rc;
ngx_http_push_channel_t *up = NULL;
ngx_http_push_channel_t *trash[] = { NULL, NULL, NULL };
ngx_uint_t i, trashed=0;
if (tree==NULL) {
return NULL;
}
hash = ngx_crc32_short(id->data, id->len);
node = tree->root;
sentinel = tree->sentinel;
while (node != sentinel) {
//every search is responsible for deleting a couple of empty, if it comes across them
if (trashed < (sizeof(trash) / sizeof(*trash))) {
if((trash[trashed]=ngx_http_push_clean_channel_locked((ngx_http_push_channel_t *) node))!=NULL) {
trashed++;
}
}
if (hash < node->key) {
node = node->left;
continue;
}
if (hash > node->key) {
node = node->right;
continue;
}
/* hash == node->key */
do {
up = (ngx_http_push_channel_t *) node;
rc = ngx_memn2cmp(id->data, up->id.data, id->len, up->id.len);
if (rc == 0) {
//found
for(i=0; i<trashed; i++) {
if(trash[i] != up){ //take out the trash
ngx_http_push_delete_channel_locked(trash[i]);
}
}
up->expires = ngx_time() + timeout;
ngx_http_push_clean_channel_locked(up);
return up;
}
node = (rc < 0) ? node->left : node->right;
} while (node != sentinel && hash == node->key);
break;
}
//not found
for(i=0; i<trashed; i++) {
ngx_http_push_delete_channel_locked(trash[i]);
}
return NULL;
}
//find a channel by id. if channel not found, make one, insert it, and return that.
static ngx_http_push_channel_t *ngx_http_push_get_channel(ngx_str_t *id, time_t timeout, ngx_log_t *log) {
ngx_rbtree_t *tree;
ngx_http_push_channel_t *up=ngx_http_push_find_channel(id, timeout, log);
if(up != NULL) { //we found our channel
return up;
}
tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
if((up = ngx_http_push_slab_alloc_locked(sizeof(*up) + id->len + sizeof(ngx_http_push_msg_t)))==NULL) {
return NULL;
}
up->id.data = (u_char *) (up+1); //contiguous piggy
up->message_queue = (ngx_http_push_msg_t *) (up->id.data + id->len);
up->id.len = (u_char) id->len;
ngx_memcpy(up->id.data, id->data, up->id.len);
up->node.key = ngx_crc32_short(id->data, id->len);
ngx_rbtree_insert(tree, (ngx_rbtree_node_t *) up);
//initialize queues
ngx_queue_init(&up->message_queue->queue);
up->messages=0;
ngx_queue_init(&up->workers_with_subscribers.queue);
up->subscribers=0;
up->expires = ngx_time() + timeout;
((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels++;
return up;
}
static void ngx_rbtree_generic_insert(
ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node,
ngx_rbtree_node_t *sentinel,
int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right))
{
for ( ;; ) {
if (node->key < temp->key) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else if (node->key > temp->key) {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
} else { /* node->key == temp->key */
if (compare(node, temp) < 0) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
}
}
}
node->parent = temp;
node->left = sentinel;
node->right = sentinel;
ngx_rbt_red(node);
}
#define ngx_http_push_walk_rbtree(apply) \
ngx_http_push_rbtree_walker(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, apply, ((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree.root)
static void ngx_http_push_rbtree_walker(ngx_rbtree_t *tree, ngx_int_t (*apply)(ngx_http_push_channel_t * channel), ngx_rbtree_node_t *node) {
ngx_rbtree_node_t *sentinel = tree->sentinel;
if(node!=sentinel) {
apply((ngx_http_push_channel_t *)node);
if(node->left!=NULL) {
ngx_http_push_rbtree_walker(tree, apply, node->left);
}
if(node->right!=NULL) {
ngx_http_push_rbtree_walker(tree, apply, node->right);
}
}
}
static void ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
ngx_rbtree_generic_insert(temp, node, sentinel, ngx_http_push_compare_rbtree_node);
}
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right)
{
ngx_http_push_channel_t *left = (ngx_http_push_channel_t *) v_left, *right = (ngx_http_push_channel_t *) v_right;
return ngx_memn2cmp(left->id.data, right->id.data, left->id.len, right->id.len);
}