diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/CMakeLists.txt | 14 | ||||
-rw-r--r-- | server/channel.c | 163 | ||||
-rw-r--r-- | server/channel.h | 62 | ||||
-rw-r--r-- | server/main.c | 62 |
4 files changed, 301 insertions, 0 deletions
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt new file mode 100644 index 0000000..0ba4d8b --- /dev/null +++ b/server/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.20) + +project(kv_server + VERSION 0.0 + DESCRIPTION "A server for kv project." + LANGUAGES C) + +set(SOURCE_FILES main.c channel.c) +set(HEADER_FILES channel.h) + +add_executable(kv_server ${SOURCE_FILES}) +target_compile_definitions(kv_server PUBLIC DEBUG) + + diff --git a/server/channel.c b/server/channel.c new file mode 100644 index 0000000..ca16ce0 --- /dev/null +++ b/server/channel.c @@ -0,0 +1,163 @@ +#include "channel.h" + +#include <container.h> +#include <errno.h> +#include <stdlib.h> + +void thread_loop(void) { + struct channel_handle *channel = channel_init(); + struct kv_packet **recvd_data = array_new(struct kv_packet*, 100); + size_t recvd_index = 0; + struct kv_packet work_buffer; + while (1) { + int recvlength = recvfrom(channel->sockfd, &work_buffer, KV_PACKET_SIZE, 0, NULL, NULL); + if (recvlength > 0) { + DEBUGF("rec_id = %i\n", work_buffer.id); + if (work_buffer.id <= 0) { // <= 0 is for system messages + switch (handle_system_packet(&work_buffer, channel)) { + case do_nothing: continue; + case shutdown_socket: { + clear_packet_array(recvd_data); + send_cancellation_messages(channel); + channel_uninit(channel); + } FALLTHROUGH; + default: break; + } + continue; + } + struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); + memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); + ++recvd_index; + if (recvd_index >= array_size(recvd_data)) { + array_push(recvd_data, kv_copy); + } else { + recvd_data[recvd_index] = kv_copy; + } + } else if (errno == EWOULDBLOCK) { + if (array_size(recvd_data) == 0) { + // TODO: unset O_NONBLOCK and wait for data + assert(0); + } + send_packets_back(recvd_data, channel); + clear_packet_array(recvd_data); + } else { + perror("thread_loop failed"); + } + } + channel_uninit(channel); +} +enum system_operation handle_system_packet(struct kv_packet* packet, struct channel_handle* handle) { + struct kv_system_packet* spacket = (struct kv_system_packet*) packet; + if (system_packet_checksum(spacket) != spacket->checksum) return do_nothing; + switch (spacket->operation_id) { + case do_nothing: return do_nothing; + case join_channel: { + DEBUGF("someone joined the channel: id=%i\n", spacket->user_id); + struct user user = { + .ip = ntohl(spacket->return_address), + .port = ntohs(spacket->return_port), + .id = ntohl(spacket->user_id) + }; + if (!array_binary_search(handle->users, &user, __user_cmp)) { + array_push(handle->users, user); + array_qsort(handle->users, __user_cmp); + } + struct sockaddr_in reply_addr = {.sin_family = AF_INET, .sin_port = spacket->return_port, .sin_addr = {spacket->return_address} }; + struct kv_system_packet reply = {.user_id = 0, .return_address = 0, .return_port = 0, .operation_id = acknowledgement}; + sendto(handle->sockfd, &reply, KV_PACKET_SIZE, 0, (struct sockaddr*)&reply_addr, sizeof(reply_addr)); + } return do_nothing; + case shutdown_socket: return shutdown_socket; + default: return do_nothing; + } +} + +void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) { + /*DEBUGF("");*/ + for (size_t i = 0; i < array_size(handle->users); ++i) { + struct user* current_user = &handle->users[i]; + struct sockaddr_in destination = { + .sin_family = AF_INET, + .sin_port = htons(current_user->port), + .sin_addr = { htonl(current_user->ip) } + }; + for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) { + DEBUGF("sending packet with id %i", packets[j]->id); + DEBUGF("to destination: %u.%u.%u.%u:%hu\n", + (destination.sin_addr.s_addr >> 24) & 0xFF, + (destination.sin_addr.s_addr >> 16) & 0xFF, + (destination.sin_addr.s_addr >> 8) & 0xFF, + destination.sin_addr.s_addr & 0xFF, + destination.sin_port); + if (packets[j]->id == current_user->id) continue; + int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination)); + if (error_code) perror("could not send packets back"); + } + } +} + +unsigned int system_packet_checksum(struct kv_system_packet *packet) { + return (packet->user_id << 8) ^ + (packet->return_port ^ 0x1608) ^ + (packet->operation_id | (177013 << 10)) ^ + (packet->return_address & 0xF0F0F0F0); +} + +struct channel_handle *channel_init(void) { + struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 6969, .sin_addr = {INADDR_ANY}}; + int sock = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP); + if (sock < 0) { + perror("channel_init: create socket"); + return NULL; + } + if (bind(sock, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) < 0) { + close(sock); + perror("channel_init: bind socket"); + return NULL; + }; + struct user *users = array_new(struct user, 0); + if (users == NULL) { + close(sock); + perror("channel_init: create users"); + return NULL; + } + struct channel_handle *ret = (struct channel_handle*)calloc(1, sizeof(struct channel_handle)); + ret->sockfd = sock; + ret->users = users; + DEBUGF("channel initialized\n"); + return ret; +} + +void channel_uninit(struct channel_handle *handle) { + array_free(handle->users); + if (close(handle->sockfd) == -1) perror("could not gracefully uninitialize channel"); + free(handle); +} + +void send_cancellation_messages(struct channel_handle *handle) { + struct kv_system_packet packet = { + .user_id = 0, + .return_address = 0, + .operation_id = shutdown_socket + }; + packet.checksum = system_packet_checksum(&packet); + struct user* users = handle->users; + for (size_t i = 0; i < array_size(users); ++i) { + struct sockaddr_in dest_addr = {.sin_family = AF_INET, .sin_port = htons(users[i].port), .sin_addr = {htonl(users[i].ip)} }; + sendto(handle->sockfd, &packet, KV_PACKET_SIZE, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + } +} + +void clear_packet_array(struct kv_packet **array) { + for (size_t i = 0 ; i < array_size(array); ++i) { + if (array[i] == NULL) return; + free(array[i]); + array[i] = NULL; + } + /*memset(array, 0, array_element_size(array) * array_size(array));*/ +} + +int __user_cmp(const void* a, const void* b) { + struct user *_a = (struct user*)a, + *_b = (struct user*)b; + return _a->id - _b->id; +} diff --git a/server/channel.h b/server/channel.h new file mode 100644 index 0000000..a577638 --- /dev/null +++ b/server/channel.h @@ -0,0 +1,62 @@ +#ifndef KV_SERVER_CHANNEL_H +#define KV_SERVER_CHANNEL_H + +#include <stdbool.h> + +#include <unistd.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/ip.h> + +#ifdef DEBUG +#define DEBUGF(fmt, ...) fprintf(stderr, "DEBUG: %s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__) +#else +#define DEBUGF(fmt, ...) \ + do { } while (0) +#endif + +#define KV_PACKET_SIZE 512 + +enum system_operation { + do_nothing = 0, + join_channel = -1, + shutdown_socket = -2, + acknowledgement = -3, +}; +struct user { + unsigned int ip; + unsigned short port; + int id; +}; +struct channel_handle { + int sockfd; + struct user* users; +}; +struct kv_packet { + int id; + unsigned char data[KV_PACKET_SIZE - sizeof(unsigned int)]; +}; +struct kv_system_packet { + int operation_id; + int user_id; + int return_address; + unsigned short return_port; + unsigned int checksum; + unsigned char sentinel[KV_PACKET_SIZE - 4 * sizeof(int) - sizeof(short)]; +}; + +void thread_loop(void); + +struct channel_handle *channel_init(void); +void channel_uninit(struct channel_handle *handle); + +enum system_operation handle_system_packet(struct kv_packet* packet, struct channel_handle* handle); +unsigned int system_packet_checksum(struct kv_system_packet *packet); +void send_packets_back(struct kv_packet** packets, struct channel_handle *handle); + +void send_cancellation_messages(struct channel_handle *handle) ; +void clear_packet_array(struct kv_packet **array); + +int __user_cmp(const void* a, const void* b); + +#endif // KV_SERVER_CHANNEL_H diff --git a/server/main.c b/server/main.c new file mode 100644 index 0000000..f899f7e --- /dev/null +++ b/server/main.c @@ -0,0 +1,62 @@ +#include <netinet/in.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <stdio.h> + +#define CONTAINER_IMPLEMENTATION +#include <container.h> +#undef CONTAINER_IMPLEMENTATION + +#include "channel.h" + +#define MAIN_PORT 8164 + +enum request_type { + spawn_channel, + get_channels, +}; + +static int* open_sockets; +static int request_socket; + +void init(void) { + open_sockets = array_new(int, 0); + request_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(MAIN_PORT), .sin_addr = {INADDR_ANY}}; + + for (int retries = 0; retries <= 5; ++retries) { + if (bind(request_socket, (struct sockaddr*)&addr, sizeof(addr)) == 0) break; + else { + perror("init (bind)"); + sleep(1); + } + } +} + +enum request_type wait_for_requests(void) { + return spawn_channel; +} + +int spawn_channel_thread(void) { + return 0; +} + +void event_loop(void) { + init(); + while (1) { + enum request_type req = wait_for_requests(); + switch (req) { + case spawn_channel: break; + case get_channels: break; + } + } +} + +int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + thread_loop(); + return 0; +} |