blob: c2518bb54e53be164e0e839221983b3517552982 [file] [log] [blame] [raw]
//worker processes of the world, unite.
static void ngx_http_push_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_process_worker_message(void);
static void ngx_http_push_ipc_exit_worker(ngx_cycle_t *cycle);
#define NGX_CMD_HTTP_PUSH_CHECK_MESSAGES 49
ngx_socket_t ngx_http_push_socketpairs[NGX_MAX_PROCESSES][2];
static ngx_int_t ngx_http_push_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers) {
int i, s = 0, on = 1;
ngx_int_t last_expected_process = ngx_last_process;
/* here's the deal: we have no control over fork()ing, nginx's internal
socketpairs are unusable for our purposes (as of nginx 0.8 -- check the
code to see why), and the module initialization callbacks occur before
any workers are spawned. Rather than futzing around with existing
socketpairs, we populate our own socketpairs array.
Trouble is, ngx_spawn_process() creates them one-by-one, and we need to
do it all at once. So we must guess all the workers' ngx_process_slots in
advance. Meaning the spawning logic must be copied to the T.
*/
for(i=0; i < workers; i++) {
while (s < last_expected_process && ngx_processes[s].pid != -1) {
//find empty existing slot
s++;
}
//copypasta from os/unix/ngx_process.c (ngx_spawn_process)
ngx_socket_t *socks = ngx_http_push_socketpairs[s];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed on socketpair while initializing push module");
return NGX_ERROR;
}
if (ngx_nonblocking(socks[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (ngx_nonblocking(socks[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (ioctl(socks[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed on socketpair while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed on socketpair while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed on socketpair while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while initializing push module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
s++; //NEXT!!
}
return NGX_OK;
}
static void ngx_http_push_ipc_exit_worker(ngx_cycle_t *cycle) {
ngx_close_channel((ngx_socket_t *) ngx_http_push_socketpairs[ngx_process_slot], cycle->log);
}
//will be called many times
static ngx_int_t ngx_http_push_init_ipc_shm(ngx_int_t workers) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_shm_zone->shm.addr;
ngx_http_push_shm_data_t *d = (ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data;
ngx_http_push_worker_msg_t *worker_messages;
int i;
ngx_shmtx_lock(&shpool->mutex);
if(d->ipc!=NULL) {
//already initialized...
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
//initialize worker message queues
if((worker_messages = ngx_slab_alloc_locked(shpool, sizeof(*worker_messages)*workers))==NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
for(i=0; i<workers; i++) {
ngx_queue_init(&worker_messages[i].queue);
}
d->ipc=worker_messages;
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
static ngx_int_t ngx_http_push_register_worker_message_handler(ngx_cycle_t *cycle) {
if (ngx_add_channel_event(cycle, ngx_http_push_socketpairs[ngx_process_slot][1], NGX_READ_EVENT, ngx_http_push_channel_handler) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "failed to register channel handler while initializing push module worker");
return NGX_ERROR;
}
return NGX_OK;
}
static void ngx_http_push_channel_handler(ngx_event_t *ev) {
//copypasta from os/unix/ngx_process_cycle.c (ngx_channel_handler)
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
while(1) {
n = ngx_read_channel(c->fd, &ch, sizeof(ch), ev->log);
if (n == NGX_ERROR) {
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
ngx_del_conn(c, 0);
}
ngx_close_connection(c);
return;
}
if ((ngx_event_flags & NGX_USE_EVENTPORT_EVENT) && (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR)) {
return;
}
if (n == NGX_AGAIN) {
return;
}
//ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "push module: channel command: %d", ch.command);
if (ch.command==NGX_CMD_HTTP_PUSH_CHECK_MESSAGES) {
ngx_http_push_process_worker_message();
}
}
}
static ngx_int_t ngx_http_push_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log) {
//seems ch doesn't need to have fd set. odd, but roll with it. pid and process slot also unnecessary.
static ngx_channel_t ch = {NGX_CMD_HTTP_PUSH_CHECK_MESSAGES, 0, 0, -1};
return ngx_write_channel(ngx_http_push_socketpairs[slot][0], &ch, sizeof(ngx_channel_t), log);
}
static ngx_inline void ngx_http_push_process_worker_message(void) {
ngx_http_push_worker_msg_t *prev_worker_msg, *worker_msg, *sentinel;
const ngx_str_t *status_line = NULL;
ngx_http_push_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_shm_zone->shm.addr;
ngx_http_push_subscriber_t *subscriber_sentinel;
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_worker_msg_t *worker_messages = ((ngx_http_push_shm_data_t *)ngx_http_push_shm_zone->data)->ipc;
ngx_int_t status_code;
ngx_http_push_msg_t *msg;
sentinel = &worker_messages[ngx_process_slot];
worker_msg = (ngx_http_push_worker_msg_t *)ngx_queue_next(&sentinel->queue);
while(worker_msg != sentinel) {
if(worker_msg->pid==ngx_pid) {
//everything is okay.
status_code = worker_msg->status_code;
msg = worker_msg->msg;
channel = worker_msg->channel;
subscriber_sentinel = worker_msg->subscriber_sentinel;
if(msg==NULL) {
//just a status line, is all
//status code only.
switch(status_code) {
case NGX_HTTP_CONFLICT:
status_line=&NGX_HTTP_PUSH_HTTP_STATUS_409;
break;
case NGX_HTTP_GONE:
status_line=&NGX_HTTP_PUSH_HTTP_STATUS_410;
break;
case 0:
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push module: worker message contains neither a channel message nor a status code");
//let's let the subscribers know that something went wrong and they might've missed a message
status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
//intentional fall-through
default:
status_line=NULL;
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_respond_to_subscribers(channel, subscriber_sentinel, msg, status_code, status_line);
ngx_shmtx_lock(&shpool->mutex);
}
else {
//that's quite bad you see. a previous worker died with an undelivered message.
//but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
ngx_http_push_pid_queue_t *channel_worker_sentinel = &worker_msg->channel->workers_with_subscribers;
ngx_http_push_pid_queue_t *channel_worker_cur = channel_worker_sentinel;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
//delete that invalid sucker.
while((channel_worker_cur=(ngx_http_push_pid_queue_t *)ngx_queue_next(&channel_worker_cur->queue))!=channel_worker_sentinel) {
if(channel_worker_cur->pid == worker_msg->pid) {
ngx_queue_remove(&channel_worker_cur->queue);
ngx_slab_free_locked(shpool, channel_worker_cur);
break;
}
}
}
//It may be worth it to memzero worker_msg for debugging purposes.
prev_worker_msg = worker_msg;
worker_msg = (ngx_http_push_worker_msg_t *)ngx_queue_next(&worker_msg->queue);
ngx_slab_free_locked(shpool, prev_worker_msg);
}
ngx_queue_init(&sentinel->queue); //reset the worker message sentinel
ngx_shmtx_unlock(&shpool->mutex);
return;
}
static ngx_int_t ngx_http_push_send_worker_message(ngx_http_push_channel_t *channel, ngx_http_push_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_msg_t *msg, ngx_int_t status_code, ngx_log_t *log) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_shm_zone->shm.addr;
ngx_http_push_worker_msg_t *worker_messages = ((ngx_http_push_shm_data_t *)ngx_http_push_shm_zone->data)->ipc;
ngx_http_push_worker_msg_t *thisworker_messages = worker_messages + worker_slot;
ngx_http_push_worker_msg_t *newmessage;
ngx_shmtx_lock(&shpool->mutex);
if((newmessage=ngx_slab_alloc_locked(shpool, sizeof(*newmessage)))==NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push module: unable to allocate worker message");
return NGX_ERROR;
}
ngx_queue_insert_tail(&thisworker_messages->queue, &newmessage->queue);
newmessage->msg = msg;
newmessage->status_code = status_code;
newmessage->pid = pid;
newmessage->subscriber_sentinel = subscriber_sentinel;
newmessage->channel = channel;
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}