blob: 32cd4ac75721e07871c1e733db7e76df3974ebad [file] [log] [blame] [raw]
/*
Copyright: Boaz segev, 2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef H_FACIL_PUBSUB_H
/**
* This pub/sub API is designed to unload pub/sub stress from external messanging
* systems onto the local process.
* For example, the NULL pub/sub engine, which is routed to the facil_cluster
* engine, will only publish a single message per process instead of a message
per
* client, allowing the cluster communication channel to be less crowded when
* possible.
*
* This should allow pub/sub engines, such as Redis, to spread their workload
* between all of an application's processes, enhancing overall performance.
*/
#define H_FACIL_PUBSUB_H
#include "facil.h"
#include "fiobj.h"
/* support C++ */
#ifdef __cplusplus
extern "C" {
#endif
#ifndef FIO_PUBBSUB_MAX_CHANNEL_LEN
#define FIO_PUBBSUB_MAX_CHANNEL_LEN 1024
#endif
/** An opaque pointer used to identify a subscription. */
typedef struct pubsub_sub_s *pubsub_sub_pt;
/** A pub/sub engine data structure. See details later on. */
typedef struct pubsub_engine_s pubsub_engine_s;
/** Publishing and on_message callback arguments. */
typedef struct pubsub_message_s {
/** The pub/sub engine that should be used to farward this message. */
pubsub_engine_s const *engine;
/** The pub/sub target channnel. */
fiobj_s *channel;
/** The pub/sub message. */
fiobj_s *message;
/** The subscription that prompted the message to be routed to the client. */
pubsub_sub_pt subscription;
/** Client opaque data pointer (from the `subscribe`) function call. */
void *udata1;
/** Client opaque data pointer (from the `subscribe`) function call. */
void *udata2;
} pubsub_message_s;
/** The arguments used for `pubsub_subscribe` or `pubsub_find_sub`. */
struct pubsub_subscribe_args {
/** The channel namr used for the subscription. */
fiobj_s *channel;
/** The on message callback. the `*msg` pointer is to a temporary object. */
void (*on_message)(pubsub_message_s *msg);
/** An optional callback for when a subscription is fully canceled. */
void (*on_unsubscribe)(void *udata1, void *udata2);
/** Opaque user data#1 */
void *udata1;
/** Opaque user data#2 .. using two allows allocation to be avoided. */
void *udata2;
/** Use pattern matching for channel subscription. */
unsigned use_pattern : 1;
};
/**
* Subscribes to a specific channel.
*
* Returns a subscription pointer or NULL (failure).
*/
pubsub_sub_pt pubsub_subscribe(struct pubsub_subscribe_args);
#define pubsub_subscribe(...) \
pubsub_subscribe((struct pubsub_subscribe_args){__VA_ARGS__})
/**
* This helper searches for an existing subscription.
*
* Use with care, NEVER call `pubsub_unsubscribe` more times than you have
* called `pubsub_subscribe`, since the subscription handle memory is realesed
* onnce the reference count reaches 0.
*
* Returns a subscription pointer or NULL (none found).
*/
pubsub_sub_pt pubsub_find_sub(struct pubsub_subscribe_args);
#define pubsub_find_sub(...) \
pubsub_find_sub((struct pubsub_subscribe_args){__VA_ARGS__})
/**
* Unsubscribes from a specific subscription.
*
* Note: This should be called exactly the same number times as
* `pubsub_subscribe`. Any less and a memory leak might occur. Any more, and the
* program might crash.
*
* Returns 0 on success and -1 on failure.
*/
int pubsub_unsubscribe(pubsub_sub_pt subscription);
/**
* Publishes a message to a channel belonging to a pub/sub service (engine).
*
* Returns 0 on success and -1 on failure (i.e., no channel, no message or no
* known subscriptions).
*/
int pubsub_publish(struct pubsub_message_s);
#define pubsub_publish(...) \
pubsub_publish((struct pubsub_message_s){__VA_ARGS__})
/**
* defers message hadling if it can't be performed (i.e., resource is busy) or
* should be fragmented (allowing large tasks to be broken down).
*
* This should only be called from within the `on_message` callback.
*
* It's recommended that the `on_message` callback return immediately following
* this function call, as code might run concurrently.
*
* Uses reference counting for zero copy.
*
* It's impossible to use a different `on_message` callbck without resorting to
* memory allocations... so when in need, manage routing withing the
* `on_message` callback.
*/
void pubsub_defer(pubsub_message_s *msg);
/**
* Pub/Sub services (engines) MUST provide the listed function pointers.
*
* Engines should also register using the `pubsub_engine_register` function.
*
* Engines should deregister, before being destroyed, by using the
* `pubsub_engine_deregister` function.
*
* When an engine received a message to publish, they should call the
* `pubsub_publish` function with the engine to which the message is forwarded.
* i.e.:
*
* pubsub_engine_distribute(
* .engine = PUBSUB_PROCESS_ENGINE,
* .channel = channel_name,
* .message = msg_body );
*
*/
struct pubsub_engine_s {
/* Must subscribe channel. Failures are ignored. */
void (*subscribe)(const pubsub_engine_s *eng, fiobj_s *channel,
uint8_t use_pattern);
/* Must unsubscribe channel. Failures are ignored. */
void (*unsubscribe)(const pubsub_engine_s *eng, fiobj_s *channel,
uint8_t use_pattern);
/** Should return 0 on success and -1 on failure. */
int (*publish)(const pubsub_engine_s *eng, fiobj_s *channel, fiobj_s *msg);
};
/** Registers an engine, so it's callback can be called. */
void pubsub_engine_register(pubsub_engine_s *engine);
/** Unregisters an engine, so it could be safely destroyed. */
void pubsub_engine_deregister(pubsub_engine_s *engine);
/** The default pub/sub engine.
* This engine performs pub/sub within a group of processes (process cluster).
*
* The process cluser is initialized by the `facil_run` command with `processes`
* set to more than 1.
*/
extern const pubsub_engine_s *PUBSUB_CLUSTER_ENGINE;
/** An engine that performs pub/sub only within a single process. */
extern const pubsub_engine_s *PUBSUB_PROCESS_ENGINE;
/** Allows process wide changes to the default Pub/Sub Engine.
* Setting a new default before calling `facil_run` will change the default for
* the whole process cluster.
*/
extern const pubsub_engine_s *PUBSUB_DEFAULT_ENGINE;
/**
* The function used by engines to distribute received messages.
* The `udata*` and `subscription` fields are ignored.
*
* The `.engine` field should be either PUBSUB_PROCESS_ENGINE or
* PUBSUB_CLUSTER_ENGINE
*/
void pubsub_engine_distribute(pubsub_message_s msg);
#define pubsub_engine_distribute(...) \
pubsub_engine_distribute((pubsub_message_s){__VA_ARGS__})
/**
* Engines can ask facil.io to resubscribe to all active channels.
*
* This allows engines that lost their connection to their Pub/Sub service to
* resubscribe all the currently active channels with the new connection.
*
* CAUTION: This is an evented task... try not to free the engine's memory while
* resubscriptions are under way...
*/
void pubsub_engine_resubscribe(pubsub_engine_s *eng);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* H_FACIL_PUBSUB_H */