| /* |
| copyright: Boaz segev, 2016 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #include "libbuffer.h" |
| #include <string.h> |
| #include <pthread.h> |
| #include <unistd.h> |
| #include <errno.h> |
| |
| /****************************************************************************** |
| The pre-allocated memory per packet |
| */ |
| |
| // packet sizes |
| #ifndef BUFFER_PACKET_SIZE |
| #define BUFFER_PACKET_SIZE (1024 * 64) |
| #endif |
| #ifndef BUFFER_MAX_PACKET_POOL |
| #define BUFFER_MAX_PACKET_POOL 127 |
| #endif |
| |
| // Buffer packets |
| struct Packet { |
| ssize_t length; |
| struct Packet* next; |
| void* data; |
| char mem[BUFFER_PACKET_SIZE]; |
| struct { |
| unsigned can_interrupt : 1; |
| unsigned close_after : 1; |
| unsigned rsrv : 6; |
| } metadata; |
| }; |
| |
| // The global packet container pool |
| static struct { |
| int ref_count; |
| int pool_count; |
| struct Packet* pool; |
| } ContainerPool = {0}; |
| |
| // the packet pool mutex |
| static pthread_mutex_t container_pool_locker = PTHREAD_MUTEX_INITIALIZER; |
| |
| // register a buffer in the pool - the pool will self-distruct when the last |
| // buffer unregisters. |
| static void register_buffer(void) { |
| pthread_mutex_lock(&container_pool_locker); |
| ContainerPool.ref_count++; |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| // unregister a buffer in the pool |
| static void unregister_buffer(void) { |
| pthread_mutex_lock(&container_pool_locker); |
| ContainerPool.ref_count--; |
| if (ContainerPool.ref_count <= 0) { |
| ContainerPool.ref_count = 0; // never fall from 0 |
| struct Packet* to_free; |
| while ((to_free = ContainerPool.pool)) { |
| ContainerPool.pool = to_free->next; |
| free(to_free); |
| } |
| } |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| // grab a packet from the pool |
| static struct Packet* get_packet(void) { |
| struct Packet* packet; |
| pthread_mutex_lock(&container_pool_locker); |
| packet = ContainerPool.pool; |
| if (packet) { |
| ContainerPool.pool = packet->next; |
| ContainerPool.pool_count--; |
| if (ContainerPool.pool_count < 0) // just in case...? |
| ContainerPool.pool_count = 0; |
| } else { |
| packet = malloc(sizeof(struct Packet)); |
| } |
| pthread_mutex_unlock(&container_pool_locker); |
| if (!packet) |
| return 0; |
| packet->data = packet->mem; |
| packet->next = NULL; |
| packet->length = 0; |
| *((char*)&packet->metadata) = 0; |
| return packet; |
| } |
| // return a packet to the pool, or free it (when the pool is full). |
| static void free_packet(struct Packet* packet) { |
| if (packet->data != packet->mem && packet->data) { |
| if (packet->length) |
| free(packet->data); |
| else |
| fclose(packet->data); |
| } |
| pthread_mutex_lock(&container_pool_locker); |
| if (ContainerPool.pool_count <= BUFFER_MAX_PACKET_POOL) { |
| packet->next = ContainerPool.pool; |
| ContainerPool.pool = packet; |
| ContainerPool.pool_count++; |
| } else |
| free(packet); |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| |
| /****************************************************************************** |
| The Buffer data and helper methods |
| */ |
| |
| // The buffer structure |
| struct Buffer { |
| void* id; |
| // pointer to the actual data. |
| struct Packet* packet; |
| // the amount of data sent from the first packet. |
| size_t sent; |
| // a mutex preventing buffer corruption. |
| pthread_mutex_t lock; |
| // a writing hook, allowing for SSL sockets or other extensions. |
| ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len); |
| // the buffer's owner |
| server_pt owner; |
| }; |
| |
| // validates that this is an actual buffer object. |
| static inline int is_buffer(struct Buffer* object) { |
| return object->id == is_buffer; |
| } |
| |
| /****************************************************************************** |
| The Buffer API |
| */ |
| |
| // create a new buffer object |
| static inline void* new_buffer(server_pt owner) { |
| struct Buffer* buffer = malloc(sizeof(struct Buffer)); |
| if (!buffer) |
| return 0; |
| *buffer = (struct Buffer){ |
| //.lock = PTHREAD_MUTEX_INITIALIZER, |
| .id = is_buffer, |
| .sent = 0, |
| .packet = NULL, |
| .owner = owner, |
| }; |
| |
| if (pthread_mutex_init(&buffer->lock, NULL)) { |
| free(buffer); |
| return 0; |
| } |
| register_buffer(); |
| return buffer; |
| } |
| |
| // clears all the buffer data |
| static inline void clear_buffer(struct Buffer* buffer) { |
| if (is_buffer(buffer)) { |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet* to_free = NULL; |
| while ((to_free = buffer->packet)) { |
| buffer->packet = buffer->packet->next; |
| free_packet(to_free); |
| } |
| buffer->writing_hook = NULL; |
| pthread_mutex_unlock(&buffer->lock); |
| } |
| } |
| |
| // sets the buffer's writing hook |
| void set_whook( |
| struct Buffer* buffer, |
| ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len)) { |
| if (is_buffer(buffer)) |
| buffer->writing_hook = writing_hook; |
| } |
| |
| // destroys the buffer |
| static inline void destroy_buffer(struct Buffer* buffer) { |
| if (is_buffer(buffer)) { |
| clear_buffer(buffer); |
| pthread_mutex_destroy(&buffer->lock); |
| free(buffer); |
| unregister_buffer(); |
| } |
| } |
| |
| // applys the move logic for either urgent or non urgent packets |
| static void insert_packets_to_buffer(struct Buffer* buffer, |
| struct Packet* packet, |
| char urgent) { |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet *tail, **pos = &(buffer->packet); |
| if (urgent) { |
| while (*pos && (!(*pos)->next || !(*pos)->next->metadata.can_interrupt)) { |
| pos = &((*pos)->next); |
| } |
| } else { |
| while (*pos) { |
| pos = &((*pos)->next); |
| } |
| } |
| tail = (*pos); |
| *pos = packet; |
| if (tail) { |
| pos = &(packet->next); |
| while (*pos) |
| pos = &((*pos)->next); |
| *pos = tail; |
| } |
| pthread_mutex_unlock(&buffer->lock); |
| } |
| |
| // takes data and places it into the end of the buffer |
| static inline size_t buffer_move_logic(struct Buffer* buffer, |
| void* data, |
| size_t length, |
| char urgent) { |
| if (!is_buffer(buffer)) |
| return 0; |
| if (!length || !data) { |
| fprintf( |
| stderr, |
| "Buffer: Canot move data because either length (%lu) or data (%p) are " |
| "invalid\n", |
| length, data); |
| return 0; |
| } |
| struct Packet* np = get_packet(); |
| if (!np) |
| return 0; |
| np->data = data; |
| np->length = length; |
| // np->next = NULL; // performed by `get_packet` |
| // *((char*)&np->metadata) = 0; // performed by `get_packet` |
| np->metadata.can_interrupt = 1; |
| insert_packets_to_buffer(buffer, np, urgent); |
| return length; |
| } |
| static size_t buffer_move(struct Buffer* buffer, void* data, size_t length) { |
| return buffer_move_logic(buffer, data, length, 0); |
| } |
| static size_t buffer_move_next(struct Buffer* buffer, |
| void* data, |
| size_t length) { |
| return buffer_move_logic(buffer, data, length, 1); |
| } |
| |
| // takes data, copies it and pushes it into the buffer |
| static size_t buffer_copy_logic(struct Buffer* buffer, |
| void* data, |
| size_t length, |
| char urgent) { |
| if (!length || !data) { |
| fprintf( |
| stderr, |
| "Buffer: Canot copy data because either length (%lu) or data (%p) are " |
| "invalid\n", |
| length, data); |
| return 0; |
| } |
| size_t to_copy = length; |
| struct Packet* np = get_packet(); |
| if (!np) { |
| fprintf(stderr, "Couldn't allocate memory for the buffer (on copy)\n"); |
| return 0; |
| } |
| // set marker for packet interrupt |
| np->metadata.can_interrupt = 1; |
| struct Packet* tmp = np; |
| while (to_copy) { |
| if (to_copy > BUFFER_PACKET_SIZE) { |
| memcpy(tmp->mem, data, BUFFER_PACKET_SIZE); |
| tmp->data = tmp->mem; |
| data += BUFFER_PACKET_SIZE; |
| to_copy -= BUFFER_PACKET_SIZE; |
| tmp->length = BUFFER_PACKET_SIZE; |
| tmp->next = get_packet(); |
| if (!(tmp->next)) { |
| fprintf(stderr, "Couldn't allocate memory for the buffer (on copy)\n"); |
| // free them all and return 0; |
| tmp = np; |
| while (tmp) { |
| np = tmp; |
| tmp = np->next; |
| free_packet(np); |
| } |
| return 0; |
| } |
| tmp = tmp->next; |
| } else { |
| memcpy(tmp->mem, data, to_copy); |
| tmp->data = tmp->mem; |
| tmp->length = to_copy; |
| to_copy = 0; |
| } |
| } |
| insert_packets_to_buffer(buffer, np, urgent); |
| return length; |
| } |
| |
| // takes data, copies it and pushes it into the buffer |
| static size_t buffer_copy(struct Buffer* buffer, void* data, size_t length) { |
| return buffer_copy_logic(buffer, data, length, 0); |
| } |
| |
| // takes data, copies it, and places it at the front of the buffer |
| static size_t buffer_copy_next(struct Buffer* buffer, |
| void* data, |
| size_t length) { |
| return buffer_copy_logic(buffer, data, length, 1); |
| } |
| |
| // Flushes the buffer (writes as much as it can)... |
| // This is where a lot of the action takes place :-) |
| static ssize_t buffer_flush(struct Buffer* buffer, uint64_t conn) { |
| int fd = server_uuid_to_fd(conn); |
| if (!is_buffer(buffer)) |
| return -1; |
| ssize_t sent = 0; |
| struct Packet* packet; |
| pthread_mutex_lock(&buffer->lock); |
| start_flush: |
| // no packets to send |
| if (!buffer->packet) { |
| pthread_mutex_unlock(&buffer->lock); |
| return 0; |
| } |
| // packet is a file |
| if (!buffer->packet->length) { |
| // make sure file sending isn't interrupted. |
| buffer->packet->metadata.can_interrupt = 0; |
| // grab a packet from the pool |
| packet = get_packet(); |
| // read the data |
| packet->length = |
| fread(packet->data, 1, BUFFER_PACKET_SIZE, buffer->packet->data); |
| // read less? done sending file |
| if (packet->length < BUFFER_PACKET_SIZE) { |
| if (packet->length <= 0) { // no more data... |
| // return the packet we got from the pool. |
| free_packet(packet); |
| // move the buffer one step forward. |
| packet = buffer->packet; |
| buffer->packet = buffer->packet->next; |
| free_packet(packet); |
| packet = NULL; |
| } else { // this will be the last the file will offer. |
| // set the next packet. |
| packet->next = buffer->packet->next; |
| // free the file packet. |
| free_packet(buffer->packet); |
| // set the data packet as the buffer's packet. |
| buffer->packet = packet; |
| } |
| } else { |
| // set the next packet. |
| packet->next = buffer->packet; |
| // set the data packet as the buffer's packet, the file packet is next. |
| buffer->packet = packet; |
| } |
| // make sure the sent property is reset. |
| buffer->sent = 0; |
| // restart the flush |
| goto start_flush; |
| } |
| // the packet, at this point, is always a data packet. send the data. |
| |
| // write using the writing hook if available. |
| if (buffer->writing_hook) { |
| sent = buffer->writing_hook(buffer->owner, fd, |
| buffer->packet->data + buffer->sent, |
| buffer->packet->length - buffer->sent); |
| } else { |
| sent = write(fd, buffer->packet->data + buffer->sent, |
| buffer->packet->length - buffer->sent); |
| if (sent < 0 && (errno & (EWOULDBLOCK | EAGAIN | EINTR))) |
| sent = 0; |
| } |
| if (sent < 0) { |
| pthread_mutex_unlock(&buffer->lock); |
| return -1; |
| } else if (sent > 0) { |
| buffer->sent += sent; |
| Server.touch(buffer->owner, conn); // only `on_ready` resets idle time. |
| } |
| if (buffer->sent >= buffer->packet->length) { |
| // review the close connection flag means: "Close the connection" |
| if (buffer->packet->metadata.close_after) { |
| pthread_mutex_unlock(&(buffer->lock)); |
| Server.close(buffer->owner, conn); |
| return sent; |
| // buffer clearing should be performed by the Buffer's owner. |
| } |
| packet = buffer->packet; |
| buffer->sent = 0; |
| buffer->packet = buffer->packet->next; |
| free_packet(packet); |
| } |
| pthread_mutex_unlock(&(buffer->lock)); |
| return sent; |
| } |
| |
| static int buffer_sendfile(struct Buffer* buffer, FILE* file) { |
| if (!is_buffer(buffer)) |
| return -1; |
| struct Packet* np = get_packet(); |
| if (!np) |
| return -1; |
| np->data = file; |
| np->metadata.can_interrupt = 1; |
| insert_packets_to_buffer(buffer, np, 0); |
| return 0; |
| } |
| |
| static void buffer_close_w_d(struct Buffer* buffer, int fd) { |
| if (!is_buffer(buffer)) |
| return; |
| if (!buffer->packet) { |
| reactor_close((struct Reactor*)buffer->owner, fd); |
| return; |
| } |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet* packet = buffer->packet; |
| if (!packet) |
| goto finish; |
| while (packet->next) |
| packet = packet->next; |
| packet->metadata.close_after = 1; |
| finish: |
| pthread_mutex_unlock(&buffer->lock); |
| } |
| /** returns the sizes of all the pending data packets, excluding files (yet to |
| * be implemented). */ |
| size_t buffer_pending(struct Buffer* buffer) { |
| if (!is_buffer(buffer)) |
| return 0; |
| size_t len = 0; |
| struct Packet* p; |
| pthread_mutex_lock(&buffer->lock); |
| p = buffer->packet; |
| while (p) { |
| if (p->data && p->length) |
| len += p->length; |
| else if (p->data) |
| len += 1; // if it's a file - can we check it's size? expensive? |
| else |
| break; // no need to move beyond a close connection packet. |
| p = p->next; |
| } |
| len -= buffer->sent; |
| pthread_mutex_unlock(&buffer->lock); |
| return len; |
| } |
| |
| /** returns true (1) if the buffer is empty, otherwise returns false (0). */ |
| char buffer_is_empty(struct Buffer* buffer) { |
| if (!is_buffer(buffer)) |
| return 1; |
| return buffer->packet == NULL; |
| } |
| |
| /****************************************************************************** |
| The API Interface |
| */ |
| |
| const struct BufferClass Buffer = { |
| .create = new_buffer, |
| .destroy = (void (*)(void*))destroy_buffer, |
| .clear = (void (*)(void*))clear_buffer, |
| .set_whook = (void (*)(void*, ssize_t (*)()))set_whook, |
| .sendfile = (int (*)(void*, FILE*))buffer_sendfile, |
| .write = (size_t (*)(void*, void*, size_t))buffer_copy, |
| .write_move = (size_t (*)(void*, void*, size_t))buffer_move, |
| .write_next = (size_t (*)(void*, void*, size_t))buffer_copy_next, |
| .write_move_next = (size_t (*)(void*, void*, size_t))buffer_move_next, |
| .flush = (ssize_t (*)(void*, int))buffer_flush, |
| .close_when_done = (void (*)(void*, int))buffer_close_w_d, |
| .is_empty = (char (*)(void*))buffer_is_empty, |
| }; |