blob: 5635ae12c7fb39fec56a385512745d29abe8b600 [file] [log] [blame] [raw]
#include "redis_engine.h"
#include "redis_connection.h"
#include "resp.h"
#include <string.h>
/* *****************************************************************************
Data Structures / State
***************************************************************************** */
typedef struct {
pubsub_engine_s engine;
intptr_t sub;
intptr_t pub;
char *address;
char *port;
} redis_engine_s;
/* *****************************************************************************
Engine Bridge
***************************************************************************** */
static void on_pubsub(intptr_t uuid, const resp_array_s *msg, void *udata) {
if (msg->len != 3 || msg->array[0]->type != RESP_STRING ||
msg->array[1]->type != RESP_STRING ||
msg->array[2]->type != RESP_STRING) {
fprintf(
stderr,
"ERROR: WTF!? Redis engine pubsub received an unparsable message.\n");
return;
}
pubsub_engine_distribute(
.engine = udata,
.channel = {.name = (char *)resp_obj2str(msg->array[1])->string,
.len = resp_obj2str(msg->array[1])->len},
.msg = {.data = (char *)resp_obj2str(msg->array[2])->string,
.len = resp_obj2str(msg->array[2])->len});
(void)uuid;
}
/* *****************************************************************************
Connections
***************************************************************************** */
/**** Don't do this at home, kids (monkey patching in C) ****/
#include "fio_list.h"
typedef struct {
protocol_s protocol;
/* the RESP parser */
resp_parser_pt parser;
/* The callbacks list.
* We don't neet locks since we'll be using the facil.io protocol locking
* mechanism.
*/
fio_list_s callbacks;
/* The on_open callback */
void (*on_open)(intptr_t uuid);
/* The on_open callback */
void (*on_close)(void *udata);
void *on_close_udata;
/* The on_pubsub callback */
void (*on_pubsub)(intptr_t uuid, const resp_array_s *msg, void *udata);
void *on_pubsub_udata;
/* Fallback / default handler for messages. */
void (*fallback)(intptr_t uuid, resp_object_s *msg, void *udata);
void *fallback_udata;
} redis_protocol_s;
static void (*original_on_close)(protocol_s *) = NULL;
static void on_close(protocol_s *p) {
redis_protocol_s *r = (void *)p;
pubsub_engine_resubscribe(r->on_pubsub_udata);
original_on_close(p);
}
/**** Okay, you can open your eyes now ****/
static protocol_s *on_connect(intptr_t uuid, void *en_) {
protocol_s *pr = redis_create_protocol(uuid, NULL);
redis_on_pubsub2(pr, on_pubsub, en_);
pubsub_engine_resubscribe(en_);
original_on_close = pr->on_close;
pr->on_close = on_close;
return pr;
}
static void on_fail(intptr_t uuid, void *en_) {
facil_run_every(50, 1, (void (*)(void *))pubsub_engine_resubscribe,
(void *)en_, NULL);
(void)uuid;
}
#define CONNECT_SUB(e) \
e->sub = \
facil_connect(.address = e->address, .port = e->port, \
.on_connect = on_connect, .udata = e, .on_fail = on_fail)
#define CONNECT_PUB(e) \
e->pub = facil_connect(.address = e->address, .port = e->port, \
.on_connect = redis_create_protocol, .udata = e, )
/* *****************************************************************************
Callbacks
***************************************************************************** */
/** Should return 0 on success and -1 on failure. */
static int subscribe(const pubsub_engine_s *eng, const char *ch, size_t ch_len,
uint8_t use_pattern) {
redis_engine_s *e = (redis_engine_s *)eng;
if (!sock_isvalid(e->sub)) {
CONNECT_SUB(e);
if (!sock_isvalid(e->sub)) {
fprintf(stderr,
"ERROR: (RedisEngine) cannot connect to Subscription service at "
"%s:%s\n",
e->address, e->port);
return -1;
}
fprintf(stderr,
"ERROR: (RedisEngine) lost connection to Redis, reconnectiong at "
"%s:%s\n",
e->address, e->port);
}
resp_object_s *cmd = resp_arr2obj(2, NULL);
if (!cmd) {
return -1;
}
resp_obj2arr(cmd)->array[0] = use_pattern ? resp_str2obj("PSUBSCRIBE", 10)
: resp_str2obj("SUBSCRIBE", 9);
resp_obj2arr(cmd)->array[1] =
(ch ? resp_str2obj(ch, ch_len) : resp_nil2obj());
return redis_send(.cmd = cmd, .uuid = e->sub, .move = 1);
}
/** Return value is ignored. */
static void unsubscribe(const pubsub_engine_s *eng, const char *ch,
size_t ch_len, uint8_t use_pattern) {
redis_engine_s *e = (redis_engine_s *)eng;
if (!sock_isvalid(e->sub))
return;
resp_object_s *cmd = resp_arr2obj(2, NULL);
if (!cmd)
return;
resp_obj2arr(cmd)->array[0] = use_pattern ? resp_str2obj("PUNSUBSCRIBE", 12)
: resp_str2obj("UNSUBSCRIBE", 11);
resp_obj2arr(cmd)->array[1] =
(ch ? resp_str2obj(ch, ch_len) : resp_nil2obj());
redis_send(.cmd = cmd, .uuid = e->sub, .move = 1);
}
/** Should return 0 on success and -1 on failure. */
static int publish(const pubsub_engine_s *eng, const char *ch, size_t ch_len,
const char *msg, size_t msg_len, uint8_t use_pattern) {
redis_engine_s *e = (redis_engine_s *)eng;
if (!defer_fork_is_active() || !msg || use_pattern)
return -1;
if (!ch)
return -1;
// if (!sock_isvalid(e->sub)){
// CONNECT_SUB(e);
// }
if (!sock_isvalid(e->pub)) {
CONNECT_PUB(e);
if (!sock_isvalid(e->pub)) {
fprintf(stderr, "ERROR: (RedisEngine) cannot connect to Redis at %s:%s",
e->address, e->port);
return -1;
}
}
resp_object_s *cmd = resp_arr2obj(3, NULL);
if (!cmd)
return -1;
resp_obj2arr(cmd)->array[0] = resp_str2obj("PUBLISH", 7);
resp_obj2arr(cmd)->array[1] = resp_str2obj(ch, ch_len);
resp_obj2arr(cmd)->array[2] = resp_str2obj(msg, msg_len);
return redis_send(.cmd = cmd, .uuid = e->pub, .move = 1);
}
/* *****************************************************************************
Creation
***************************************************************************** */
static void initialize_engine(void *en_, void *ig) {
redis_engine_s *e = (redis_engine_s *)en_;
(void)ig;
CONNECT_SUB(e);
CONNECT_PUB(e);
}
/**
See the {pubsub.h} file for documentation about engines.
function names speak for themselves ;-)
*/
pubsub_engine_s *redis_engine_create(const char *address, const char *port) {
if (!port) {
return NULL;
}
size_t addr_len = address ? strlen(address) : 0;
size_t port_len = strlen(port);
redis_engine_s *e = malloc(sizeof(*e) + addr_len + port_len + 2);
*e = (redis_engine_s){.engine = {.subscribe = subscribe,
.unsubscribe = unsubscribe,
.publish = publish},
.address = (char *)(e + 1),
.port = ((char *)(e + 1) + addr_len + 1)};
if (address)
memcpy(e->address, address, addr_len);
else
e->address = NULL;
e->address[addr_len] = 0;
memcpy(e->port, port, port_len);
e->port[port_len] = 0;
defer(initialize_engine, e, NULL);
return (pubsub_engine_s *)e;
}
/**
See the {redis_connection.h} file for documentation about Redis connections and
the {resp.h} file to learn more about sending RESP messages.
function names speak for themselves ;-)
*/
intptr_t redis_engine_get_redis(pubsub_engine_s *en_) {
redis_engine_s *e = (redis_engine_s *)en_;
if (!defer_fork_is_active())
return -1;
if (!sock_isvalid(e->pub))
CONNECT_PUB(e);
return e->pub;
}
/**
See the {pubsub.h} file for documentation about engines.
function names speak for themselves ;-)
*/
void redis_engine_destroy(pubsub_engine_s *engine) {
redis_engine_s *e = (redis_engine_s *)engine;
sock_close(e->pub);
sock_close(e->sub);
free(e);
}