blob: e6dba6507b8c0fc1c93fc0666f6b8896aa5fabdc [file] [log] [blame] [raw]
/**
This example creates a simple chat application that uses Redis to sync pub/sub
across machines.
To test this application, you will need a Redis server (defaults to address
"localhost" and port "6379").
To run the test, run the application twice, on two different ports. Clients on
each port will share their pub/sub events with clients from the other port - fun
times :-)
i.e.:
Use a javascript consol to connect to the websockets... maybe using the
following javascript code:
// run 1st client app on port 3000.
ws = new WebSocket("ws://localhost:3000/Mitchel");
ws.onmessage = function(e) { console.log(e.data); };
ws.onclose = function(e) { console.log("closed"); };
ws.onopen = function(e) { e.target.send("Yo!"); };
// run 2nd client app on port 3030.
ws = new WebSocket("ws://localhost:3030/Johana");
ws.onmessage = function(e) { console.log(e.data); };
ws.onclose = function(e) { console.log("closed"); };
ws.onopen = function(e) { e.target.send("Brut."); };
It's possible to use SSE (Server-Sent-Events / EventSource) for listening in on
the chat:
var source = new EventSource("/Watcher");
source.addEventListener('messgae', (e) => { console.log(e.data); });
source.addEventListener('open', (e) => { console.log("SSE Connection
open."); }); source.addEventListener('close', (e) => { console.log("SSE
Connection lost."); });
Remember that published messages will now be printed to the console both by
Mitchel and Johana, which means messages will be delivered twice unless using
two different browser windows.
*/
#include "fio_cli.h"
#include "redis_engine.h"
#include "websockets.h"
#include <string.h>
/* *****************************************************************************
Websocket Pub/Sub
***************************************************************************** */
/* Pub/Sub channels can persists safely in memory. */
static FIOBJ CHAT_CHANNEL;
/* We'll subscribe to the channel's chat channel when a new connection opens */
static void on_open_websocket(ws_s *ws) {
/* we use a FIOBJ String for the client's "nickname" */
FIOBJ n = (FIOBJ)websocket_udata(ws);
fprintf(stderr, "(%d) %s connected to the chat service.\n", getpid(),
fiobj_obj2cstr(n).data);
websocket_subscribe(ws, .channel = CHAT_CHANNEL, .force_text = 1);
}
/* Free the nickname, if any. */
static void on_close_websocket(intptr_t uuid, void *udata) {
fiobj_free((FIOBJ)udata);
(void)uuid;
}
/* Copy the nickname and the data to format a nicer message. */
static void handle_websocket_messages(ws_s *ws, char *data, size_t size,
uint8_t is_text) {
/* we use a FIOBJ String for the client's "nickname" */
FIOBJ n = (FIOBJ)websocket_udata(ws);
/* We'll copy the nickname and the data to a new message buffer */
FIOBJ msg = fiobj_str_buf(fiobj_obj2cstr(n).len + 2 + size);
fiobj_str_join(msg, n);
fiobj_str_write(msg, ": ", 2);
fiobj_str_write(msg, data, size);
/* Publish to the chat channel */
facil_publish(.channel = CHAT_CHANNEL, .message = msg);
/* free any temporary objects */
fiobj_free(msg);
/* we didn't use these for this `on_message` callback implementation */
(void)(ws);
(void)(is_text);
}
/* *****************************************************************************
SSE Pub/Sub
***************************************************************************** */
/**
* The (optional) on_open callback will be called once the EventSource
* connection is established.
*/
static void sse_on_open(http_sse_s *sse) {
fprintf(stderr, "(%d) %s connected to the chat service using SSE.\n",
getpid(), fiobj_obj2cstr((FIOBJ)sse->udata).data);
/* a ping will be sent evet 10 seconds of inactivity */
http_sse_set_timout(sse, 10);
/* Let everyone knnow they're here */
http_sse_subscribe(sse, .channel = CHAT_CHANNEL);
FIOBJ msg = fiobj_str_buf(64);
fiobj_str_join(msg, (FIOBJ)sse->udata);
fiobj_str_write(msg, " joind the chat, but they're just listening...", 46);
facil_publish(.channel = CHAT_CHANNEL, .message = msg);
fiobj_free(msg);
}
/**
* The (optional) on_shutdown callback will be called if a connection is still
* open while the server is shutting down (called before `on_close`).
*/
static void sse_on_shutdown(http_sse_s *sse) {
http_sse_write(sse, .event = {.data = "Shutdown", .len = 8},
.data = {.data = "Goodbye", .len = 7});
}
/**
* The (optional) on_close callback will be called once a connection is
* terminated or failed to be established.
*
* The `uuid` is the connection's unique ID that can identify the Websocket. A
* value of `uuid == 0` indicates the Websocket connection wasn't established
* (an error occured).
*
* The `udata` is the user data as set during the upgrade or using the
* `websocket_udata_set` function.
*/
static void sse_on_close(http_sse_s *sse) { fiobj_free((FIOBJ)sse->udata); }
/* *****************************************************************************
HTTP Handling (Upgrading to Websocket)
***************************************************************************** */
/* handles normal HTTP requests. */
static void answer_http_request(http_s *h) {
http_set_header2(h, (fio_cstr_s){.name = "Server", .len = 6},
(fio_cstr_s){.value = "facil.example", .len = 13});
http_set_header(h, HTTP_HEADER_CONTENT_TYPE, http_mimetype_find("txt", 3));
/* this both sends the response and frees the http handler. */
http_send_body(h, "This is a Websocket chatroom example using Redis.", 49);
}
/* handles HTTP upgrade requests. */
static void answer_http_upgrade(http_s *h, char *pr, size_t len) {
/* Assign a nickname */
FIOBJ nickname = FIOBJ_INVALID;
fio_cstr_s path = fiobj_obj2cstr(h->path);
if (path.len > 1) {
nickname = fiobj_str_new(path.data + 1, path.len - 1);
} else {
nickname = fiobj_str_new("guest", 5);
}
/* test if the upgrade request is for Websockets */
if (len == 9 && pr[1] == 'e') {
/* attempt Websocket upgrade */
http_upgrade2ws(
h, .on_open = on_open_websocket, .on_close = on_close_websocket,
.on_message = handle_websocket_messages, .udata = (void *)nickname);
return;
}
/* test if the upgrade request is for SSE */
if (len == 3 && pr[0] == 's') {
/* attempt Websocket upgrade */
http_upgrade2sse(h, .on_open = sse_on_open, .on_shutdown = sse_on_shutdown,
.on_close = sse_on_close, .udata = (void *)nickname);
return;
}
/* fallback on error */
http_send_error(h, 400);
}
/*
Read available command line details using "-?".
-p <port> : defaults port 3000.
-t <threads> : defaults to 1 (use 0 for automatic CPU core test/set).
-w <processes> : defaults to 1 (use 0 for automatic CPU core test/set).
-v : sets verbosity (HTTP logging) on.
-r <address> <port> : a spece delimited couplet for the Redis address and port.
*/
int main(int argc, char const *argv[]) {
const char *port = "3000";
const char *public_folder = NULL;
const char *redis_url = NULL;
uint32_t threads = 0;
uint32_t workers = 0;
uint8_t print_log = 0;
CHAT_CHANNEL = fiobj_str_new("chat", 4);
/* **** Command line arguments **** */
fio_cli_start(
argc, argv, 0,
"This is a facil.io example application.\n"
"\nThis example demonstrates Pub/Sub using a Chat application.\n"
"Optional Redis support is also demonstrated.\n"
"\nThe following arguments are supported:",
"-port -p port number to listen to. defaults port 3000", FIO_CLI_TYPE_INT,
"-workers -w number of processes to use.", FIO_CLI_TYPE_INT,
"-threads -t number of threads per process.", FIO_CLI_TYPE_INT,
"-log -v request verbosity (logging).", FIO_CLI_TYPE_BOOL,
"-public -www public folder, for static file service.",
"-redis -r an optional Redis URL server address. i.e.: "
"redis://user:password@localhost:6379/");
fio_cli_set_default("-p", "3000");
fio_cli_set_default("-port", "3000");
port = fio_cli_get("-p");
if (fio_cli_get("-www")) {
public_folder = fio_cli_get("-www");
fprintf(stderr, "* serving static files from:%s\n", public_folder);
}
threads = fio_cli_get_i("-t");
workers = fio_cli_get_i("-w");
print_log = fio_cli_get_bool("-v");
redis_url = fio_cli_get("-redis");
fio_cli_end();
if (!threads && !workers)
threads = workers = 1;
/* **** actual code **** */
if (redis_url) {
fprintf(stderr, "* Connecting to Redis for Pub/Sub.\n");
http_url_s url = http_url_parse(redis_url, strlen(redis_url));
if (!url.port.data) {
url.port.data = "6379";
url.port.len = 4;
}
fprintf(stderr, "Redis address: %.*s : %.*s %s%.*s\n", (int)url.host.len,
url.host.data, (int)url.port.len, url.port.data,
(url.password.data ? "password " : ""), (int)url.password.len,
url.password.data ? url.password.data : "");
FACIL_PUBSUB_DEFAULT =
redis_engine_create(.address = url.host, .port = url.port,
.auth = url.password, .ping_interval = 40);
if (!FACIL_PUBSUB_DEFAULT) {
perror("\nERROR: couldn't initialize Redis engine.\n");
exit(-2);
}
printf("* Redis engine initialized.\n");
} else {
printf("* Redis engine details missing, "
"using native-local pub/sub engine.\n");
}
websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 1);
if (http_listen(port, NULL, .on_request = answer_http_request,
.on_upgrade = answer_http_upgrade, .log = print_log,
.public_folder = public_folder) == -1)
perror("Couldn't initiate Websocket service"), exit(1);
facil_run(.threads = threads, .processes = workers);
if (FACIL_PUBSUB_DEFAULT != FACIL_PUBSUB_CLUSTER) {
redis_engine_destroy(FACIL_PUBSUB_DEFAULT);
FACIL_PUBSUB_DEFAULT = FACIL_PUBSUB_CLUSTER;
}
fiobj_free(CHAT_CHANNEL);
}
/* *****************************************************************************
While Redis is broken
***************************************************************************** */
#define NIL
#pragma weak redis_engine_create
pubsub_engine_s *redis_engine_create NIL(struct redis_engine_create_args a) {
(void)a;
return NULL;
}
#pragma weak redis_engine_destroy
void redis_engine_destroy(pubsub_engine_s *a) { (void)a; }