blob: 514aff233148d1f933dad4633b4589d9b59c8463 [file] [log] [blame] [raw]
/*
Copyright: Boaz segev, 2016-2019
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#define FIO_INCLUDE_LINKED_LIST
#define FIO_INCLUDE_STR
// #define DEBUG 1
#include <fio.h>
#include <fiobj.h>
#include <redis_engine.h>
#include <resp_parser.h>
#define REDIS_READ_BUFFER 8192
/* *****************************************************************************
The Redis Engine and Callbacks Object
***************************************************************************** */
typedef struct {
fio_pubsub_engine_s en;
struct redis_engine_internal_s {
fio_protocol_s protocol;
intptr_t uuid;
resp_parser_s parser;
void (*on_message)(struct redis_engine_internal_s *parser, FIOBJ msg);
FIOBJ str;
FIOBJ ary;
uint32_t ary_count;
uint16_t buf_pos;
uint16_t nesting;
} pub_data, sub_data;
subscription_s *publication_forwarder;
subscription_s *cmd_forwarder;
subscription_s *cmd_reply;
char *address;
char *port;
char *auth;
FIOBJ last_ch;
size_t auth_len;
size_t ref;
fio_ls_embd_s queue;
fio_lock_i lock;
fio_lock_i lock_connection;
uint8_t ping_int;
volatile uint8_t pub_sent;
volatile uint8_t flag;
uint8_t buf[];
} redis_engine_s;
typedef struct {
fio_ls_embd_s node;
void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply, void *udata);
void *udata;
size_t cmd_len;
uint8_t cmd[];
} redis_commands_s;
/** converts from a publishing protocol to an `redis_engine_s`. */
#define pub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, pub_data, (pr))
/** converts from a subscribing protocol to an `redis_engine_s`. */
#define sub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, sub_data, (pr))
/** converts from a `resp_parser_s` to the internal data structure. */
#define parser2data(prsr) \
FIO_LS_EMBD_OBJ(struct redis_engine_internal_s, parser, (prsr))
/* releases any resources used by an internal engine*/
static inline void redis_internal_reset(struct redis_engine_internal_s *i) {
i->buf_pos = 0;
i->parser = (resp_parser_s){.obj_countdown = 0, .expecting = 0};
fiobj_free((FIOBJ)fio_ct_if(i->ary == FIOBJ_INVALID, (uintptr_t)i->str,
(uintptr_t)i->ary));
i->str = FIOBJ_INVALID;
i->ary = FIOBJ_INVALID;
i->ary_count = 0;
i->nesting = 0;
i->uuid = -1;
}
/** cleans up and frees the engine data. */
static inline void redis_free(redis_engine_s *r) {
if (fio_atomic_sub(&r->ref, 1))
return;
FIO_LOG_DEBUG("freeing redis engine for %s:%s", r->address, r->port);
redis_internal_reset(&r->pub_data);
redis_internal_reset(&r->sub_data);
fiobj_free(r->last_ch);
while (fio_ls_embd_any(&r->queue)) {
fio_free(
FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue)));
}
fio_unsubscribe(r->publication_forwarder);
r->publication_forwarder = NULL;
fio_unsubscribe(r->cmd_forwarder);
r->cmd_forwarder = NULL;
fio_unsubscribe(r->cmd_reply);
r->cmd_reply = NULL;
fio_free(r);
}
/* *****************************************************************************
Simple RESP formatting
***************************************************************************** */
inline static void fiobj2resp___internal(FIOBJ dest, FIOBJ obj) {
fio_str_info_s s;
switch (FIOBJ_TYPE(obj)) {
case FIOBJ_T_NULL:
fiobj_str_write(dest, "$-1\r\n", 5);
break;
case FIOBJ_T_ARRAY:
fiobj_str_write(dest, "*", 1);
fiobj_str_write_i(dest, fiobj_ary_count(obj));
fiobj_str_write(dest, "\r\n", 2);
break;
case FIOBJ_T_HASH:
fiobj_str_write(dest, "*", 1);
fiobj_str_write_i(dest, fiobj_hash_count(obj) * 2);
fiobj_str_write(dest, "\r\n", 2);
break;
case FIOBJ_T_TRUE:
fiobj_str_write(dest, "$4\r\ntrue\r\n", 10);
break;
case FIOBJ_T_FALSE:
fiobj_str_write(dest, "$4\r\nfalse\r\n", 11);
break;
#if 0
/* Numbers aren't as good for commands as one might think... */
case FIOBJ_T_NUMBER:
fiobj_str_write(dest, ":", 1);
fiobj_str_write_i(dest, fiobj_obj2num(obj));
fiobj_str_write(dest, "\r\n", 2);
break;
#else
case FIOBJ_T_NUMBER: /* overflow */
#endif
case FIOBJ_T_FLOAT: /* overflow */
case FIOBJ_T_UNKNOWN: /* overflow */
case FIOBJ_T_STRING: /* overflow */
case FIOBJ_T_DATA:
s = fiobj_obj2cstr(obj);
fiobj_str_write(dest, "$", 1);
fiobj_str_write_i(dest, s.len);
fiobj_str_write(dest, "\r\n", 2);
fiobj_str_write(dest, s.data, s.len);
fiobj_str_write(dest, "\r\n", 2);
break;
}
}
static int fiobj2resp_task(FIOBJ o, void *dest_) {
if (fiobj_hash_key_in_loop())
fiobj2resp___internal((FIOBJ)dest_, fiobj_hash_key_in_loop());
fiobj2resp___internal((FIOBJ)dest_, o);
return 0;
}
/**
* Converts FIOBJ objects into a RESP string (client mode).
*/
static FIOBJ fiobj2resp(FIOBJ dest, FIOBJ obj) {
fiobj_each2(obj, fiobj2resp_task, (void *)dest);
return dest;
}
/**
* Converts FIOBJ objects into a RESP string (client mode).
*
* Don't call `fiobj_free`, object will self-destruct.
*/
static inline FIOBJ fiobj2resp_tmp(FIOBJ obj) {
return fiobj2resp(fiobj_str_tmp(), obj);
}
/* *****************************************************************************
RESP parser callbacks
***************************************************************************** */
/** a local static callback, called when a parser / protocol error occurs. */
static int resp_on_parser_error(resp_parser_s *parser) {
struct redis_engine_internal_s *i = parser2data(parser);
FIO_LOG_ERROR("(redis) parser error - attempting to restart connection.\n");
fio_close(i->uuid);
return -1;
}
/** a local static callback, called when the RESP message is complete. */
static int resp_on_message(resp_parser_s *parser) {
struct redis_engine_internal_s *i = parser2data(parser);
FIOBJ msg = i->ary ? i->ary : i->str;
i->on_message(i, msg);
/* cleanup */
fiobj_free(msg);
i->ary = FIOBJ_INVALID;
i->str = FIOBJ_INVALID;
return 0;
}
/** a local helper to add parsed objects to the data store. */
static inline void resp_add_obj(struct redis_engine_internal_s *dest, FIOBJ o) {
if (dest->ary) {
fiobj_ary_push(dest->ary, o);
--dest->ary_count;
if (!dest->ary_count && dest->nesting) {
FIOBJ tmp = fiobj_ary_shift(dest->ary);
dest->ary_count = fiobj_obj2num(tmp);
fiobj_free(tmp);
dest->ary = fiobj_ary_shift(dest->ary);
--dest->nesting;
}
}
dest->str = o;
}
/** a local static callback, called when a Number object is parsed. */
static int resp_on_number(resp_parser_s *parser, int64_t num) {
struct redis_engine_internal_s *data = parser2data(parser);
resp_add_obj(data, fiobj_num_new(num));
return 0;
}
/** a local static callback, called when a OK message is received. */
static int resp_on_okay(resp_parser_s *parser) {
struct redis_engine_internal_s *data = parser2data(parser);
resp_add_obj(data, fiobj_true());
return 0;
}
/** a local static callback, called when NULL is received. */
static int resp_on_null(resp_parser_s *parser) {
struct redis_engine_internal_s *data = parser2data(parser);
resp_add_obj(data, fiobj_null());
return 0;
}
/**
* a local static callback, called when a String should be allocated.
*
* `str_len` is the expected number of bytes that will fill the final string
* object, without any NUL byte marker (the string might be binary).
*
* If this function returns any value besides 0, parsing is stopped.
*/
static int resp_on_start_string(resp_parser_s *parser, size_t str_len) {
struct redis_engine_internal_s *data = parser2data(parser);
resp_add_obj(data, fiobj_str_buf(str_len));
return 0;
}
/** a local static callback, called as String objects are streamed. */
static int resp_on_string_chunk(resp_parser_s *parser, void *data, size_t len) {
struct redis_engine_internal_s *i = parser2data(parser);
fiobj_str_write(i->str, data, len);
return 0;
}
/** a local static callback, called when a String object had finished
* streaming.
*/
static int resp_on_end_string(resp_parser_s *parser) {
return 0;
(void)parser;
}
/** a local static callback, called an error message is received. */
static int resp_on_err_msg(resp_parser_s *parser, void *data, size_t len) {
struct redis_engine_internal_s *i = parser2data(parser);
resp_add_obj(i, fiobj_str_new(data, len));
return 0;
}
/**
* a local static callback, called when an Array should be allocated.
*
* `array_len` is the expected number of objects that will fill the Array
* object.
*
* There's no `resp_on_end_array` callback since the RESP protocol assumes the
* message is finished along with the Array (`resp_on_message` is called).
* However, just in case a non-conforming client/server sends nested Arrays,
* the callback should test against possible overflow or nested Array endings.
*
* If this function returns any value besides 0, parsing is stopped.
*/
static int resp_on_start_array(resp_parser_s *parser, size_t array_len) {
struct redis_engine_internal_s *i = parser2data(parser);
if (i->ary) {
++i->nesting;
FIOBJ tmp = fiobj_ary_new2(array_len + 2);
fiobj_ary_push(tmp, fiobj_num_new(i->ary_count));
fiobj_ary_push(tmp, fiobj_num_new(i->ary));
i->ary = tmp;
} else {
i->ary = fiobj_ary_new2(array_len + 2);
}
i->ary_count = array_len;
return 0;
}
/* *****************************************************************************
Publication and Command Handling
***************************************************************************** */
/* the deferred callback handler */
static void redis_perform_callback(void *e, void *cmd_) {
redis_commands_s *cmd = cmd_;
FIOBJ reply = (FIOBJ)cmd->node.next;
if (cmd->callback)
cmd->callback(e, reply, cmd->udata);
fiobj_free(reply);
FIO_LOG_DEBUG("Handled: %s\n", cmd->cmd);
fio_free(cmd);
}
/* send command within lock, to ensure flag integrity */
static void redis_send_next_command_unsafe(redis_engine_s *r) {
if (!r->pub_sent && fio_ls_embd_any(&r->queue)) {
r->pub_sent = 1;
redis_commands_s *cmd =
FIO_LS_EMBD_OBJ(redis_commands_s, node, r->queue.next);
fio_write2(r->pub_data.uuid, .data.buffer = cmd->cmd,
.length = cmd->cmd_len, .after.dealloc = FIO_DEALLOC_NOOP);
FIO_LOG_DEBUG("(redis %d) Sending (%zu bytes):\n%s\n", (int)getpid(),
cmd->cmd_len, cmd->cmd);
}
}
/* attach a command to the queue */
static void redis_attach_cmd(redis_engine_s *r, redis_commands_s *cmd) {
fio_lock(&r->lock);
fio_ls_embd_push(&r->queue, &cmd->node);
redis_send_next_command_unsafe(r);
fio_unlock(&r->lock);
}
/** a local static callback, called when the RESP message is complete. */
static void resp_on_pub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
redis_engine_s *r = pub2redis(i);
// #if DEBUG
if (FIO_LOG_LEVEL >= FIO_LOG_LEVEL_DEBUG) {
FIOBJ json = fiobj_obj2json(msg, 1);
FIO_LOG_DEBUG("Redis reply:\n%s\n", fiobj_obj2cstr(json).data);
fiobj_free(json);
}
// #endif
/* publishing / command parser */
fio_lock(&r->lock);
fio_ls_embd_s *node = fio_ls_embd_shift(&r->queue);
r->pub_sent = 0;
redis_send_next_command_unsafe(r);
fio_unlock(&r->lock);
if (!node) {
/* TODO: possible ping? from server?! not likely... */
FIO_LOG_WARNING("(redis %d) received a reply when no command was sent.",
(int)getpid());
return;
}
node->next = (void *)fiobj_dup(msg);
fio_defer(redis_perform_callback, &r->en,
FIO_LS_EMBD_OBJ(redis_commands_s, node, node));
}
/* *****************************************************************************
Subscription Message Handling
***************************************************************************** */
/** a local static callback, called when the RESP message is complete. */
static void resp_on_sub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
redis_engine_s *r = sub2redis(i);
/* subscriotion parser */
if (FIOBJ_TYPE(msg) != FIOBJ_T_ARRAY) {
if (FIOBJ_TYPE(msg) != FIOBJ_T_STRING || fiobj_obj2cstr(msg).len != 4 ||
fiobj_obj2cstr(msg).data[0] != 'P') {
FIO_LOG_WARNING("(redis) unexpected data format in "
"subscription stream (%zu bytes):\n %s\n",
fiobj_obj2cstr(msg).len, fiobj_obj2cstr(msg).data);
}
} else {
// FIOBJ *ary = fiobj_ary2ptr(msg);
// for (size_t i = 0; i < fiobj_ary_count(msg); ++i) {
// fio_str_info_s tmp = fiobj_obj2cstr(ary[i]);
// fprintf(stderr, "(%lu) %s\n", (unsigned long)i, tmp.data);
// }
fio_str_info_s tmp = fiobj_obj2cstr(fiobj_ary_index(msg, 0));
if (tmp.len == 7) { /* "message" */
fiobj_free(r->last_ch);
r->last_ch = fiobj_dup(fiobj_ary_index(msg, 1));
fio_publish(.channel = fiobj_obj2cstr(r->last_ch),
.message = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
.engine = FIO_PUBSUB_CLUSTER);
} else if (tmp.len == 8) { /* "pmessage" */
if (!fiobj_iseq(r->last_ch, fiobj_ary_index(msg, 2)))
fio_publish(.channel = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
.message = fiobj_obj2cstr(fiobj_ary_index(msg, 3)),
.engine = FIO_PUBSUB_CLUSTER);
}
}
}
/* *****************************************************************************
Connection Callbacks (fio_protocol_s) and Engine
***************************************************************************** */
/** defined later - connects to Redis */
static void redis_connect(void *r, void *i);
#define defer_redis_connect(r, i) \
do { \
fio_atomic_add(&(r)->ref, 1); \
fio_defer(redis_connect, (r), (i)); \
} while (0);
/** Called when a data is available, but will not run concurrently */
static void redis_on_data(intptr_t uuid, fio_protocol_s *pr) {
struct redis_engine_internal_s *internal =
(struct redis_engine_internal_s *)pr;
uint8_t *buf;
if (internal->on_message == resp_on_sub_message) {
buf = sub2redis(pr)->buf + REDIS_READ_BUFFER;
} else {
buf = pub2redis(pr)->buf;
}
ssize_t i = fio_read(uuid, buf + internal->buf_pos,
REDIS_READ_BUFFER - internal->buf_pos);
if (i <= 0)
return;
internal->buf_pos += i;
i = resp_parse(&internal->parser, buf, internal->buf_pos);
if (i) {
memmove(buf, buf + internal->buf_pos - i, i);
}
internal->buf_pos = i;
}
/** Called when the connection was closed, but will not run concurrently */
static void redis_on_close(intptr_t uuid, fio_protocol_s *pr) {
struct redis_engine_internal_s *internal =
(struct redis_engine_internal_s *)pr;
redis_internal_reset(internal);
redis_engine_s *r;
if (internal->on_message == resp_on_sub_message) {
r = sub2redis(pr);
fiobj_free(r->last_ch);
r->last_ch = FIOBJ_INVALID;
if (r->flag) {
/* reconnection for subscription connection. */
if (uuid != -1) {
FIO_LOG_WARNING("(redis %d) subscription connection lost. "
"Reconnecting...",
(int)getpid());
}
fio_atomic_sub(&r->ref, 1);
defer_redis_connect(r, internal);
} else {
redis_free(r);
}
} else {
r = pub2redis(pr);
if (r->flag && uuid != -1) {
FIO_LOG_WARNING("(redis %d) publication connection lost. "
"Reconnecting...",
(int)getpid());
}
r->pub_sent = 0;
fio_close(r->sub_data.uuid);
redis_free(r);
}
(void)uuid;
}
/** Called before the facil.io reactor is shut down. */
static uint8_t redis_on_shutdown(intptr_t uuid, fio_protocol_s *pr) {
fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nQUIT\r\n", .length = 14,
.after.dealloc = FIO_DEALLOC_NOOP);
return 0;
(void)pr;
}
/** Called on connection timeout. */
static void redis_sub_ping(intptr_t uuid, fio_protocol_s *pr) {
fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nPING\r\n", .length = 14,
.after.dealloc = FIO_DEALLOC_NOOP);
(void)pr;
}
/** Called on connection timeout. */
static void redis_pub_ping(intptr_t uuid, fio_protocol_s *pr) {
redis_engine_s *r = pub2redis(pr);
if (fio_ls_embd_any(&r->queue)) {
FIO_LOG_WARNING("(redis) Redis server unresponsive, disconnecting.");
fio_close(uuid);
return;
}
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + 15);
*cmd = (redis_commands_s){.cmd_len = 14};
memcpy(cmd->cmd, "*1\r\n$4\r\nPING\r\n\0", 15);
redis_attach_cmd(r, cmd);
}
/* *****************************************************************************
Connecting to Redis
***************************************************************************** */
static void redis_on_auth(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) {
if (FIOBJ_TYPE_IS(reply, FIOBJ_T_TRUE)) {
fio_str_info_s s = fiobj_obj2cstr(reply);
FIO_LOG_WARNING("(redis) Authentication FAILED."
" %.*s",
(int)s.len, s.data);
}
(void)e;
(void)udata;
}
static void redis_on_connect(intptr_t uuid, void *i_) {
struct redis_engine_internal_s *i = i_;
redis_engine_s *r;
i->uuid = uuid;
if (i->on_message == resp_on_sub_message) {
r = sub2redis(i);
if (r->auth_len) {
fio_write2(uuid, .data.buffer = r->auth, .length = r->auth_len,
.after.dealloc = FIO_DEALLOC_NOOP);
}
fio_pubsub_reattach(&r->en);
if (r->pub_data.uuid == -1) {
defer_redis_connect(r, &r->pub_data);
}
FIO_LOG_INFO("(redis %d) subscription connection established.",
(int)getpid());
} else {
r = pub2redis(i);
if (r->auth_len) {
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + r->auth_len);
*cmd =
(redis_commands_s){.cmd_len = r->auth_len, .callback = redis_on_auth};
memcpy(cmd->cmd, r->auth, r->auth_len);
fio_lock(&r->lock);
r->pub_sent = 0;
fio_ls_embd_unshift(&r->queue, &cmd->node);
redis_send_next_command_unsafe(r);
fio_unlock(&r->lock);
} else {
fio_lock(&r->lock);
r->pub_sent = 0;
redis_send_next_command_unsafe(r);
fio_unlock(&r->lock);
}
FIO_LOG_INFO("(redis %d) publication connection established.",
(int)getpid());
}
i->protocol.rsv = 0;
fio_attach(uuid, &i->protocol);
fio_timeout_set(uuid, r->ping_int);
return;
}
static void redis_on_connect_failed(intptr_t uuid, void *i_) {
struct redis_engine_internal_s *i = i_;
i->uuid = -1;
i->protocol.on_close(-1, &i->protocol);
(void)uuid;
}
static void redis_connect(void *r_, void *i_) {
redis_engine_s *r = r_;
struct redis_engine_internal_s *i = i_;
fio_lock(&r->lock_connection);
if (r->flag == 0 || i->uuid != -1 || !fio_is_running()) {
fio_unlock(&r->lock_connection);
redis_free(r);
return;
}
// fio_atomic_add(&r->ref, 1);
i->uuid = fio_connect(.address = r->address, .port = r->port,
.on_connect = redis_on_connect, .udata = i,
.on_fail = redis_on_connect_failed);
fio_unlock(&r->lock_connection);
}
/* *****************************************************************************
Engine / Bridge Callbacks (Root Process)
***************************************************************************** */
static void redis_on_subscribe_root(const fio_pubsub_engine_s *eng,
fio_str_info_s channel,
fio_match_fn match) {
redis_engine_s *r = (redis_engine_s *)eng;
if (r->sub_data.uuid != -1) {
FIOBJ cmd = fiobj_str_buf(96 + channel.len);
if (match == FIO_MATCH_GLOB)
fiobj_str_write(cmd, "*2\r\n$10\r\nPSUBSCRIBE\r\n$", 22);
else
fiobj_str_write(cmd, "*2\r\n$9\r\nSUBSCRIBE\r\n$", 20);
fiobj_str_write_i(cmd, channel.len);
fiobj_str_write(cmd, "\r\n", 2);
fiobj_str_write(cmd, channel.data, channel.len);
fiobj_str_write(cmd, "\r\n", 2);
// {
// fio_str_info_s s = fiobj_obj2cstr(cmd);
// fprintf(stderr, "(%d) Sending Subscription (%p):\n%s\n", getpid(),
// (void *)r->sub_data.uuid, s.data);
// }
fiobj_send_free(r->sub_data.uuid, cmd);
}
}
static void redis_on_unsubscribe_root(const fio_pubsub_engine_s *eng,
fio_str_info_s channel,
fio_match_fn match) {
redis_engine_s *r = (redis_engine_s *)eng;
if (r->sub_data.uuid != -1) {
fio_str_s *cmd = fio_str_new2();
fio_str_capa_assert(cmd, 96 + channel.len);
if (match == FIO_MATCH_GLOB)
fio_str_write(cmd, "*2\r\n$12\r\nPUNSUBSCRIBE\r\n$", 24);
else
fio_str_write(cmd, "*2\r\n$11\r\nUNSUBSCRIBE\r\n$", 23);
fio_str_write_i(cmd, channel.len);
fio_str_write(cmd, "\r\n", 2);
fio_str_write(cmd, channel.data, channel.len);
fio_str_write(cmd, "\r\n", 2);
// {
// fio_str_info_s s = fio_str_info(cmd);
// fprintf(stderr, "(%d) Cancel Subscription (%p):\n%s\n", getpid(),
// (void *)r->sub_data.uuid, s.data);
// }
fio_str_send_free2(r->sub_data.uuid, cmd);
}
}
static void redis_on_publish_root(const fio_pubsub_engine_s *eng,
fio_str_info_s channel, fio_str_info_s msg,
uint8_t is_json) {
redis_engine_s *r = (redis_engine_s *)eng;
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + channel.len + msg.len + 96);
*cmd = (redis_commands_s){.cmd_len = 0};
memcpy(cmd->cmd, "*3\r\n$7\r\nPUBLISH\r\n$", 18);
char *buf = (char *)cmd->cmd + 18;
buf += fio_ltoa((void *)buf, channel.len, 10);
*buf++ = '\r';
*buf++ = '\n';
memcpy(buf, channel.data, channel.len);
buf += channel.len;
*buf++ = '\r';
*buf++ = '\n';
*buf++ = '$';
buf += fio_ltoa(buf, msg.len, 10);
*buf++ = '\r';
*buf++ = '\n';
memcpy(buf, msg.data, msg.len);
buf += msg.len;
*buf++ = '\r';
*buf++ = '\n';
*buf = 0;
FIO_LOG_DEBUG("(%d) Publishing:\n%s", (int)getpid(), cmd->cmd);
cmd->cmd_len = (uintptr_t)buf - (uintptr_t)(cmd + 1);
redis_attach_cmd(r, cmd);
return;
(void)is_json;
}
/* *****************************************************************************
Engine / Bridge Stub Callbacks (Child Process)
***************************************************************************** */
static void redis_on_mock_subscribe_child(const fio_pubsub_engine_s *eng,
fio_str_info_s channel,
fio_match_fn match) {
/* do nothing, root process is notified about (un)subscriptions by facil.io */
(void)eng;
(void)channel;
(void)match;
}
static void redis_on_publish_child(const fio_pubsub_engine_s *eng,
fio_str_info_s channel, fio_str_info_s msg,
uint8_t is_json) {
/* attach engine data to channel (prepend) */
fio_str_s tmp = FIO_STR_INIT;
/* by using fio_str_s, short names are allocated on the stack */
fio_str_info_s tmp_info = fio_str_resize(&tmp, channel.len + 8);
fio_u2str64(tmp_info.data, (uint64_t)eng);
memcpy(tmp_info.data + 8, channel.data, channel.len);
/* forward publication request to Root */
fio_publish(.filter = -1, .channel = tmp_info, .message = msg,
.engine = FIO_PUBSUB_ROOT, .is_json = is_json);
fio_str_free(&tmp);
(void)eng;
}
/* *****************************************************************************
Root Publication Handler
***************************************************************************** */
/* listens to filter -1 and publishes and messages */
static void redis_on_internal_publish(fio_msg_s *msg) {
if (msg->channel.len < 8)
return; /* internal error, unexpected data */
void *en = (void *)fio_str2u64(msg->channel.data);
if (en != msg->udata1)
return; /* should be delivered by a different engine */
/* step after the engine data */
msg->channel.len -= 8;
msg->channel.data += 8;
/* forward to publishing */
FIO_LOG_DEBUG("Forwarding to engine %p, on channel %s", msg->udata1,
msg->channel.data);
redis_on_publish_root(msg->udata1, msg->channel, msg->msg, msg->is_json);
}
/* *****************************************************************************
Sending commands using the Root connection
***************************************************************************** */
/* callback from the Redis reply */
static void redis_forward_reply(fio_pubsub_engine_s *e, FIOBJ reply,
void *udata) {
uint8_t *data = udata;
fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(data + 0);
void *callback = (void *)fio_str2u64(data + 8);
if (engine != e || !callback) {
FIO_LOG_DEBUG("Redis reply not forwarded (callback: %p)", callback);
return;
}
int32_t pid = (int32_t)fio_str2u32(data + 24);
FIOBJ rp = fiobj_obj2json(reply, 0);
fio_publish(.filter = (-10 - (int32_t)pid), .channel.data = (char *)data,
.channel.len = 28, .message = fiobj_obj2cstr(rp), .is_json = 1);
fiobj_free(rp);
}
/* listens to channel -2 for commands that need to be sent (only ROOT) */
static void redis_on_internal_cmd(fio_msg_s *msg) {
// void*(void *)fio_str2u64(msg->msg.data);
fio_pubsub_engine_s *engine =
(fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
if (engine != msg->udata1) {
return;
}
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + msg->msg.len + 1 + 28);
FIO_ASSERT_ALLOC(cmd);
*cmd = (redis_commands_s){.callback = redis_forward_reply,
.udata = (cmd->cmd + msg->msg.len + 1),
.cmd_len = msg->msg.len};
memcpy(cmd->cmd, msg->msg.data, msg->msg.len);
memcpy(cmd->cmd + msg->msg.len + 1, msg->channel.data, 28);
redis_attach_cmd((redis_engine_s *)engine, cmd);
// fprintf(stderr, " *** Attached CMD (%d) ***\n%s\n", getpid(), cmd->cmd);
}
/* Listens on filter `-10 -getpid()` for incoming reply data */
static void redis_on_internal_reply(fio_msg_s *msg) {
fio_pubsub_engine_s *engine =
(fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
if (engine != msg->udata1) {
FIO_LOG_DEBUG("Redis reply not forwarded (engine mismatch: %p != %p)",
(void *)engine, msg->udata1);
return;
}
FIOBJ reply;
fiobj_json2obj(&reply, msg->msg.data, msg->msg.len);
void (*callback)(fio_pubsub_engine_s *, FIOBJ, void *) = (void (*)(
fio_pubsub_engine_s *, FIOBJ, void *))fio_str2u64(msg->channel.data + 8);
void *udata = (void *)fio_str2u64(msg->channel.data + 16);
callback(engine, reply, udata);
fiobj_free(reply);
}
/* publishes a Redis command to Root's filter -2 */
intptr_t redis_engine_send(fio_pubsub_engine_s *engine, FIOBJ command,
void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply,
void *udata),
void *udata) {
if ((uintptr_t)engine < 4) {
FIO_LOG_WARNING("(redis send) trying to use one of the core engines");
return -1;
}
// if(fio_is_master()) {
// FIOBJ resp = fiobj2resp_tmp(fio_str_info_s obj1, FIOBJ obj2);
// TODO...
// } else {
/* forward publication request to Root */
fio_str_s tmp = FIO_STR_INIT;
fio_str_info_s ti = fio_str_resize(&tmp, 28);
/* combine metadata */
fio_u2str64(ti.data + 0, (uint64_t)engine);
fio_u2str64(ti.data + 8, (uint64_t)callback);
fio_u2str64(ti.data + 16, (uint64_t)udata);
fio_u2str32(ti.data + 24, (uint32_t)getpid());
FIOBJ cmd = fiobj2resp_tmp(command);
fio_publish(.filter = -2, .channel = ti, .message = fiobj_obj2cstr(cmd),
.engine = FIO_PUBSUB_ROOT, .is_json = 0);
fio_str_free(&tmp);
// }
return 0;
}
/* *****************************************************************************
Redis Engine Creation
***************************************************************************** */
static void redis_on_facil_start(void *r_) {
redis_engine_s *r = r_;
r->flag = 1;
if (!fio_is_valid(r->sub_data.uuid)) {
defer_redis_connect(r, &r->sub_data);
}
}
static void redis_on_facil_shutdown(void *r_) {
redis_engine_s *r = r_;
r->flag = 0;
}
static void redis_on_engine_fork(void *r_) {
redis_engine_s *r = r_;
r->flag = 0;
r->lock = FIO_LOCK_INIT;
fio_force_close(r->sub_data.uuid);
r->sub_data.uuid = -1;
fio_force_close(r->pub_data.uuid);
r->pub_data.uuid = -1;
while (fio_ls_embd_any(&r->queue)) {
redis_commands_s *cmd =
FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue));
fio_free(cmd);
}
r->en = (fio_pubsub_engine_s){
.subscribe = redis_on_mock_subscribe_child,
.unsubscribe = redis_on_mock_subscribe_child,
.publish = redis_on_publish_child,
};
fio_unsubscribe(r->publication_forwarder);
r->publication_forwarder = NULL;
fio_unsubscribe(r->cmd_forwarder);
r->cmd_forwarder = NULL;
fio_unsubscribe(r->cmd_reply);
r->cmd_reply =
fio_subscribe(.filter = -10 - (int32_t)getpid(),
.on_message = redis_on_internal_reply, .udata1 = r);
}
fio_pubsub_engine_s *redis_engine_create
FIO_IGNORE_MACRO(struct redis_engine_create_args args) {
if (getpid() != fio_parent_pid()) {
FIO_LOG_FATAL("(redis) Redis engine initialization can only "
"be performed in the Root process.");
kill(0, SIGINT);
fio_stop();
return NULL;
}
if (!args.address.len && args.address.data)
args.address.len = strlen(args.address.data);
if (!args.port.len && args.port.data)
args.port.len = strlen(args.port.data);
if (!args.auth.len && args.auth.data) {
args.auth.len = strlen(args.auth.data);
}
if (!args.address.data || !args.address.len) {
args.address = (fio_str_info_s){.len = 9, .data = (char *)"localhost"};
}
if (!args.port.data || !args.port.len) {
args.port = (fio_str_info_s){.len = 4, .data = (char *)"6379"};
}
redis_engine_s *r =
fio_malloc(sizeof(*r) + args.port.len + 1 + args.address.len + 1 +
args.auth.len + 1 + (REDIS_READ_BUFFER * 2));
FIO_ASSERT_ALLOC(r);
*r = (redis_engine_s){
.en =
{
.subscribe = redis_on_subscribe_root,
.unsubscribe = redis_on_unsubscribe_root,
.publish = redis_on_publish_root,
},
.pub_data =
{
.protocol =
{
.on_data = redis_on_data,
.on_close = redis_on_close,
.on_shutdown = redis_on_shutdown,
.ping = redis_pub_ping,
},
.uuid = -1,
.on_message = resp_on_pub_message,
},
.sub_data =
{
.protocol =
{
.on_data = redis_on_data,
.on_close = redis_on_close,
.on_shutdown = redis_on_shutdown,
.ping = redis_sub_ping,
},
.on_message = resp_on_sub_message,
.uuid = -1,
},
.publication_forwarder =
fio_subscribe(.filter = -1, .udata1 = r,
.on_message = redis_on_internal_publish),
.cmd_forwarder = fio_subscribe(.filter = -2, .udata1 = r,
.on_message = redis_on_internal_cmd),
.cmd_reply =
fio_subscribe(.filter = -10 - (uint32_t)getpid(), .udata1 = r,
.on_message = redis_on_internal_reply),
.address = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2)),
.port =
((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len + 1),
.auth = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len +
args.port.len + 2),
.auth_len = args.auth.len,
.ref = 1,
.queue = FIO_LS_INIT(r->queue),
.lock = FIO_LOCK_INIT,
.lock_connection = FIO_LOCK_INIT,
.ping_int = args.ping_interval,
.flag = 1,
};
memcpy(r->address, args.address.data, args.address.len);
memcpy(r->port, args.port.data, args.port.len);
if (args.auth.len)
memcpy(r->auth, args.auth.data, args.auth.len);
fio_pubsub_attach(&r->en);
redis_on_facil_start(r);
fio_state_callback_add(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
fio_state_callback_add(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
/* if restarting */
fio_state_callback_add(FIO_CALL_PRE_START, redis_on_facil_start, r);
FIO_LOG_DEBUG("Redis engine initialized %p", (void *)r);
return &r->en;
}
/* *****************************************************************************
Redis Engine Destruction
***************************************************************************** */
void redis_engine_destroy(fio_pubsub_engine_s *engine) {
redis_engine_s *r = (redis_engine_s *)engine;
r->flag = 0;
fio_pubsub_detach(&r->en);
fio_state_callback_remove(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
fio_state_callback_remove(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
fio_state_callback_remove(FIO_CALL_PRE_START, redis_on_facil_start, r);
FIO_LOG_DEBUG("Redis engine destroyed %p", (void *)r);
redis_free(r);
}