blob: 9bfe0f70027b51b124d4ce892bd2b293cbcbcde2 [file] [log] [blame] [raw]
#include <sys/types.h>
#include <sys/mman.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
#include <fcntl.h>
static void *map_write_only(size_t length) {
#if 0
int fd = open("/dev/zero", O_RDONLY);
if(fd == -1) {
perror("/dev/zero");
return MAP_FAILED;
}
#endif
char *p = mmap(NULL, length, PROT_WRITE,
#ifdef MAP_NOSYNC
MAP_NOSYNC |
#endif
#ifdef MAP_NOCORE
MAP_NOCORE |
#endif
#ifdef MAP_NORESERVE
MAP_NORESERVE |
#endif
#ifdef MAP_UNINITIALIZED
MAP_UNINITIALIZED |
#endif
#if 1
MAP_ANON |
#endif
#if 0
MAP_PRIVATE, 0, 0);
close(fd);
#else
MAP_PRIVATE, -1, 0);
#endif
return p;
}
static int open_connection_background(const struct sockaddr_in *addr) {
#ifdef SOCK_NONBLOCK
int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
#else
int fd = socket(PF_INET, SOCK_STREAM, 0);
#endif
if(fd == -1) {
perror("socket");
return -1;
}
if(fd >= FD_SETSIZE) {
fprintf(stderr, "Warning: fd %d too large, ignoring\n", fd);
close(fd);
return -1;
}
#ifndef SOCK_NONBLOCK
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1) {
perror("fcntl get flag error");
close(fd);
return -1;
}
if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl F_SETFL");
close(fd);
return -1;
}
#endif
int value = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof value);
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof value);
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &value, sizeof value);
#if 0
struct timeval timeout = { .tv_sec = 30 };
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout);
#endif
while(connect(fd, (struct sockaddr *)addr, sizeof(struct sockaddr_in)) == -1) {
if(errno == EINTR || errno == EINPROGRESS) return fd;
perror("connect");
close(fd);
return -1;
}
return fd;
}
int main(int argc, char **argv) {
if(argc != 4) {
printf("Usage: %s <server> <port> 1-%d\n Open ports to receive data\n", argv[0], FD_SETSIZE);
exit(1);
}
struct sockaddr_in addr = {
.sin_family = AF_INET, .sin_port = htons(atoi(argv[2]))
};
if(inet_pton(AF_INET, argv[1], &addr.sin_addr) < 1) {
fprintf(stderr, "%s: Failed to resolve server address %s\n", argv[0], argv[1]);
return 1;
}
static const char payload[] = "binary 32Ki\nyes\n";
int nports = atoi(argv[3]);
if(nports < 1) {
fprintf(stderr, "Number of ports should be bigger than 0\n");
exit(1);
}
if(nports > FD_SETSIZE) {
fprintf(stderr, "Number of ports can't be larger than %d\n", FD_SETSIZE);
return 1;
}
size_t buffer_size = sysconf(_SC_PAGESIZE);
if(buffer_size < 8192) buffer_size = 8192;
void *buffer = map_write_only(buffer_size);
if(buffer == MAP_FAILED) {
buffer = malloc(buffer_size);
if(!buffer) {
perror("malloc");
return 1;
}
}
int max_fd = -1;
int *sockets = malloc(nports * sizeof(int));
if(!sockets) {
perror("malloc");
return 1;
}
int i = 0;
while(i < nports) {
int fd = open_connection_background(&addr);
if(fd == -1) break;
//fprintf(stderr, "Opened fd %d\n", fd);
if(fd > max_fd) max_fd = fd;
sockets[i++] = fd;
}
if(i < 1) {
fprintf(stderr, "%s: Failed to create any connection\n", argv[0]);
exit(1);
}
if(i < nports) {
nports = i;
sockets = realloc(sockets, nports * sizeof(int));
assert(sockets);
}
int *nbytes_written = calloc(nports, sizeof(int));
if(!nbytes_written) {
perror("calloc");
return 1;
}
fd_set orig_rfdset, orig_wfdset;
FD_ZERO(&orig_rfdset);
FD_ZERO(&orig_wfdset);
i = 0;
while(i < nports) {
if(sockets[i] != -1) FD_SET(sockets[i], &orig_wfdset);
i++;
}
int orig_nports = nports;
while(nports > 0) {
fd_set rfdset = orig_rfdset;
fd_set wfdset = orig_wfdset;
struct timeval timeout = { .tv_sec = 600 };
int select_return = select(max_fd + 1, &rfdset, &wfdset, NULL, &timeout);
if(select_return == -1) {
if(errno == EINTR) continue;
perror("select error");
exit(1);
}
if(!select_return) {
fprintf(stderr, "%s: Timed out waiting for %d remaining connection(s), reconnecting\n",
argv[0], nports);
for(i = 0; i < orig_nports; i++) {
int fd = sockets[i];
if(fd != -1) {
close(fd);
FD_CLR(fd, &orig_rfdset);
FD_CLR(fd, &orig_wfdset);
fd = open_connection_background(&addr);
if(fd == -1) {
fprintf(stderr, "Failed to reopen socket for index %d\n", i);
nports--;
}
sockets[i] = fd;
FD_SET(fd, &orig_wfdset);
if(fd > max_fd) max_fd = fd;
nbytes_written[i] = 0;
}
}
continue;
}
for(i = 0; i < orig_nports && select_return > 0; i++) {
int fd = sockets[i];
if(fd == -1) continue;
if(FD_ISSET(fd, &wfdset)) {
int ss = nbytes_written[i];
assert(ss < sizeof payload);
int s;
do {
s = write(fd, payload + ss, sizeof payload - 1 - ss);
} while(s < 0 && errno == EINTR);
if(s < 0) {
fprintf(stderr, "index %d fd %d write error: %s\n", i, fd, strerror(errno));
close(fd);
FD_CLR(fd, &orig_wfdset);
fd = open_connection_background(&addr);
if(fd == -1) {
fprintf(stderr, "Failed to reopen socket for index %d\n", i);
nports--;
fprintf(stderr, "%d connection(s) left\n", nports);
}
sockets[i] = fd;
FD_SET(fd, &orig_wfdset);
if(fd > max_fd) max_fd = fd;
nbytes_written[i] = 0;
} else {
nbytes_written[i] += s;
assert(nbytes_written[i] < sizeof payload);
if(nbytes_written[i] == sizeof payload - 1) {
FD_CLR(fd, &orig_wfdset);
FD_SET(fd, &orig_rfdset);
}
}
select_return--;
} else if(FD_ISSET(fd, &rfdset)) {
int s;
do {
s = read(fd, buffer, buffer_size);
} while(s < 0 && errno == EINTR);
if(s < 0) {
fprintf(stderr, "index %d fd %d read error: %s\n", i, fd, strerror(errno));
close(fd);
FD_CLR(fd, &orig_rfdset);
fd = open_connection_background(&addr);
if(fd == -1) {
fprintf(stderr, "Failed to reopen socket for index %d\n", i);
nports--;
fprintf(stderr, "%d connection(s) left\n", nports);
}
sockets[i] = fd;
FD_SET(fd, &orig_wfdset);
if(fd > max_fd) max_fd = fd;
nbytes_written[i] = 0;
} else if(!s) {
fprintf(stderr, "index %d fd %d EOF\n", i, fd);
close(fd);
sockets[i] = -1;
nports--;
FD_CLR(fd, &orig_rfdset);
fprintf(stderr, "%d connection(s) left\n", nports);
} else {
fprintf(stderr, "index %d fd %d read %d byte\n",
i, fd, s);
}
select_return--;
}
}
}
return 0;
}