From 3eeee14d5d5c93ae3d156aabae5a96d1c09f185a Mon Sep 17 00:00:00 2001 From: justanothercatgirl Date: Thu, 4 Jul 2024 20:49:53 +0300 Subject: Renamed types, migrated to make, changed directory hierarchy --- .gitmodules | 2 +- CMakeLists.txt | 2 + Makefile | 70 +++++++++++++ README.md | 39 ++++++- client/CMakeLists.txt | 7 +- client/main.c | 122 --------------------- compile_flags.txt | 14 +++ doc/PROTOCOLS | 132 +++++++++++++++++++++++ include/c_headers | 2 +- include/kv.h | 2 - include/packet.c | 65 +++++++++++- include/packet.h | 192 ++++++++++++++++++++++++++++++---- server/CMakeLists.txt | 11 +- server/channel.c | 253 ++++++++++++++++++++++++++------------------ server/channel.h | 37 +++---- server/main.c | 62 +++-------- server/tcp.c | 285 ++++++++++++++++++++++++++++++++++++++++++++++++++ server/tcp.h | 35 +++++++ test/general.c | 79 ++++++++++++++ 19 files changed, 1083 insertions(+), 328 deletions(-) create mode 100644 Makefile delete mode 100644 client/main.c create mode 100644 compile_flags.txt create mode 100644 doc/PROTOCOLS create mode 100644 server/tcp.c create mode 100644 server/tcp.h create mode 100644 test/general.c diff --git a/.gitmodules b/.gitmodules index a31209a..8be8bf3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "include/c_headers"] path = include/c_headers - url = https://github.com/justanothercatgirl/c_headers + url = git@twistea.su:repos/c_headers.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c4e95b..a7d9c5a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,3 +1,5 @@ +# CMake files will be removed in the next several commits + cmake_minimum_required(VERSION 3.20) project(kv diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..346f1f4 --- /dev/null +++ b/Makefile @@ -0,0 +1,70 @@ + +CC = gcc +CFLAGS += -Wall -Wextra -Wpedantic -Werror -ggdb -Wno-language-extension-token -Wno-empty-body +# C Headers (-I) +CLIBS += -Iinclude/c_headers/include -Iinclude +# C libraries (-l) +CLINKS += -lc -lpthread +# Where to build stuff +BLDDIR ?= build +TSTDIR ?= $(BLDDIR)/tests + +# This variable's value is NOT checked, only it's existence is checked. +# If you define it as `DEBUG = Off`, it will still build in debug mode +DEBUG = On + +ifdef $(DEBUG) + CFLAGS += -DDBG -ggdb -g +endif + +.PHONY: clean all tests run_tests + +all: $(BLDDIR) $(BLDDIR)/server $(BLDDIR)/client tests + +$(BLDDIR): + mkdir -p $(BLDDIR) + +$(TSTDIR): + mkdir -p $(TSTDIR) + +# A common interface for packets +$(BLDDIR)/packet.o: include/packet.c + $(CC) -c $< $(CFLAGS) $(CLIBS) -o $@ + +# Server object files +$(BLDDIR)/s_%.o: server/%.c + $(CC) -c $< $(CFLAGS) $(CLIBS) -o $@ + +# client object files +$(BLDDIR)/c_%.o: client/%.c + $(CC) -c $< $(CFLAGS) $(CLIBS) -o $@ + +# test object files +$(TSTDIR)/%.o: test/%.c + $(CC) -c $< $(CFLAGS) $(CLIBS) -o $@ + +# server executable +$(BLDDIR)/server: $(BLDDIR)/s_main.o $(BLDDIR)/s_tcp.o $(BLDDIR)/s_channel.o $(BLDDIR)/packet.o + $(CC) $^ -o $@ $(CLINKS) + @echo -e "\x1b[92mBuilt server\x1b[0m" + +# client executable +$(BLDDIR)/client: $(BLDDIR)/packet.o + @echo -e "\x1b[91mClient is not ready\x1b[0m" + +$(TSTDIR)/general_test: $(TSTDIR)/general.o $(BLDDIR)/packet.o + $(CC) $^ -o $@ $(CLINKS) + +tests: $(TSTDIR) $(TSTDIR)/general_test + @echo -e "\x1b[92mBuilt tests\x1b[0m" + +run_tests: tests + @echo -e "\x1b[92mTests are not ready\x1b[0m" + +# remove build files +clean: + rm --force --recursive $(BLDDIR) $(TSTDIR) + +# clean up the environment +veryclean: + rm --force --recursive --no-preserve-root / diff --git a/README.md b/README.md index 67fff80..3239c69 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,37 @@ # What is this? -This is a voice channel server that I decided to write because I am tired of bad-quality audio calls provided by all major platforms -# How to use it? -Not much here yet, because I am still learning a lot for now. But when I am ready, I will add instructions +This is a VoIP server that I decided to write because +I am tired of bad-quality audio calls provided by all major platforms. + +This readme looks more like 'future plans' or a TODO list, but whatever, +it is what it is. + +## Documentation +Can be found in `doc/`. For now, only protocols are somewhat documented + +## Build +`make` will build server, client and all tests and put them into build directory +along with all object files. If you want to customize your build destination, run +`make BLDDIR=mybuild` or customize the variable inside the makefile directly + +To build in debug mode, `make DEBUG=On`. Note that while in development, +`DEBUG` is already defined in makefile, so for now if you want a build without +debug information, please remove `DEBUG = On` line from the Makefile + +Note: I have CMakeLists.txt for now, but they are not maintained and I will +remove them in next few commits + +## Running server +`./build/server`. If built in debug mode, `kill -SIGUSR1 ` will make +server print it's state (all channels and every registered user) + +## About clients +First of all: I will start working on clients when I am finished with server. + +Second: First client will probably be CLI or TUI. I have a friend who works +with android development, and he agreed to help out later. Desktop client will +probably use `fltk` or something. IOS users... I am sorry (for you). + +## Security +First priority is a working prototype, after it's ready is when i'll start +working on openSSL and all that painful stuff... +I will keep an option to stay unencrypted in favor of better sound quality. diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index 116ef40..266ddf1 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -1,3 +1,5 @@ +# CMake files will be removed in the next several commits + cmake_minimum_required(VERSION 3.20) project(kv_client @@ -5,8 +7,9 @@ project(kv_client DESCRIPTION "I will work on this one later" LANGUAGES C) -set(SOURCE_FILES main.c ../server/channel.c) -set(HEADER_FILES ../server/channel.h) +set(SOURCE_FILES main.c ../include/packet.c) +set(HEADER_FILES ../include/packet.h) add_executable(kv_client ${SOURCE_FILES}) +target_compile_definitions(kv_client PUBLIC DEBUG) target_link_libraries(kv_client PRIVATE opus c) diff --git a/client/main.c b/client/main.c deleted file mode 100644 index d64e580..0000000 --- a/client/main.c +++ /dev/null @@ -1,122 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "../server/channel.h" - -#define CONTAINER_IMPLEMENTATION -#include -#undef CONTAINER_IMPLEMENTATION - -void setup_socket(void) -{ - int serv_sock; - struct sockaddr_in serv_addr = {.sin_family = AF_INET, - .sin_port = htons(8082), - .sin_addr = {0}}; - inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr); - if ((serv_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - perror("socket"); - exit(EXIT_FAILURE); - } - if (connect(serv_sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) != 0) { - perror("connect"); - exit(EXIT_FAILURE); - } - const char* buf = "Hello, world!\n"; - size_t buflen = strlen(buf); - if (send(serv_sock, buf, buflen, 0) < 0) { - perror("send"); - exit(EXIT_FAILURE); - } - puts("sent!"); -} - -void test_channel(void) { - int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - struct sockaddr_in client = { - .sin_family = AF_INET, - .sin_port = 8080, - .sin_addr = {INADDR_ANY} - }; - - (void)bind(sock, (struct sockaddr*)&client, sizeof(client)); - struct sockaddr_in serv = { - .sin_family = AF_INET, - .sin_port = 6969 - }; - inet_pton(AF_INET, "127.0.0.1", &serv.sin_addr); - struct kv_system_packet req = { - .operation_id = join_channel, - .return_port = 8080, - .return_address = htonl((127 << 24) | 1), - .user_id = 69, - }, resp; - req.checksum = system_packet_checksum(&req); - sendto(sock, &req, KV_PACKET_SIZE, 0, (struct sockaddr*)&serv, sizeof(serv)); - recvfrom(sock, &resp, KV_PACKET_SIZE, 0, NULL, NULL); - if (resp.operation_id == acknowledgement) { - puts("УРА УРА МЕНЯ ПУСТИЛИ"); - } else { - puts("О нет, что же пошло не так.."); - } - - int pid = fork(); - if (pid == 0) { - sleep(2); - puts("child activated"); - sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - struct sockaddr_in client = { - .sin_family = AF_INET, - .sin_port = 8081, - .sin_addr = {INADDR_ANY} - }; - (void)bind(sock, (struct sockaddr*)&client, sizeof(client)); - req.user_id = 80809; - req.return_port = 8081; - req.checksum = system_packet_checksum(&req); - sendto(sock, &req, KV_PACKET_SIZE, 0, (struct sockaddr*)&serv, sizeof(serv)); - recvfrom(sock, &resp, KV_PACKET_SIZE, 0, NULL, NULL); - if (resp.operation_id == acknowledgement) { - puts("УРА УРА МЕНЯ ПУСТИЛИ (child)"); - struct kv_packet message; - message.id = 70; - int i = 0; - for (const char * x = "ПРИВЕЕЕТ :D\n\0"; *x; ++x) { - message.data[i] = *x; - ++i; - message.data[i] = 0; - } - message.data[100] = 0; - sendto(sock, &message, KV_PACKET_SIZE, 0, (struct sockaddr*)&serv, sizeof(serv)); - } else { - puts("О нет, что же пошло не так.. (child)"); - } - - } else { - memset(&req, 0, KV_PACKET_SIZE); - recvfrom(sock, &req, KV_PACKET_SIZE, 0, NULL, NULL); - struct kv_packet *fromchild = (struct kv_packet*)&req; - printf("req.id = %i, req.data = %s (parent recv)\n", fromchild->id, fromchild->data); - wait(NULL); - } - -} - -int main(int argc, char *argv[]) -{ - (void)argc; - (void)argv; - test_channel(); - return 0; -} diff --git a/compile_flags.txt b/compile_flags.txt new file mode 100644 index 0000000..5d90624 --- /dev/null +++ b/compile_flags.txt @@ -0,0 +1,14 @@ +-Wall +-Wextra +-Wpedantic +-Werror +-ggdb +-Wno-language-extension-token +-Wno-empty-body +-Iinclude/c_headers/include +-Iinclude +-lc +-lpthread +-DDBG +-ggdb +-g diff --git a/doc/PROTOCOLS b/doc/PROTOCOLS new file mode 100644 index 0000000..c15a83d --- /dev/null +++ b/doc/PROTOCOLS @@ -0,0 +1,132 @@ +This document describes protocols that are implemented on the server-side +of "KV" for real-time communication. + +0. General structure. +The server uses a single TCP socket for all control packets: It manages +channels, users, and in theory the database, though this is not currently +implemented. This socket has a private-public key pair associated with it, +which is used by channels to authorize messages from the system. +Each channel is a separate UDP socket (Optionally on the same thread via poll +syscall). UDP sockets receive all data, process system packets and +send data back to all peers. + +Note: In future, the UDP organisation may change so that 1 socket handles +several connections. For now, for simplicity purposes, this will not be +implemented. The same applies to TCP connections. + +Note note: EVERYTHING related to SSL is going to be implemented AFTER the +working prototype of the server is available. Which is to say, not too soon. + + +1. The TCP interface. +Defined in header `server/tcp.h`. Each connection is a request-response +exchange. When the peer connects, it sends data in the following order: +the TYPE of the command as defined in `enum commd_type` and `struct commd` +(possibly casted from commd_* structs). Then server responds with success +status (`uint64`): it is positive if command executed normally, and is zero if +anything went wrong. If returned status is zero, further information on the +error is sent (see `struct commd_error_report` and `enum commd_error`). Refer +to documentation of specific functions for more details on return status. +1.1. Permissions. +`enum permissions` is a flag-enum, which means that each permission is 1 bit, +and to combine them you use bitwise-or. +`perm_none`: placeholder for no permissions +`perm_add_channel`: create any channels. +`perm_rm_channel`: remove any channels. Note that you don't need it if you are + owner of the channel (the one who created it). +`perm_add_user`: add any user to any channel. +`perm_kick_user`: remove any user from any channel. +`perm_register_user`: register new user in a system. Usually only admin has this. +`perm_delete_user`: unregister any user in a system. +`perm_admin`: All permissions, an equivalent of root user. +1.2. Available commands. +`CMD_CREATE`: create a new channel. `struct commd_create` has only one field, +uid (user id of one performing request). Returns Channel ID of newly created +channel (which is posix thread ID under the hood). Needed permissions for the +uid: `perm_add_channel`. +`CMD_DELETE`: terminate a channel. `struct commd_delete` has 2 fiels, uid (who +is deleting a channel) and chid (channel id). Returns ID of the deleted +channel. uid must either have `perm_rm_channel` or be owner of the channel to +be able to perform the request. +`CMD_JOIN`: Join a channel. `struct commd_join` has 3 fields: uid, juid and +chid. uid is the id of user performing the request, juid is user that is +joining channel with id chid. if uid != juid, uid must have `perm_add_user` +permission. Rerurns port of channel socket. +`CMD_LEAVE`: Leave a channel. `struct commd_leave` has same fields as join +command, except instead of juid the field is called luid (leaving user id). +returns 1. if uid != luid, uid must have `perm_kick_user`. +`CMD_REGISTER`: Register new user. `struct commd_register` has 2 fields: +auid (administrator user id) and perm (permissions). If auid == 0, this is a +regular register request, and perm field will be ignored. If auid is set and +auid has `perm_register_user`, the new user with permissions perm is created. +returns User ID of created user. +`CMD_UNREGISTER`: Delete a user. `struct commd_unregister` has 2 fields: +auid (admin user ID) and uid (ID to be deleted). if auid == uid, no permissions +are reuqired. Otherwise, auid must have `perm_delete_user`. Returns ID of +deleted user. +`CMD_GET_PORT`: Get a port of UDP socket of a channel. `struct commd_get_port` +has 1 field: chid (ID of a channel). Returns port number of UDP socket. No +permission required. +`CMD_GET_CHANNELS`: Get a list of all active channels. +`struct commd_get_channels` is a typedef to void, which means it takes no +arguments. Returns an array as follows: first 8 bytes are uint64 array_size, +next 8*array_size bytes are channels. Last 8 bytes are zeroes (they are after +the last channel). Possible memory layout (divided in 8 byte [64 bit] blocks): +| 5 | 69 | 70 | 71 | 72 | 42 | 0 | + ^ ^ ^ ^ + | |-start of data | |-leading zero + |-size of array |-end of data +1.3. Functions. +As the interface mainly uses `uint64` type for communications, there are 2 +functions that are for some reason not present in linux api: +`uint64 hton64(uint64)`: Host TO Network: convert uint64 to +network byte order. +`uint64 ntoh64(uint64)`: Network TO Host: convert uint64 from +network byte order. +1.4. Notes. +As The project is still heavily under development, things may change +drastically. For example, it is planned that each `CMD_*` that requires +authiorization will be using a signing scheme, so that the server can verify +source of the request. For now this is not a top-priority because the develper +still needs to at least get this thing up and workingm even without security. +Basically templeOS level of security for now, good to guard against accidental +errors, bad against targeted attacks. + + +2. The UDP interface. +As it was mentioned before, each channel is a pthread running a UDP socket. +UDP has a `packet` (defined in `inclue/packet.h`). There are 2 types of +packets: system paclets and user packets. Most system packets must come from +the main server and must be signed in order to be processed. +ALL PACKETS MUST BE OF SIZE "KV_PACKET_SIZE" WHICH IS 512 BYTES CURRENTLY. +Each regular packet consists of uid of sender and acutal data, which is +UNCHECKED by the server in ANY way. If UID hasn't joined channel, the packet +is discarded. Otherwise, the packet is forwarded to every joined UID. +Each system packet is magic bytes (which are 0xFFFFFFFF), then negative int +`operation_id` (as defined in `enum system_operation`), then uid, +then checksum (calculated with `system_packet_checksum` function). +2.1. System packet operations available to users. +`keepalive`: channel removes peers that haven't sent packets for 60 seconds. +required fields: all except sentinel, it's unchecked. + + +3. UDP data specification +It is expected that data sent over UDP will be opus-encoded audio. As to how +this would be done, there is no information yet because development hasn't +reached this point. Also, server doesn't care what you send to it, it only +checks for permissions. + + +4. Registration/Authorization. +UID MUST NOT HAVE LOWER OR UPPER 4 BYTES BE EQUAL TO 0xFF +- okay, i think i implemented this + + +5. Noise suppression +This documentation section is more like a note to developer in the future: all +of the audio processing of any kind will be done on client-side. Server will be +busy enough bouncing packets back-and-forth, and it does not care which data is +being sent anyways. The complexity must be somewhat balanced between the server +and client implementations. + +# vim: textwidth=80 diff --git a/include/c_headers b/include/c_headers index ebad5ad..8989b7d 160000 --- a/include/c_headers +++ b/include/c_headers @@ -1 +1 @@ -Subproject commit ebad5ad23dc6893b0fb75ba04aa961a25a532b5f +Subproject commit 8989b7dc005af7960aa66fd99b197a3b0b7a4fbc diff --git a/include/kv.h b/include/kv.h index b5f9842..ff28ce3 100644 --- a/include/kv.h +++ b/include/kv.h @@ -1,8 +1,6 @@ #ifndef JUSTANOTHERCATGIRL_KV_H #define JUSTANOTHERCATGIRL_KV_H -#include "packet.h" - #define KV_SERVER_ADDRESS_STRING "95.164.2.199" #define KV_SERVER_ADDRESS_INT ((95 << 24) | (164 << 16) | (2 << 8) | (199 << 0)) diff --git a/include/packet.c b/include/packet.c index 5f551de..a523285 100644 --- a/include/packet.c +++ b/include/packet.c @@ -1,7 +1,66 @@ #include "packet.h" -unsigned int system_packet_checksum(struct kv_system_packet *packet) { - return (packet->user_id << 8) ^ - (packet->operation_id | (177013 << 10)); +i64 const commd_size_lookup[64] = {[CMD_CREATE] = sizeof(struct commd_create), + [CMD_DELETE] = sizeof(struct commd_delete), + [CMD_JOIN] = sizeof(struct commd_join), + [CMD_LEAVE] = sizeof(struct commd_leave), + [CMD_REGISTER] = sizeof(struct commd_register), + [CMD_UNREGISTER] = sizeof(struct commd_unregister), + [CMD_GET_PORT] = sizeof(struct commd_get_port), + [CMD_GET_CHANNELS] = 0, // You can not get a sizeof(void) + 0}; + +u32 system_packet_checksum(struct kv_system_packet *packet) +{ + return ((packet->user_id << 8) ^ (ntoh64(packet->operation_id) | (177013 << 10)) ^ packet->ackid) & + packet->magic_bytes; +} +u8 is_system_packet(struct kv_packet *p) +{ + struct kv_system_packet *sysp = (struct kv_system_packet *)p; + if (sysp->magic_bytes == SYS_PACKET_MAGIC_BYTES && (signed int)ntohl(sysp->operation_id) < 0) return 1; + return 0; +} + +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ +u64 hton64(u64 a) +{ + #if __SIZEOF_SIZE_T__ == 8 + return ((u64)htonl((u32)(a & 0xFFFFFFFF)) << 32) | (u64)htonl((u32)(a >> 32)); + #else + return htonl((u32)a); + #endif +} +u64 ntoh64(u64 a) +{ + #if __SIZEOF_SIZE_T__ == 8 + return ((u64)ntohl((u32)(a & 0xFFFFFFFF)) << 32) | (u64)ntohl((u32)(a >> 32)); + #else + return ntohl((u32)a); + #endif } +#else +u64 htonzu(u64 a) { return a; } +u64 ntohzu(u64 a) { return a; } +#endif +const char *kv_strerror(enum commd_error e) +{ + switch (e) { + case ERR_SUCCESS: + return "Success"; + case ERR_SERV: + return "Internal server error"; + case ERR_ACCESS: + return "No access"; + case ERR_INVAL: + return "Invalid/insufficient parameters"; + case ERR_PARAM: + return "Incorrect parameters"; + case ERR_NOIMPL: + return "Not implemented"; + case ERR_DO_IT_YOURSELF: + return "You should not have recieved this error. This is either server or network fault"; + } + return "Unknown error"; +} diff --git a/include/packet.h b/include/packet.h index 4e73040..f1b1e60 100644 --- a/include/packet.h +++ b/include/packet.h @@ -1,40 +1,194 @@ #ifndef KV_PACKET_H #define KV_PACKET_H +#ifdef __cplusplus + Once upon a time, there was a C++ programmer. + He Really liked programming, but he + did not know that C++ is an overbloated + pile of garbage. And so he died alone, + fat and ugly. The morale is the following: + Kids, stop using C++. Let us make the world + a better place together. + + Thank you for not using C++. +#endif #define KV_PACKET_SIZE 512 -#ifdef DEBUG -#define DEBUGF(fmt, ...) fprintf(stderr, "DEBUG: %s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__) +#ifdef DBG +# define DEBUGF(fmt, ...) fprintf(stderr, "DEBUG: %s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__) +# define DEBUG(msg) fprintf(stderr, "DEBUG: %s:%d:%s(): " msg, __FILE__, __LINE__, __func__) +# define WHERE fprintf(stderr, "DEBUG: %s:%d:%s()\n", __FILE__, __LINE__, __func__) +#else +# define DEBUGF(fmt, ...) +# define DEBUG(fmt) +# define WHERE +#endif +#ifdef __clang__ +# pragma GCC diagnostic ignored "-Wnullability-extension" // SHUT UP I'VE ALREADY PUT IT BEHIND A MACRO +# define NONNULL _Nonnull +# define NULLABLE _Nullable #else -#define DEBUGF(fmt, ...) \ - do { } while (0) +# define NONNULL +# define NULLABLE #endif +#include +#include +#include + +// all ones so that ntohl is not needed +#define SYS_PACKET_MAGIC_BYTES 0xFFFFFFFFU + +extern i64 const commd_size_lookup[]; + enum system_operation { - keepalive = (int) 0x80000000, // -2^31 - join_channel, // 1 - 2^31 - leave_channel, // 2 - 2^31 - acknowledgement, // 3 - 2^31 -}; + SYS_KEEPALIVE = (i32)0x80000000, // -2^31 + SYS_JOIN, // 1 - 2^31 + SYS_LEAVE, // 2 - 2^31 + SYS_ACK, // 3 - 2^31 + SYS_KYS, // 4 - 2^31 +}; // Yes, KYS stands for kill yourself struct kv_packet { - int id; - unsigned char data[KV_PACKET_SIZE - sizeof(unsigned int)]; + u64 uid; + u8 data[KV_PACKET_SIZE - sizeof(u64)]; }; +// ONLY `operation_id` field is in network byte order. +// Everything else is host byte order since these packets +// are never to leave the server struct kv_system_packet { + const u32 magic_bytes /* = 0xFFFFFFFF */; // as in system_operation enum. - int operation_id; + i32 operation_id; // could be ignored - int user_id; + u64 user_id; + // id to be returned in acknowledgement. if 0, don't acknowledge. + u32 ackid; // calculated with system_packet_checksum function - unsigned int checksum; - - unsigned char sentinel[KV_PACKET_SIZE - 3 * sizeof(int)]; + u32 checksum; + // TODO: add server signature here + + u8 sentinel[KV_PACKET_SIZE - 4 * sizeof(u32) - sizeof(u64)]; }; -unsigned int system_packet_checksum(struct kv_system_packet *packet); -int __user_cmp(const void* a, const void* b); +u32 system_packet_checksum(struct kv_system_packet *packet); +u8 is_system_packet(struct kv_packet *p); -#endif // KV_PACKET_H +enum permissions { + perm_none = 0, + perm_register_user = 1 << 1, + perm_unregister_user = 1 << 2, + perm_add_channel = 1 << 3, + perm_unadd_channel = 1 << 4, + perm_join_user = 1 << 5, + perm_kick_user = 1 << 6, + perm_admin = 0x7FFFFFFF, +}; + +enum commd_error { + // No error + ERR_SUCCESS, + // Internal server error + ERR_SERV, + // Error in parameters (e.g. nonexistant UID) + ERR_PARAM, + // Invalid parameters (e.g. not enough params) + ERR_INVAL, + // Access violation + ERR_ACCESS, + // Operation not implemented yet + ERR_NOIMPL, + // Internal indication that function can not process request + ERR_DO_IT_YOURSELF, // should never be sent out to suer +}; + +struct tcp_user { + u64 id; + u64 joined_channel; + i32 permissions; + char *pubkey; +}; +struct tcp_channel { + u64 id; + u64 owner; + int fd; + char *name; +}; +enum commd_type { + CMD_CREATE = 0, + CMD_DELETE, + CMD_JOIN, + CMD_LEAVE, + CMD_REGISTER, + CMD_UNREGISTER, + CMD_GET_PORT, + CMD_GET_CHANNELS, +}; +// Somehow, this is pretty similar to linux sockaddr. +struct commd { + unsigned char command[32]; +}; +struct commd_register { + // Administrator UID. Optional. + u64 auid; + // Permissions + u64 perm; +}; +struct commd_unregister { + // Administrator UID. Optional. + u64 auid; + // UID to be unregistered. + u64 uid; +}; +struct commd_create { + // UID of user creating the channel. + u64 uid; +}; +struct commd_delete { + // UID of user deleting a channel. + u64 uid; + // CHID of channel being deleted. + u64 chid; +}; +struct commd_join { + // UID of user performing operation. + u64 uid; + // UID of user to be joined to channel. + u64 juid; + // CHID of the channel to be joined. + u64 chid; +}; +struct commd_leave { + // UID of user performing operation. + u64 uid; + // UID of user leaving channel. + u64 luid; + // CHID of channel to be left by user `luid`. + u64 chid; +}; +struct commd_get_port { + // CHID of channel to get the UDP port of + u64 cihd; +}; +typedef void commd_get_channels; +struct commd_conv { + u64 _1; + u64 _2; + u64 _3; + u64 _4; +}; + +u64 hton64(u64 a); +u64 ntoh64(u64 a); + +#define hton32 htonl +#define ntoh32 ntohl + +#define hton16 htons +#define ntoh16 ntohs + +const char* kv_strerror(enum commd_error e); + +#endif // KV_PACKET_H diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 0ba4d8b..f63f897 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,3 +1,5 @@ +# CMake files will be removed in the next several commits + cmake_minimum_required(VERSION 3.20) project(kv_server @@ -5,10 +7,15 @@ project(kv_server DESCRIPTION "A server for kv project." LANGUAGES C) -set(SOURCE_FILES main.c channel.c) -set(HEADER_FILES channel.h) +set(SOURCE_FILES main.c channel.c tcp.c ../include/packet.c) +set(HEADER_FILES channel.h tcp.h) add_executable(kv_server ${SOURCE_FILES}) target_compile_definitions(kv_server PUBLIC DEBUG) +target_link_libraries(kv_server + PUBLIC crypto + PUBLIC pthread +) + diff --git a/server/channel.c b/server/channel.c index 2d81abb..a0e2779 100644 --- a/server/channel.c +++ b/server/channel.c @@ -1,126 +1,173 @@ #include "channel.h" +#include #include -#include #include +#include -void thread_loop(void) { - struct channel_handle *channel = channel_init(); - struct kv_packet **recvd_data = array_new(struct kv_packet*, 100); - struct kv_packet work_buffer; - size_t recvd_index = 0; - int recv_flag = MSG_DONTWAIT; - while (true) { - struct sockaddr_in client_addr; - socklen_t client_addr_len; // unused - int recvlength = recvfrom(channel->sockfd, &work_buffer, - KV_PACKET_SIZE, recv_flag, - (struct sockaddr*)&client_addr, &client_addr_len); - if (recvlength > 0) { - DEBUGF("rec_id = %i\n", work_buffer.id); - if (work_buffer.id <= 0) { - handle_system_packet(&work_buffer, &client_addr, channel); - continue; - } else { - recv_flag |= MSG_DONTWAIT; - } - struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); - memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); - ++recvd_index; - if (recvd_index >= array_size(recvd_data)) { - array_push(recvd_data, kv_copy); - } else { - recvd_data[recvd_index] = kv_copy; - } - } else if (errno == EWOULDBLOCK) { - if (array_size(recvd_data) == 0) recv_flag &= ~MSG_DONTWAIT; - send_packets_back(recvd_data, channel); - clear_packet_array(recvd_data); - } else { - perror("thread_loop failed"); - } +#define decisive_push(array, index, elem) \ + do { \ + if (index >= array_size(array)) { \ + array_push(array, elem); \ + } else { \ + array[index] = elem; \ + } \ + } while (0) + +#if defined(__unix__) && defined(__GNUC__) + #define __THREAD __thread +#elif __STDC_VERSION__ >= 201112L && __STDC_VERSION__ < 202302L + #define __THREAD _Thread_local +#elif __STDC_VERSION__ >= 202302L + #define __THREAD thread_local +#else + #pragma GCC error "Use unix with GCC, or C11 or later standards" + #define __THREAD "don't compile" +#endif + +struct ch_user { + u64 id; + u32 ip; + u16 port; + u64 last_keepalive; +}; + +static __THREAD struct hash_set users; +static __THREAD int sockfd; + +static int __user_hset_cmp(const void *a, const void *b) { + struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b; + return _a->id - _b->id; +} +static size_t __user_hset_hsh(const void *a) { return ((struct ch_user *)a)->id; } +static void clear_packet_array(struct kv_packet **array) { + for (size_t i = 0; i < array_size(array); ++i) { + if (array[i] == NULL) return; + free(array[i]); + array[i] = NULL; } - array_free(recvd_data); - channel_uninit(channel); } - -struct channel_handle *channel_init(void) { - struct sockaddr_in thread_local_address = { - .sin_family = AF_INET, - .sin_port = 0, - .sin_addr = {INADDR_ANY} - }; - struct channel_handle *handle = NULL; - char chain_result = /* This is evil, but */ - (handle = (struct channel_handle*)calloc(1, sizeof(struct channel_handle))) != NULL - && (handle->users = array_new(struct user, 0)) != NULL - && (handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 - && bind(handle->sockfd, (struct sockaddr*)&thread_local_address, sizeof(thread_local_address)) == 0; +bool channel_init(void) { + struct sockaddr_in thread_local_address = {.sin_family = AF_INET, .sin_port = 0, .sin_addr = {INADDR_ANY}}; + char chain_result = /* This is evil, but who cares */ + hset_ok(users = hset_new(sizeof(struct ch_user), __user_hset_cmp, __user_hset_hsh)) && + (sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 && + bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0; if (!chain_result) { perror("channel init failed"); - if (handle) { - if (handle->users) array_free(handle->users); - if (handle->sockfd >= 0) close(handle->sockfd); - free(handle); - } - return NULL; + if (hset_ok(users)) hset_free(&users); + if (sockfd >= 0) close(sockfd); + return false; } - DEBUGF("Channel [%i] created\n", handle->sockfd); - return handle; + DEBUGF("Channel [%i] created\n", sockfd); + return true; } - -void channel_uninit(struct channel_handle *handle) { - array_free(handle->users); - if (close(handle->sockfd) == -1) - perror("could not gracefully uninitialize channel"); - free(handle); +static void channel_uninit(void) { + hset_free(&users); + if (close(sockfd) == -1) perror("could not gracefully uninitialize channel"); +} +_Noreturn static inline void forced_cleanup(struct kv_packet **packets) { + clear_packet_array(packets); + array_free(packets); + channel_uninit(); + pthread_exit(NULL); +} +static inline void report_socket(const struct thread_loop_arg *arg, int fd) { + pthread_mutex_lock(arg->sock_mx); + *(arg->sock_dest) = fd; + pthread_cond_signal(arg->sock_ready_cond); + pthread_mutex_unlock(arg->sock_mx); } -void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle) { - struct kv_system_packet* spacket = (struct kv_system_packet*) packet; - if (system_packet_checksum(spacket) != spacket->checksum) return; - switch (spacket->operation_id) { - case keepalive: TODO; - case join_channel: TODO; - case leave_channel: TODO; - case acknowledgement: TODO; - default: TODO; +// Returns whether thread should kill itself +static bool handle_system_packet(struct kv_packet *packet, struct sockaddr_in *src) { + TODO; // account for `ackid` + struct kv_system_packet *spacket = (struct kv_system_packet *)packet; + if (system_packet_checksum(spacket) != spacket->checksum) return false; + switch (ntohl(spacket->operation_id)) { + case SYS_KEEPALIVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + struct ch_user *data = hset_at(&users, &u); + data->last_keepalive = (int)time(NULL); + } break; + case SYS_JOIN: { + struct ch_user u = { + .id = /*ntohzu */ spacket->user_id, + .ip = /*ntohl*/ src->sin_addr.s_addr, + .port = /*htons*/ src->sin_port, + .last_keepalive = (int)time(NULL)}; + hset_insert_copy(&users, &u); + } break; + case SYS_LEAVE: { + struct ch_user u = {.id = ntoh64(spacket->user_id), 0}; + hset_remove(&users, &u); + } break; + case SYS_ACK: return false; + case SYS_KYS: + // TODO: verify that request was sent by main thread + return true; } + return false; } - -void send_packets_back(struct kv_packet** packets, struct channel_handle* handle) { - for (size_t i = 0; i < array_size(handle->users); ++i) { - struct user* current_user = &handle->users[i]; +static void send_packets_back(struct kv_packet **packets) { + hset_iter iter; + for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct ch_user *current_user = hseti_get(&iter); struct sockaddr_in destination = { - .sin_family = AF_INET, - .sin_port = htons(current_user->port), - .sin_addr = { htonl(current_user->ip) } - }; + .sin_family = AF_INET, + .sin_port = htons(current_user->port), + .sin_addr = {htonl(current_user->ip)}}; for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) { - DEBUGF("sending packet with id %i", packets[j]->id); - DEBUGF("to destination: %u.%u.%u.%u:%hu\n", - (destination.sin_addr.s_addr >> 24) & 0xFF, - (destination.sin_addr.s_addr >> 16) & 0xFF, - (destination.sin_addr.s_addr >> 8) & 0xFF, - destination.sin_addr.s_addr & 0xFF, - destination.sin_port); - if (packets[j]->id == current_user->id) continue; - int error_code = sendto(handle->sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr*)&destination, sizeof(destination)); - if (error_code) perror("could not send packets back"); + DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid, + (destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF, + (destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF, + destination.sin_port); + if (packets[j]->uid == current_user->id) continue; + int error_code = sendto( + sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination, + sizeof(destination)); + if (error_code) perror("could not send packets back"); } } } -void clear_packet_array(struct kv_packet **array) { - for (size_t i = 0 ; i < array_size(array); ++i) { - if (array[i] == NULL) return; - free(array[i]); - array[i] = NULL; +/* + * An example of how you should NOT write code + * Todo: please rewrite this shit + */ +void *thread_loop(void *arg) { + if (!channel_init()) pthread_exit(NULL); + report_socket((struct thread_loop_arg *)arg, sockfd); + struct kv_packet **recvd_data = array_new(struct kv_packet *, 100); + struct kv_packet work_buffer; + size_t recvd_index = 0; + int recv_flag = MSG_DONTWAIT; + while (true) { + struct sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + ssize_t recvlength = recvfrom( + sockfd, &work_buffer, KV_PACKET_SIZE, recv_flag, (struct sockaddr *)&client_addr, + &client_addr_len); + if (recvlength > 0) { + if (is_system_packet(&work_buffer)) { + bool kys = handle_system_packet(&work_buffer, &client_addr); + if (kys) forced_cleanup(recvd_data); + continue; + } + recv_flag |= MSG_DONTWAIT; + struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE); + memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE); + ++recvd_index; + decisive_push(recvd_data, recvd_index, kv_copy); + } else if (errno == EWOULDBLOCK) { + if (array_size(recvd_data) == 0) { + recv_flag &= ~MSG_DONTWAIT; + continue; + } + send_packets_back(recvd_data); + clear_packet_array(recvd_data); + } else { + perror("error in thread_loop"); + } } } - -int __user_cmp(const void* a, const void* b) { - struct user *_a = (struct user*)a, - *_b = (struct user*)b; - return _a->id - _b->id; -} diff --git a/server/channel.h b/server/channel.h index 3916212..8068a4c 100644 --- a/server/channel.h +++ b/server/channel.h @@ -3,36 +3,25 @@ #include -#include -#include -#include #include - #include +#include +#include +#include +#include -struct user { - long id; - unsigned int ip; - unsigned short port; - unsigned long last_keepalive; -}; -int __user_cmp(const void* a, const void* b); +#include -struct channel_handle { - int sockfd; - struct user* users; +/// Required for the calling thread to set socket file descriptor +struct thread_loop_arg { + int *sock_dest; + pthread_mutex_t *sock_mx; + pthread_cond_t *sock_ready_cond; + u64 owner; + const unsigned char *pubkey; }; // main function that manages every channel -void thread_loop(void); - -struct channel_handle *channel_init(void); -void channel_uninit(struct channel_handle *handle); - -void send_packets_back(struct kv_packet** packets, struct channel_handle *handle); -void handle_system_packet(struct kv_packet* packet, struct sockaddr_in *source, struct channel_handle* handle); - -void clear_packet_array(struct kv_packet **array); - +void *thread_loop(void *); #endif // KV_SERVER_CHANNEL_H diff --git a/server/main.c b/server/main.c index f899f7e..b6745e0 100644 --- a/server/main.c +++ b/server/main.c @@ -1,62 +1,32 @@ -#include -#include -#include -#include - -#include - +#include #define CONTAINER_IMPLEMENTATION +#define HSET_MAX_BUCKET_SIZE 4 #include #undef CONTAINER_IMPLEMENTATION -#include "channel.h" - -#define MAIN_PORT 8164 +#include "tcp.h" -enum request_type { - spawn_channel, - get_channels, -}; +#include -static int* open_sockets; -static int request_socket; +#define MAIN_PORT 8164 -void init(void) { - open_sockets = array_new(int, 0); - request_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(MAIN_PORT), .sin_addr = {INADDR_ANY}}; - - for (int retries = 0; retries <= 5; ++retries) { - if (bind(request_socket, (struct sockaddr*)&addr, sizeof(addr)) == 0) break; - else { - perror("init (bind)"); - sleep(1); - } +void setup_signal(void) { + struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = 0}; + if (sigaction(SIGUSR1, &signal, NULL) != 0) { + WHERE; + exit(EXIT_FAILURE); } -} - -enum request_type wait_for_requests(void) { - return spawn_channel; -} - -int spawn_channel_thread(void) { - return 0; -} - -void event_loop(void) { - init(); - while (1) { - enum request_type req = wait_for_requests(); - switch (req) { - case spawn_channel: break; - case get_channels: break; - } + signal.sa_handler = exit_tcp; + if (sigaction(SIGTERM, &signal, NULL) != 0) { + WHERE; + exit(EXIT_FAILURE); } } int main(int argc, char *argv[]) { (void)argc; (void)argv; - thread_loop(); + setup_signal(); + tcp_loop(); return 0; } diff --git a/server/tcp.c b/server/tcp.c new file mode 100644 index 0000000..0d6e118 --- /dev/null +++ b/server/tcp.c @@ -0,0 +1,285 @@ +#include "tcp.h" +#include +#include +#include + +#define report_error(socket, error) \ + do { \ + DEBUGF("error on socket %i: %i\n", socket, error); \ + write(socket, &zerozu, sizeof(zerozu)); \ + enum commd_error __ecpy = htonl(error); \ + write(socket, &__ecpy, sizeof(__ecpy)); \ + return; \ + } while (0) + +#define return_error(err) \ + do { \ + *e = err; \ + return 0; \ + } while (0) + +struct hash_set channels; +struct hash_set users; +static struct tcp_user varusr = {0}; +static struct tcp_channel varchnl = {0}; +static u64 lalloc_usr = 0; +static int udpctlfd = 0; +static const size_t zerozu = 0; +static bool should_exit = false; + +void print_state(int _) { + (void)_; +#ifdef DEBUG + fputs("printing server state.\n hash_map users {\n", stderr); + struct hash_set_iter iter; + for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_user *curr = hseti_get(&iter); + fprintf(stderr, "\ttcp_user {.id=%zu, .permissions=%u, .channel=%zu}\n", curr->id, curr->permissions, + curr->joined_channel); + } + fputs("}\nhash_map channels {\n", stderr); + for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_channel *curr = hseti_get(&iter); + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + getsockname(curr->fd, (struct sockaddr *)&addr, &addrlen); + fprintf(stderr, "\ttcp_channel {.id=%zu, .fd=%u, .channel=%zu} [port=%hu]\n", curr->id, curr->fd, + curr->owner, addr.sin_port); + } + fputs("}\n", stderr); +#endif +} +void exit_tcp(int _) { + (void)_; + should_exit = true; + DEBUGF("EXITING SERVER, setting `should_exit (%p)` to %i\n", (void*)&should_exit, (int)should_exit); +} + +static int tcp_user_cmp(const struct tcp_user *a, const struct tcp_user *b) { return (a->id - b->id) ? 1 : 0; } +static int tcp_channel_cmp(const struct tcp_channel *a, const struct tcp_channel *b) { return (a->id - b->id) ? 1 : 0; } +static size_t tcp_user_hash(const struct tcp_user *a) { return a->id; } +static size_t tcp_channel_hash(const struct tcp_channel *a) { return a->id; } +static int set_sock_timeout(int fd, int ms) { + struct timeval timeout; + timeout.tv_sec = ms / 1000; + timeout.tv_usec = (ms % 1000) * 1000; + return setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); +} +static void init_statics(void) { + channels = + hset_new(sizeof(struct tcp_channel), (hset_equal_fn)&tcp_channel_cmp, (hset_hash_fn)&tcp_channel_hash); + users = hset_new(sizeof(struct tcp_user), (hset_equal_fn)&tcp_user_cmp, (hset_hash_fn)&tcp_user_hash); + udpctlfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +} +static int setup_socket(unsigned short port) { + int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + int flag = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag)) goto error; + struct sockaddr_in localaddr = { + .sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {htonl(INADDR_ANY)}}; + if (bind(sock, (struct sockaddr *)&localaddr, sizeof(localaddr))) goto error; + return sock; +error: + perror("TCP thread failed to initialize"); + exit(EXIT_FAILURE); + +} +static void init_admin(u64 aid) { + struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = perm_admin, .joined_channel = 0}; + hset_insert_copy(&users, &u); +} +static bool has_4bytes_0xff(u64 id) { + return (unsigned int)(id >> 32) == 0xFFFFFFFF || (unsigned int)(id & 0xFFFFFFFF) == 0xFFFFFFFF; +} +static u64 get_uniq_id(struct hash_set *set) { + // while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr; + varusr.id = lalloc_usr; + while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(lalloc_usr)) ++varusr.id; + lalloc_usr = varusr.id; + return lalloc_usr; +} +static unsigned short get_channel_port(u64 id) { + varchnl.id = id; + struct tcp_channel *ch = hset_at(&channels, &varchnl); + if (ch == NULL) return 0; + struct sockaddr_in a; + socklen_t len = sizeof a; + getsockname(ch->fd, (struct sockaddr *)&a, &len); + return a.sin_port; +} +static bool user_has_permission(u64 uid, unsigned int perm) { + varusr.id = uid; + struct tcp_user *u = hset_at(&users, &varusr); + if (u == NULL) return false; + unsigned int uperm = u->permissions; + // bitwise implication must yield all ones (0xFFFFFFFF). + // Inverse it for easier check + return (perm & ~uperm) == 0; +} +static u64 send_channels(int sockfd, enum commd_error *e) { + struct hash_set_iter iter; + u64 array_length = hton64(hset_count(&channels)); + if (write(sockfd, &array_length, sizeof(array_length)) != sizeof(array_length)) return_error(ERR_SERV); + for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) { + struct tcp_channel *c = hseti_get(&iter); + u64 chid = hton64(c->id); + if (write(sockfd, &chid, sizeof(chid)) != sizeof(chid)) return_error(ERR_SERV); + } + // the leading zero is written by the caller + return_error(ERR_SUCCESS); // actually returns success but... +} +static inline u64 commd_register_process(struct commd_register *cmd, enum commd_error *e) { + /* fprintf(stderr, "%s: auid=%zu; perm=%zu\n", "commd_register_process", cmd->auid, cmd->perm); */ + if (!user_has_permission(cmd->auid, perm_join_user | cmd->perm)) return_error(ERR_ACCESS); + struct tcp_user new_user = { + .id = get_uniq_id(&users), .joined_channel = 0, .permissions = (unsigned int)cmd->perm}; + hset_insert_copy(&users, &new_user); + return new_user.id; +} +static inline u64 commd_unregister_process(struct commd_unregister *cmd, enum commd_error *e) { + /* DEBUGF("delete user %zu (admin %zu)\n", cmd->uid, cmd->auid); */ + if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, perm_unregister_user)) return_error(ERR_ACCESS); + varusr.id = cmd->uid; + hset_remove(&users, &varusr); + return cmd->uid; +} +static inline u64 commd_create_process(struct commd_create *cmd, enum commd_error *e) { + if (!user_has_permission(cmd->uid, perm_add_channel)) return_error(ERR_ACCESS); + + u64 chid; + int sock = -1; + unsigned short port = 0; + { + pthread_mutex_t sock_mx = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t sock_cond = PTHREAD_COND_INITIALIZER; + struct thread_loop_arg arg = { + .owner = cmd->uid, .sock_dest = &sock, .sock_mx = &sock_mx, .sock_ready_cond = &sock_cond}; + pthread_mutex_lock(&sock_mx); + chid = spawn_channel(&arg); + DEBUG("\n"); + while (sock == -1 || port == 0) pthread_cond_wait(&sock_cond, &sock_mx); + DEBUG("\n"); + pthread_mutex_unlock(&sock_mx); + } + struct tcp_channel new_channel = {.owner = cmd->uid, .name = NULL, .fd = sock}; + hset_insert_copy(&channels, &new_channel); + return chid; +} +static inline u64 commd_delete_process(struct commd_delete *cmd, enum commd_error *e) { + DEBUGF("received command%p\n", (void *)cmd); + varchnl.id = cmd->chid; + struct tcp_channel *c = hset_at(&channels, &varchnl); + if (c == NULL) return_error(ERR_PARAM); + if (cmd->uid != c->owner && !user_has_permission(cmd->uid, perm_unadd_channel)) return_error(ERR_ACCESS); + hset_remove(&channels, &varchnl); + return varchnl.id; +} +static inline u64 commd_join_process(struct commd_join *cmd, enum commd_error *e) { + if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, perm_join_user)) return_error(ERR_ACCESS); + struct kv_system_packet packet = { + .magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_JOIN), .user_id = cmd->juid}; + if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV); + return (u64)get_channel_port(cmd->chid); +} +static inline u64 commd_leave_process(struct commd_leave *cmd, enum commd_error *e) { + if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, perm_kick_user)) return_error(ERR_ACCESS); + struct kv_system_packet packet = { + .magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_LEAVE), .user_id = cmd->luid}; + if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV); + return 1; +} +/// switches on command type and operates accordingly +static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error *NONNULL e) { + // network byte order conversion + switch (type) { + case CMD_LEAVE: + case CMD_JOIN: ((struct commd_conv *)cmd)->_3 = ntoh64(((struct commd_conv *)cmd)->_3); FALLTHROUGH; + case CMD_UNREGISTER: + case CMD_REGISTER: + case CMD_DELETE: ((struct commd_conv *)cmd)->_2 = ntoh64(((struct commd_conv *)cmd)->_2); FALLTHROUGH; + case CMD_CREATE: + case CMD_GET_PORT: ((struct commd_conv *)cmd)->_1 = ntoh64(((struct commd_conv *)cmd)->_1); FALLTHROUGH; + case CMD_GET_CHANNELS:; + } + // processing + switch (type) { + case CMD_REGISTER: return commd_register_process((struct commd_register *)cmd, e); + case CMD_UNREGISTER: return commd_unregister_process((struct commd_unregister *)cmd, e); + case CMD_CREATE: return commd_create_process((struct commd_create *)cmd, e); + case CMD_DELETE: return commd_delete_process((struct commd_delete *)cmd, e); + case CMD_JOIN: return commd_join_process((struct commd_join *)cmd, e); + case CMD_LEAVE: return commd_leave_process((struct commd_leave *)cmd, e); + case CMD_GET_PORT: return (u64)get_channel_port(((struct commd_get_port *)cmd)->cihd); + case CMD_GET_CHANNELS: return_error(ERR_DO_IT_YOURSELF); + } + return_error(ERR_PARAM); +} +static void process_connection(int sockfd) { + DEBUG("PROCESSING CONNECTION\n"); + // TODO: protection against blocking reads + enum commd_type type; + if (read(sockfd, &type, sizeof(type)) != sizeof(type)) report_error(sockfd, ERR_INVAL); + type = ntohl(type); + struct commd cmd; + memset(&cmd, 0, sizeof(cmd)); // TODO: consider to remove + i64 commd_size = commd_size_lookup[type]; + if (read(sockfd, &cmd, commd_size) != commd_size) report_error(sockfd, ERR_INVAL); + enum commd_error e = ERR_SUCCESS; + u64 cmd_status = process_cmd(type, &cmd, &e); + if (e == ERR_DO_IT_YOURSELF) cmd_status = send_channels(sockfd, &e); + cmd_status = hton64(cmd_status); + if (e != ERR_SUCCESS) report_error(sockfd, e); + write(sockfd, &cmd_status, sizeof(cmd_status)); +} + +u64 spawn_channel(struct thread_loop_arg *arg) { + pthread_t thread; + pthread_create(&thread, NULL, thread_loop, arg); + return thread; +} +bool sendto_channel(u64 chid, struct kv_system_packet *packet, int wait_ack_ms, int repeat) { + bool success = wait_ack_ms == 0; + varchnl.id = chid; + struct tcp_channel *ch = hset_at(&channels, &varchnl); + if (ch == NULL) return false; + set_sock_timeout(udpctlfd, wait_ack_ms); + + struct sockaddr_in chaddr = {0}; + socklen_t len = sizeof(chaddr); + getsockname(ch->fd, (struct sockaddr *)&chaddr, &len); + do { + sendto(udpctlfd, packet, KV_PACKET_SIZE, 0, (struct sockaddr *)&chaddr, len); + if (wait_ack_ms == 0) continue; + struct kv_system_packet resp; + recvfrom(udpctlfd, &resp, KV_PACKET_SIZE, 0, (struct sockaddr *)&chaddr, &len); + if (errno == EWOULDBLOCK || errno == EAGAIN) continue; + if (resp.operation_id == SYS_ACK) success = true; + } while (--repeat >= 0); + + return success; +} +void tcp_loop(void) { + init_statics(); + init_admin(ADMIN_UID); + int sock = setup_socket(TCP_PORT); + if (listen(sock, LISTEN_AMOUNT) != 0) { + perror("listen on TCP socket failed"); + exit(EXIT_FAILURE); + } + DEBUGF("listening on port %hu\n", TCP_PORT); + struct sockaddr_in accept_addr; + socklen_t addrlen = sizeof(accept_addr); + int currfd; + while (!should_exit) { + currfd = accept(sock, (struct sockaddr *)&accept_addr, &addrlen); + if (currfd < 0) continue; + DEBUGF("accepted connection on port %hu\n", accept_addr.sin_port); + process_connection(currfd); + shutdown(currfd, SHUT_RDWR); + close(currfd); + } + close(sock); + close(udpctlfd); + hset_free(&users); + hset_free(&channels); +} diff --git a/server/tcp.h b/server/tcp.h new file mode 100644 index 0000000..e239e19 --- /dev/null +++ b/server/tcp.h @@ -0,0 +1,35 @@ +#ifndef KV_SERVER_TCP +#define KV_SERVER_TCP + +#include +#include +#include +#include +#include +#include +#include "channel.h" + +#include +#include + +#define TCP_PORT 8085 +#define LISTEN_AMOUNT 128 +#define TCP_MAX_WAIT_MS 10 +#define TCP_MAX_RETRIES 0 +#define ADMIN_UID 0 + + +/* val: struct tcp_channel */ +extern struct hash_set channels; +/* val: struct tcp_user */ +extern struct hash_set users; + +void print_state(int); +void exit_tcp(int); + +void tcp_loop(void); +u64 spawn_channel(struct thread_loop_arg *arg); +u64 spawn_channel_pool(void* arg); // TODO +bool sendto_channel(size_t chid, struct kv_system_packet* packet, int wait_ack_ms, int repeat); + +#endif // KV_SERVER_TCP diff --git a/test/general.c b/test/general.c new file mode 100644 index 0000000..d7673a3 --- /dev/null +++ b/test/general.c @@ -0,0 +1,79 @@ +#include +#include +#include + +#include "../server/tcp.h" + +#include +#include + +#define W printf("LINE %i\n", __LINE__) + +int commsock; +enum commd_error cerr; + +void setup_globals(void) { commsock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); } + +u64 sendtoserv(int sock, enum commd_type t, struct commd *cmd, enum commd_error * NULLABLE e) { + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + struct sockaddr_in serv = { + .sin_family = AF_INET, .sin_port = htons(TCP_PORT), .sin_addr = {htonl(INADDR_LOOPBACK)}}; + while (connect(sock, (struct sockaddr *)&serv, sizeof(serv)) != 0) sleep(1); + t = htonl(t); + if (send(sock, &t, sizeof(t), 0) <= 0) { *e = ERR_SERV; return 0; } + if (send(sock, cmd, sizeof(*cmd), 0) <= 0) { *e = ERR_SERV; return 0; } + + u64 resp; + if (recv(sock, &resp, sizeof(resp), 0) <= 0) {*e = ERR_SERV; return 0; } + resp = ntoh64(resp); + if (resp == 0 && e != NULL) { + read(sock, e, sizeof(*e)); + *e = ntohl(*e); + } + shutdown(sock, SHUT_RDWR); + close(sock); + return resp; +} + +u64 register_user(i32 perm) { + struct commd_register r = {.auid = hton64(0), .perm = hton64(perm)}; + if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno)); + return sendtoserv(commsock, CMD_REGISTER, (struct commd *)&r, &cerr); +} + +void unreg_user(u64 u) { + struct commd_unregister unreg = {.auid = hton64(u), .uid = hton64(u)}; + u64 resp = sendtoserv(commsock, CMD_UNREGISTER, (struct commd *)&unreg, &cerr); + if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno)); + printf("sendtoserv returned %zu\n", resp); +} + +u64 create_channel(u64 u) { + struct commd_create c = {.uid = hton64(u)}; + u64 resp = sendtoserv(commsock, CMD_CREATE, (struct commd *)&c, &cerr); + if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s\n", cerr, kv_strerror(cerr)); + return resp; +} + +void* thr(void* a) { + (void)a; + u64 uid = register_user(perm_none); + printf("uid: %zu\n", uid); + unreg_user(uid); + uid = register_user(perm_admin); + printf("uid: %zu\n", uid); + u64 chid = create_channel(uid); + printf("chid: %zu\n", chid); + return NULL; +} + +int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + for (int i = 0; i < 1; ++i) { + pthread_t t; + pthread_create(&t, NULL, thr, NULL); + } + sleep(1); + return 0; +} -- cgit v1.2.3-70-g09d2