From 3eeee14d5d5c93ae3d156aabae5a96d1c09f185a Mon Sep 17 00:00:00 2001 From: justanothercatgirl Date: Thu, 4 Jul 2024 20:49:53 +0300 Subject: Renamed types, migrated to make, changed directory hierarchy --- server/CMakeLists.txt | 11 +- server/channel.c | 253 ++++++++++++++++++++++++++------------------ server/channel.h | 37 +++---- server/main.c | 62 +++-------- server/tcp.c | 285 ++++++++++++++++++++++++++++++++++++++++++++++++++ server/tcp.h | 35 +++++++ 6 files changed, 508 insertions(+), 175 deletions(-) create mode 100644 server/tcp.c create mode 100644 server/tcp.h (limited to 'server') diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 0ba4d8b..f63f897 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,3 +1,5 @@ +# CMake files will be removed in the next several commits + cmake_minimum_required(VERSION 3.20) project(kv_server @@ -5,10 +7,15 @@ project(kv_server DESCRIPTION "A server for kv project." LANGUAGES C) -set(SOURCE_FILES main.c channel.c) -set(HEADER_FILES channel.h) +set(SOURCE_FILES main.c channel.c tcp.c ../include/packet.c) +set(HEADER_FILES channel.h tcp.h) add_executable(kv_server ${SOURCE_FILES}) target_compile_definitions(kv_server PUBLIC DEBUG) +target_link_libraries(kv_server + PUBLIC crypto + PUBLIC pthread +) + diff --git a/server/channel.c b/server/channel.c index 2d81abb..a0e2779 100644 --- a/server/channel.c +++ b/server/channel.c @@ -1,126 +1,173 @@ #include "channel.h" +#include #include -#include #include +#include -void thread_loop(void) { - struct channel_handle *channel = channel_init(); - struct kv_packet **recvd_data = array_new(struct kv_packet*, 100); - struct kv_packet work_buffer; - size_t recvd_index = 0; - int recv_flag = MSG_DONTWAIT; - while (true) { - struct sockaddr_in client_addr; - socklen_t client_addr_len; // unused - int recvlength = recvfrom(channel->sockfd, &work_buffer, - KV_PACKET_SIZE, recv_flag, - (struct sockaddr*)&client_addr, &client_addr_len); - if (recvlength > 0) { - DEBUGF("rec_id = %i\n", work_buffer.id); - if (work_buffer.id <= 0) { - handle_system_packet(&work_buffer, &client_addr, channel); - continue; - } else { - recv_flag |= MSG_DONTWAIT; - } - struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); - memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); - ++recvd_index; - if (recvd_index >= array_size(recvd_data)) { - array_push(recvd_data, kv_copy); - } else { - recvd_data[recvd_index] = kv_copy; - } - } else if (errno == EWOULDBLOCK) { - if (array_size(recvd_data) == 0) recv_flag &= ~MSG_DONTWAIT; - send_packets_back(recvd_data, channel); - clear_packet_array(recvd_data); - } else { - perror("thread_loop failed"); - } +#define decisive_push(array, index, elem) \ + do { \ + if (index >= array_size(array)) { \ + array_push(array, elem); \ + } else { \ + array[index] = elem; \ + } \ + } while (0) + +#if defined(__unix__) && defined(__GNUC__) + #define __THREAD __thread +#elif __STDC_VERSION__ >= 201112L && __STDC_VERSION__ < 202302L + #define __THREAD _Thread_local +#elif __STDC_VERSION__ >= 202302L + #define __THREAD thread_local +#else + #pragma GCC error "Use unix with GCC, or C11 or later standards" + #define __THREAD "don't compile" +#endif + +struct ch_user { + u64 id; + u32 ip; + u16 port; + u64 last_keepalive; +}; + +static __THREAD struct hash_set users; +static __THREAD int sockfd; + +static int __user_hset_cmp(const void *a, const void *b) { + struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b; + return _a->id - _b->id; +} +static size_t __user_hset_hsh(const void *a) { return ((struct ch_user *)a)->id; } +static void clear_packet_array(struct kv_packet **array) { + for (size_t i = 0; i < array_size(array); ++i) { + if (array[i] == NULL) return; + free(array[i]); + array[i] = NULL; } - array_free(recvd_data); - channel_uninit(channel); } - -struct channel_handle *channel_init(void) { - struct sockaddr_in thread_local_address = { - .sin_family = AF_INET, - .sin_port = 0, - .sin_addr = {INADDR_ANY} - }; - struct channel_handle *handle = NULL; - char chain_result = /* This is evil, but */ - (handle = (struct channel_handle*)calloc(1, sizeof(struct channel_handle))) != NULL - && (handle->users = array_new(struct user, 0)) != NULL - && (handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 - && bind(handle->sockfd, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) == 0; +bool channel_init(void) { + struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 0, .sin_addr = {INADDR_ANY}}; + char chain_result = /* This is evil, but who cares */ + hset_ok(users = hset_new(sizeof(struct ch_user), __user_hset_cmp, __user_hset_hsh)) && + (sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 && + bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0; if (!chain_result) { perror("channel init failed"); - if (handle) { - if (handle->users) array_free(handle->users); - if (handle->sockfd >= 0) close(handle->sockfd); - free(handle); - } - return NULL; + if (hset_ok(users)) hset_free(&users); + if (sockfd >= 0) close(sockfd); + return false; } - DEBUGF("Channel [%i] created\n", handle->sockfd); - return handle; + DEBUGF("Channel [%i] created\n", sockfd); + return true; } - -void channel_uninit(struct channel_handle *handle) { - array_free(handle->users); - if (close(handle->sockfd) == -1) - perror("could not gracefully uninitialize channel"); - free(handle); +static void channel_uninit(void) { + hset_free(&users); + if (close(sockfd) == -1) perror("could not gracefully uninitialize channel"); +} +_Noreturn static inline void forced_cleanup(struct kv_packet **packets) { + clear_packet_array(packets); + array_free(packets); + channel_uninit(); + pthread_exit(NULL); +} +static inline void report_socket(const struct thread_loop_arg *arg, int fd) { + pthread_mutex_lock(arg->sock_mx); + *(arg->sock_dest) = fd; + pthread_cond_signal(arg->sock_ready_cond); + pthread_mutex_unlock(arg->sock_mx); } -void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle) { - struct kv_system_packet* spacket = (struct kv_system_packet*) packet; - if (system_packet_checksum(spacket) != spacket->checksum) return; - switch (spacket->operation_id) { - case keepalive: TODO; - case join_channel: TODO; - case leave_channel: TODO; - case acknowledgement: TODO; - default: TODO; +// Returns whether thread should kill itself +static bool handle_system_packet(struct kv_packet *packet, struct sockaddr_in *src) { + TODO; // account for `ackid` + struct kv_system_packet *spacket = (struct kv_system_packet *)packet; + if (system_packet_checksum(spacket) != spacket->checksum) return false; + switch (ntohl(spacket->operation_id)) { + case SYS_KEEPALIVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + struct ch_user *data = hset_at(&users, &u); + data->last_keepalive = (int)time(NULL); + } break; + case SYS_JOIN: { + struct ch_user u = { + .id = /*ntohzu */ spacket->user_id, + .ip = /*ntohl*/ src->sin_addr.s_addr, + .port = /*htons*/ src->sin_port, + .last_keepalive = (int)time(NULL)}; + hset_insert_copy(&users, &u); + } break; + case SYS_LEAVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + hset_remove(&users, &u); + } break; + case SYS_ACK: return false; + case SYS_KYS: + // TODO: verify that request was sent by main thread + return true; } + return false; } - -void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) { - for (size_t i = 0; i < array_size(handle->users); ++i) { - struct user* current_user = &handle->users[i]; +static void send_packets_back(struct kv_packet **packets) { + hset_iter iter; + for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct ch_user *current_user = hseti_get(&iter); struct sockaddr_in destination = { - .sin_family = AF_INET, - .sin_port = htons(current_user->port), - .sin_addr = { htonl(current_user->ip) } - }; + .sin_family = AF_INET, + .sin_port = htons(current_user->port), + .sin_addr = {htonl(current_user->ip)}}; for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) { - DEBUGF("sending packet with id %i", packets[j]->id); - DEBUGF("to destination: %u.%u.%u.%u:%hu\n", - (destination.sin_addr.s_addr >> 24) & 0xFF, - (destination.sin_addr.s_addr >> 16) & 0xFF, - (destination.sin_addr.s_addr >> 8) & 0xFF, - destination.sin_addr.s_addr & 0xFF, - destination.sin_port); - if (packets[j]->id == current_user->id) continue; - int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination)); - if (error_code) perror("could not send packets back"); + DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid, + (destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF, + (destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF, + destination.sin_port); + if (packets[j]->uid == current_user->id) continue; + int error_code = sendto( + sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination, + sizeof(destination)); + if (error_code) perror("could not send packets back"); } } } -void clear_packet_array(struct kv_packet **array) { - for (size_t i = 0 ; i < array_size(array); ++i) { - if (array[i] == NULL) return; - free(array[i]); - array[i] = NULL; +/* + * An example of how you should NOT write code + * Todo: please rewrite this shit + */ +void *thread_loop(void *arg) { + if (!channel_init()) pthread_exit(NULL); + report_socket((struct thread_loop_arg *)arg, sockfd); + struct kv_packet **recvd_data = array_new(struct kv_packet *, 100); + struct kv_packet work_buffer; + size_t recvd_index = 0; + int recv_flag = MSG_DONTWAIT; + while (true) { + struct sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + ssize_t recvlength = recvfrom( + sockfd, &work_buffer, KV_PACKET_SIZE, recv_flag, (struct sockaddr *)&client_addr, + &client_addr_len); + if (recvlength > 0) { + if (is_system_packet(&work_buffer)) { + bool kys = handle_system_packet(&work_buffer, &client_addr); + if (kys) forced_cleanup(recvd_data); + continue; + } + recv_flag |= MSG_DONTWAIT; + struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); + memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); + ++recvd_index; + decisive_push(recvd_data, recvd_index, kv_copy); + } else if (errno == EWOULDBLOCK) { + if (array_size(recvd_data) == 0) { + recv_flag &= ~MSG_DONTWAIT; + continue; + } + send_packets_back(recvd_data); + clear_packet_array(recvd_data); + } else { + perror("error in thread_loop"); + } } } - -int __user_cmp(const void* a, const void* b) { - struct user *_a = (struct user*)a, - *_b = (struct user*)b; - return _a->id - _b->id; -} diff --git a/server/channel.h b/server/channel.h index 3916212..8068a4c 100644 --- a/server/channel.h +++ b/server/channel.h @@ -3,36 +3,25 @@ #include -#include -#include -#include #include - #include +#include +#include +#include +#include -struct user { - long id; - unsigned int ip; - unsigned short port; - unsigned long last_keepalive; -}; -int __user_cmp(const void* a, const void* b); +#include -struct channel_handle { - int sockfd; - struct user* users; +/// Required for the calling thread to set socket file descriptor +struct thread_loop_arg { + int *sock_dest; + pthread_mutex_t *sock_mx; + pthread_cond_t *sock_ready_cond; + u64 owner; + const unsigned char *pubkey; }; // main function that manages every channel -void thread_loop(void); - -struct channel_handle *channel_init(void); -void channel_uninit(struct channel_handle *handle); - -void send_packets_back(struct kv_packet** packets, struct channel_handle *handle); -void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle); - -void clear_packet_array(struct kv_packet **array); - +void *thread_loop(void *); #endif // KV_SERVER_CHANNEL_H diff --git a/server/main.c b/server/main.c index f899f7e..b6745e0 100644 --- a/server/main.c +++ b/server/main.c @@ -1,62 +1,32 @@ -#include -#include -#include -#include - -#include - +#include #define CONTAINER_IMPLEMENTATION +#define HSET_MAX_BUCKET_SIZE 4 #include #undef CONTAINER_IMPLEMENTATION -#include "channel.h" - -#define MAIN_PORT 8164 +#include "tcp.h" -enum request_type { - spawn_channel, - get_channels, -}; +#include -static int* open_sockets; -static int request_socket; +#define MAIN_PORT 8164 -void init(void) { - open_sockets = array_new(int, 0); - request_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(MAIN_PORT), .sin_addr = {INADDR_ANY}}; - - for (int retries = 0; retries <= 5; ++retries) { - if (bind(request_socket, (struct sockaddr*)&addr, sizeof(addr)) == 0) break; - else { - perror("init (bind)"); - sleep(1); - } +void setup_signal(void) { + struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = 0}; + if (sigaction(SIGUSR1, &signal, NULL) != 0) { + WHERE; + exit(EXIT_FAILURE); } -} - -enum request_type wait_for_requests(void) { - return spawn_channel; -} - -int spawn_channel_thread(void) { - return 0; -} - -void event_loop(void) { - init(); - while (1) { - enum request_type req = wait_for_requests(); - switch (req) { - case spawn_channel: break; - case get_channels: break; - } + signal.sa_handler = exit_tcp; + if (sigaction(SIGTERM, &signal, NULL) != 0) { + WHERE; + exit(EXIT_FAILURE); } } int main(int argc, char *argv[]) { (void)argc; (void)argv; - thread_loop(); + setup_signal(); + tcp_loop(); return 0; } diff --git a/server/tcp.c b/server/tcp.c new file mode 100644 index 0000000..0d6e118 --- /dev/null +++ b/server/tcp.c @@ -0,0 +1,285 @@ +#include "tcp.h" +#include +#include +#include + +#define report_error(socket, error) \ + do { \ + DEBUGF("error on socket %i: %i\n", socket, error); \ + write(socket, &zerozu, sizeof(zerozu)); \ + enum commd_error __ecpy = htonl(error); \ + write(socket, &__ecpy, sizeof(__ecpy)); \ + return; \ + } while (0) + +#define return_error(err) \ + do { \ + *e = err; \ + return 0; \ + } while (0) + +struct hash_set channels; +struct hash_set users; +static struct tcp_user varusr = {0}; +static struct tcp_channel varchnl = {0}; +static u64 lalloc_usr = 0; +static int udpctlfd = 0; +static const size_t zerozu = 0; +static bool should_exit = false; + +void print_state(int _) { + (void)_; +#ifdef DEBUG + fputs("printing server state.\n hash_map users {\n", stderr); + struct hash_set_iter iter; + for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_user *curr = hseti_get(&iter); + fprintf(stderr, "\ttcp_user {.id=%zu, .permissions=%u, .channel=%zu}\n", curr->id, curr->permissions, + curr->joined_channel); + } + fputs("}\nhash_map channels {\n", stderr); + for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_channel *curr = hseti_get(&iter); + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + getsockname(curr->fd, (struct sockaddr *)&addr, &addrlen); + fprintf(stderr, "\ttcp_channel {.id=%zu, .fd=%u, .channel=%zu} [port=%hu]\n", curr->id, curr->fd, + curr->owner, addr.sin_port); + } + fputs("}\n", stderr); +#endif +} +void exit_tcp(int _) { + (void)_; + should_exit = true; + DEBUGF("EXITING SERVER, setting `should_exit (%p)` to %i\n", (void*)&should_exit, (int)should_exit); +} + +static int tcp_user_cmp(const struct tcp_user *a, const struct tcp_user *b) { return (a->id - b->id) ? 1 : 0; } +static int tcp_channel_cmp(const struct tcp_channel *a, const struct tcp_channel *b) { return (a->id - b->id) ? 1 : 0; } +static size_t tcp_user_hash(const struct tcp_user *a) { return a->id; } +static size_t tcp_channel_hash(const struct tcp_channel *a) { return a->id; } +static int set_sock_timeout(int fd, int ms) { + struct timeval timeout; + timeout.tv_sec = ms / 1000; + timeout.tv_usec = (ms % 1000) * 1000; + return setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); +} +static void init_statics(void) { + channels = + hset_new(sizeof(struct tcp_channel), (hset_equal_fn)&tcp_channel_cmp, (hset_hash_fn)&tcp_channel_hash); + users = hset_new(sizeof(struct tcp_user), (hset_equal_fn)&tcp_user_cmp, (hset_hash_fn)&tcp_user_hash); + udpctlfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +} +static int setup_socket(unsigned short port) { + int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + int flag = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag)) goto error; + struct sockaddr_in localaddr = { + .sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {htonl(INADDR_ANY)}}; + if (bind(sock, (struct sockaddr *)&localaddr, sizeof(localaddr))) goto error; + return sock; +error: + perror("TCP thread failed to initialize"); + exit(EXIT_FAILURE); + +} +static void init_admin(u64 aid) { + struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = perm_admin, .joined_channel = 0}; + hset_insert_copy(&users, &u); +} +static bool has_4bytes_0xff(u64 id) { + return (unsigned int)(id >> 32) == 0xFFFFFFFF || (unsigned int)(id & 0xFFFFFFFF) == 0xFFFFFFFF; +} +static u64 get_uniq_id(struct hash_set *set) { + // while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr; + varusr.id = lalloc_usr; + while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(lalloc_usr)) ++varusr.id; + lalloc_usr = varusr.id; + return lalloc_usr; +} +static unsigned short get_channel_port(u64 id) { + varchnl.id = id; + struct tcp_channel *ch = hset_at(&channels, &varchnl); + if (ch == NULL) return 0; + struct sockaddr_in a; + socklen_t len = sizeof a; + getsockname(ch->fd, (struct sockaddr *)&a, &len); + return a.sin_port; +} +static bool user_has_permission(u64 uid, unsigned int perm) { + varusr.id = uid; + struct tcp_user *u = hset_at(&users, &varusr); + if (u == NULL) return false; + unsigned int uperm = u->permissions; + // bitwise implication must yield all ones (0xFFFFFFFF). + // Inverse it for easier check + return (perm & ~uperm) == 0; +} +static u64 send_channels(int sockfd, enum commd_error *e) { + struct hash_set_iter iter; + u64 array_length = hton64(hset_count(&channels)); + if (write(sockfd, &array_length, sizeof(array_length)) != sizeof(array_length)) return_error(ERR_SERV); + for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_channel *c = hseti_get(&iter); + u64 chid = hton64(c->id); + if (write(sockfd, &chid, sizeof(chid)) != sizeof(chid)) return_error(ERR_SERV); + } + // the leading zero is written by the caller + return_error(ERR_SUCCESS); // actually returns success but... +} +static inline u64 commd_register_process(struct commd_register *cmd, enum commd_error *e) { + /* fprintf(stderr, "%s: auid=%zu; perm=%zu\n", "commd_register_process", cmd->auid, cmd->perm); */ + if (!user_has_permission(cmd->auid, perm_join_user | cmd->perm)) return_error(ERR_ACCESS); + struct tcp_user new_user = { + .id = get_uniq_id(&users), .joined_channel = 0, .permissions = (unsigned int)cmd->perm}; + hset_insert_copy(&users, &new_user); + return new_user.id; +} +static inline u64 commd_unregister_process(struct commd_unregister *cmd, enum commd_error *e) { + /* DEBUGF("delete user %zu (admin %zu)\n", cmd->uid, cmd->auid); */ + if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, perm_unregister_user)) return_error(ERR_ACCESS); + varusr.id = cmd->uid; + hset_remove(&users, &varusr); + return cmd->uid; +} +static inline u64 commd_create_process(struct commd_create *cmd, enum commd_error *e) { + if (!user_has_permission(cmd->uid, perm_add_channel)) return_error(ERR_ACCESS); + + u64 chid; + int sock = -1; + unsigned short port = 0; + { + pthread_mutex_t sock_mx = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t sock_cond = PTHREAD_COND_INITIALIZER; + struct thread_loop_arg arg = { + .owner = cmd->uid, .sock_dest = &sock, .sock_mx = &sock_mx, .sock_ready_cond = &sock_cond}; + pthread_mutex_lock(&sock_mx); + chid = spawn_channel(&arg); + DEBUG("\n"); + while (sock == -1 || port == 0) pthread_cond_wait(&sock_cond, &sock_mx); + DEBUG("\n"); + pthread_mutex_unlock(&sock_mx); + } + struct tcp_channel new_channel = {.owner = cmd->uid, .name = NULL, .fd = sock}; + hset_insert_copy(&channels, &new_channel); + return chid; +} +static inline u64 commd_delete_process(struct commd_delete *cmd, enum commd_error *e) { + DEBUGF("received command%p\n", (void *)cmd); + varchnl.id = cmd->chid; + struct tcp_channel *c = hset_at(&channels, &varchnl); + if (c == NULL) return_error(ERR_PARAM); + if (cmd->uid != c->owner && !user_has_permission(cmd->uid, perm_unadd_channel)) return_error(ERR_ACCESS); + hset_remove(&channels, &varchnl); + return varchnl.id; +} +static inline u64 commd_join_process(struct commd_join *cmd, enum commd_error *e) { + if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, perm_join_user)) return_error(ERR_ACCESS); + struct kv_system_packet packet = { + .magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_JOIN), .user_id = cmd->juid}; + if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV); + return (u64)get_channel_port(cmd->chid); +} +static inline u64 commd_leave_process(struct commd_leave *cmd, enum commd_error *e) { + if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, perm_kick_user)) return_error(ERR_ACCESS); + struct kv_system_packet packet = { + .magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_LEAVE), .user_id = cmd->luid}; + if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV); + return 1; +} +/// switches on command type and operates accordingly +static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error *NONNULL e) { + // network byte order conversion + switch (type) { + case CMD_LEAVE: + case CMD_JOIN: ((struct commd_conv *)cmd)->_3 = ntoh64(((struct commd_conv *)cmd)->_3); FALLTHROUGH; + case CMD_UNREGISTER: + case CMD_REGISTER: + case CMD_DELETE: ((struct commd_conv *)cmd)->_2 = ntoh64(((struct commd_conv *)cmd)->_2); FALLTHROUGH; + case CMD_CREATE: + case CMD_GET_PORT: ((struct commd_conv *)cmd)->_1 = ntoh64(((struct commd_conv *)cmd)->_1); FALLTHROUGH; + case CMD_GET_CHANNELS:; + } + // processing + switch (type) { + case CMD_REGISTER: return commd_register_process((struct commd_register *)cmd, e); + case CMD_UNREGISTER: return commd_unregister_process((struct commd_unregister *)cmd, e); + case CMD_CREATE: return commd_create_process((struct commd_create *)cmd, e); + case CMD_DELETE: return commd_delete_process((struct commd_delete *)cmd, e); + case CMD_JOIN: return commd_join_process((struct commd_join *)cmd, e); + case CMD_LEAVE: return commd_leave_process((struct commd_leave *)cmd, e); + case CMD_GET_PORT: return (u64)get_channel_port(((struct commd_get_port *)cmd)->cihd); + case CMD_GET_CHANNELS: return_error(ERR_DO_IT_YOURSELF); + } + return_error(ERR_PARAM); +} +static void process_connection(int sockfd) { + DEBUG("PROCESSING CONNECTION\n"); + // TODO: protection against blocking reads + enum commd_type type; + if (read(sockfd, &type, sizeof(type)) != sizeof(type)) report_error(sockfd, ERR_INVAL); + type = ntohl(type); + struct commd cmd; + memset(&cmd, 0, sizeof(cmd)); // TODO: consider to remove + i64 commd_size = commd_size_lookup[type]; + if (read(sockfd, &cmd, commd_size) != commd_size) report_error(sockfd, ERR_INVAL); + enum commd_error e = ERR_SUCCESS; + u64 cmd_status = process_cmd(type, &cmd, &e); + if (e == ERR_DO_IT_YOURSELF) cmd_status = send_channels(sockfd, &e); + cmd_status = hton64(cmd_status); + if (e != ERR_SUCCESS) report_error(sockfd, e); + write(sockfd, &cmd_status, sizeof(cmd_status)); +} + +u64 spawn_channel(struct thread_loop_arg *arg) { + pthread_t thread; + pthread_create(&thread, NULL, thread_loop, arg); + return thread; +} +bool sendto_channel(u64 chid, struct kv_system_packet *packet, int wait_ack_ms, int repeat) { + bool success = wait_ack_ms == 0; + varchnl.id = chid; + struct tcp_channel *ch = hset_at(&channels, &varchnl); + if (ch == NULL) return false; + set_sock_timeout(udpctlfd, wait_ack_ms); + + struct sockaddr_in chaddr = {0}; + socklen_t len = sizeof(chaddr); + getsockname(ch->fd, (struct sockaddr *)&chaddr, &len); + do { + sendto(udpctlfd, packet, KV_PACKET_SIZE, 0, (struct sockaddr *)&chaddr, len); + if (wait_ack_ms == 0) continue; + struct kv_system_packet resp; + recvfrom(udpctlfd, &resp, KV_PACKET_SIZE, 0, (struct sockaddr *)&chaddr, &len); + if (errno == EWOULDBLOCK || errno == EAGAIN) continue; + if (resp.operation_id == SYS_ACK) success = true; + } while (--repeat >= 0); + + return success; +} +void tcp_loop(void) { + init_statics(); + init_admin(ADMIN_UID); + int sock = setup_socket(TCP_PORT); + if (listen(sock, LISTEN_AMOUNT) != 0) { + perror("listen on TCP socket failed"); + exit(EXIT_FAILURE); + } + DEBUGF("listening on port %hu\n", TCP_PORT); + struct sockaddr_in accept_addr; + socklen_t addrlen = sizeof(accept_addr); + int currfd; + while (!should_exit) { + currfd = accept(sock, (struct sockaddr *)&accept_addr, &addrlen); + if (currfd < 0) continue; + DEBUGF("accepted connection on port %hu\n", accept_addr.sin_port); + process_connection(currfd); + shutdown(currfd, SHUT_RDWR); + close(currfd); + } + close(sock); + close(udpctlfd); + hset_free(&users); + hset_free(&channels); +} diff --git a/server/tcp.h b/server/tcp.h new file mode 100644 index 0000000..e239e19 --- /dev/null +++ b/server/tcp.h @@ -0,0 +1,35 @@ +#ifndef KV_SERVER_TCP +#define KV_SERVER_TCP + +#include +#include +#include +#include +#include +#include +#include "channel.h" + +#include +#include + +#define TCP_PORT 8085 +#define LISTEN_AMOUNT 128 +#define TCP_MAX_WAIT_MS 10 +#define TCP_MAX_RETRIES 0 +#define ADMIN_UID 0 + + +/* val: struct tcp_channel */ +extern struct hash_set channels; +/* val: struct tcp_user */ +extern struct hash_set users; + +void print_state(int); +void exit_tcp(int); + +void tcp_loop(void); +u64 spawn_channel(struct thread_loop_arg *arg); +u64 spawn_channel_pool(void* arg); // TODO +bool sendto_channel(size_t chid, struct kv_system_packet* packet, int wait_ack_ms, int repeat); + +#endif // KV_SERVER_TCP -- cgit v1.2.3-70-g09d2