| /* |
| copyright: Boaz segev, 2016 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #ifndef _GNU_SOURCE |
| #define _GNU_SOURCE |
| #endif |
| |
| #include "libsock.h" |
| |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <limits.h> |
| #include <netdb.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include <sys/mman.h> |
| #include <sys/resource.h> |
| #include <sys/socket.h> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <time.h> |
| |
| /* ***************************************************************************** |
| Use spinlocks "spnlock.h". |
| |
| For portability, it's possible copy "spnlock.h" directly after this line. |
| */ |
| #include "spnlock.h" |
| |
| /* ***************************************************************************** |
| Support `libreact` on_close callback, if exist. |
| */ |
| |
| #pragma weak reactor_on_close |
| void reactor_on_close(intptr_t uuid) {} |
| #pragma weak reactor_remove |
| int reactor_remove(intptr_t uuid) { return -1; } |
| |
| /* ***************************************************************************** |
| Support timeout setting. |
| */ |
| #pragma weak sock_touch |
| void sock_touch(intptr_t uuid) {} |
| |
| /* ***************************************************************************** |
| Support event based `write` scheduling. |
| */ |
| #pragma weak async_run |
| int async_run(void (*task)(void *), void *arg) { return -1; } |
| |
| /* ***************************************************************************** |
| OS Sendfile settings. |
| */ |
| |
| #ifndef USE_SENDFILE |
| |
| #if defined(__linux__) /* linux sendfile works */ |
| #include <sys/sendfile.h> |
| #define USE_SENDFILE 1 |
| #elif defined(__unix__) /* BSD sendfile should work, but isn't tested */ |
| #include <sys/uio.h> |
| #define USE_SENDFILE 0 |
| #elif defined(__APPLE__) /* Is the apple sendfile still broken? */ |
| #include <sys/uio.h> |
| #define USE_SENDFILE 1 |
| #else /* sendfile might not be available - always set to 0 */ |
| #define USE_SENDFILE 0 |
| #endif |
| |
| #endif |
| |
| /* ***************************************************************************** |
| Buffer and socket map memory allocation. Defaults to mmap. |
| */ |
| #ifndef USE_MALLOC |
| #define USE_MALLOC 0 |
| #endif |
| |
| /* ***************************************************************************** |
| The system call to `write` (non-blocking) can be defered when using `libasync`. |
| |
| However, this will not prevent `sock_write` from cycling through the sockets and |
| flushing them (block emulation) when both the system and the user level buffers |
| are full. |
| |
| Defaults to 1 (defered). |
| */ |
| #ifndef SOCK_DELAY_WRITE |
| #define SOCK_DELAY_WRITE 1 |
| #endif |
| |
| /* ***************************************************************************** |
| Library related helper functions |
| */ |
| |
| /** |
| Sets a socket to non blocking state. |
| */ |
| inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese |
| { |
| /* If they have O_NONBLOCK, use the Posix way to do it */ |
| #if defined(O_NONBLOCK) |
| /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */ |
| int flags; |
| if (-1 == (flags = fcntl(fd, F_GETFL, 0))) |
| flags = 0; |
| // printf("flags initial value was %d\n", flags); |
| return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
| #else |
| /* Otherwise, use the old way of doing it */ |
| static int flags = 1; |
| return ioctl(fd, FIOBIO, &flags); |
| #endif |
| } |
| |
| /** |
| Gets the maximum number of file descriptors this process can be allowed to |
| access (== maximum fd value + 1). |
| */ |
| ssize_t sock_max_capacity(void) { |
| // get current limits |
| static ssize_t flim = 0; |
| if (flim) |
| return flim; |
| #ifdef _SC_OPEN_MAX |
| flim = sysconf(_SC_OPEN_MAX); |
| #elif defined(OPEN_MAX) |
| flim = OPEN_MAX; |
| #endif |
| // try to maximize limits - collect max and set to max |
| struct rlimit rlim = {0}; |
| getrlimit(RLIMIT_NOFILE, &rlim); |
| // printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur, |
| // rlim.rlim_max); |
| #if defined(__APPLE__) /* Apple's getrlimit is broken. */ |
| rlim.rlim_cur = rlim.rlim_max >= OPEN_MAX ? OPEN_MAX : rlim.rlim_max; |
| #else |
| rlim.rlim_cur = rlim.rlim_max; |
| #endif |
| |
| setrlimit(RLIMIT_NOFILE, &rlim); |
| getrlimit(RLIMIT_NOFILE, &rlim); |
| // printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur, |
| // rlim.rlim_max); |
| // if the current limit is higher than it was, update |
| if (flim < rlim.rlim_cur) |
| flim = rlim.rlim_cur; |
| // return what we have |
| return flim; |
| } |
| |
| /* ***************************************************************************** |
| Library Core Data |
| */ |
| |
| typedef struct { |
| /** write buffer - a linked list */ |
| sock_packet_s *packet; |
| /** The fd UUID for the current connection */ |
| fduuid_u fduuid; |
| /** the amount of data sent from the current buffer packet */ |
| uint32_t sent; |
| /** state lock */ |
| spn_lock_i lock; |
| /* -- state flags -- */ |
| /** Connection is open */ |
| unsigned open : 1; |
| /** indicated that the connection should be closed. */ |
| unsigned close : 1; |
| /** indicated that the connection experienced an error. */ |
| unsigned err : 1; |
| /** future flags. */ |
| unsigned rsv : 5; |
| /* -- placement enforces padding to guaranty memory alignment -- */ |
| /** Read/Write hooks. */ |
| sock_rw_hook_s *rw_hooks; |
| } fd_info_s; |
| |
| #define LIB_SOCK_STATE_OPEN 1 |
| #define LIB_SOCK_STATE_CLOSED 0 |
| |
| static fd_info_s *fd_info = NULL; |
| static size_t fd_capacity = 0; |
| |
| #define uuid2info(uuid) fd_info[sock_uuid2fd(uuid)] |
| #define is_valid(uuid) \ |
| (fd_info[sock_uuid2fd(uuid)].fduuid.data.counter == \ |
| ((fduuid_u)(uuid)).data.counter && \ |
| uuid2info(uuid).open) |
| |
| static struct { |
| sock_packet_s *pool; |
| sock_packet_s *allocated; |
| spn_lock_i lock; |
| } buffer_pool = {.lock = SPN_LOCK_INIT}; |
| |
| #define BUFFER_PACKET_REAL_SIZE (sizeof(sock_packet_s) + BUFFER_PACKET_SIZE) |
| |
| /* reset a socket state */ |
| static inline void set_fd(int fd, unsigned int state) { |
| fd_info_s old_data; |
| // lock and update |
| spn_lock(&fd_info[fd].lock); |
| old_data = fd_info[fd]; |
| fd_info[fd] = (fd_info_s){ |
| .fduuid.data.counter = fd_info[fd].fduuid.data.counter + state, |
| .fduuid.data.fd = fd, |
| .lock = fd_info[fd].lock, |
| .open = state, |
| }; |
| // unlock |
| spn_unlock(&fd_info[fd].lock); |
| // should be called within the lock? - no function calling within a |
| // spinlock. |
| if (old_data.rw_hooks && old_data.rw_hooks->on_clear) |
| old_data.rw_hooks->on_clear(old_data.fduuid.uuid, old_data.rw_hooks); |
| // clear old data |
| if (old_data.packet) |
| sock_free_packet(old_data.packet); |
| // call callback if exists |
| if (old_data.open) { |
| // if (state == LIB_SOCK_STATE_OPEN) |
| // printf( |
| // "STRONG FD COLLISION PROTECTION: A new connection was accepted " |
| // "while the old one was marked as open.\n"); |
| reactor_remove(old_data.fduuid.uuid); |
| reactor_on_close(old_data.fduuid.uuid); |
| } |
| } |
| |
| /** |
| Destroys the library data. |
| |
| Call this function before calling any `libsock` functions. |
| */ |
| static void destroy_lib_data(void) { |
| if (fd_info) { |
| while (fd_capacity--) { // include 0 in countdown |
| // if (fd_info[fd_capacity].open) { |
| // fprintf(stderr, "Socket %lu is marked as open\n", fd_capacity); |
| // } |
| set_fd(fd_capacity, LIB_SOCK_STATE_CLOSED); |
| } |
| #if USE_MALLOC == 1 |
| free(fd_info); |
| #else |
| munmap(fd_info, (BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) + |
| (sizeof(fd_info_s) * fd_capacity)); |
| #endif |
| } |
| fd_info = NULL; |
| buffer_pool.pool = NULL; |
| buffer_pool.allocated = NULL; |
| buffer_pool.lock = SPN_LOCK_INIT; |
| } |
| |
| /** |
| Initializes the library. |
| |
| Call this function before calling any `libsock` functions. |
| */ |
| static void sock_lib_init(void) { |
| if (fd_info) |
| return; |
| |
| fd_capacity = sock_max_capacity(); |
| size_t fd_map_mem_size = sizeof(fd_info_s) * fd_capacity; |
| size_t buffer_mem_size = BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL; |
| |
| void *buff_mem; |
| #if USE_MALLOC == 1 |
| buff_mem = malloc(fd_map_mem_size + buffer_mem_size); |
| if (buff_mem == NULL) { |
| perror("Couldn't initialize libsock - not enough memory? "); |
| exit(1); |
| } |
| #else |
| buff_mem = mmap(NULL, fd_map_mem_size + buffer_mem_size, |
| PROT_READ | PROT_WRITE | PROT_EXEC, |
| MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); |
| // MAP_SHARED | MAP_ANONYMOUS, -1, 0); |
| if (buff_mem == MAP_FAILED || buff_mem == NULL) { |
| perror("Couldn't initialize libsock - not enough memory? "); |
| exit(1); |
| } |
| #endif |
| fd_info = buff_mem; |
| for (size_t i = 0; i < fd_capacity; i++) { |
| fd_info[i] = (fd_info_s){.lock = SPN_LOCK_INIT}; |
| spn_unlock(&fd_info[i].lock); |
| } |
| /* initialize pool */ |
| buffer_pool.allocated = buff_mem + fd_map_mem_size; |
| buffer_pool.pool = buffer_pool.allocated; |
| sock_packet_s *pos = buffer_pool.pool; |
| for (size_t i = 0; i < BUFFER_PACKET_POOL - 1; i++) { |
| *pos = (sock_packet_s){ |
| .metadata.next = (void *)(((uintptr_t)pos) + BUFFER_PACKET_REAL_SIZE), |
| }; |
| pos = pos->metadata.next; |
| } |
| pos->metadata.next = 0; |
| /* deallocate and manage on exit */ |
| atexit(destroy_lib_data); |
| #ifdef DEBUG |
| fprintf(stderr, "\nInitialized libsock for %lu sockets, " |
| "each one requires %lu bytes.\n" |
| "overall ovearhead: %lu bytes.\n" |
| "Initialized packet pool for %d elements, " |
| "each one %lu bytes.\n" |
| "overall buffer ovearhead: %lu bytes.\n" |
| "=== Total: %lu bytes ===\n\n", |
| fd_capacity, sizeof(*fd_info), sizeof(*fd_info) * fd_capacity, |
| BUFFER_PACKET_POOL, BUFFER_PACKET_REAL_SIZE, |
| BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL, |
| (BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) + |
| (sizeof(*fd_info) * fd_capacity)); |
| #endif |
| } |
| |
| #define review_lib() \ |
| if (fd_info == NULL) \ |
| sock_lib_init(); |
| |
| /* ***************************************************************************** |
| Read / Write internals |
| */ |
| |
| #define ERR_OK (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) |
| #define ERR_TRY_AGAIN (errno == EINTR) |
| |
| static inline int sock_flush_fd_failed(int fd) { |
| sock_free_packet(fd_info[fd].packet); |
| fd_info[fd].packet = NULL; |
| fd_info[fd].close = 1; |
| fd_info[fd].err = 1; |
| return 0; |
| } |
| |
| #if USE_SENDFILE == 1 |
| |
| #if defined(__linux__) /* linux sendfile API */ |
| static inline int sock_flush_os_sendfile(int fd) { |
| size_t sent; |
| sock_packet_s *packet = fd_info[fd].packet; |
| sent = |
| sendfile64(fd, (int)((ssize_t)packet->buffer), &packet->metadata.offset, |
| packet->length - fd_info[fd].sent); |
| |
| if (sent < 0) { |
| if (ERR_OK) |
| return -1; |
| else if (ERR_TRY_AGAIN) |
| return 0; |
| else |
| return sock_flush_fd_failed(fd); |
| } |
| if (sent == 0) |
| fd_info[fd].sent = packet->length; |
| fd_info[fd].sent += sent; |
| return 0; |
| } |
| |
| #elif defined(__APPLE__) || defined(__unix__) /* BSD / Apple API */ |
| |
| static inline int sock_flush_os_sendfile(int fd) { |
| off_t act_sent; |
| sock_packet_s *packet = fd_info[fd].packet; |
| act_sent = packet->length - fd_info[fd].sent; |
| |
| #if defined(__APPLE__) |
| if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset, |
| &act_sent, NULL, 0) < 0 && |
| act_sent == 0) |
| #else |
| if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset, |
| (size_t)act_sent, NULL, &act_sent, 0) < 0 && |
| act_sent == 0) |
| #endif |
| { |
| if (ERR_OK) |
| return -1; |
| else if (ERR_TRY_AGAIN) |
| return 0; |
| else |
| return sock_flush_fd_failed(fd); |
| } |
| if (act_sent == 0) { |
| fd_info[fd].sent = packet->length; |
| return 0; |
| } |
| packet->metadata.offset += act_sent; |
| fd_info[fd].sent += act_sent; |
| return 0; |
| } |
| #endif |
| |
| #else |
| |
| static inline int sock_flush_os_sendfile(int fd) { return -1; } |
| |
| #endif |
| |
| static inline int sock_flush_fd(int fd) { |
| if (USE_SENDFILE && fd_info[fd].rw_hooks == NULL) |
| return sock_flush_os_sendfile(fd); |
| ssize_t sent; |
| sock_packet_s *packet = fd_info[fd].packet; |
| // how much data are we expecting to send...? |
| ssize_t i_exp = (BUFFER_PACKET_SIZE > packet->length) ? packet->length |
| : BUFFER_PACKET_SIZE; |
| |
| // read data into the internal buffer |
| if (packet->metadata.internal_flag == 0) { |
| ssize_t i_read; |
| i_read = pread((int)((ssize_t)packet->buffer), packet + 1, i_exp, |
| packet->metadata.offset); |
| if (i_read <= 0) { |
| fd_info[fd].sent = fd_info[fd].packet->length; |
| return 0; |
| } else { |
| packet->metadata.offset += i_read; |
| packet->metadata.internal_flag = 1; |
| } |
| } |
| // send the data |
| if (fd_info[fd].rw_hooks && fd_info[fd].rw_hooks->write) |
| sent = fd_info[fd].rw_hooks->write( |
| fd_info[fd].fduuid.uuid, (((void *)(packet + 1)) + fd_info[fd].sent), |
| i_exp - fd_info[fd].sent); |
| else |
| sent = write(fd, (((void *)(packet + 1)) + fd_info[fd].sent), |
| i_exp - fd_info[fd].sent); |
| // review result and update packet data |
| if (sent < 0) { |
| if (ERR_OK) |
| return -1; |
| else if (ERR_TRY_AGAIN) |
| return 0; |
| else |
| return sock_flush_fd_failed(fd); |
| } |
| fd_info[fd].sent += sent; |
| if (fd_info[fd].sent >= i_exp) { |
| packet->metadata.internal_flag = 0; |
| fd_info[fd].sent = 0; |
| packet->length -= i_exp; |
| } |
| return 0; |
| } |
| |
| static inline int sock_flush_data(int fd) { |
| ssize_t sent; |
| if (fd_info[fd].rw_hooks && fd_info[fd].rw_hooks->write) |
| sent = fd_info[fd].rw_hooks->write( |
| fd_info[fd].fduuid.uuid, fd_info[fd].packet->buffer + fd_info[fd].sent, |
| fd_info[fd].packet->length - fd_info[fd].sent); |
| else |
| sent = write(fd, fd_info[fd].packet->buffer + fd_info[fd].sent, |
| fd_info[fd].packet->length - fd_info[fd].sent); |
| if (sent < 0) { |
| if (ERR_OK) |
| return -1; |
| else if (ERR_TRY_AGAIN) |
| return 0; |
| else |
| return sock_flush_fd_failed(fd); |
| } |
| fd_info[fd].sent += sent; |
| return 0; |
| } |
| |
| static void sock_flush_unsafe(int fd) { |
| while (fd_info[fd].packet) { |
| if (fd_info[fd].packet->metadata.is_fd == 0) { |
| if (sock_flush_data(fd)) |
| return; |
| } else { |
| if (sock_flush_fd(fd)) |
| return; |
| } |
| if (fd_info[fd].packet && fd_info[fd].packet->length <= fd_info[fd].sent) { |
| sock_packet_s *packet = fd_info[fd].packet; |
| fd_info[fd].packet = packet->metadata.next; |
| packet->metadata.next = NULL; |
| fd_info[fd].sent = 0; |
| sock_free_packet(packet); |
| } |
| } |
| } |
| |
| #if SOCK_DELAY_WRITE == 1 |
| |
| static inline void sock_flush_schd(intptr_t uuid) { |
| if (async_run((void *)sock_flush, (void *)uuid) == -1) |
| goto fallback; |
| return; |
| fallback: |
| sock_flush_unsafe(sock_uuid2fd(uuid)); |
| } |
| |
| #define _write_to_sock() sock_flush_schd(sfd->fduuid.uuid) |
| |
| #else |
| |
| #define _write_to_sock() sock_flush_unsafe(fd) |
| |
| #endif |
| |
| static inline void sock_send_packet_unsafe(int fd, sock_packet_s *packet) { |
| fd_info_s *sfd = fd_info + fd; |
| if (sfd->packet == NULL) { |
| /* no queue, nothing to check */ |
| sfd->packet = packet; |
| _write_to_sock(); |
| return; |
| |
| } else if (packet->metadata.urgent == 0) { |
| /* not urgent, last in line */ |
| sock_packet_s *pos = sfd->packet; |
| while (pos->metadata.next) |
| pos = pos->metadata.next; |
| pos->metadata.next = packet; |
| _write_to_sock(); |
| return; |
| |
| } else { |
| /* urgent, find a spot we can interrupt */ |
| sock_packet_s **pos = &sfd->packet; |
| while (*pos && (*pos)->metadata.can_interrupt == 0) |
| pos = &(*pos)->metadata.next; |
| sock_packet_s *tail = *pos; |
| *pos = packet; |
| if (tail) { |
| pos = &packet->metadata.next; |
| while (*pos) |
| pos = &(*pos)->metadata.next; |
| *pos = tail; |
| } |
| } |
| _write_to_sock(); |
| } |
| |
| /* ***************************************************************************** |
| Listen |
| */ |
| |
| /** |
| Opens a listening non-blocking socket. Return's the socket's UUID. |
| */ |
| intptr_t sock_listen(const char *address, const char *port) { |
| review_lib(); |
| int srvfd; |
| // setup the address |
| struct addrinfo hints; |
| struct addrinfo *servinfo; // will point to the results |
| memset(&hints, 0, sizeof hints); // make sure the struct is empty |
| hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6 |
| hints.ai_socktype = SOCK_STREAM; // TCP stream sockets |
| hints.ai_flags = AI_PASSIVE; // fill in my IP for me |
| if (getaddrinfo(address, port, &hints, &servinfo)) { |
| // perror("addr err"); |
| return -1; |
| } |
| // get the file descriptor |
| srvfd = |
| socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); |
| if (srvfd <= 0) { |
| // perror("socket err"); |
| freeaddrinfo(servinfo); |
| return -1; |
| } |
| // make sure the socket is non-blocking |
| if (sock_set_non_block(srvfd) < 0) { |
| // perror("couldn't set socket as non blocking! "); |
| freeaddrinfo(servinfo); |
| close(srvfd); |
| return -1; |
| } |
| // avoid the "address taken" |
| { |
| int optval = 1; |
| setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); |
| } |
| // bind the address to the socket |
| { |
| int bound = 0; |
| for (struct addrinfo *p = servinfo; p != NULL; p = p->ai_next) { |
| if (!bind(srvfd, p->ai_addr, p->ai_addrlen)) |
| bound = 1; |
| } |
| |
| if (!bound) { |
| // perror("bind err"); |
| freeaddrinfo(servinfo); |
| close(srvfd); |
| return -1; |
| } |
| } |
| freeaddrinfo(servinfo); |
| // listen in |
| if (listen(srvfd, SOMAXCONN) < 0) { |
| // perror("couldn't start listening"); |
| close(srvfd); |
| return -1; |
| } |
| set_fd(srvfd, LIB_SOCK_STATE_OPEN); |
| return fd_info[srvfd].fduuid.uuid; |
| } |
| |
| /* ***************************************************************************** |
| Accept |
| */ |
| |
| intptr_t sock_accept(intptr_t srv_uuid) { |
| review_lib(); |
| static socklen_t cl_addrlen = 0; |
| int client; |
| #ifdef SOCK_NONBLOCK |
| client = accept4(sock_uuid2fd(srv_uuid), NULL, &cl_addrlen, SOCK_NONBLOCK); |
| if (client <= 0) |
| return -1; |
| #else |
| client = accept(sock_uuid2fd(srv_uuid), NULL, &cl_addrlen); |
| if (client <= 0) |
| return -1; |
| sock_set_non_block(client); |
| #endif |
| set_fd(client, LIB_SOCK_STATE_OPEN); |
| return fd_info[client].fduuid.uuid; |
| } |
| |
| /* ***************************************************************************** |
| Connect |
| */ |
| intptr_t sock_connect(char *address, char *port) { |
| review_lib(); |
| int fd; |
| // setup the address |
| struct addrinfo hints; |
| struct addrinfo *addrinfo; // will point to the results |
| memset(&hints, 0, sizeof hints); // make sure the struct is empty |
| hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6 |
| hints.ai_socktype = SOCK_STREAM; // TCP stream sockets |
| hints.ai_flags = AI_PASSIVE; // fill in my IP for me |
| if (getaddrinfo(address, port, &hints, &addrinfo)) { |
| return -1; |
| } |
| // get the file descriptor |
| fd = |
| socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol); |
| if (fd <= 0) { |
| freeaddrinfo(addrinfo); |
| return -1; |
| } |
| // make sure the socket is non-blocking |
| if (sock_set_non_block(fd) < 0) { |
| freeaddrinfo(addrinfo); |
| close(fd); |
| return -1; |
| } |
| |
| if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0 && |
| errno != EINPROGRESS) { |
| close(fd); |
| freeaddrinfo(addrinfo); |
| return -1; |
| } |
| freeaddrinfo(addrinfo); |
| set_fd(fd, LIB_SOCK_STATE_OPEN); |
| return fd_info[fd].fduuid.uuid; |
| } |
| |
| /* ***************************************************************************** |
| Open existing |
| */ |
| |
| intptr_t sock_open(int fd) { |
| review_lib(); |
| set_fd(fd, LIB_SOCK_STATE_OPEN); |
| return fd_info[fd].fduuid.uuid; |
| } |
| |
| /* ***************************************************************************** |
| Information about the socket |
| */ |
| |
| /** |
| Returns 1 if the uuid refers to a valid and open, socket. |
| |
| Returns 0 if not. |
| */ |
| int sock_isvalid(intptr_t uuid) { return fd_info && is_valid(uuid); } |
| |
| /** |
| `sock_fd2uuid` takes an existing file decriptor `fd` and returns it's active |
| `uuid`. |
| */ |
| intptr_t sock_fd2uuid(int fd) { |
| return (fd_info && fd_info[fd].open) ? fd_info[fd].fduuid.uuid : -1; |
| } |
| |
| /* ***************************************************************************** |
| Buffer API. |
| */ |
| |
| static inline sock_packet_s *sock_try_checkout_packet(void) { |
| sock_packet_s *packet; |
| spn_lock(&buffer_pool.lock); |
| packet = buffer_pool.pool; |
| if (packet) { |
| buffer_pool.pool = packet->metadata.next; |
| spn_unlock(&buffer_pool.lock); |
| *packet = (sock_packet_s){.buffer = packet + 1, .metadata.next = NULL}; |
| return packet; |
| } |
| spn_unlock(&buffer_pool.lock); |
| return packet; |
| } |
| |
| /** |
| Checks out a `sock_packet_s` from the packet pool, transfering the |
| ownership of the memory to the calling function. The function will hang until |
| a |
| packet becomes available, so never check out more then a single packet at a |
| time. |
| */ |
| sock_packet_s *sock_checkout_packet(void) { |
| review_lib(); |
| sock_packet_s *packet = NULL; |
| for (;;) { |
| spn_lock(&buffer_pool.lock); |
| packet = buffer_pool.pool; |
| if (packet) { |
| buffer_pool.pool = packet->metadata.next; |
| spn_unlock(&buffer_pool.lock); |
| *packet = (sock_packet_s){.buffer = packet + 1, .metadata.next = NULL}; |
| return packet; |
| } |
| spn_unlock(&buffer_pool.lock); |
| reschedule_thread(); |
| sock_flush_all(); |
| } |
| } |
| /** |
| Attaches a packet to a socket's output buffer and calls `sock_flush` for the |
| socket. |
| */ |
| ssize_t sock_send_packet(intptr_t uuid, sock_packet_s *packet) { |
| if (!fd_info || !is_valid(uuid)) { |
| sock_free_packet(packet); |
| return -1; |
| } |
| spn_lock(&uuid2info(uuid).lock); |
| sock_send_packet_unsafe(sock_uuid2fd(uuid), packet); |
| spn_unlock(&uuid2info(uuid).lock); |
| return 0; |
| } |
| |
| /** |
| Returns TRUE (non 0) if there is data waiting to be written to the socket in |
| the |
| user-land buffer. |
| */ |
| _Bool sock_packets_pending(intptr_t uuid) { |
| return fd_info && uuid2info(uuid).packet != NULL; |
| } |
| |
| /** |
| Use `sock_free_packet` to free unused packets that were checked-out using |
| `sock_checkout_packet`. |
| */ |
| void sock_free_packet(sock_packet_s *packet) { |
| sock_packet_s *next = packet; |
| if (packet == NULL) |
| return; |
| for (;;) { |
| if (next->metadata.is_fd) { |
| if (next->metadata.keep_open == 0) |
| close((int)((ssize_t)next->buffer)); |
| } else if (next->metadata.external) |
| free(next->buffer); |
| if (next->metadata.next == NULL) |
| break; /* next will hold the last packet in the chain. */ |
| next = next->metadata.next; |
| } |
| spn_lock(&buffer_pool.lock); |
| next->metadata.next = buffer_pool.pool; |
| buffer_pool.pool = packet; |
| spn_unlock(&buffer_pool.lock); |
| } |
| |
| /* ***************************************************************************** |
| Reading |
| */ |
| ssize_t sock_read(intptr_t uuid, void *buf, size_t count) { |
| if (!fd_info || !is_valid(uuid)) { |
| errno = ENODEV; |
| return -1; |
| } |
| ssize_t i_read; |
| fd_info_s *sfd = fd_info + sock_uuid2fd(uuid); |
| if (sfd->rw_hooks && sfd->rw_hooks->read) |
| i_read = sfd->rw_hooks->read(uuid, buf, count); |
| else |
| i_read = read(sock_uuid2fd(uuid), buf, count); |
| |
| if (i_read > 0) { |
| sock_touch(uuid); |
| return i_read; |
| } |
| if (i_read == -1 && (ERR_OK || ERR_TRY_AGAIN)) |
| return 0; |
| // fprintf(stderr, "Read Error for %lu bytes from fd %d (closing))\n", |
| // count, |
| // sock_uuid2fd(uuid)); |
| sock_close(uuid); |
| return -1; |
| } |
| |
| /* ***************************************************************************** |
| Flushing |
| */ |
| |
| ssize_t sock_flush(intptr_t uuid) { |
| if (!fd_info || !is_valid(uuid)) |
| return -1; |
| if (uuid2info(uuid).packet == NULL) |
| goto no_packet; |
| spn_lock(&uuid2info(uuid).lock); |
| sock_flush_unsafe(sock_uuid2fd(uuid)); |
| spn_unlock(&uuid2info(uuid).lock); |
| no_packet: |
| if (uuid2info(uuid).close) { |
| sock_force_close(uuid); |
| return -1; |
| } |
| return 0; |
| } |
| /** |
| `sock_flush_strong` performs the same action as `sock_flush` but returns only |
| after all the data was sent. This is an "active" wait, polling isn't |
| performed. |
| */ |
| void sock_flush_strong(intptr_t uuid) { |
| if (!fd_info) |
| return; |
| while (is_valid(uuid) && uuid2info(uuid).packet) |
| sock_flush(uuid); |
| } |
| /** |
| Calls `sock_flush` for each file descriptor that's buffer isn't empty. |
| */ |
| void sock_flush_all(void) { |
| for (size_t i = 0; i < fd_capacity; i++) { |
| if (fd_info[i].packet == NULL || spn_is_locked(&fd_info[i].lock)) |
| continue; |
| sock_flush(fd_info[i].fduuid.uuid); |
| } |
| } |
| |
| /* ***************************************************************************** |
| Writing |
| */ |
| |
| ssize_t sock_write2_fn(sock_write_info_s options) { |
| if (!fd_info || !is_valid(options.fduuid)) { |
| errno = ENODEV; |
| return -1; |
| } |
| if (options.buffer == NULL) |
| return -1; |
| if (!options.length && !options.is_fd) |
| options.length = strlen(options.buffer); |
| if (options.length == 0) |
| return -1; |
| sock_packet_s *packet = sock_checkout_packet(); |
| packet->metadata.can_interrupt = 1; |
| packet->metadata.urgent = options.urgent; |
| |
| if (options.is_fd) { |
| packet->buffer = (void *)options.buffer; |
| packet->length = options.length; |
| packet->metadata.is_fd = options.is_fd; |
| packet->metadata.offset = options.offset; |
| return sock_send_packet(options.fduuid, packet); |
| } else { |
| if (options.move) { |
| packet->buffer = (void *)options.buffer; |
| packet->length = options.length; |
| packet->metadata.external = 1; |
| return sock_send_packet(options.fduuid, packet); |
| } else { |
| if (options.length <= BUFFER_PACKET_SIZE) { |
| memcpy(packet->buffer, options.buffer, options.length); |
| packet->length = options.length; |
| return sock_send_packet(options.fduuid, packet); |
| } else { |
| if (packet->metadata.urgent) { |
| fprintf(stderr, "Socket err:" |
| "Large data cannot be sent as an urgent packet.\n" |
| "Urgency silently ignored\n"); |
| packet->metadata.urgent = 0; |
| } |
| size_t to_cpy; |
| spn_lock(&uuid2info(options.fduuid).lock); |
| for (;;) { |
| to_cpy = options.length > BUFFER_PACKET_SIZE ? BUFFER_PACKET_SIZE |
| : options.length; |
| memcpy(packet->buffer, options.buffer, to_cpy); |
| packet->length = to_cpy; |
| options.length -= to_cpy; |
| options.buffer += to_cpy; |
| sock_send_packet_unsafe(sock_uuid2fd(options.fduuid), packet); |
| if (!is_valid(options.fduuid) || uuid2info(options.fduuid).err == 1 || |
| options.length == 0) |
| break; |
| packet = sock_try_checkout_packet(); |
| while (packet == NULL) { |
| sock_flush_all(); |
| sock_flush_unsafe(sock_uuid2fd(options.fduuid)); |
| packet = sock_try_checkout_packet(); |
| } |
| } |
| spn_unlock(&uuid2info(options.fduuid).lock); |
| if (uuid2info(options.fduuid).packet == NULL && |
| uuid2info(options.fduuid).close) { |
| sock_force_close(options.fduuid); |
| return -1; |
| } |
| return is_valid(options.fduuid) ? 0 : -1; |
| } |
| } |
| } |
| // how did we get here? |
| return -1; |
| } |
| |
| /* ***************************************************************************** |
| Closing. |
| */ |
| |
| void sock_close(intptr_t uuid) { |
| // fprintf(stderr, "called sock_close for %lu (%d)\n", uuid, |
| // sock_uuid2fd(uuid)); |
| if (!fd_info || !is_valid(uuid)) |
| return; |
| fd_info[sock_uuid2fd(uuid)].close = 1; |
| sock_flush(uuid); |
| } |
| |
| void sock_force_close(intptr_t uuid) { |
| // fprintf(stderr, "called sock_force_close for %lu (%d)\n", uuid, |
| // sock_uuid2fd(uuid)); |
| if (!fd_info || !is_valid(uuid)) |
| return; |
| shutdown(sock_uuid2fd(uuid), SHUT_RDWR); |
| close(sock_uuid2fd(uuid)); |
| set_fd(sock_uuid2fd(uuid), LIB_SOCK_STATE_CLOSED); |
| } |
| |
| /* ***************************************************************************** |
| RW hooks implementation |
| */ |
| |
| /** Gets a socket hook state (a pointer to the struct). */ |
| struct sock_rw_hook_s *sock_rw_hook_get(intptr_t uuid) { |
| if (!fd_info || !is_valid(uuid)) |
| return NULL; |
| return uuid2info(uuid).rw_hooks; |
| } |
| |
| /** Sets a socket hook state (a pointer to the struct). */ |
| int sock_rw_hook_set(intptr_t uuid, sock_rw_hook_s *rw_hooks) { |
| if (!fd_info || !is_valid(uuid)) |
| return -1; |
| spn_lock(&(uuid2info(uuid).lock)); |
| uuid2info(uuid).rw_hooks = rw_hooks; |
| spn_unlock(&uuid2info(uuid).lock); |
| return 0; |
| } |
| |
| /* ***************************************************************************** |
| test |
| */ |
| #ifdef DEBUG |
| void sock_libtest(void) { |
| sock_lib_init(); |
| sock_packet_s *p, *pl; |
| size_t count = 0; |
| fprintf(stderr, "Testing packet pool\n"); |
| for (size_t i = 0; i < BUFFER_PACKET_POOL * 2; i++) { |
| count = 1; |
| pl = p = sock_checkout_packet(); |
| while (buffer_pool.pool) { |
| count++; |
| pl->metadata.next = sock_checkout_packet(); |
| pl = pl->metadata.next; |
| } |
| sock_free_packet(p); |
| // fprintf(stderr, "Collected and freed %lu packets.\n", count); |
| } |
| fprintf(stderr, |
| "liniar (no-contention) packet checkout + free shows %lu packets. " |
| "test %s\n", |
| count, count == BUFFER_PACKET_POOL ? "passed." : "FAILED!"); |
| } |
| #endif |