| /* |
| copyright: Boaz segev, 2015 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #include "libbuffer.h" |
| #include <string.h> |
| #include <pthread.h> |
| #include <unistd.h> |
| #include <errno.h> |
| /////////////////// |
| // The buffer class ID |
| static const void* BufferClassID; |
| |
| /////////////////// |
| // The packets |
| struct Packet { |
| size_t length; |
| struct Packet* next; |
| void* data; |
| }; |
| |
| /////////////////// |
| // The global packet container pool |
| static struct { |
| int count; |
| struct Packet* pool; |
| } ContainerPool = {0, 0}; |
| |
| static pthread_mutex_t container_pool_locker = PTHREAD_MUTEX_INITIALIZER; |
| |
| static void register_buffer(void) { |
| pthread_mutex_lock(&container_pool_locker); |
| ContainerPool.count++; |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| |
| static void unregister_buffer(void) { |
| pthread_mutex_lock(&container_pool_locker); |
| ContainerPool.count--; |
| if (!ContainerPool.count) { |
| struct Packet* to_free; |
| while ((to_free = ContainerPool.pool)) { |
| ContainerPool.pool = to_free->next; |
| free(to_free); |
| } |
| } |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| |
| static struct Packet* get_packet(void) { |
| struct Packet* packet; |
| pthread_mutex_lock(&container_pool_locker); |
| packet = ContainerPool.pool; |
| if (packet) |
| ContainerPool.pool = packet->next; |
| else |
| packet = malloc(sizeof(struct Packet)); |
| pthread_mutex_unlock(&container_pool_locker); |
| if (!packet) |
| return 0; |
| packet->data = NULL; |
| packet->next = 0; |
| packet->length = 0; |
| return packet; |
| } |
| |
| static void free_packet(struct Packet* packet) { |
| if (packet->data) { |
| if (packet->length) |
| free(packet->data); |
| else |
| fclose(packet->data); |
| } |
| pthread_mutex_lock(&container_pool_locker); |
| packet->next = ContainerPool.pool; |
| ContainerPool.pool = packet; |
| pthread_mutex_unlock(&container_pool_locker); |
| } |
| /////////////////// |
| // The buffer structor |
| struct Buffer { // 88 bytes pet buffer |
| void* id; |
| // a data locker. |
| pthread_mutex_t lock; |
| // pointer to the actual data. |
| struct Packet* packet; |
| // the amount of data sent from the first packet |
| size_t sent; |
| }; |
| |
| /////////////////// |
| // helpers |
| |
| static inline int is_buffer(struct Buffer* object) { |
| // if (object->id != &BufferClassID) |
| // printf("ERROR, Buffer received a non buffer object\n"); |
| return object->id == &BufferClassID; |
| } |
| |
| /////////////////// |
| // The functions |
| |
| static inline void* new_buffer(size_t offset) { |
| struct Buffer* buffer = malloc(sizeof(struct Buffer)); |
| if (!buffer) |
| return 0; |
| *buffer = (struct Buffer){//.lock = PTHREAD_MUTEX_INITIALIZER, |
| .id = &BufferClassID, |
| .sent = offset, |
| .packet = NULL}; |
| |
| if (pthread_mutex_init(&buffer->lock, NULL)) { |
| free(buffer); |
| return 0; |
| } |
| register_buffer(); |
| return buffer; |
| } |
| |
| // clears all the buffer data |
| static inline void clear_buffer(struct Buffer* buffer) { |
| if (is_buffer(buffer)) { |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet* to_free = NULL; |
| while ((to_free = buffer->packet)) { |
| buffer->packet = buffer->packet->next; |
| free_packet(to_free); |
| } |
| pthread_mutex_unlock(&buffer->lock); |
| } |
| } |
| |
| // destroys the buffer |
| static inline void destroy_buffer(struct Buffer* buffer) { |
| if (is_buffer(buffer)) { |
| clear_buffer(buffer); |
| pthread_mutex_destroy(&buffer->lock); |
| free(buffer); |
| unregister_buffer(); |
| } |
| } |
| |
| // takes data and places it into the end of the buffer |
| static size_t buffer_move(struct Buffer* buffer, void* data, size_t length) { |
| if (!is_buffer(buffer)) |
| return 0; |
| struct Packet* np = get_packet(); |
| if (!np) |
| return 0; |
| np->data = data; |
| np->length = length; |
| np->next = NULL; |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet** pos = &buffer->packet; |
| while (*pos) { |
| pos = &(*pos)->next; |
| } |
| *pos = np; |
| pthread_mutex_unlock(&buffer->lock); |
| return length; |
| } |
| |
| // takes data, copies it and pushes it into the buffer |
| static size_t buffer_copy(struct Buffer* buffer, void* data, size_t length) { |
| void* cpy = NULL; |
| if (data && length) { |
| cpy = malloc(length); |
| if (!cpy) |
| return 0; |
| memcpy(cpy, data, length); |
| } |
| if (!buffer_move(buffer, cpy, length)) { |
| if (cpy) |
| free(cpy); |
| return 0; |
| } |
| return length; |
| } |
| |
| // urgent buffer logic |
| static size_t buffer_next_logic(struct Buffer* buffer, |
| void* data, |
| size_t length, |
| char copy) { |
| if (!is_buffer(buffer)) |
| return 0; |
| struct Packet* np = get_packet(); |
| if (!np) |
| return 0; |
| |
| if (copy) { |
| np->data = malloc(length); |
| if (!np->data) { |
| free(np); |
| return 0; |
| } |
| memcpy(np->data, data, length); |
| } else { |
| np->data = data; |
| } |
| np->length = length; |
| |
| pthread_mutex_lock(&buffer->lock); |
| struct Packet** pos = &buffer->packet; |
| // if the next packet's length is 0, it is a file packet. |
| // file packets insert packets before themselves... so we must wait. |
| if (buffer->packet && buffer->packet->next && !buffer->packet->next->length) |
| pos = &buffer->packet->next->next; |
| // never interrupt a packet in the middle. |
| else if (buffer->sent && buffer->packet) |
| pos = &buffer->packet->next; |
| if (*pos) |
| np->next = (*pos)->next; |
| else |
| np->next = 0; |
| (*pos) = np; |
| pthread_mutex_unlock(&buffer->lock); |
| return length; |
| } |
| |
| // takes data, copies it, and places it at the front of the buffer |
| static size_t buffer_next(struct Buffer* buffer, void* data, size_t length) { |
| return buffer_next_logic(buffer, data, length, 1); |
| } |
| |
| // takes data, and places it at the front of the buffer |
| static size_t buffer_move_next(struct Buffer* buffer, |
| void* data, |
| size_t length) { |
| return buffer_next_logic(buffer, data, length, 0); |
| } |
| |
| static ssize_t buffer_flush(struct Buffer* buffer, int fd) { |
| if (!is_buffer(buffer)) |
| return -1; |
| struct Packet* to_free; |
| ssize_t sent = 0; |
| pthread_mutex_lock(&buffer->lock); |
| start_flush: |
| // no packets to send |
| if (!buffer->packet) { |
| pthread_mutex_unlock(&buffer->lock); |
| return 0; |
| } |
| // a NULL packet (data is NULL) means: "Close the connection" |
| if (!buffer->packet->data) { |
| close(fd); |
| goto clear_buffer; |
| } |
| // a Packet with data but no length is a FILE * to be sent |
| if (!buffer->packet->length) { |
| // allocate buffer of 65,536 Bytes |
| struct Packet* np = get_packet(); |
| if (!np) |
| goto skip_file; |
| np->data = malloc(65536); |
| if (!np->data) { |
| free_packet(np); |
| goto skip_file; |
| } |
| size_t read_len = fread(np->data, 1, 65536, (FILE*)buffer->packet->data); |
| if (read_len <= 0) { |
| free(np->data); |
| free_packet(np); |
| goto skip_file; |
| } |
| np->length = read_len; |
| // insert the new packet (np) before the file packet and switch references |
| np->next = buffer->packet; |
| buffer->packet = np; |
| if (feof((FILE*)buffer->packet->next->data)) { |
| // we have reached the end of the file... |
| // np will now hold the file packet |
| np = buffer->packet->next; |
| // we remove the file packet from the chain |
| buffer->packet->next = np->next; |
| // we free the file packet |
| free_packet(np); |
| } |
| goto start_flush; |
| skip_file: |
| np = buffer->packet; |
| buffer->packet = buffer->packet->next; |
| free_packet(np); |
| goto start_flush; |
| } |
| sent = write(fd, buffer->packet->data + buffer->sent, |
| buffer->packet->length - buffer->sent); |
| if (sent < 0 && !(errno & (EWOULDBLOCK | EAGAIN))) { |
| return -1; |
| } else if (sent > 0) { |
| buffer->sent += sent; |
| } |
| if (buffer->sent >= buffer->packet->length) { |
| to_free = buffer->packet; |
| buffer->sent = 0; |
| buffer->packet = buffer->packet->next; |
| free_packet(to_free); |
| // a NULL packet (data is NULL) means: "Close the connection" |
| if (buffer->packet && !buffer->packet->data) { |
| close(fd); |
| goto clear_buffer; |
| } |
| } |
| pthread_mutex_unlock(&buffer->lock); |
| return sent; |
| clear_buffer: |
| while (buffer->packet) { |
| to_free = buffer->packet; |
| buffer->packet = buffer->packet->next; |
| free_packet(to_free); |
| } |
| pthread_mutex_unlock(&buffer->lock); |
| return sent; |
| } |
| |
| static int buffer_sendfile(struct Buffer* buffer, FILE* file) { |
| if (!is_buffer(buffer)) |
| return -1; |
| return buffer_move(buffer, file, 0); |
| } |
| |
| static void buffer_close_w_d(struct Buffer* buffer, int fd) { |
| if (!is_buffer(buffer)) |
| return; |
| if (!buffer->packet) |
| close(fd); |
| else |
| buffer_move(buffer, NULL, fd); |
| } |
| /** returns the sizes of all the pending data packets, excluding files (yet to |
| * be implemented). */ |
| size_t buffer_pending(struct Buffer* buffer) { |
| if (!is_buffer(buffer)) |
| return 0; |
| size_t len = 0; |
| struct Packet* p; |
| pthread_mutex_lock(&buffer->lock); |
| p = buffer->packet; |
| while (p) { |
| if (p->data && p->length) |
| len += p->length; |
| else if (p->data) |
| len += 1; // if it's a file - can we check it's size? expensive? |
| else |
| break; // no need to move beyond a close connection packet. |
| p = p->next; |
| } |
| len -= buffer->sent; |
| pthread_mutex_unlock(&buffer->lock); |
| return len; |
| } |
| |
| /** returns true (1) if the buffer is empty, otherwise returns false (0). */ |
| char buffer_empty(struct Buffer* buffer) { |
| if (!is_buffer(buffer)) |
| return 1; |
| return buffer->packet == NULL; |
| } |
| |
| /////////////////// |
| // The interface |
| |
| const struct BufferClass Buffer = { |
| .new = new_buffer, |
| .destroy = (void (*)(void*))destroy_buffer, |
| .clear = (void (*)(void*))clear_buffer, |
| .sendfile = (int (*)(void*, FILE*))buffer_sendfile, |
| .write = (size_t (*)(void*, void*, size_t))buffer_copy, |
| .write_move = (size_t (*)(void*, void*, size_t))buffer_move, |
| .write_next = (size_t (*)(void*, void*, size_t))buffer_next, |
| .write_move_next = (size_t (*)(void*, void*, size_t))buffer_move_next, |
| .flush = (ssize_t (*)(void*, int))buffer_flush, |
| .close_when_done = (void (*)(void*, int))buffer_close_w_d, |
| .pending = (size_t (*)(void*))buffer_pending, |
| .empty = (char (*)(void*))buffer_empty, |
| }; |