aboutsummaryrefslogtreecommitdiffstats
path: root/server/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/channel.c')
-rw-r--r--server/channel.c253
1 files changed, 150 insertions, 103 deletions
diff --git a/server/channel.c b/server/channel.c
index 2d81abb..a0e2779 100644
--- a/server/channel.c
+++ b/server/channel.c
@@ -1,126 +1,173 @@
#include "channel.h"
+#include <packet.h>
#include <container.h>
-#include <stdlib.h>
#include <errno.h>
+#include <stdlib.h>
-void thread_loop(void) {
- struct channel_handle *channel = channel_init();
- struct kv_packet **recvd_data = array_new(struct kv_packet*, 100);
- struct kv_packet work_buffer;
- size_t recvd_index = 0;
- int recv_flag = MSG_DONTWAIT;
- while (true) {
- struct sockaddr_in client_addr;
- socklen_t client_addr_len; // unused
- int recvlength = recvfrom(channel->sockfd, &work_buffer,
- KV_PACKET_SIZE, recv_flag,
- (struct sockaddr*)&client_addr, &client_addr_len);
- if (recvlength > 0) {
- DEBUGF("rec_id = %i\n", work_buffer.id);
- if (work_buffer.id <= 0) {
- handle_system_packet(&work_buffer, &client_addr, channel);
- continue;
- } else {
- recv_flag |= MSG_DONTWAIT;
- }
- struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE);
- memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE);
- ++recvd_index;
- if (recvd_index >= array_size(recvd_data)) {
- array_push(recvd_data, kv_copy);
- } else {
- recvd_data[recvd_index] = kv_copy;
- }
- } else if (errno == EWOULDBLOCK) {
- if (array_size(recvd_data) == 0) recv_flag &= ~MSG_DONTWAIT;
- send_packets_back(recvd_data, channel);
- clear_packet_array(recvd_data);
- } else {
- perror("thread_loop failed");
- }
+#define decisive_push(array, index, elem) \
+ do { \
+ if (index >= array_size(array)) { \
+ array_push(array, elem); \
+ } else { \
+ array[index] = elem; \
+ } \
+ } while (0)
+
+#if defined(__unix__) && defined(__GNUC__)
+ #define __THREAD __thread
+#elif __STDC_VERSION__ >= 201112L && __STDC_VERSION__ < 202302L
+ #define __THREAD _Thread_local
+#elif __STDC_VERSION__ >= 202302L
+ #define __THREAD thread_local
+#else
+ #pragma GCC error "Use unix with GCC, or C11 or later standards"
+ #define __THREAD "don't compile"
+#endif
+
+struct ch_user {
+ u64 id;
+ u32 ip;
+ u16 port;
+ u64 last_keepalive;
+};
+
+static __THREAD struct hash_set users;
+static __THREAD int sockfd;
+
+static int __user_hset_cmp(const void *a, const void *b) {
+ struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b;
+ return _a->id - _b->id;
+}
+static size_t __user_hset_hsh(const void *a) { return ((struct ch_user *)a)->id; }
+static void clear_packet_array(struct kv_packet **array) {
+ for (size_t i = 0; i < array_size(array); ++i) {
+ if (array[i] == NULL) return;
+ free(array[i]);
+ array[i] = NULL;
}
- array_free(recvd_data);
- channel_uninit(channel);
}
-
-struct channel_handle *channel_init(void) {
- struct sockaddr_in thread_local_address = {
- .sin_family = AF_INET,
- .sin_port = 0,
- .sin_addr = {INADDR_ANY}
- };
- struct channel_handle *handle = NULL;
- char chain_result = /* This is evil, but */
- (handle = (struct channel_handle*)calloc(1, sizeof(struct channel_handle))) != NULL
- && (handle->users = array_new(struct user, 0)) != NULL
- && (handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0
- && bind(handle->sockfd, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) == 0;
+bool channel_init(void) {
+ struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 0, .sin_addr = {INADDR_ANY}};
+ char chain_result = /* This is evil, but who cares */
+ hset_ok(users = hset_new(sizeof(struct ch_user), __user_hset_cmp, __user_hset_hsh)) &&
+ (sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 &&
+ bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0;
if (!chain_result) {
perror("channel init failed");
- if (handle) {
- if (handle->users) array_free(handle->users);
- if (handle->sockfd >= 0) close(handle->sockfd);
- free(handle);
- }
- return NULL;
+ if (hset_ok(users)) hset_free(&users);
+ if (sockfd >= 0) close(sockfd);
+ return false;
}
- DEBUGF("Channel [%i] created\n", handle->sockfd);
- return handle;
+ DEBUGF("Channel [%i] created\n", sockfd);
+ return true;
}
-
-void channel_uninit(struct channel_handle *handle) {
- array_free(handle->users);
- if (close(handle->sockfd) == -1)
- perror("could not gracefully uninitialize channel");
- free(handle);
+static void channel_uninit(void) {
+ hset_free(&users);
+ if (close(sockfd) == -1) perror("could not gracefully uninitialize channel");
+}
+_Noreturn static inline void forced_cleanup(struct kv_packet **packets) {
+ clear_packet_array(packets);
+ array_free(packets);
+ channel_uninit();
+ pthread_exit(NULL);
+}
+static inline void report_socket(const struct thread_loop_arg *arg, int fd) {
+ pthread_mutex_lock(arg->sock_mx);
+ *(arg->sock_dest) = fd;
+ pthread_cond_signal(arg->sock_ready_cond);
+ pthread_mutex_unlock(arg->sock_mx);
}
-void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle) {
- struct kv_system_packet* spacket = (struct kv_system_packet*) packet;
- if (system_packet_checksum(spacket) != spacket->checksum) return;
- switch (spacket->operation_id) {
- case keepalive: TODO;
- case join_channel: TODO;
- case leave_channel: TODO;
- case acknowledgement: TODO;
- default: TODO;
+// Returns whether thread should kill itself
+static bool handle_system_packet(struct kv_packet *packet, struct sockaddr_in *src) {
+ TODO; // account for `ackid`
+ struct kv_system_packet *spacket = (struct kv_system_packet *)packet;
+ if (system_packet_checksum(spacket) != spacket->checksum) return false;
+ switch (ntohl(spacket->operation_id)) {
+ case SYS_KEEPALIVE: {
+ struct ch_user u = {.id = ntoh64(spacket->user_id), 0};
+ struct ch_user *data = hset_at(&users, &u);
+ data->last_keepalive = (int)time(NULL);
+ } break;
+ case SYS_JOIN: {
+ struct ch_user u = {
+ .id = /*ntohzu */ spacket->user_id,
+ .ip = /*ntohl*/ src->sin_addr.s_addr,
+ .port = /*htons*/ src->sin_port,
+ .last_keepalive = (int)time(NULL)};
+ hset_insert_copy(&users, &u);
+ } break;
+ case SYS_LEAVE: {
+ struct ch_user u = {.id = ntoh64(spacket->user_id), 0};
+ hset_remove(&users, &u);
+ } break;
+ case SYS_ACK: return false;
+ case SYS_KYS:
+ // TODO: verify that request was sent by main thread
+ return true;
}
+ return false;
}
-
-void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) {
- for (size_t i = 0; i < array_size(handle->users); ++i) {
- struct user* current_user = &handle->users[i];
+static void send_packets_back(struct kv_packet **packets) {
+ hset_iter iter;
+ for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) {
+ struct ch_user *current_user = hseti_get(&iter);
struct sockaddr_in destination = {
- .sin_family = AF_INET,
- .sin_port = htons(current_user->port),
- .sin_addr = { htonl(current_user->ip) }
- };
+ .sin_family = AF_INET,
+ .sin_port = htons(current_user->port),
+ .sin_addr = {htonl(current_user->ip)}};
for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) {
- DEBUGF("sending packet with id %i", packets[j]->id);
- DEBUGF("to destination: %u.%u.%u.%u:%hu\n",
- (destination.sin_addr.s_addr >> 24) & 0xFF,
- (destination.sin_addr.s_addr >> 16) & 0xFF,
- (destination.sin_addr.s_addr >> 8) & 0xFF,
- destination.sin_addr.s_addr & 0xFF,
- destination.sin_port);
- if (packets[j]->id == current_user->id) continue;
- int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination));
- if (error_code) perror("could not send packets back");
+ DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid,
+ (destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF,
+ (destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF,
+ destination.sin_port);
+ if (packets[j]->uid == current_user->id) continue;
+ int error_code = sendto(
+ sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination,
+ sizeof(destination));
+ if (error_code) perror("could not send packets back");
}
}
}
-void clear_packet_array(struct kv_packet **array) {
- for (size_t i = 0 ; i < array_size(array); ++i) {
- if (array[i] == NULL) return;
- free(array[i]);
- array[i] = NULL;
+/*
+ * An example of how you should NOT write code
+ * Todo: please rewrite this shit
+ */
+void *thread_loop(void *arg) {
+ if (!channel_init()) pthread_exit(NULL);
+ report_socket((struct thread_loop_arg *)arg, sockfd);
+ struct kv_packet **recvd_data = array_new(struct kv_packet *, 100);
+ struct kv_packet work_buffer;
+ size_t recvd_index = 0;
+ int recv_flag = MSG_DONTWAIT;
+ while (true) {
+ struct sockaddr_in client_addr;
+ socklen_t client_addr_len = sizeof(client_addr);
+ ssize_t recvlength = recvfrom(
+ sockfd, &work_buffer, KV_PACKET_SIZE, recv_flag, (struct sockaddr *)&client_addr,
+ &client_addr_len);
+ if (recvlength > 0) {
+ if (is_system_packet(&work_buffer)) {
+ bool kys = handle_system_packet(&work_buffer, &client_addr);
+ if (kys) forced_cleanup(recvd_data);
+ continue;
+ }
+ recv_flag |= MSG_DONTWAIT;
+ struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE);
+ memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE);
+ ++recvd_index;
+ decisive_push(recvd_data, recvd_index, kv_copy);
+ } else if (errno == EWOULDBLOCK) {
+ if (array_size(recvd_data) == 0) {
+ recv_flag &= ~MSG_DONTWAIT;
+ continue;
+ }
+ send_packets_back(recvd_data);
+ clear_packet_array(recvd_data);
+ } else {
+ perror("error in thread_loop");
+ }
}
}
-
-int __user_cmp(const void* a, const void* b) {
- struct user *_a = (struct user*)a,
- *_b = (struct user*)b;
- return _a->id - _b->id;
-}