blob: 459a401d23129c98f1ce7cc49bb098b0dad19e8d [file] [log] [blame] [raw]
#include "facil.h"
#include "pubsub.h"
#include "redis_engine.h"
static void reporter_subscribe(const pubsub_engine_s *eng, FIOBJ channel,
uint8_t use_pattern) {
fprintf(stderr, "(%u) + subscribing to %s\n", getpid(),
fiobj_obj2cstr(channel).data);
(void)eng;
(void)use_pattern;
}
static void reporter_unsubscribe(const pubsub_engine_s *eng, FIOBJ channel,
uint8_t use_pattern) {
fprintf(stderr, "(%u) - unsubscribing to %s\n", getpid(),
fiobj_obj2cstr(channel).data);
(void)eng;
(void)use_pattern;
}
static int reporter_publish(const pubsub_engine_s *eng, FIOBJ channel,
FIOBJ msg) {
fprintf(stderr, "(%u) publishing to %s\n", getpid(),
fiobj_obj2cstr(channel).data);
(void)eng;
(void)msg;
return 0;
}
pubsub_engine_s REPORTER = {
.subscribe = reporter_subscribe,
.unsubscribe = reporter_unsubscribe,
.publish = reporter_publish,
};
void my_on_message(pubsub_message_s *msg) {
fio_cstr_s s = fiobj_obj2cstr(msg->channel);
if (FIOBJ_TYPE(msg->message) == FIOBJ_T_STRING) {
fprintf(stderr, "Got message from %s: %s\n", s.data,
fiobj_obj2cstr(msg->message).data);
} else {
fprintf(stderr, "Got message from %s, with subscription %p\n", s.data,
(void *)fiobj_obj2num(msg->message));
pubsub_sub_pt sub =
pubsub_find_sub(.channel = msg->channel, .on_message = my_on_message,
.udata1 = msg->udata1, .udata2 = msg->udata2);
pubsub_unsubscribe(sub);
}
}
void perfrom_sub(void *a) {
if (facil_parent_pid() != getpid()) {
(void)a;
} else {
FIOBJ ch = fiobj_str_new("my channel", 10);
FIOBJ msg = fiobj_num_new(
(intptr_t)pubsub_subscribe(.channel = ch, .on_message = my_on_message,
.udata1 = a, .udata2 = NULL));
pubsub_publish(.channel = ch, .message = ch);
pubsub_publish(.channel = ch, .message = msg);
fiobj_free(msg);
fiobj_free(ch);
}
}
int main(void) {
// PUBSUB_DEFAULT_ENGINE = redis_engine_create(.address = "localhost");
pubsub_engine_register(&REPORTER);
facil_run_every(100, 1, perfrom_sub, NULL, NULL);
facil_run(.threads = 4, .processes = 4);
pubsub_engine_deregister(&REPORTER);
return 0;
}