blob: 683f3397276170388edaa74b86742460a8675a31 [file] [log] [blame] [raw]
/* *****************************************************************************
* Cluster Messages API
*
* Facil supports a message oriented API for use for Inter Process Communication
* (IPC), publish/subscribe patterns, horizontal scaling and similar use-cases.
**************************************************************************** */
// #include "facil_cluster.h"
#include "spnlock.h"
#include "facil.h"
#include "fio_mem.h"
#include "fio_llist.h"
#include "fio_tmpfile.h"
#include "fiobj4sock.h"
#include <sys/types.h>
#include <signal.h>
#undef facil_subscribe
#undef facil_subscribe_pubsub
#undef facil_publish
/* *****************************************************************************
* Data Structures - Clients / Subscriptions data
**************************************************************************** */
typedef enum cluster_message_type_e {
CLUSTER_MESSAGE_FORWARD,
CLUSTER_MESSAGE_JSON,
CLUSTER_MESSAGE_ROOT,
CLUSTER_MESSAGE_ROOT_JSON,
CLUSTER_MESSAGE_PUBSUB_SUB,
CLUSTER_MESSAGE_PUBSUB_UNSUB,
CLUSTER_MESSAGE_PATTERN_SUB,
CLUSTER_MESSAGE_PATTERN_UNSUB,
CLUSTER_MESSAGE_SHUTDOWN,
CLUSTER_MESSAGE_ERROR,
CLUSTER_MESSAGE_PING,
} cluster_message_type_e;
#define FIO_HASH_KEY_TYPE FIOBJ
#define FIO_HASH_KEY_INVALID FIOBJ_INVALID
#define FIO_HASH_KEY2UINT(key) (fiobj_obj2hash((key)))
#define FIO_HASH_COMPARE_KEYS(k1, k2) (fiobj_iseq((k1), (k2)))
#define FIO_HASH_KEY_ISINVALID(key) ((key) == FIOBJ_INVALID)
#define FIO_HASH_KEY_COPY(key) (fiobj_dup((key)))
#define FIO_HASH_KEY_DESTROY(key) fiobj_free((key))
#include "fio_ary.h"
#include "fio_hashmap.h"
typedef struct {
fio_hash_s channels;
spn_lock_i lock;
} collection_s;
typedef struct {
fio_ary_s ary;
spn_lock_i lock;
} collection_ary_s;
typedef struct {
FIOBJ id;
fio_ls_embd_s subscriptions;
collection_s *parent;
spn_lock_i lock;
} channel_s;
typedef struct {
channel_s ch;
facil_match_fn match;
} pattern_s;
struct subscription_s {
fio_ls_embd_s node;
channel_s *parent;
void (*on_message)(facil_msg_s *msg);
void (*on_unsubscribe)(void *udata1, void *udata2);
void *udata1;
void *udata2;
/** reference counter. */
uintptr_t ref;
/** prevents the callback from running concurrently for multiple messages. */
spn_lock_i lock;
};
typedef struct {
facil_msg_s msg;
facil_msg_metadata_s *meta;
uintptr_t ref;
} facil_msg_internal_s;
typedef struct {
cluster_message_type_e type;
/** A unique message type. Negative values are reserved, 0 == pub/sub. */
int32_t filter;
/** A channel name, allowing for pub/sub patterns. */
FIOBJ channel;
/** The actual message. */
FIOBJ msg;
} facil_msg_str_s;
#define COLLECTION_INIT \
{ .channels = FIO_HASH_INIT, .lock = SPN_LOCK_INIT }
struct {
collection_s filters;
collection_s pubsub;
collection_s patterns;
collection_s engines;
collection_ary_s meta;
} postoffice = {
.filters = COLLECTION_INIT,
.pubsub = COLLECTION_INIT,
.patterns = COLLECTION_INIT,
};
/** The default engine (settable). */
pubsub_engine_s *FACIL_PUBSUB_DEFAULT = FACIL_PUBSUB_CLUSTER;
/* *****************************************************************************
Engine handling and Management
***************************************************************************** */
/* implemented later, informs root process about subscriptions */
static inline void inform_root_about_channel(FIOBJ ch, facil_match_fn match,
int add);
/* runs in lock(!) let'm all know */
static void pubsub_on_channel_create(channel_s *ch, facil_match_fn match) {
spn_lock(&postoffice.engines.lock);
FIO_HASH_FOR_LOOP(&postoffice.engines.channels, e_) {
if (!e_ || !e_->obj)
continue;
pubsub_engine_s *e = e_->obj;
e->subscribe(e, ch->id, match);
}
spn_unlock(&postoffice.engines.lock);
inform_root_about_channel(ch->id, match, 1);
}
/* runs in lock(!) let'm all know */
static void pubsub_on_channel_destroy(channel_s *ch, facil_match_fn match) {
spn_lock(&postoffice.engines.lock);
FIO_HASH_FOR_LOOP(&postoffice.engines.channels, e_) {
if (!e_ || !e_->obj)
continue;
pubsub_engine_s *e = e_->obj;
e->unsubscribe(e, ch->id, match);
}
spn_unlock(&postoffice.engines.lock);
inform_root_about_channel(ch->id, match, 0);
}
/* *****************************************************************************
* Freeing subscriptions / channels
**************************************************************************** */
/* to be used for reference counting (subtructing) */
static inline void subscription_free(subscription_s *s) {
if (spn_sub(&s->ref, 1)) {
return;
}
if (s->on_unsubscribe) {
s->on_unsubscribe(s->udata1, s->udata2);
}
fio_free(s);
}
/* to be used for reference counting (increasing) */
static inline subscription_s *subscription_dup(subscription_s *s) {
spn_add(&s->ref, 1);
return s;
}
// static void subscription_free_later(void *s, void *ignr) {
// subscription_free(s);
// (void)ignr;
// }
/* free a channel (if it's empty) */
static inline void channel_destroy(channel_s *c) {
spn_lock(&c->parent->lock);
if (fio_ls_embd_any(&c->subscriptions)) {
spn_unlock(&c->parent->lock);
return;
}
fio_hash_insert(&c->parent->channels, c->id, NULL);
if ((fio_hash_count(&c->parent->channels) << 1) <=
fio_hash_capa(&c->parent->channels) &&
fio_hash_capa(&c->parent->channels) > 512) {
fio_hash_compact(&c->parent->channels);
}
spn_unlock(&c->parent->lock);
pubsub_on_channel_destroy(
c, (c->parent == &postoffice.patterns ? ((pattern_s *)c)->match : NULL));
fio_free(c);
}
/* cancel a subscription */
static void subscription_destroy(void *s_, void *ignore) {
if (!s_) { /* ignore NULL subscriptions. */
return;
}
subscription_s *s = s_;
if (spn_trylock(&s->parent->lock)) {
defer(subscription_destroy, s_, ignore);
return;
}
fio_ls_embd_remove(&s->node);
if (fio_ls_embd_is_empty(&s->parent->subscriptions)) {
spn_unlock(&s->parent->lock);
channel_destroy(s->parent);
} else {
spn_unlock(&s->parent->lock);
}
subscription_free(s);
(void)ignore;
}
/* *****************************************************************************
* Creating subscriptions
**************************************************************************** */
/** Creates a new subscription object, returning NULL on error. */
static inline subscription_s *subscription_create(subscribe_args_s args) {
if (!args.on_message || (!args.channel && !args.filter)) {
if (args.on_unsubscribe) {
args.on_unsubscribe(args.udata1, args.udata2);
}
return NULL;
}
collection_s *collection;
if (args.filter) {
/* either a filter OR a channel can be subscribed to. */
args.channel = fiobj_num_new((uintptr_t)args.filter);
collection = &postoffice.filters;
} else {
if (args.match) {
collection = &postoffice.patterns;
} else {
collection = &postoffice.pubsub;
}
if (FIOBJ_TYPE_IS(args.channel, FIOBJ_T_STRING)) {
/* Hash values are cached, so it can be computed outside the lock */
fiobj_str_freeze(args.channel);
fiobj_obj2hash(args.channel);
}
}
/* allocate and initialize subscription object */
subscription_s *s = fio_malloc(sizeof(*s));
if (!s) {
perror("FATAL ERROR: (pubsub) can't allocate memory for subscription");
exit(errno);
}
*s = (subscription_s){
.node = (fio_ls_embd_s)FIO_LS_INIT(s->node),
.parent = NULL,
.on_message = args.on_message,
.on_unsubscribe = args.on_unsubscribe,
.udata1 = args.udata1,
.udata2 = args.udata2,
.ref = 1,
.lock = SPN_LOCK_INIT,
};
/* seek existing channel or create one (and prevent memory bloat) */
spn_lock(&collection->lock);
if (fio_hash_is_fragmented(&collection->channels)) {
fio_hash_compact(&collection->channels);
}
channel_s *ch = fio_hash_find(&collection->channels, args.channel);
if (!ch) {
if (args.match) {
/* pattern subscriptions */
ch = fio_malloc(sizeof(pattern_s));
if (!ch) {
perror("FATAL ERROR: (pubsub) can't allocate memory for pattern");
exit(errno);
}
((pattern_s *)ch)->match = args.match;
} else {
/* channel subscriptions */
ch = fio_malloc(sizeof(*ch));
if (!ch) {
perror("FATAL ERROR: (pubsub) can't allocate memory for channel");
exit(errno);
}
}
*ch = (channel_s){
.id = fiobj_dup(args.channel),
.subscriptions = (fio_ls_embd_s)FIO_LS_INIT(ch->subscriptions),
.parent = collection,
.lock = SPN_LOCK_INIT,
};
fio_hash_insert(&collection->channels, args.channel, ch);
if (!args.filter) {
pubsub_on_channel_create(ch, args.match);
}
}
/* add subscription to filter / channel / pattern */
s->parent = ch;
spn_lock(&ch->lock);
fio_ls_embd_push(&ch->subscriptions, &s->node);
spn_unlock(&ch->lock);
spn_unlock(&collection->lock);
if (args.filter) {
fiobj_free(args.channel);
}
return s;
}
/* *****************************************************************************
* Publishing to the subsriptions
**************************************************************************** */
/** frees the internal message data */
static inline void internal_message_free(facil_msg_internal_s *msg) {
if (spn_sub(&msg->ref, 1))
return;
facil_msg_metadata_s *meta = msg->meta;
while (meta) {
facil_msg_metadata_s *tmp = meta;
meta = meta->next;
if (tmp->on_finish) {
tmp->on_finish(&msg->msg, tmp->metadata);
}
fio_free(tmp);
}
fiobj_free(msg->msg.channel);
fiobj_free(msg->msg.msg);
fio_free(msg);
}
/* defers the callback (mark only) */
static inline void defer_subscription_callback(facil_msg_s *msg_) {
facil_msg_internal_s *msg = (facil_msg_internal_s *)msg_;
msg->ref = 1;
}
/** Finds the message's metadata by it's type ID. */
void *facil_message_metadata(facil_msg_s *msg, intptr_t type_id) {
facil_msg_metadata_s *exists = ((facil_msg_internal_s *)msg)->meta;
while (exists && exists->type_id != type_id) {
exists = exists->next;
}
if (exists) {
return exists->metadata;
}
return NULL;
}
/* performs the actual callback */
static void perform_subscription_callback(void *s_, void *msg_) {
subscription_s *s = s_;
if (spn_trylock(&s->lock)) {
defer(perform_subscription_callback, s_, msg_);
return;
}
facil_msg_internal_s *msg = (facil_msg_internal_s *)msg_;
facil_msg_internal_s m = {
.msg =
{
.channel = msg->msg.channel,
.msg = msg->msg.msg,
.filter = msg->msg.filter,
.udata1 = s->udata1,
.udata2 = s->udata2,
},
.meta = msg->meta,
.ref = 0,
};
s->on_message((facil_msg_s *)&m);
spn_unlock(&s->lock);
if (m.ref) {
defer(perform_subscription_callback, s_, msg_);
return;
}
internal_message_free(msg);
subscription_free(s);
}
/* publishes a message to a channel, managing the reference counts */
static void publish2channel(channel_s *ch, facil_msg_internal_s *msg) {
if (!ch || !msg) {
return;
}
spn_lock(&ch->lock);
FIO_LS_EMBD_FOR(&ch->subscriptions, pos) {
subscription_s *s = FIO_LS_EMBD_OBJ(subscription_s, node, pos);
if (!s) {
continue;
}
subscription_dup(s);
spn_add(&msg->ref, 1);
defer(perform_subscription_callback, s, msg);
}
spn_unlock(&ch->lock);
}
static inline void call_meta_callbacks(facil_msg_internal_s *m, FIOBJ ch_raw,
FIOBJ msg_raw) {
if (fio_ary_count(&postoffice.meta.ary) == 0) {
return;
}
/* don't call user code within a lock - copy the array :-( */
fio_ary_s cpy = FIO_ARY_INIT;
spn_lock(&postoffice.meta.lock);
fio_ary_concat(&cpy, &postoffice.meta.ary);
spn_unlock(&postoffice.meta.lock);
FIO_ARY_FOR(&cpy, pos) {
if (pos.obj == NULL)
continue;
facil_msg_metadata_s *ret = fio_malloc(sizeof(*ret));
if (!ret) {
perror("FATAL ERROR: (pubsub) couldn't allocate memory for metadata");
exit(errno);
}
*ret = ((facil_msg_metadata_s(*)(facil_msg_s * msg, FIOBJ raw_ch,
FIOBJ raw_msg))(uintptr_t)pos.obj)(
&m->msg, ch_raw, msg_raw);
ret->next = m->meta;
m->meta = ret;
}
fio_ary_free(&cpy);
}
static void publish2process(int32_t filter, FIOBJ channel, FIOBJ msg,
cluster_message_type_e type) {
facil_msg_internal_s *m = fio_malloc(sizeof(*m));
if (!m) {
perror("FATAL ERROR: (pubsub) can't allocate memory for message data");
exit(errno);
}
*m = (facil_msg_internal_s){
.msg =
{
.filter = filter,
.channel = fiobj_dup(channel),
.msg = fiobj_dup(msg),
},
.ref = 1,
};
if (type == CLUSTER_MESSAGE_JSON) {
FIOBJ org_ch = m->msg.channel;
FIOBJ org_msg = m->msg.msg;
if (org_ch) {
fio_cstr_s str = fiobj_obj2cstr(org_ch);
fiobj_json2obj(&m->msg.channel, str.data, str.length);
}
if (org_msg) {
fio_cstr_s str = fiobj_obj2cstr(org_msg);
fiobj_json2obj(&m->msg.msg, str.data, str.length);
}
if (!m->msg.channel) {
m->msg.channel = fiobj_dup(org_ch);
}
if (!m->msg.msg) {
m->msg.msg = fiobj_dup(org_msg);
}
if (filter == 0) {
call_meta_callbacks(m, org_ch, org_msg);
}
fiobj_free(org_ch);
fiobj_free(org_msg);
} else {
if (filter == 0) {
call_meta_callbacks(m, m->msg.channel, m->msg.msg);
}
}
if (filter) {
FIOBJ key = fiobj_num_new((uintptr_t)filter);
spn_lock(&postoffice.filters.lock);
channel_s *ch = fio_hash_find(&postoffice.filters.channels, key);
publish2channel(ch, m);
spn_unlock(&postoffice.filters.lock);
internal_message_free(m);
return;
}
/* exact match */
spn_lock(&postoffice.pubsub.lock);
channel_s *ch = fio_hash_find(&postoffice.pubsub.channels, channel);
publish2channel(ch, m);
spn_unlock(&postoffice.pubsub.lock);
/* test patterns */
spn_lock(&postoffice.patterns.lock);
FIO_HASH_FOR_LOOP(&postoffice.patterns.channels, p) {
if (!p->obj) {
continue;
}
pattern_s *pattern = p->obj;
if (pattern->match(pattern->ch.id, channel)) {
publish2channel(&pattern->ch, m);
}
}
spn_unlock(&postoffice.patterns.lock);
internal_message_free(m);
}
/** Prepares the message to be published. */
static inline facil_msg_str_s prepare_message(int32_t filter, FIOBJ ch,
FIOBJ msg) {
facil_msg_str_s m = {
.channel = ch,
.msg = msg,
.type = CLUSTER_MESSAGE_FORWARD,
.filter = filter,
};
if ((!ch || FIOBJ_TYPE_IS(ch, FIOBJ_T_STRING)) &&
(!msg || FIOBJ_TYPE_IS(msg, FIOBJ_T_STRING))) {
/* nothing to do */
} else {
m.type = CLUSTER_MESSAGE_JSON;
if (ch) {
m.channel = fiobj_obj2json(ch, 0);
}
if (msg) {
m.msg = fiobj_obj2json(msg, 0);
}
}
fiobj_dup(m.channel);
fiobj_dup(m.msg);
return m;
}
static void facil_send2cluster(int32_t filter, FIOBJ ch, FIOBJ msg,
cluster_message_type_e type);
/** Publishes a message to all processes (including this one) */
static inline void publish_msg2all(int32_t filter, FIOBJ ch, FIOBJ msg) {
facil_msg_str_s m = prepare_message(filter, ch, msg);
facil_send2cluster(m.filter, m.channel, m.msg, m.type);
publish2process(m.filter, m.channel, m.msg, m.type);
fiobj_free(m.channel);
fiobj_free(m.msg);
}
/** Publishes a message within the current process (only this one) */
static inline void publish_msg2local(int32_t filter, FIOBJ ch, FIOBJ msg) {
facil_msg_str_s m = prepare_message(filter, ch, msg);
publish2process(m.filter, m.channel, m.msg, m.type);
fiobj_free(m.channel);
fiobj_free(m.msg);
}
/** Publishes a message to other processes (excluding this one) */
static inline void publish_msg2cluster(int32_t filter, FIOBJ ch, FIOBJ msg) {
facil_msg_str_s m = prepare_message(filter, ch, msg);
facil_send2cluster(m.filter, m.channel, m.msg, m.type);
fiobj_free(m.channel);
fiobj_free(m.msg);
}
/** Publishes a message exclusively to the root process */
static inline void publish_msg2root(int32_t filter, FIOBJ ch, FIOBJ msg) {
if (facil_parent_pid() == getpid()) {
publish_msg2local(filter, ch, msg);
} else {
facil_msg_str_s m = prepare_message(filter, ch, msg);
m.type = (m.type == CLUSTER_MESSAGE_JSON ? CLUSTER_MESSAGE_ROOT_JSON
: CLUSTER_MESSAGE_ROOT);
facil_send2cluster(m.filter, m.channel, m.msg, m.type);
}
}
/* *****************************************************************************
* Data Structures - Core Structures
**************************************************************************** */
#define CLUSTER_READ_BUFFER 16384
typedef struct cluster_pr_s {
protocol_s protocol;
FIOBJ channel;
FIOBJ msg;
void (*handler)(struct cluster_pr_s *pr);
void (*sender)(FIOBJ data);
collection_s pubsub;
collection_s patterns;
intptr_t uuid;
uint32_t exp_channel;
uint32_t exp_msg;
uint32_t type;
int32_t filter;
uint32_t length;
uint8_t buffer[CLUSTER_READ_BUFFER];
} cluster_pr_s;
struct cluster_data_s {
intptr_t listener;
intptr_t client;
fio_ls_s clients;
fio_hash_s subscribers;
spn_lock_i lock;
char name[128];
} cluster_data = {.clients = FIO_LS_INIT(cluster_data.clients),
.subscribers = FIO_HASH_INIT,
.lock = SPN_LOCK_INIT};
static void cluster_data_cleanup(int delete_file) {
if (delete_file && cluster_data.name[0]) {
#if DEBUG
fprintf(stderr, "* INFO: (%d) CLUSTER UNLINKING\n", getpid());
#endif
unlink(cluster_data.name);
}
while (fio_ls_any(&cluster_data.clients)) {
intptr_t uuid = (intptr_t)fio_ls_pop(&cluster_data.clients);
if (uuid > 0) {
sock_close(uuid);
}
}
cluster_data = (struct cluster_data_s){
.lock = SPN_LOCK_INIT,
.clients = (fio_ls_s)FIO_LS_INIT(cluster_data.clients),
};
}
static int cluster_init(void) {
cluster_data_cleanup(0);
/* create a unique socket name */
char *tmp_folder = getenv("TMPDIR");
uint32_t tmp_folder_len = 0;
if (!tmp_folder || ((tmp_folder_len = (uint32_t)strlen(tmp_folder)) > 100)) {
#ifdef P_tmpdir
tmp_folder = P_tmpdir;
if (tmp_folder)
tmp_folder_len = (uint32_t)strlen(tmp_folder);
#else
tmp_folder = "/tmp/";
tmp_folder_len = 5;
#endif
}
if (tmp_folder_len >= 100) {
tmp_folder_len = 0;
}
if (tmp_folder_len) {
memcpy(cluster_data.name, tmp_folder, tmp_folder_len);
if (cluster_data.name[tmp_folder_len - 1] != '/')
cluster_data.name[tmp_folder_len++] = '/';
}
memcpy(cluster_data.name + tmp_folder_len, "facil-io-sock-", 14);
tmp_folder_len += 14;
tmp_folder_len += fio_ltoa(cluster_data.name + tmp_folder_len, getpid(), 8);
cluster_data.name[tmp_folder_len] = 0;
/* remove if existing */
unlink(cluster_data.name);
return 0;
}
/* *****************************************************************************
* Cluster Protocol callbacks
**************************************************************************** */
#ifdef __BIG_ENDIAN__
inline static uint32_t cluster_str2uint32(uint8_t *str) {
return ((str[0] & 0xFF) | ((((uint32_t)str[1]) << 8) & 0xFF00) |
((((uint32_t)str[2]) << 16) & 0xFF0000) |
((((uint32_t)str[3]) << 24) & 0xFF000000));
}
inline static void cluster_uint2str(uint8_t *dest, uint32_t i) {
dest[0] = i & 0xFF;
dest[1] = (i >> 8) & 0xFF;
dest[2] = (i >> 16) & 0xFF;
dest[3] = (i >> 24) & 0xFF;
}
#else
inline static uint32_t cluster_str2uint32(uint8_t *str) {
return (((((uint32_t)str[0]) << 24) & 0xFF000000) |
((((uint32_t)str[1]) << 16) & 0xFF0000) |
((((uint32_t)str[2]) << 8) & 0xFF00) | (str[3] & 0xFF));
}
inline static void cluster_uint2str(uint8_t *dest, uint32_t i) {
dest[0] = (i >> 24) & 0xFF;
dest[1] = (i >> 16) & 0xFF;
dest[2] = (i >> 8) & 0xFF;
dest[3] = i & 0xFF;
}
#endif
typedef struct cluster_msg_s {
facil_msg_s message;
size_t ref;
} cluster_msg_s;
static inline FIOBJ cluster_wrap_message(uint32_t ch_len, uint32_t msg_len,
uint32_t type, int32_t filter,
uint8_t *ch_data, uint8_t *msg_data) {
FIOBJ buf = fiobj_str_buf(ch_len + msg_len + 16);
fio_cstr_s f = fiobj_obj2cstr(buf);
cluster_uint2str(f.bytes, ch_len);
cluster_uint2str(f.bytes + 4, msg_len);
cluster_uint2str(f.bytes + 8, type);
cluster_uint2str(f.bytes + 12, (uint32_t)filter);
if (ch_len && ch_data) {
memcpy(f.bytes + 16, ch_data, ch_len);
}
if (msg_len && msg_data) {
memcpy(f.bytes + 16 + ch_len, msg_data, msg_len);
}
fiobj_str_resize(buf, ch_len + msg_len + 16);
return buf;
}
static uint8_t cluster_on_shutdown(intptr_t uuid, protocol_s *pr_) {
cluster_pr_s *p = (cluster_pr_s *)pr_;
p->sender(
cluster_wrap_message(0, 0, CLUSTER_MESSAGE_SHUTDOWN, 0, NULL, NULL));
return 255;
(void)pr_;
(void)uuid;
}
static void cluster_on_data(intptr_t uuid, protocol_s *pr_) {
cluster_pr_s *c = (cluster_pr_s *)pr_;
ssize_t i =
sock_read(uuid, c->buffer + c->length, CLUSTER_READ_BUFFER - c->length);
if (i <= 0)
return;
c->length += i;
i = 0;
do {
if (!c->exp_channel && !c->exp_msg) {
if (c->length - i < 16)
break;
c->exp_channel = cluster_str2uint32(c->buffer + i);
c->exp_msg = cluster_str2uint32(c->buffer + i + 4);
c->type = cluster_str2uint32(c->buffer + i + 8);
c->filter = (int32_t)cluster_str2uint32(c->buffer + i + 12);
if (c->exp_channel) {
if (c->exp_channel >= (1024 * 1024 * 16)) {
fprintf(stderr,
"FATAL ERROR: (%d) cluster message name too long (16Mb "
"limit): %u\n",
getpid(), (unsigned int)c->exp_channel);
exit(1);
return;
}
c->channel = fiobj_str_buf(c->exp_channel);
}
if (c->exp_msg) {
if (c->exp_msg >= (1024 * 1024 * 64)) {
fprintf(stderr,
"FATAL ERROR: (%d) cluster message data too long (64Mb "
"limit): %u\n",
getpid(), (unsigned int)c->exp_msg);
exit(1);
return;
}
c->msg = fiobj_str_buf(c->exp_msg);
}
i += 16;
}
if (c->exp_channel) {
if (c->exp_channel + i > c->length) {
fiobj_str_write(c->channel, (char *)c->buffer + i,
(size_t)(c->length - i));
c->exp_channel -= (c->length - i);
i = c->length;
break;
} else {
fiobj_str_write(c->channel, (char *)c->buffer + i, c->exp_channel);
i += c->exp_channel;
c->exp_channel = 0;
}
}
if (c->exp_msg) {
if (c->exp_msg + i > c->length) {
fiobj_str_write(c->msg, (char *)c->buffer + i, (size_t)(c->length - i));
c->exp_msg -= (c->length - i);
i = c->length;
break;
} else {
fiobj_str_write(c->msg, (char *)c->buffer + i, c->exp_msg);
i += c->exp_msg;
c->exp_msg = 0;
}
}
c->handler(c);
fiobj_free(c->msg);
fiobj_free(c->channel);
c->msg = FIOBJ_INVALID;
c->channel = FIOBJ_INVALID;
} while (c->length > i);
c->length -= i;
if (c->length) {
memmove(c->buffer, c->buffer + i, c->length);
}
(void)pr_;
}
static void cluster_ping(intptr_t uuid, protocol_s *pr_) {
FIOBJ ping = cluster_wrap_message(0, 0, CLUSTER_MESSAGE_PING, 0, NULL, NULL);
fiobj_send_free(uuid, ping);
(void)pr_;
}
static void cluster_data_cleanup(int delete_file);
static void cluster_on_close(intptr_t uuid, protocol_s *pr_) {
cluster_pr_s *c = (cluster_pr_s *)pr_;
if (facil_parent_pid() == getpid()) {
/* a child was lost, respawning is handled elsewhere. */
spn_lock(&cluster_data.lock);
FIO_LS_FOR(&cluster_data.clients, pos) {
if (pos->obj == (void *)uuid) {
fio_ls_remove(pos);
break;
}
}
spn_unlock(&cluster_data.lock);
} else if (cluster_data.client == uuid) {
/* no shutdown message received - parent crashed. */
if (c->type != CLUSTER_MESSAGE_SHUTDOWN && facil_is_running()) {
if (FACIL_PRINT_STATE) {
fprintf(stderr, "* FATAL ERROR: (%d) Parent Process crash detected!\n",
getpid());
}
facil_core_callback_force(FIO_CALL_ON_PARENT_CRUSH);
cluster_data_cleanup(1);
kill(getpid(), SIGINT);
}
}
fiobj_free(c->msg);
fiobj_free(c->channel);
FIO_HASH_FOR_FREE(&c->pubsub.channels, pos) {
if (pos->obj) {
subscription_destroy(pos->obj, NULL);
}
}
FIO_HASH_FOR_FREE(&c->patterns.channels, pos) {
if (pos->obj) {
subscription_destroy(pos->obj, NULL);
}
}
fio_free(c);
(void)uuid;
}
static inline protocol_s *
cluster_alloc(intptr_t uuid, void (*handler)(struct cluster_pr_s *pr),
void (*sender)(FIOBJ data)) {
cluster_pr_s *p = fio_mmap(sizeof(*p));
if (!p) {
perror("FATAL ERROR: Cluster protocol allocation failed");
exit(errno);
}
p->protocol = (protocol_s){
.service = "_facil.io_cluster_",
.ping = cluster_ping,
.on_close = cluster_on_close,
.on_shutdown = cluster_on_shutdown,
.on_data = cluster_on_data,
};
p->uuid = uuid;
p->handler = handler;
p->sender = sender;
p->pubsub = (collection_s){
.channels = FIO_HASH_INIT,
.lock = SPN_LOCK_INIT,
};
p->patterns = (collection_s){
.channels = FIO_HASH_INIT,
.lock = SPN_LOCK_INIT,
};
return &p->protocol;
}
/* *****************************************************************************
* Master (server) IPC Connections
**************************************************************************** */
/**
* A mock pub/sub callback for external subscriptions.
*/
static void mock_on_message(facil_msg_s *msg) { (void)msg; }
static void cluster_server_sender(FIOBJ data) {
spn_lock(&cluster_data.lock);
FIO_LS_FOR(&cluster_data.clients, pos) {
if ((intptr_t)pos->obj > 0) {
fiobj_send_free((intptr_t)pos->obj, fiobj_dup(data));
}
}
spn_unlock(&cluster_data.lock);
fiobj_free(data);
}
static void cluster_server_handler(struct cluster_pr_s *pr) {
/* what to do? */
switch ((cluster_message_type_e)pr->type) {
case CLUSTER_MESSAGE_FORWARD: /* fallthrough */
case CLUSTER_MESSAGE_JSON: {
fio_cstr_s cs = fiobj_obj2cstr(pr->channel);
fio_cstr_s ms = fiobj_obj2cstr(pr->msg);
cluster_server_sender(cluster_wrap_message(cs.len, ms.len, pr->type,
pr->filter, cs.bytes, ms.bytes));
publish2process(pr->filter, pr->channel, pr->msg,
(cluster_message_type_e)pr->type);
break;
}
case CLUSTER_MESSAGE_PUBSUB_SUB: {
subscription_s *s = subscription_create((subscribe_args_s){
.on_message = mock_on_message, .match = NULL, .channel = pr->channel});
spn_lock(&pr->pubsub.lock);
s = fio_hash_insert(&pr->pubsub.channels, pr->channel, s);
spn_unlock(&pr->pubsub.lock);
if (s) {
subscription_destroy(s, NULL);
}
break;
}
case CLUSTER_MESSAGE_PUBSUB_UNSUB: {
spn_lock(&pr->pubsub.lock);
subscription_s *s =
fio_hash_insert(&pr->pubsub.channels, pr->channel, NULL);
spn_unlock(&pr->pubsub.lock);
if (s) {
subscription_destroy(s, NULL);
}
break;
}
case CLUSTER_MESSAGE_PATTERN_SUB: {
void *match;
fio_cstr_s m = fiobj_obj2cstr(pr->msg);
for (size_t i = 0; i < sizeof(void *); ++i) {
((uint8_t *)(&match))[i] = m.bytes[i];
}
subscription_s *s = subscription_create((subscribe_args_s){
.on_message = mock_on_message, .match = NULL, .channel = pr->channel});
spn_lock(&pr->patterns.lock);
s = fio_hash_insert(&pr->patterns.channels, pr->channel, s);
spn_unlock(&pr->patterns.lock);
if (s) {
subscription_destroy(s, NULL);
}
break;
}
case CLUSTER_MESSAGE_PATTERN_UNSUB: {
spn_lock(&pr->pubsub.lock);
subscription_s *s =
fio_hash_insert(&pr->patterns.channels, pr->channel, NULL);
spn_unlock(&pr->pubsub.lock);
if (s) {
subscription_destroy(s, NULL);
}
break;
}
case CLUSTER_MESSAGE_ROOT_JSON:
pr->type = CLUSTER_MESSAGE_JSON; /* fallthrough */
case CLUSTER_MESSAGE_ROOT:
publish2process(pr->filter, pr->channel, pr->msg,
(cluster_message_type_e)pr->type);
break;
case CLUSTER_MESSAGE_SHUTDOWN: /* fallthrough */
case CLUSTER_MESSAGE_ERROR: /* fallthrough */
case CLUSTER_MESSAGE_PING: /* fallthrough */
default:
break;
}
}
/** Called when a ne client is available */
static void cluster_listen_accept(intptr_t uuid, protocol_s *protocol) {
(void)protocol;
/* prevent `accept` backlog in parent */
intptr_t client;
while ((client = sock_accept(uuid)) != -1) {
if (facil_attach(client, cluster_alloc(client, cluster_server_handler,
cluster_server_sender))) {
perror("FATAL ERROR: (facil.io) failed to attach cluster client");
exit(errno);
}
spn_lock(&cluster_data.lock);
fio_ls_push(&cluster_data.clients, (void *)client);
spn_unlock(&cluster_data.lock);
}
}
/** Called when the connection was closed, but will not run concurrently */
static void cluster_listen_on_close(intptr_t uuid, protocol_s *protocol) {
free(protocol);
cluster_data.listener = -1;
if (facil_parent_pid() == getpid()) {
#if DEBUG
fprintf(stderr, "* INFO: (%d) stopped listening for cluster connections\n",
getpid());
#endif
kill(0, SIGINT);
}
(void)uuid;
}
/** called when a connection's timeout was reached */
static void cluster_listen_ping(intptr_t uuid, protocol_s *protocol) {
sock_touch(uuid);
(void)protocol;
}
static uint8_t cluster_listen_on_shutdown(intptr_t uuid, protocol_s *pr_) {
return 255;
(void)pr_;
(void)uuid;
}
static void facil_listen2cluster(void *ignore) {
/* this is called for each `fork`, but we only need this to run once. */
spn_lock(&cluster_data.lock);
cluster_init();
cluster_data.listener = sock_listen(cluster_data.name, NULL);
spn_unlock(&cluster_data.lock);
if (cluster_data.listener < 0) {
perror("FATAL ERROR: (facil.io cluster) failed to open cluster socket.\n"
" check file permissions");
exit(errno);
}
protocol_s *p = malloc(sizeof(*p));
if (!p) {
perror("FATAL ERROR: (facil.io) couldn't allocate cluster server");
exit(errno);
}
*p = (protocol_s){
.service = "_facil.io_listen4cluster_",
.on_data = cluster_listen_accept,
.on_shutdown = cluster_listen_on_shutdown,
.ping = cluster_listen_ping,
.on_close = cluster_listen_on_close,
};
if (facil_attach(cluster_data.listener, p)) {
perror(
"FATAL ERROR: (facil.io) couldn't attach cluster server to facil.io");
exit(errno);
}
#if DEBUG
fprintf(stderr, "* INFO: (%d) Listening to cluster: %s\n", getpid(),
cluster_data.name);
#endif
(void)ignore;
}
static void facil_cluster_cleanup(void *ignore) {
/* cleanup the cluster data */
cluster_data_cleanup(facil_parent_pid() == getpid());
(void)ignore;
}
/* *****************************************************************************
* Worker (client) IPC connections
****************************************************************************
*/
static void cluster_client_handler(struct cluster_pr_s *pr) {
/* what to do? */
switch ((cluster_message_type_e)pr->type) {
case CLUSTER_MESSAGE_FORWARD: /* fallthrough */
case CLUSTER_MESSAGE_JSON:
publish2process(pr->filter, pr->channel, pr->msg,
(cluster_message_type_e)pr->type);
break;
case CLUSTER_MESSAGE_SHUTDOWN:
kill(getpid(), SIGINT);
case CLUSTER_MESSAGE_ERROR: /* fallthrough */
case CLUSTER_MESSAGE_PING: /* fallthrough */
case CLUSTER_MESSAGE_ROOT: /* fallthrough */
case CLUSTER_MESSAGE_ROOT_JSON: /* fallthrough */
case CLUSTER_MESSAGE_PUBSUB_SUB: /* fallthrough */
case CLUSTER_MESSAGE_PUBSUB_UNSUB: /* fallthrough */
case CLUSTER_MESSAGE_PATTERN_SUB: /* fallthrough */
case CLUSTER_MESSAGE_PATTERN_UNSUB: /* fallthrough */
default:
break;
}
}
static void cluster_client_sender(FIOBJ data) {
fiobj_send_free(cluster_data.client, data);
}
/** The address of the server we are connecting to. */
// char *address;
/** The port on the server we are connecting to. */
// char *port;
/**
* The `on_connect` callback should return a pointer to a protocol object
* that will handle any connection related events.
*
* Should either call `facil_attach` or close the connection.
*/
void facil_cluster_on_connect(intptr_t uuid, void *udata) {
cluster_data.client = uuid;
if (facil_attach(uuid, cluster_alloc(uuid, cluster_client_handler,
cluster_client_sender))) {
perror("FATAL ERROR: (facil.io) failed to attach cluster connection");
kill(facil_parent_pid(), SIGINT);
exit(errno);
}
/* inform root about all existing channels */
spn_lock(&postoffice.pubsub.lock);
FIO_HASH_FOR_LOOP(&postoffice.pubsub.channels, pos) {
if (!pos->obj) {
continue;
}
inform_root_about_channel(((channel_s *)pos->obj)->id, NULL, 1);
}
spn_unlock(&postoffice.pubsub.lock);
spn_lock(&postoffice.patterns.lock);
FIO_HASH_FOR_LOOP(&postoffice.patterns.channels, pos) {
if (!pos->obj) {
continue;
}
inform_root_about_channel(((pattern_s *)pos->obj)->ch.id,
((pattern_s *)pos->obj)->match, 1);
}
spn_unlock(&postoffice.patterns.lock);
(void)udata;
}
/**
* The `on_fail` is called when a socket fails to connect. The old sock UUID
* is passed along.
*/
void facil_cluster_on_fail(intptr_t uuid, void *udata) {
perror("FATAL ERROR: (facil.io) unknown cluster connection error");
kill(facil_parent_pid(), SIGINT);
exit(errno ? errno : 1);
(void)udata;
(void)uuid;
}
/** Opaque user data. */
// void *udata;
/** A non-system timeout after which connection is assumed to have failed. */
// uint8_t timeout;
static void facil_connect2cluster(void *ignore) {
if (facil_parent_pid() != getpid()) {
/* this is called for each child. */
cluster_data.client =
facil_connect(.address = cluster_data.name, .port = NULL,
.on_connect = facil_cluster_on_connect,
.on_fail = facil_cluster_on_fail);
}
spn_lock(&postoffice.engines.lock);
FIO_HASH_FOR_LOOP(&postoffice.engines.channels, pos) {
if (!pos->obj) {
continue;
}
if (((pubsub_engine_s *)pos->obj)->on_startup)
((pubsub_engine_s *)pos->obj)->on_startup(pos->obj);
}
spn_unlock(&postoffice.engines.lock);
(void)ignore;
}
static void facil_send2cluster(int32_t filter, FIOBJ ch, FIOBJ msg,
cluster_message_type_e type) {
if (!facil_is_running()) {
fprintf(stderr, "ERROR: cluster inactive, can't send message.\n");
return;
}
fio_cstr_s cs = fiobj_obj2cstr(ch);
fio_cstr_s ms = fiobj_obj2cstr(msg);
if (cluster_data.client > 0) {
cluster_client_sender(
cluster_wrap_message(cs.len, ms.len, type, filter, cs.bytes, ms.bytes));
} else {
cluster_server_sender(
cluster_wrap_message(cs.len, ms.len, type, filter, cs.bytes, ms.bytes));
}
}
/* *****************************************************************************
* Propegation
****************************************************************************
*/
static inline void inform_root_about_channel(FIOBJ ch, facil_match_fn match,
int add) {
if (!cluster_data.client || !ch)
return;
fio_cstr_s ch_str = fiobj_obj2cstr(ch);
FIOBJ m;
if (match) {
m = cluster_wrap_message(
ch_str.length, sizeof(facil_match_fn),
(add ? CLUSTER_MESSAGE_PATTERN_SUB : CLUSTER_MESSAGE_PATTERN_UNSUB), 0,
ch_str.bytes, (uint8_t *)&match);
} else {
m = cluster_wrap_message(
ch_str.length, 0,
(add ? CLUSTER_MESSAGE_PUBSUB_SUB : CLUSTER_MESSAGE_PUBSUB_UNSUB), 0,
ch_str.bytes, NULL);
}
cluster_client_sender(m);
}
/* *****************************************************************************
* Initialization
****************************************************************************
*/
static void facil_connect_after_fork(void *ignore) {
if (facil_parent_pid() == getpid()) {
/* prevent `accept` backlog in parent */
cluster_listen_accept(cluster_data.listener, NULL);
} else {
/* this is called for each child. */
}
(void)ignore;
}
static void facil_cluster_in_child(void *ignore) {
postoffice.patterns.lock = SPN_LOCK_INIT;
postoffice.pubsub.lock = SPN_LOCK_INIT;
postoffice.filters.lock = SPN_LOCK_INIT;
postoffice.engines.lock = SPN_LOCK_INIT;
postoffice.meta.lock = SPN_LOCK_INIT;
fio_hash_compact(&postoffice.patterns.channels);
fio_hash_compact(&postoffice.pubsub.channels);
fio_hash_compact(&postoffice.filters.channels);
FIO_HASH_FOR_LOOP(&postoffice.patterns.channels, pos) {
if (!pos->obj) {
continue;
}
((channel_s *)pos->obj)->lock = SPN_LOCK_INIT;
}
FIO_HASH_FOR_LOOP(&postoffice.pubsub.channels, pos) {
if (!pos->obj) {
continue;
}
((channel_s *)pos->obj)->lock = SPN_LOCK_INIT;
}
FIO_HASH_FOR_LOOP(&postoffice.filters.channels, pos) {
if (!pos->obj) {
continue;
}
((channel_s *)pos->obj)->lock = SPN_LOCK_INIT;
}
(void)ignore;
}
static void facil_cluster_at_exit(void *ignore) {
/* unlock all */
facil_cluster_in_child(NULL);
/* clear subscriptions of all types */
while (fio_hash_count(&postoffice.patterns.channels)) {
channel_s *ch = fio_hash_last(&postoffice.patterns.channels, NULL);
while (fio_ls_embd_any(&ch->subscriptions)) {
subscription_s *sub =
FIO_LS_EMBD_OBJ(subscription_s, node, ch->subscriptions.next);
facil_unsubscribe(sub);
}
}
while (fio_hash_count(&postoffice.pubsub.channels)) {
channel_s *ch = fio_hash_last(&postoffice.pubsub.channels, NULL);
while (fio_ls_embd_any(&ch->subscriptions)) {
subscription_s *sub =
FIO_LS_EMBD_OBJ(subscription_s, node, ch->subscriptions.next);
facil_unsubscribe(sub);
}
}
while (fio_hash_count(&postoffice.filters.channels)) {
channel_s *ch = fio_hash_last(&postoffice.filters.channels, NULL);
while (fio_ls_embd_any(&ch->subscriptions)) {
subscription_s *sub =
FIO_LS_EMBD_OBJ(subscription_s, node, ch->subscriptions.next);
facil_unsubscribe(sub);
}
}
FIO_HASH_FOR_FREE(&postoffice.patterns.channels, pos) { (void)pos; }
FIO_HASH_FOR_FREE(&postoffice.pubsub.channels, pos) { (void)pos; }
FIO_HASH_FOR_FREE(&postoffice.filters.channels, pos) { (void)pos; }
/* clear engines */
FACIL_PUBSUB_DEFAULT = FACIL_PUBSUB_CLUSTER;
while (fio_hash_count(&postoffice.engines.channels)) {
facil_pubsub_detach(fio_hash_last(&postoffice.engines.channels, NULL));
}
FIO_HASH_FOR_FREE(&postoffice.engines.channels, pos) { (void)pos; }
/* clear meta hooks */
fio_ary_free(&postoffice.meta.ary);
/* perform newly created tasks */
defer_perform();
(void)ignore;
}
void __attribute__((constructor)) facil_cluster_initialize(void) {
facil_core_callback_add(FIO_CALL_PRE_START, facil_listen2cluster, NULL);
facil_core_callback_add(FIO_CALL_AFTER_FORK, facil_connect_after_fork, NULL);
facil_core_callback_add(FIO_CALL_IN_CHILD, facil_cluster_in_child, NULL);
facil_core_callback_add(FIO_CALL_ON_START, facil_connect2cluster, NULL);
facil_core_callback_add(FIO_CALL_ON_FINISH, facil_cluster_cleanup, NULL);
facil_core_callback_add(FIO_CALL_AT_EXIT, facil_cluster_at_exit, NULL);
}
/* *****************************************************************************
* External API
****************************************************************************
*/
/** Signals children (or self) to shutdown) - NOT signal safe. */
void facil_cluster_signal_children(void) {
if (facil_parent_pid() != getpid()) {
kill(getpid(), SIGINT);
return;
}
cluster_server_sender(
cluster_wrap_message(0, 0, CLUSTER_MESSAGE_SHUTDOWN, 0, NULL, NULL));
}
/**
* Subscribes to either a filter OR a channel (never both).
*
* Returns a subscription pointer on success or NULL on failure.
*
* See `subscribe_args_s` for details.
*/
subscription_s *facil_subscribe(subscribe_args_s args) {
return subscription_create(args);
}
/**
* Subscribes to a channel (enforces filter == 0).
*
* Returns a subscription pointer on success or NULL on failure.
*
* See `subscribe_args_s` for details.
*/
subscription_s *facil_subscribe_pubsub(subscribe_args_s args) {
args.filter = 0;
return subscription_create(args);
}
/**
* This helper returns a temporary handle to an existing subscription's
* channel or filter.
*
* To keep the handle beyond the lifetime of the subscription, use
* `fiobj_dup`.
*/
FIOBJ facil_subscription_channel(subscription_s *subscription) {
return subscription->parent->id;
}
/**
* Cancels an existing subscriptions (actual effects might be delayed).
*/
void facil_unsubscribe(subscription_s *subscription) {
subscription_destroy(subscription, NULL);
}
/**
* Publishes a message to the relevant subscribers (if any).
*
* See `facil_publish_args_s` for details.
*
* By default the message is sent using the FACIL_PUBSUB_CLUSTER engine (all
* processes, including the calling process).
*
* To limit the message only to other processes (exclude the calling process),
* use the FACIL_PUBSUB_SIBLINGS engine.
*
* To limit the message only to the calling process, use the
* FACIL_PUBSUB_PROCESS engine.
*
* To publish messages to the pub/sub layer, the `.filter` argument MUST be
* equal to 0 or missing.
*/
void facil_publish(facil_publish_args_s args) {
if (!args.engine) {
args.engine = FACIL_PUBSUB_DEFAULT;
}
switch ((uintptr_t)args.engine) {
case 0UL: /* fallthrough (missing default) */
case 1UL: // ((uintptr_t)FACIL_PUBSUB_CLUSTER):
publish_msg2all(args.filter, args.channel, args.message);
break;
case 2UL: // ((uintptr_t)FACIL_PUBSUB_PROCESS):
publish_msg2local(args.filter, args.channel, args.message);
break;
case 3UL: // ((uintptr_t)FACIL_PUBSUB_SIBLINGS):
publish_msg2cluster(args.filter, args.channel, args.message);
break;
case 4UL: // ((uintptr_t)FACIL_PUBSUB_ROOT):
publish_msg2root(args.filter, args.channel, args.message);
break;
default:
if (args.filter != 0) {
fprintf(stderr, "ERROR: (pub/sub) pub/sub engines can only be used for "
"pub/sub messages (no filter).\n");
return;
}
args.engine->publish(args.engine, args.channel, args.message);
return;
}
}
/**
* Defers the current callback, so it will be called again for the message.
*/
void facil_message_defer(facil_msg_s *msg) { defer_subscription_callback(msg); }
/* *****************************************************************************
* MetaData (extension) API
****************************************************************************
*/
/**
* It's possible to attach metadata to facil.io pub/sub messages (filter == 0)
* before they are published.
*
* This allows, for example, messages to be encoded as network packets for
* outgoing protocols (i.e., encoding for WebSocket transmissions), improving
* performance in large network based broadcasting.
*
* The callback should return a pointer to a valid metadata object.
*
* Since the cluster messaging system serializes objects to JSON (unless both
* the channel and the data are String objects), the pre-serialized data is
* available to the callback as the `raw_ch` and `raw_msg` arguments.
*
* To remove a callback, set the `remove` flag to true (`1`).
*/
void facil_message_metadata_set(
facil_msg_metadata_s (*callback)(facil_msg_s *msg, FIOBJ raw_ch,
FIOBJ raw_msg),
int enable) {
spn_lock(&postoffice.meta.lock);
fio_ary_remove2(&postoffice.meta.ary, (void *)(uintptr_t)callback);
if (enable) {
fio_ary_push(&postoffice.meta.ary, (void *)(uintptr_t)callback);
}
spn_unlock(&postoffice.meta.lock);
}
/* *****************************************************************************
* Pub/Sub Engine (extension) API
****************************************************************************
*/
/** Attaches an engine, so it's callback can be called by facil.io. */
void facil_pubsub_attach(pubsub_engine_s *engine) {
if (!engine) {
return;
}
FIOBJ key = fiobj_num_new((intptr_t)engine);
spn_lock(&postoffice.engines.lock);
fio_hash_insert(&postoffice.engines.channels, key, engine);
spn_unlock(&postoffice.engines.lock);
if (engine->subscribe) {
spn_lock(&postoffice.pubsub.lock);
FIO_HASH_FOR_LOOP(&postoffice.pubsub.channels, i) {
if (!i->obj) {
continue;
}
channel_s *ch = i->obj;
engine->subscribe(engine, ch->id, NULL);
}
spn_unlock(&postoffice.pubsub.lock);
spn_lock(&postoffice.patterns.lock);
FIO_HASH_FOR_LOOP(&postoffice.patterns.channels, i) {
if (!i->obj) {
continue;
}
channel_s *ch = i->obj;
engine->subscribe(engine, ch->id, ((pattern_s *)ch)->match);
}
spn_unlock(&postoffice.patterns.lock);
}
fiobj_free(key);
}
/** Detaches an engine, so it could be safely destroyed. */
void facil_pubsub_detach(pubsub_engine_s *engine) {
if (FACIL_PUBSUB_DEFAULT == engine) {
FACIL_PUBSUB_DEFAULT = FACIL_PUBSUB_CLUSTER;
}
if (postoffice.engines.channels.count == 0) {
return;
}
FIOBJ key = fiobj_num_new((intptr_t)engine);
spn_lock(&postoffice.engines.lock);
void *old = fio_hash_insert(&postoffice.engines.channels, key, NULL);
fio_hash_compact(&postoffice.engines.channels);
spn_unlock(&postoffice.engines.lock);
fiobj_free(key);
#if DEBUG
if (!old) {
fprintf(stderr, "WARNING: (pubsub) detachment error, not registered?\n");
}
#else
(void)old;
#endif
}
/**
* Engines can ask facil.io to call the `subscribe` callback for all active
* channels.
*
* This allows engines that lost their connection to their Pub/Sub service to
* resubscribe all the currently active channels with the new connection.
*
* CAUTION: This is an evented task... try not to free the engine's memory
* while resubscriptions are under way...
*/
void facil_pubsub_reattach(pubsub_engine_s *engine) {
if (engine->subscribe) {
spn_lock(&postoffice.pubsub.lock);
FIO_HASH_FOR_LOOP(&postoffice.pubsub.channels, i) {
if (!i->obj) {
continue;
}
channel_s *ch = i->obj;
engine->subscribe(engine, ch->id, NULL);
}
spn_unlock(&postoffice.pubsub.lock);
spn_lock(&postoffice.patterns.lock);
FIO_HASH_FOR_LOOP(&postoffice.patterns.channels, i) {
if (!i->obj) {
continue;
}
channel_s *ch = i->obj;
engine->subscribe(engine, ch->id, ((pattern_s *)ch)->match);
}
spn_unlock(&postoffice.patterns.lock);
}
}
/** Returns true (1) if the engine is attached to the system. */
int facil_pubsub_is_attached(pubsub_engine_s *engine) {
if (!engine) {
return 0;
}
FIOBJ key = fiobj_num_new((intptr_t)engine);
spn_lock(&postoffice.engines.lock);
int ret = (fio_hash_find(&postoffice.engines.channels, key) != NULL);
spn_unlock(&postoffice.engines.lock);
fiobj_free(key);
return ret;
}
/* *****************************************************************************
* Glob Matching
****************************************************************************
*/
/** A binary glob matching helper. Returns 1 on match, otherwise returns 0. */
static int facil_glob_match(FIOBJ pattern, FIOBJ channel) {
/* adapted and rewritten, with thankfulness, from the code at:
* https://github.com/opnfv/kvmfornfv/blob/master/kernel/lib/glob.c
*
* Original version's copyright:
* Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
* Under the MIT license.
*/
fio_cstr_s ch = fiobj_obj2cstr(channel);
fio_cstr_s pat = fiobj_obj2cstr(pattern);
/*
* Backtrack to previous * on mismatch and retry starting one
* character later in the string. Because * matches all characters,
* there's never a need to backtrack multiple levels.
*/
uint8_t *back_pat = NULL, *back_str = ch.bytes;
size_t back_pat_len = 0, back_str_len = ch.len;
/*
* Loop over each token (character or class) in pat, matching
* it against the remaining unmatched tail of str. Return false
* on mismatch, or true after matching the trailing nul bytes.
*/
while (ch.len) {
uint8_t c = *ch.bytes++;
uint8_t d = *pat.bytes++;
ch.len--;
pat.len--;
switch (d) {
case '?': /* Wildcard: anything goes */
break;
case '*': /* Any-length wildcard */
if (!pat.len) /* Optimize trailing * case */
return 1;
back_pat = pat.bytes;
back_pat_len = pat.len;
back_str = --ch.bytes; /* Allow zero-length match */
back_str_len = ++ch.len;
break;
case '[': { /* Character class */
uint8_t match = 0, inverted = (*pat.bytes == '^');
uint8_t *cls = pat.bytes + inverted;
uint8_t a = *cls++;
/*
* Iterate over each span in the character class.
* A span is either a single character a, or a
* range a-b. The first span may begin with ']'.
*/
do {
uint8_t b = a;
if (cls[0] == '-' && cls[1] != ']') {
b = cls[1];
cls += 2;
if (a > b) {
uint8_t tmp = a;
a = b;
b = tmp;
}
}
match |= (a <= c && c <= b);
} while ((a = *cls++) != ']');
if (match == inverted)
goto backtrack;
pat.len -= cls - pat.bytes;
pat.bytes = cls;
} break;
case '\\':
d = *pat.bytes++;
pat.len--;
/* FALLTHROUGH */
default: /* Literal character */
if (c == d)
break;
backtrack:
if (!back_pat)
return 0; /* No point continuing */
/* Try again from last *, one character later in str. */
pat.bytes = back_pat;
ch.bytes = ++back_str;
ch.len = --back_str_len;
pat.len = back_pat_len;
}
}
return !ch.len && !pat.len;
}
facil_match_fn FACIL_MATCH_GLOB = facil_glob_match;