blob: 895edcc027c66e014d9780caaec72bca063c32fc [file] [log] [blame] [raw]
#ifndef WEBSOCKET_SHOOTOUT_PUBSUB_H
/**
This test suite emulates the websocket shoot testing requirements, except that
the JSON will not be parsed.
Here are a few possible test commands:
{"type":"broadcast",
websocket-bench broadcast ws://127.0.0.1:3000/ --concurrent 10 \
--sample-size 100 --server-type binary --step-size 1000 --limit-percentile 95 \
--limit-rtt 250ms --initial-clients 1000
websocket-bench broadcast ws://127.0.0.1:3000/ --concurrent 10 \
--sample-size 100 --step-size 1000 --limit-percentile 95 \
--limit-rtt 250ms --initial-clients 1000
ab -n 1000000 -c 2000 -k http://127.0.0.1:3000/
wrk -c400 -d5 -t12 http://localhost:3000/
I also run it for a while using the following Ruby script:
sleep 10 while `websocket-bench broadcast ws://127.0.0.1:3000/ --concurrent 10 \
--sample-size 100 --server-type binary --step-size 1000 --limit-percentile 95 \
--limit-rtt 250ms --initial-clients 1000`.tap {|s| puts s; puts "zzz..."}
*/
#define WEBSOCKET_SHOOTOUT_PUBSUB_H
#include "pubsub.h" // includes the "http.h" header
#include "websockets.h" // includes the "http.h" header
#include <errno.h>
#include <string.h>
static void ws_on_pubsub_message(pubsub_sub_pt pubsub, pubsub_message_s msg,
void *udata) {
protocol_s *pr = facil_protocol_try_lock((intptr_t)udata, FIO_PR_LOCK_STATE);
while (!pr) {
if (errno == EBADF) {
fwrite("X", 1, 1, stderr);
return;
}
// fwrite("-", 1, 1, stderr);
defer_thread_throttle(1);
pr = facil_protocol_try_lock((intptr_t)udata, FIO_PR_LOCK_STATE);
}
websocket_write((ws_s *)pr, msg.msg.data, msg.msg.len, msg.channel.len == 4);
facil_protocol_unlock(pr, FIO_PR_LOCK_STATE);
return;
(void)pubsub;
}
static void ws_register(ws_s *ws) {
pubsub_sub_pt *pubsubs = malloc(sizeof(*pubsubs) * 2);
websocket_udata_set(ws, pubsubs);
pubsubs[0] = pubsub_subscribe(.on_message = ws_on_pubsub_message,
.channel.name = "text",
.udata = (void *)websocket_uuid(ws));
pubsubs[1] = pubsub_subscribe(.on_message = ws_on_pubsub_message,
.channel.name = "binary",
.udata = (void *)websocket_uuid(ws));
}
static void ws_unregister(ws_s *ws) {
pubsub_sub_pt *pubsubs = websocket_udata(ws);
if (pubsubs) {
pubsub_unsubscribe(pubsubs[0]);
pubsub_unsubscribe(pubsubs[1]);
free(pubsubs);
}
}
static void ws_shootout_pubsub(ws_s *ws, char *data, size_t size,
uint8_t is_text) {
(void)(ws);
(void)(is_text);
(void)(size);
if (data[0] == 'b') {
pubsub_publish(.channel.name = "binary", .msg.data = data, .msg.len = size);
fwrite(".", 1, 1, stderr);
data[0] = 'r';
websocket_write(ws, data, size, 0);
} else if (data[9] == 'b') {
fwrite(".", 1, 1, stderr);
pubsub_publish(.channel.name = "text", .msg.data = data, .msg.len = size);
/* send result */
size = size + (25 - 19);
void *buff = malloc(size);
memcpy(buff, "{\"type\":\"broadcastResult\"", 25);
memcpy((void *)(((uintptr_t)buff) + 25), data + 19, size - 25);
websocket_write(ws, buff, size, 1);
free(buff);
} else {
/* perform echo */
websocket_write(ws, data, size, is_text);
}
}
/*
A simple Hello World HTTP response emulation. Test with:
ab -n 1000000 -c 200 -k http://127.0.0.1:3000/
*/
static void http_websocket_shotout_pubsub(http_request_s *request) {
// to log we will start a response.
http_response_s *response = http_response_create(request);
http_response_write_header(response, .name = "Server", .name_len = 6,
.value = "facil.io/0.4.0", .value_len = 14);
// http_response_log_start(&response);
// upgrade requests to broadcast will have the following properties:
if (request->upgrade) {
// Websocket upgrade will use our existing response (never leak responses).
websocket_upgrade(.request = request, .response = response,
.on_open = ws_register, .on_close = ws_unregister,
.on_message = ws_shootout_pubsub);
return;
}
http_response_write_header(response, .name = "Content-Type", .name_len = 12,
.value = "text/plain", .value_len = 10);
http_response_write_body(response,
"This is a Websocket-Shootout application!", 41);
http_response_finish(response);
}
static void listen2shootout_pubsub(const char *port, char is_logging) {
if (http_listen(port, NULL, .on_request = http_websocket_shotout_pubsub,
.log_static = is_logging))
perror("Couldn't initiate Websocket Shootout service"), exit(1);
}
#endif