blob: ca60e0a01a50b29813e64b34738f3705d63f9904 [file] [log] [blame] [raw]
/*
Copyright: Boaz segev, 2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef H_FACIL_PUBSUB_H
/**
This pub/sub API is designed to unload pub/sub stress from external messanging
systems onto the local process.
For example, the NULL pub/sub engine, which is routed to the facil_cluster
engine, will only publish a single message per process instead of a message per
client, allowing the cluster communication channel to be less crowded when
possible.
This should allow pub/sub engines, such as Redis, to spread their workload
between all of an application's processes, enhancing overall performance.
*/
#define H_FACIL_PUBSUB_H
#include "facil.h"
/* support C++ */
#ifdef __cplusplus
extern "C" {
#endif
#ifndef FIO_PUBBSUB_MAX_CHANNEL_LEN
#define FIO_PUBBSUB_MAX_CHANNEL_LEN 1024
#endif
/** An opaque pointer used to identify a subscription. */
typedef struct pubsub_sub_s *pubsub_sub_pt;
/** A pub/sub engine data structure. See details later on. */
typedef struct pubsub_engine_s pubsub_engine_s;
/** The information a "client" (callback) receives. */
typedef struct pubsub_message_s {
/** The pub/sub engine farwarding this message. */
pubsub_engine_s const *engine;
/** The pub/sub target channnel. */
struct {
char *name;
uint32_t len;
} channel;
/** The pub/sub message. */
struct {
char *data;
uint32_t len;
} msg;
/** indicates that pattern matching was used. */
unsigned use_pattern : 1;
/** The subscription that prompted the message to be routed to the client. */
pubsub_sub_pt subscription;
/** Client opaque data pointer (from the `subscribe`) function call. */
void *udata1;
/** Client opaque data pointer (from the `subscribe`) function call. */
void *udata2;
} pubsub_message_s;
/** The arguments used for `pubsub_subscribe` or `pubsub_find_sub`. */
struct pubsub_subscribe_args {
/** The pub/sub engine to use. NULL defaults to the local cluster engine. */
pubsub_engine_s const *engine;
/** The channel to subscribe to. */
struct {
char *name;
uint32_t len;
} channel;
/** The on message callback. the `*msg` pointer is to a temporary object. */
void (*on_message)(pubsub_message_s *msg);
/** An optional callback for when a subscription is fully canceled. */
void (*on_unsubscribe)(void *udata1, void *udata2);
/** Opaque user data#1 */
void *udata1;
/** Opaque user data#2 .. using two allows allocation to be avoided. */
void *udata2;
/** Use pattern matching for channel subscription. */
unsigned use_pattern : 1;
};
/** The arguments used for `pubsub_publish`. */
struct pubsub_publish_args {
/** The pub/sub engine to use. NULL defaults to the local cluster engine. */
pubsub_engine_s const *engine;
/** The channel to publish to. */
struct {
char *name;
uint32_t len;
} channel;
/** The data being pushed. */
struct {
char *data;
uint32_t len;
} msg;
/** Use pattern matching for channel publication. */
unsigned use_pattern : 1;
/**
* Push the message to the whole cluster, using the cluster engine.
* Always TRUE unless an engine was specified.
*/
unsigned push2cluster : 1;
};
/**
* Subscribes to a specific channel.
*
* Returns a subscription pointer or NULL (failure).
*/ pubsub_sub_pt pubsub_subscribe(struct pubsub_subscribe_args);
#define pubsub_subscribe(...) \
pubsub_subscribe((struct pubsub_subscribe_args){__VA_ARGS__})
/**
* This helper searches for an existing subscription.
*
* Use with care, NEVER call `pubsub_unsubscribe` more times than you have
* called `pubsub_subscribe`, since the subscription handle memory is realesed
* onnce the reference count reaches 0.
*
* Returns a subscription pointer or NULL (none found).
*/
pubsub_sub_pt pubsub_find_sub(struct pubsub_subscribe_args);
#define pubsub_find_sub(...) \
pubsub_find_sub((struct pubsub_subscribe_args){__VA_ARGS__})
/**
* Unsubscribes from a specific channel.
*
* Returns 0 on success and -1 on failure.
*/
void pubsub_unsubscribe(pubsub_sub_pt subscription);
/**
* Publishes a message to a channel belonging to a pub/sub service (engine).
*
* Returns 0 on success and -1 on failure.
*/
int pubsub_publish(struct pubsub_publish_args);
#define pubsub_publish(...) \
pubsub_publish((struct pubsub_publish_args){__VA_ARGS__})
/**
* defers message hadling if it can't be performed (i.e., resource is busy) or
* should be fragmented (allowing large tasks to be broken down).
*
* This should only be called from within the `on_message` callback.
*
* It's recommended that the `on_message` callback return immediately following
* this function call, as code might run concurrently.
*
* Uses reference counting for zero copy.
*
* It's impossible to use a different `on_message` callbck without resorting to
* memory allocations... so when in need, manage routing withing the
* `on_message` callback.
*/
void pubsub_defer(pubsub_message_s *msg);
/**
* Pub/Sub services (engines) MUST provide the listed function pointers.
*
* When an engine received a message to publish, they should call the
* `pubsub_eng_distribute` function. i.e.:
*
* pubsub_engine_distribute(
* .engine = self,
* .channel.name = "channel 1",
* .channel.len = 9,
* .msg.data = "hello",
* .msg.len = 5,
* .push2cluster = self->push2cluster,
* .use_pattern = 0 );
*
* Engines MUST survive until the pub/sub service is finished using them and
* there are no more subscriptions.
*/
struct pubsub_engine_s {
/** Should return 0 on success and -1 on failure. */
int (*subscribe)(const pubsub_engine_s *eng, const char *ch, size_t ch_len,
uint8_t use_pattern);
/** Return value is ignored. */
void (*unsubscribe)(const pubsub_engine_s *eng, const char *ch, size_t ch_len,
uint8_t use_pattern);
/** Should return 0 on success and -1 on failure. */
int (*publish)(const pubsub_engine_s *eng, const char *ch, size_t ch_len,
const char *msg, size_t msg_len, uint8_t use_pattern);
/** Set to TRUE (1) if published messages should propegate to the cluster. */
unsigned push2cluster : 1;
};
/** The default pub/sub engine.
* This engine performs pub/sub within a group of processes (process cluster).
*
* The process cluser is initialized by the `facil_run` command with `processes`
* set to more than 1.
*/
extern const pubsub_engine_s *PUBSUB_CLUSTER_ENGINE;
/** An engine that performs pub/sub only within a single process. */
extern const pubsub_engine_s *PUBSUB_PROCESS_ENGINE;
/** Allows process wide changes to the default Pub/Sub Engine.
* Setting a new default before calling `facil_run` will change the default for
* the whole process cluster.
*/
extern const pubsub_engine_s *PUBSUB_DEFAULT_ENGINE;
/**
* The function used by engines to distribute received messages.
* The `udata*` and `subscription` fields are ignored.
*/
void pubsub_engine_distribute(pubsub_message_s msg);
#define pubsub_engine_distribute(...) \
pubsub_engine_distribute((pubsub_message_s){__VA_ARGS__})
/**
* Engines can ask facil.io to resubscribe to all active channels.
*
* This allows engines that lost their connection to their Pub/Sub service to
* resubscribe all the currently active channels with the new connection.
*
* CAUTION: This is an evented task... try not to free the engine's memory while
* resubscriptions are under way...
*/
void pubsub_engine_resubscribe(pubsub_engine_s *eng);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* H_FACIL_PUBSUB_H */