blob: 3a04922485714f884d3708750cb29c5b5a89663a [file] [log] [blame] [raw]
/*
Copyright: Boaz segev, 2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "spnlock.inc"
#include "facil.h"
#include "fio_dict.h"
#include "fio_hash_table.h"
#include "pubsub.h"
#include <errno.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* *****************************************************************************
Data Structures / State
***************************************************************************** */
typedef struct {
volatile uint64_t ref;
struct pubsub_publish_args pub;
} msg_s;
typedef struct {
fio_ht_s clients;
const pubsub_engine_s *engine;
struct {
char *name;
uint32_t len;
};
union {
fio_dict_s dict;
fio_list_s list;
} channels;
unsigned use_pattern : 1;
} channel_s;
typedef struct pubsub_sub_s {
fio_ht_node_s clients;
void (*on_message)(pubsub_message_s *msg);
void (*on_unsubscribe)(void *udata1, void *udata2);
void *udata1;
void *udata2;
channel_s *parent;
volatile uint64_t active;
uint32_t ref;
} client_s;
typedef struct {
msg_s *origin;
pubsub_message_s msg;
} msg_container_s;
static fio_dict_s pubsub_channels = FIO_DICT_INIT_STATIC;
static fio_list_s pubsub_patterns = FIO_LIST_INIT_STATIC(pubsub_patterns);
spn_lock_i pubsub_GIL = SPN_LOCK_INIT;
/* *****************************************************************************
Helpers
***************************************************************************** */
static void pubsub_free_client(void *client_, void *ignr) {
client_s *client = client_;
if (spn_sub(&client->active, 1))
return;
if (client->on_unsubscribe)
client->on_unsubscribe(client->udata1, client->udata2);
free(client);
(void)ignr;
}
static void pubsub_free_msg(void *msg_, void *ignr) {
msg_s *msg = msg_;
if (!spn_sub(&msg->ref, 1)) {
free(msg);
}
(void)ignr;
}
static void pubsub_deliver_msg(void *client_, void *msg_) {
client_s *cl = client_;
msg_s *msg = msg_;
msg_container_s clmsg = {
.origin = msg,
.msg =
{
.engine = msg->pub.engine,
.channel.name = msg->pub.channel.name,
.channel.len = msg->pub.channel.len,
.msg.data = msg->pub.msg.data,
.msg.len = msg->pub.msg.len,
.use_pattern = msg->pub.use_pattern,
.udata1 = cl->udata1,
.udata2 = cl->udata2,
.subscription = cl,
},
};
cl->on_message(&clmsg.msg);
pubsub_free_msg(msg, NULL);
pubsub_free_client(cl, NULL);
}
/* *****************************************************************************
Default Engine / Cluster Support
***************************************************************************** */
/** Used internally to identify oub/sub cluster messages. */
#define FACIL_PUBSUB_CLUSTER_MESSAGE_ID ((int32_t)-1)
/*
a `ppublish` / `publish` found this channel as a match...
Pubblish to clients and test against subscribed patterns.
*/
static void pubsub_publish_matched_channel(fio_dict_s *ch_, void *msg_) {
msg_s *msg = msg_;
client_s *cl;
if (ch_) {
channel_s *channel = fio_node2obj(channel_s, channels, ch_);
fio_ht_for_each(client_s, clients, cl, channel->clients) {
spn_add(&msg->ref, 1);
spn_add(&cl->active, 1);
defer(pubsub_deliver_msg, cl, msg);
}
}
channel_s *pattern;
fio_list_for_each(channel_s, channels.list, pattern, pubsub_patterns) {
if (msg->pub.engine == pattern->engine &&
fio_glob_match((uint8_t *)msg->pub.channel.name, msg->pub.channel.len,
(uint8_t *)pattern->name, pattern->len)) {
fio_ht_for_each(client_s, clients, cl, pattern->clients) {
spn_add(&msg->ref, 1);
spn_add(&cl->active, 1);
defer(pubsub_deliver_msg, cl, msg);
}
}
}
}
static void pubsub_perform_publish(void *msg_, void *ignr) {
msg_s *msg = msg_;
fio_dict_s *channel;
spn_lock(&pubsub_GIL);
if (msg->pub.use_pattern) {
fio_dict_each_match_glob(fio_dict_prefix(&pubsub_channels,
(void *)&msg->pub.engine,
sizeof(void *)),
msg->pub.channel.name, msg->pub.channel.len,
pubsub_publish_matched_channel, msg);
} else {
channel = (void *)fio_dict_get(fio_dict_prefix(&pubsub_channels,
(void *)&msg->pub.engine,
sizeof(void *)),
msg->pub.channel.name, msg->pub.channel.len);
pubsub_publish_matched_channel(channel, msg);
}
spn_unlock(&pubsub_GIL);
pubsub_free_msg(msg, NULL);
(void)ignr;
}
static int pubsub_cluster_publish(struct pubsub_publish_args args) {
msg_s *msg = malloc(sizeof(*msg) + args.channel.len + args.msg.len + 2);
if (!msg)
return -1;
*msg = (msg_s){.ref = 1, .pub = args};
struct pubsub_publish_args *pub = &msg->pub;
pub->msg.data = (char *)(pub + 1);
memcpy(pub->msg.data, args.msg.data, args.msg.len);
pub->msg.data[args.msg.len] = 0;
if (pub->channel.name) {
pub->channel.name = pub->msg.data + args.msg.len + 1;
memcpy(pub->channel.name, args.channel.name, args.channel.len);
pub->channel.name[args.channel.len] = 0;
}
pub->use_pattern = args.use_pattern;
if (args.push2cluster)
facil_cluster_send(FACIL_PUBSUB_CLUSTER_MESSAGE_ID, msg,
sizeof(*msg) + args.channel.len + args.msg.len + 2);
defer(pubsub_perform_publish, msg, NULL);
return 0;
}
static void pubsub_cluster_handle_publishing(void *data, uint32_t len) {
msg_s *msg = data;
if (len != sizeof(*msg) + msg->pub.channel.len + msg->pub.msg.len + 2) {
fprintf(stderr,
"ERROR: (pub/sub) cluster message size error. Message ignored.\n");
return;
}
msg = malloc(len);
if (!msg) {
fprintf(
stderr,
"ERROR: (pub/sub) cluster message allocation error.Message ignored.\n");
return;
}
memcpy(msg, data, len);
msg->ref = 1;
msg->pub.msg.data = (char *)(msg + 1);
if (msg->pub.channel.name)
msg->pub.channel.name = msg->pub.msg.data + msg->pub.msg.len + 1;
defer(pubsub_perform_publish, msg, NULL);
}
void pubsub_cluster_init(void) {
/* facil functions are thread safe, so we can call this without a guard. */
facil_cluster_set_handler(FACIL_PUBSUB_CLUSTER_MESSAGE_ID,
pubsub_cluster_handle_publishing);
}
static int pubsub_cluster_subscribe(const pubsub_engine_s *eng, const char *ch,
size_t ch_len, uint8_t use_pattern) {
return 0;
(void)ch;
(void)ch_len;
(void)use_pattern;
(void)eng;
}
static void pubsub_cluster_unsubscribe(const pubsub_engine_s *eng,
const char *ch, size_t ch_len,
uint8_t use_pattern) {
(void)ch;
(void)ch_len;
(void)use_pattern;
(void)eng;
}
static int pubsub_cluster_eng_publish(const pubsub_engine_s *eng,
const char *ch, size_t ch_len,
const char *msg, size_t msg_len,
uint8_t use_pattern) {
pubsub_cluster_publish((struct pubsub_publish_args){
.engine = eng,
.channel.name = (char *)ch,
.channel.len = ch_len,
.msg.data = (char *)msg,
.msg.len = msg_len,
.use_pattern = use_pattern,
.push2cluster = eng->push2cluster,
});
return 0;
}
const pubsub_engine_s PUBSUB_CLUSTER_ENGINE_S = {
.publish = pubsub_cluster_eng_publish,
.subscribe = pubsub_cluster_subscribe,
.unsubscribe = pubsub_cluster_unsubscribe,
.push2cluster = 1,
};
const pubsub_engine_s *PUBSUB_CLUSTER_ENGINE = &PUBSUB_CLUSTER_ENGINE_S;
const pubsub_engine_s PUBSUB_PROCESS_ENGINE_S = {
.publish = pubsub_cluster_eng_publish,
.subscribe = pubsub_cluster_subscribe,
.unsubscribe = pubsub_cluster_unsubscribe,
};
const pubsub_engine_s *PUBSUB_PROCESS_ENGINE = &PUBSUB_PROCESS_ENGINE_S;
const pubsub_engine_s *PUBSUB_DEFAULT_ENGINE = &PUBSUB_CLUSTER_ENGINE_S;
/* *****************************************************************************
External Engine Bridge
***************************************************************************** */
#undef pubsub_engine_distribute
void pubsub_engine_distribute(pubsub_message_s msg) {
pubsub_cluster_publish((struct pubsub_publish_args){
.engine = msg.engine,
.channel.name = msg.channel.name,
.channel.len = msg.channel.len,
.msg.data = msg.msg.data,
.msg.len = msg.msg.len,
.push2cluster = msg.engine->push2cluster,
.use_pattern = msg.use_pattern,
});
}
static void resubscribe_action(fio_dict_s *ch_, void *eng_) {
if (!eng_)
return;
pubsub_engine_s *eng = eng_;
channel_s *ch = fio_node2obj(channel_s, channels.dict, ch_);
eng->subscribe(eng, ch->name, ch->len, ch->use_pattern);
}
static void pubsub_engine_resubscribe_task(void *eng_, void *ignored) {
if (!eng_)
return;
pubsub_engine_s *eng = eng_;
channel_s *ch;
spn_lock(&pubsub_GIL);
fio_list_for_each(channel_s, channels.list, ch, pubsub_patterns) {
eng->subscribe(eng, ch->name, ch->len, ch->use_pattern);
}
fio_dict_each(fio_dict_prefix(&pubsub_channels, (void *)&eng, sizeof(void *)),
resubscribe_action, eng);
spn_unlock(&pubsub_GIL);
(void)ignored;
}
/**
* Engines can ask facil.io to perform an action on all their active channels.
*/
void pubsub_engine_resubscribe(pubsub_engine_s *eng) {
if (!eng)
return;
defer(pubsub_engine_resubscribe_task, eng, NULL);
}
/* *****************************************************************************
Hashing
***************************************************************************** */
#define pubsub_client_hash(p1, p2, p3) \
(((((uint64_t)(p1) * ((uint64_t)p2 ^ 0x736f6d6570736575ULL)) >> 5) | \
(((uint64_t)(p1) * ((uint64_t)p2 ^ 0x736f6d6570736575ULL)) << 59)) ^ \
((uint64_t)p3 ^ 0x646f72616e646f6dULL))
/* *****************************************************************************
API
***************************************************************************** */
#undef pubsub_subscribe
pubsub_sub_pt pubsub_subscribe(struct pubsub_subscribe_args args) {
if (!args.on_message)
goto err_unlocked;
if (args.channel.name && !args.channel.len)
args.channel.len = strlen(args.channel.name);
if (args.channel.len > FIO_PUBBSUB_MAX_CHANNEL_LEN) {
goto err_unlocked;
}
if (!args.engine)
args.engine = PUBSUB_DEFAULT_ENGINE;
channel_s *ch;
client_s *client;
uint64_t cl_hash =
pubsub_client_hash(args.on_message, args.udata1, args.udata2);
spn_lock(&pubsub_GIL);
if (args.use_pattern) {
fio_list_for_each(channel_s, channels.list, ch, pubsub_patterns) {
if (ch->engine == args.engine && ch->len == args.channel.len &&
!memcmp(ch->name, args.channel.name, args.channel.len))
goto found_channel;
}
} else {
ch = (void *)fio_dict_get(
fio_dict_prefix(&pubsub_channels, (void *)&args.engine, sizeof(void *)),
args.channel.name, args.channel.len);
if (ch) {
ch = fio_node2obj(channel_s, channels, ch);
goto found_channel;
}
}
if (args.engine->subscribe(args.engine, args.channel.name, args.channel.len,
args.use_pattern))
goto error;
ch = malloc(sizeof(*ch) + args.channel.len + 1 + sizeof(void *));
*ch = (channel_s){
.clients = FIO_HASH_TABLE_INIT(ch->clients),
.name = (char *)(ch + 1),
.len = args.channel.len,
.use_pattern = args.use_pattern,
.engine = args.engine,
};
memcpy(ch->name, args.channel.name, args.channel.len);
ch->name[args.channel.len + sizeof(void *)] = 0;
if (args.use_pattern) {
fio_list_add(&pubsub_patterns, &ch->channels.list);
} else {
fio_dict_set(fio_dict_ensure_prefix(&pubsub_channels, (void *)&args.engine,
sizeof(void *)),
args.channel.name, args.channel.len, &ch->channels.dict);
}
// fprintf(stderr, "Created a new channel for %s\n", args.channel.name);
found_channel:
client = (void *)fio_ht_find(&ch->clients, cl_hash);
if (client) {
client = fio_ht_object(client_s, clients, client);
goto found_client;
}
client = malloc(sizeof(*client));
if (!client)
goto error;
*client = (client_s){
.on_message = args.on_message,
.on_unsubscribe = args.on_unsubscribe,
.udata1 = args.udata1,
.udata2 = args.udata2,
.parent = ch,
.active = 0,
.ref = 0,
};
fio_ht_add(&ch->clients, &client->clients, cl_hash);
found_client:
client->ref++;
spn_add(&client->active, 1);
spn_unlock(&pubsub_GIL);
return client;
error:
spn_unlock(&pubsub_GIL);
err_unlocked:
if (args.on_unsubscribe)
args.on_unsubscribe(args.udata1, args.udata2);
return NULL;
}
/**
* This helper searches for an existing subscription.
* Use with care, NEVER call `pubsub_unsubscribe` more times than you have
* called `pubsub_subscribe`, since the subscription handle memory is realesed
* onnce the reference count reaches 0.
*
* Returns a subscription pointer or NULL (none found).
*/
#undef pubsub_find_sub
pubsub_sub_pt pubsub_find_sub(struct pubsub_subscribe_args args) {
if (!args.on_message)
return NULL;
if (args.channel.name && !args.channel.len)
args.channel.len = strlen(args.channel.name);
if (args.channel.len > FIO_PUBBSUB_MAX_CHANNEL_LEN) {
return NULL;
}
if (!args.engine)
args.engine = PUBSUB_DEFAULT_ENGINE;
channel_s *ch;
client_s *client;
uint64_t cl_hash =
pubsub_client_hash(args.on_message, args.udata1, args.udata2);
spn_lock(&pubsub_GIL);
if (args.use_pattern) {
fio_list_for_each(channel_s, channels.list, ch, pubsub_patterns) {
if (ch->engine == args.engine && ch->len == args.channel.len &&
!memcmp(ch->name, args.channel.name, args.channel.len))
goto found_channel;
}
} else {
ch = (void *)fio_dict_get(
fio_dict_prefix(&pubsub_channels, (void *)&args.engine, sizeof(void *)),
args.channel.name, args.channel.len);
if (ch) {
ch = fio_node2obj(channel_s, channels, ch);
goto found_channel;
}
}
goto not_found;
found_channel:
client = (void *)fio_ht_find(&ch->clients, cl_hash);
if (client) {
client = fio_ht_object(client_s, clients, client);
spn_unlock(&pubsub_GIL);
return client;
}
not_found:
spn_unlock(&pubsub_GIL);
return NULL;
}
void pubsub_unsubscribe(pubsub_sub_pt client) {
if (!client)
return;
spn_lock(&pubsub_GIL);
client->ref--;
if (client->ref) {
spn_unlock(&pubsub_GIL);
spn_sub(&client->active, 1);
return;
}
fio_ht_remove(&client->clients);
if (client->parent->clients.count) {
spn_unlock(&pubsub_GIL);
defer(pubsub_free_client, client, NULL);
return;
}
channel_s *ch = client->parent;
if (ch->use_pattern) {
fio_list_remove(&ch->channels.list);
} else {
fio_dict_remove(&ch->channels.dict);
}
spn_unlock(&pubsub_GIL);
defer(pubsub_free_client, client, NULL);
ch->engine->unsubscribe(ch->engine, ch->name, ch->len, ch->use_pattern);
fio_ht_rehash(&ch->clients, 0);
free(ch);
}
#undef pubsub_publish
int pubsub_publish(struct pubsub_publish_args args) {
if (!args.msg.data)
return -1;
if (!args.msg.len)
args.msg.len = strlen(args.msg.data);
if (args.channel.name && !args.channel.len)
args.channel.len = strlen(args.channel.name);
if (!args.engine) {
args.engine = PUBSUB_DEFAULT_ENGINE;
args.push2cluster = 1;
} else if (args.push2cluster)
PUBSUB_CLUSTER_ENGINE_S.publish(args.engine, args.channel.name,
args.channel.len, args.msg.data,
args.msg.len, args.use_pattern);
return args.engine->publish(args.engine, args.channel.name, args.channel.len,
args.msg.data, args.msg.len, args.use_pattern);
}
/**
* defers message hadling if it can't be performed (i.e., resource is busy) or
* should be fragmented (allowing large tasks to be broken down).
*/
void pubsub_defer(pubsub_message_s *msg_) {
if (!msg_)
return;
msg_container_s *msg = fio_node2obj(msg_container_s, msg, msg_);
spn_add(&msg->origin->ref, 1);
spn_add(&msg->msg.subscription->active, 1);
defer(pubsub_deliver_msg, msg->msg.subscription, msg->origin);
}