diff options
Diffstat (limited to 'server/channel.c')
-rw-r--r-- | server/channel.c | 253 |
1 files changed, 150 insertions, 103 deletions
diff --git a/server/channel.c b/server/channel.c index 2d81abb..a0e2779 100644 --- a/server/channel.c +++ b/server/channel.c @@ -1,126 +1,173 @@ #include "channel.h" +#include <packet.h> #include <container.h> -#include <stdlib.h> #include <errno.h> +#include <stdlib.h> -void thread_loop(void) { - struct channel_handle *channel = channel_init(); - struct kv_packet **recvd_data = array_new(struct kv_packet*, 100); - struct kv_packet work_buffer; - size_t recvd_index = 0; - int recv_flag = MSG_DONTWAIT; - while (true) { - struct sockaddr_in client_addr; - socklen_t client_addr_len; // unused - int recvlength = recvfrom(channel->sockfd, &work_buffer, - KV_PACKET_SIZE, recv_flag, - (struct sockaddr*)&client_addr, &client_addr_len); - if (recvlength > 0) { - DEBUGF("rec_id = %i\n", work_buffer.id); - if (work_buffer.id <= 0) { - handle_system_packet(&work_buffer, &client_addr, channel); - continue; - } else { - recv_flag |= MSG_DONTWAIT; - } - struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); - memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); - ++recvd_index; - if (recvd_index >= array_size(recvd_data)) { - array_push(recvd_data, kv_copy); - } else { - recvd_data[recvd_index] = kv_copy; - } - } else if (errno == EWOULDBLOCK) { - if (array_size(recvd_data) == 0) recv_flag &= ~MSG_DONTWAIT; - send_packets_back(recvd_data, channel); - clear_packet_array(recvd_data); - } else { - perror("thread_loop failed"); - } +#define decisive_push(array, index, elem) \ + do { \ + if (index >= array_size(array)) { \ + array_push(array, elem); \ + } else { \ + array[index] = elem; \ + } \ + } while (0) + +#if defined(__unix__) && defined(__GNUC__) + #define __THREAD __thread +#elif __STDC_VERSION__ >= 201112L && __STDC_VERSION__ < 202302L + #define __THREAD _Thread_local +#elif __STDC_VERSION__ >= 202302L + #define __THREAD thread_local +#else + #pragma GCC error "Use unix with GCC, or C11 or later standards" + #define __THREAD "don't compile" +#endif + +struct ch_user { + u64 id; + u32 ip; + u16 port; + u64 last_keepalive; +}; + +static __THREAD struct hash_set users; +static __THREAD int sockfd; + +static int __user_hset_cmp(const void *a, const void *b) { + struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b; + return _a->id - _b->id; +} +static size_t __user_hset_hsh(const void *a) { return ((struct ch_user *)a)->id; } +static void clear_packet_array(struct kv_packet **array) { + for (size_t i = 0; i < array_size(array); ++i) { + if (array[i] == NULL) return; + free(array[i]); + array[i] = NULL; } - array_free(recvd_data); - channel_uninit(channel); } - -struct channel_handle *channel_init(void) { - struct sockaddr_in thread_local_address = { - .sin_family = AF_INET, - .sin_port = 0, - .sin_addr = {INADDR_ANY} - }; - struct channel_handle *handle = NULL; - char chain_result = /* This is evil, but */ - (handle = (struct channel_handle*)calloc(1, sizeof(struct channel_handle))) != NULL - && (handle->users = array_new(struct user, 0)) != NULL - && (handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 - && bind(handle->sockfd, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) == 0; +bool channel_init(void) { + struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 0, .sin_addr = {INADDR_ANY}}; + char chain_result = /* This is evil, but who cares */ + hset_ok(users = hset_new(sizeof(struct ch_user), __user_hset_cmp, __user_hset_hsh)) && + (sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 && + bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0; if (!chain_result) { perror("channel init failed"); - if (handle) { - if (handle->users) array_free(handle->users); - if (handle->sockfd >= 0) close(handle->sockfd); - free(handle); - } - return NULL; + if (hset_ok(users)) hset_free(&users); + if (sockfd >= 0) close(sockfd); + return false; } - DEBUGF("Channel [%i] created\n", handle->sockfd); - return handle; + DEBUGF("Channel [%i] created\n", sockfd); + return true; } - -void channel_uninit(struct channel_handle *handle) { - array_free(handle->users); - if (close(handle->sockfd) == -1) - perror("could not gracefully uninitialize channel"); - free(handle); +static void channel_uninit(void) { + hset_free(&users); + if (close(sockfd) == -1) perror("could not gracefully uninitialize channel"); +} +_Noreturn static inline void forced_cleanup(struct kv_packet **packets) { + clear_packet_array(packets); + array_free(packets); + channel_uninit(); + pthread_exit(NULL); +} +static inline void report_socket(const struct thread_loop_arg *arg, int fd) { + pthread_mutex_lock(arg->sock_mx); + *(arg->sock_dest) = fd; + pthread_cond_signal(arg->sock_ready_cond); + pthread_mutex_unlock(arg->sock_mx); } -void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle) { - struct kv_system_packet* spacket = (struct kv_system_packet*) packet; - if (system_packet_checksum(spacket) != spacket->checksum) return; - switch (spacket->operation_id) { - case keepalive: TODO; - case join_channel: TODO; - case leave_channel: TODO; - case acknowledgement: TODO; - default: TODO; +// Returns whether thread should kill itself +static bool handle_system_packet(struct kv_packet *packet, struct sockaddr_in *src) { + TODO; // account for `ackid` + struct kv_system_packet *spacket = (struct kv_system_packet *)packet; + if (system_packet_checksum(spacket) != spacket->checksum) return false; + switch (ntohl(spacket->operation_id)) { + case SYS_KEEPALIVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + struct ch_user *data = hset_at(&users, &u); + data->last_keepalive = (int)time(NULL); + } break; + case SYS_JOIN: { + struct ch_user u = { + .id = /*ntohzu */ spacket->user_id, + .ip = /*ntohl*/ src->sin_addr.s_addr, + .port = /*htons*/ src->sin_port, + .last_keepalive = (int)time(NULL)}; + hset_insert_copy(&users, &u); + } break; + case SYS_LEAVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + hset_remove(&users, &u); + } break; + case SYS_ACK: return false; + case SYS_KYS: + // TODO: verify that request was sent by main thread + return true; } + return false; } - -void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) { - for (size_t i = 0; i < array_size(handle->users); ++i) { - struct user* current_user = &handle->users[i]; +static void send_packets_back(struct kv_packet **packets) { + hset_iter iter; + for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct ch_user *current_user = hseti_get(&iter); struct sockaddr_in destination = { - .sin_family = AF_INET, - .sin_port = htons(current_user->port), - .sin_addr = { htonl(current_user->ip) } - }; + .sin_family = AF_INET, + .sin_port = htons(current_user->port), + .sin_addr = {htonl(current_user->ip)}}; for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) { - DEBUGF("sending packet with id %i", packets[j]->id); - DEBUGF("to destination: %u.%u.%u.%u:%hu\n", - (destination.sin_addr.s_addr >> 24) & 0xFF, - (destination.sin_addr.s_addr >> 16) & 0xFF, - (destination.sin_addr.s_addr >> 8) & 0xFF, - destination.sin_addr.s_addr & 0xFF, - destination.sin_port); - if (packets[j]->id == current_user->id) continue; - int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination)); - if (error_code) perror("could not send packets back"); + DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid, + (destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF, + (destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF, + destination.sin_port); + if (packets[j]->uid == current_user->id) continue; + int error_code = sendto( + sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination, + sizeof(destination)); + if (error_code) perror("could not send packets back"); } } } -void clear_packet_array(struct kv_packet **array) { - for (size_t i = 0 ; i < array_size(array); ++i) { - if (array[i] == NULL) return; - free(array[i]); - array[i] = NULL; +/* + * An example of how you should NOT write code + * Todo: please rewrite this shit + */ +void *thread_loop(void *arg) { + if (!channel_init()) pthread_exit(NULL); + report_socket((struct thread_loop_arg *)arg, sockfd); + struct kv_packet **recvd_data = array_new(struct kv_packet *, 100); + struct kv_packet work_buffer; + size_t recvd_index = 0; + int recv_flag = MSG_DONTWAIT; + while (true) { + struct sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + ssize_t recvlength = recvfrom( + sockfd, &work_buffer, KV_PACKET_SIZE, recv_flag, (struct sockaddr *)&client_addr, + &client_addr_len); + if (recvlength > 0) { + if (is_system_packet(&work_buffer)) { + bool kys = handle_system_packet(&work_buffer, &client_addr); + if (kys) forced_cleanup(recvd_data); + continue; + } + recv_flag |= MSG_DONTWAIT; + struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); + memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); + ++recvd_index; + decisive_push(recvd_data, recvd_index, kv_copy); + } else if (errno == EWOULDBLOCK) { + if (array_size(recvd_data) == 0) { + recv_flag &= ~MSG_DONTWAIT; + continue; + } + send_packets_back(recvd_data); + clear_packet_array(recvd_data); + } else { + perror("error in thread_loop"); + } } } - -int __user_cmp(const void* a, const void* b) { - struct user *_a = (struct user*)a, - *_b = (struct user*)b; - return _a->id - _b->id; -} |