aboutsummaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/CMakeLists.txt14
-rw-r--r--server/channel.c163
-rw-r--r--server/channel.h62
-rw-r--r--server/main.c62
4 files changed, 301 insertions, 0 deletions
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt
new file mode 100644
index 0000000..0ba4d8b
--- /dev/null
+++ b/server/CMakeLists.txt
@@ -0,0 +1,14 @@
+cmake_minimum_required(VERSION 3.20)
+
+project(kv_server
+ VERSION 0.0
+ DESCRIPTION "A server for kv project."
+ LANGUAGES C)
+
+set(SOURCE_FILES main.c channel.c)
+set(HEADER_FILES channel.h)
+
+add_executable(kv_server ${SOURCE_FILES})
+target_compile_definitions(kv_server PUBLIC DEBUG)
+
+
diff --git a/server/channel.c b/server/channel.c
new file mode 100644
index 0000000..ca16ce0
--- /dev/null
+++ b/server/channel.c
@@ -0,0 +1,163 @@
+#include "channel.h"
+
+#include <container.h>
+#include <errno.h>
+#include <stdlib.h>
+
+void thread_loop(void) {
+ struct channel_handle *channel = channel_init();
+ struct kv_packet **recvd_data = array_new(struct kv_packet*, 100);
+ size_t recvd_index = 0;
+ struct kv_packet work_buffer;
+ while (1) {
+ int recvlength = recvfrom(channel->sockfd, &work_buffer, KV_PACKET_SIZE, 0, NULL, NULL);
+ if (recvlength > 0) {
+ DEBUGF("rec_id = %i\n", work_buffer.id);
+ if (work_buffer.id <= 0) { // <= 0 is for system messages
+ switch (handle_system_packet(&work_buffer, channel)) {
+ case do_nothing: continue;
+ case shutdown_socket: {
+ clear_packet_array(recvd_data);
+ send_cancellation_messages(channel);
+ channel_uninit(channel);
+ } FALLTHROUGH;
+ default: break;
+ }
+ continue;
+ }
+ struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE);
+ memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE);
+ ++recvd_index;
+ if (recvd_index >= array_size(recvd_data)) {
+ array_push(recvd_data, kv_copy);
+ } else {
+ recvd_data[recvd_index] = kv_copy;
+ }
+ } else if (errno == EWOULDBLOCK) {
+ if (array_size(recvd_data) == 0) {
+ // TODO: unset O_NONBLOCK and wait for data
+ assert(0);
+ }
+ send_packets_back(recvd_data, channel);
+ clear_packet_array(recvd_data);
+ } else {
+ perror("thread_loop failed");
+ }
+ }
+ channel_uninit(channel);
+}
+enum system_operation handle_system_packet(struct kv_packet* packet, struct channel_handle* handle) {
+ struct kv_system_packet* spacket = (struct kv_system_packet*) packet;
+ if (system_packet_checksum(spacket) != spacket->checksum) return do_nothing;
+ switch (spacket->operation_id) {
+ case do_nothing: return do_nothing;
+ case join_channel: {
+ DEBUGF("someone joined the channel: id=%i\n", spacket->user_id);
+ struct user user = {
+ .ip = ntohl(spacket->return_address),
+ .port = ntohs(spacket->return_port),
+ .id = ntohl(spacket->user_id)
+ };
+ if (!array_binary_search(handle->users, &user, __user_cmp)) {
+ array_push(handle->users, user);
+ array_qsort(handle->users, __user_cmp);
+ }
+ struct sockaddr_in reply_addr = {.sin_family = AF_INET, .sin_port = spacket->return_port, .sin_addr = {spacket->return_address} };
+ struct kv_system_packet reply = {.user_id = 0, .return_address = 0, .return_port = 0, .operation_id = acknowledgement};
+ sendto(handle->sockfd, &reply, KV_PACKET_SIZE, 0, (struct sockaddr*)&reply_addr, sizeof(reply_addr));
+ } return do_nothing;
+ case shutdown_socket: return shutdown_socket;
+ default: return do_nothing;
+ }
+}
+
+void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) {
+ /*DEBUGF("");*/
+ for (size_t i = 0; i < array_size(handle->users); ++i) {
+ struct user* current_user = &handle->users[i];
+ struct sockaddr_in destination = {
+ .sin_family = AF_INET,
+ .sin_port = htons(current_user->port),
+ .sin_addr = { htonl(current_user->ip) }
+ };
+ for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) {
+ DEBUGF("sending packet with id %i", packets[j]->id);
+ DEBUGF("to destination: %u.%u.%u.%u:%hu\n",
+ (destination.sin_addr.s_addr >> 24) & 0xFF,
+ (destination.sin_addr.s_addr >> 16) & 0xFF,
+ (destination.sin_addr.s_addr >> 8) & 0xFF,
+ destination.sin_addr.s_addr & 0xFF,
+ destination.sin_port);
+ if (packets[j]->id == current_user->id) continue;
+ int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination));
+ if (error_code) perror("could not send packets back");
+ }
+ }
+}
+
+unsigned int system_packet_checksum(struct kv_system_packet *packet) {
+ return (packet->user_id << 8) ^
+ (packet->return_port ^ 0x1608) ^
+ (packet->operation_id | (177013 << 10)) ^
+ (packet->return_address & 0xF0F0F0F0);
+}
+
+struct channel_handle *channel_init(void) {
+ struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 6969, .sin_addr = {INADDR_ANY}};
+ int sock = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP);
+ if (sock < 0) {
+ perror("channel_init: create socket");
+ return NULL;
+ }
+ if (bind(sock, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) < 0) {
+ close(sock);
+ perror("channel_init: bind socket");
+ return NULL;
+ };
+ struct user *users = array_new(struct user, 0);
+ if (users == NULL) {
+ close(sock);
+ perror("channel_init: create users");
+ return NULL;
+ }
+ struct channel_handle *ret = (struct channel_handle*)calloc(1, sizeof(struct channel_handle));
+ ret->sockfd = sock;
+ ret->users = users;
+ DEBUGF("channel initialized\n");
+ return ret;
+}
+
+void channel_uninit(struct channel_handle *handle) {
+ array_free(handle->users);
+ if (close(handle->sockfd) == -1) perror("could not gracefully uninitialize channel");
+ free(handle);
+}
+
+void send_cancellation_messages(struct channel_handle *handle) {
+ struct kv_system_packet packet = {
+ .user_id = 0,
+ .return_address = 0,
+ .operation_id = shutdown_socket
+ };
+ packet.checksum = system_packet_checksum(&packet);
+ struct user* users = handle->users;
+ for (size_t i = 0; i < array_size(users); ++i) {
+ struct sockaddr_in dest_addr = {.sin_family = AF_INET, .sin_port = htons(users[i].port), .sin_addr = {htonl(users[i].ip)} };
+ sendto(handle->sockfd, &packet, KV_PACKET_SIZE, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr));
+ }
+}
+
+void clear_packet_array(struct kv_packet **array) {
+ for (size_t i = 0 ; i < array_size(array); ++i) {
+ if (array[i] == NULL) return;
+ free(array[i]);
+ array[i] = NULL;
+ }
+ /*memset(array, 0, array_element_size(array) * array_size(array));*/
+}
+
+int __user_cmp(const void* a, const void* b) {
+ struct user *_a = (struct user*)a,
+ *_b = (struct user*)b;
+ return _a->id - _b->id;
+}
diff --git a/server/channel.h b/server/channel.h
new file mode 100644
index 0000000..a577638
--- /dev/null
+++ b/server/channel.h
@@ -0,0 +1,62 @@
+#ifndef KV_SERVER_CHANNEL_H
+#define KV_SERVER_CHANNEL_H
+
+#include <stdbool.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/ip.h>
+
+#ifdef DEBUG
+#define DEBUGF(fmt, ...) fprintf(stderr, "DEBUG: %s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__)
+#else
+#define DEBUGF(fmt, ...) \
+ do { } while (0)
+#endif
+
+#define KV_PACKET_SIZE 512
+
+enum system_operation {
+ do_nothing = 0,
+ join_channel = -1,
+ shutdown_socket = -2,
+ acknowledgement = -3,
+};
+struct user {
+ unsigned int ip;
+ unsigned short port;
+ int id;
+};
+struct channel_handle {
+ int sockfd;
+ struct user* users;
+};
+struct kv_packet {
+ int id;
+ unsigned char data[KV_PACKET_SIZE - sizeof(unsigned int)];
+};
+struct kv_system_packet {
+ int operation_id;
+ int user_id;
+ int return_address;
+ unsigned short return_port;
+ unsigned int checksum;
+ unsigned char sentinel[KV_PACKET_SIZE - 4 * sizeof(int) - sizeof(short)];
+};
+
+void thread_loop(void);
+
+struct channel_handle *channel_init(void);
+void channel_uninit(struct channel_handle *handle);
+
+enum system_operation handle_system_packet(struct kv_packet* packet, struct channel_handle* handle);
+unsigned int system_packet_checksum(struct kv_system_packet *packet);
+void send_packets_back(struct kv_packet** packets, struct channel_handle *handle);
+
+void send_cancellation_messages(struct channel_handle *handle) ;
+void clear_packet_array(struct kv_packet **array);
+
+int __user_cmp(const void* a, const void* b);
+
+#endif // KV_SERVER_CHANNEL_H
diff --git a/server/main.c b/server/main.c
new file mode 100644
index 0000000..f899f7e
--- /dev/null
+++ b/server/main.c
@@ -0,0 +1,62 @@
+#include <netinet/in.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <stdio.h>
+
+#define CONTAINER_IMPLEMENTATION
+#include <container.h>
+#undef CONTAINER_IMPLEMENTATION
+
+#include "channel.h"
+
+#define MAIN_PORT 8164
+
+enum request_type {
+ spawn_channel,
+ get_channels,
+};
+
+static int* open_sockets;
+static int request_socket;
+
+void init(void) {
+ open_sockets = array_new(int, 0);
+ request_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(MAIN_PORT), .sin_addr = {INADDR_ANY}};
+
+ for (int retries = 0; retries <= 5; ++retries) {
+ if (bind(request_socket, (struct sockaddr*)&addr, sizeof(addr)) == 0) break;
+ else {
+ perror("init (bind)");
+ sleep(1);
+ }
+ }
+}
+
+enum request_type wait_for_requests(void) {
+ return spawn_channel;
+}
+
+int spawn_channel_thread(void) {
+ return 0;
+}
+
+void event_loop(void) {
+ init();
+ while (1) {
+ enum request_type req = wait_for_requests();
+ switch (req) {
+ case spawn_channel: break;
+ case get_channels: break;
+ }
+ }
+}
+
+int main(int argc, char *argv[]) {
+ (void)argc;
+ (void)argv;
+ thread_loop();
+ return 0;
+}