aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjustanothercatgirl <sotov@twistea.su>2025-02-08 22:13:52 +0300
committerjustanothercatgirl <sotov@twistea.su>2025-02-08 22:13:52 +0300
commitcab382db088d9f240253466a1c5a26c62f3967c8 (patch)
treec5502f1b49211dccd3e29e163e708708495a6407
parent3eeee14d5d5c93ae3d156aabae5a96d1c09f185a (diff)
Implemented multiplexing in main thread (TCP loop). Removed CMake files.HEADmaster
TODO: Implement multiplexing in worker threads (UDP loops), implement channel_pool interface.
-rw-r--r--.gitignore2
-rw-r--r--CMakeLists.txt16
-rw-r--r--Makefile18
-rw-r--r--README.md4
-rw-r--r--client/CMakeLists.txt15
-rw-r--r--compile_flags.txt1
-rw-r--r--doc/PROTOCOLS10
-rw-r--r--doc/channel_multiplexing84
m---------include/c_headers0
-rw-r--r--include/kv.h1
-rw-r--r--include/packet.c24
-rw-r--r--include/packet.h25
-rw-r--r--server/CMakeLists.txt21
-rw-r--r--server/channel.c41
-rw-r--r--server/channel.h4
-rw-r--r--server/main.c17
-rw-r--r--server/tcp.c335
-rw-r--r--server/tcp.h41
-rw-r--r--test/general.c26
19 files changed, 513 insertions, 172 deletions
diff --git a/.gitignore b/.gitignore
index 52aa982..50aa283 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,5 @@ build/
.cache/
*.o
+core*
+.cache/*
diff --git a/CMakeLists.txt b/CMakeLists.txt
deleted file mode 100644
index a7d9c5a..0000000
--- a/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-# CMake files will be removed in the next several commits
-
-cmake_minimum_required(VERSION 3.20)
-
-project(kv
- VERSION 0.0
- DESCRIPTION "Just see the readme file in this repo"
- LANGUAGES C)
-
-set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
-include_directories(include include/c_headers/include)
-
-add_compile_options(-Wall -Wextra -Wpedantic -Werror -g)
-
-add_subdirectory(server)
-add_subdirectory(client)
diff --git a/Makefile b/Makefile
index 346f1f4..6db2190 100644
--- a/Makefile
+++ b/Makefile
@@ -8,18 +8,20 @@ CLINKS += -lc -lpthread
# Where to build stuff
BLDDIR ?= build
TSTDIR ?= $(BLDDIR)/tests
+# Library files
+LIBFS = include/c_headers/include/container.h include/c_headers/include/rstypes.h
-# This variable's value is NOT checked, only it's existence is checked.
-# If you define it as `DEBUG = Off`, it will still build in debug mode
-DEBUG = On
+# TODO: remove in production build
+DEBUG ?= 1
-ifdef $(DEBUG)
- CFLAGS += -DDBG -ggdb -g
+ifeq ($(DEBUG), 1)
+ CFLAGS += -DDBG -DLOG_DEBUG -ggdb -g
endif
.PHONY: clean all tests run_tests
all: $(BLDDIR) $(BLDDIR)/server $(BLDDIR)/client tests
+ @echo DEBUG=$(DEBUG), CFLAGS=$(CFLAGS)
$(BLDDIR):
mkdir -p $(BLDDIR)
@@ -32,15 +34,15 @@ $(BLDDIR)/packet.o: include/packet.c
$(CC) -c $< $(CFLAGS) $(CLIBS) -o $@
# Server object files
-$(BLDDIR)/s_%.o: server/%.c
+$(BLDDIR)/s_%.o: server/%.c $(LIBFS)
$(CC) -c $< $(CFLAGS) $(CLIBS) -o $@
# client object files
-$(BLDDIR)/c_%.o: client/%.c
+$(BLDDIR)/c_%.o: client/%.c $(LIBFS)
$(CC) -c $< $(CFLAGS) $(CLIBS) -o $@
# test object files
-$(TSTDIR)/%.o: test/%.c
+$(TSTDIR)/%.o: test/%.c $(LIBFS)
$(CC) -c $< $(CFLAGS) $(CLIBS) -o $@
# server executable
diff --git a/README.md b/README.md
index 3239c69..8cb3989 100644
--- a/README.md
+++ b/README.md
@@ -13,9 +13,9 @@ Can be found in `doc/`. For now, only protocols are somewhat documented
along with all object files. If you want to customize your build destination, run
`make BLDDIR=mybuild` or customize the variable inside the makefile directly
-To build in debug mode, `make DEBUG=On`. Note that while in development,
+To build in debug mode, `make DEBUG=1`. Note that while in development,
`DEBUG` is already defined in makefile, so for now if you want a build without
-debug information, please remove `DEBUG = On` line from the Makefile
+debug information, please remove `DEBUG = 1` from makefile or run `make DEBUG=0`
Note: I have CMakeLists.txt for now, but they are not maintained and I will
remove them in next few commits
diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt
deleted file mode 100644
index 266ddf1..0000000
--- a/client/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# CMake files will be removed in the next several commits
-
-cmake_minimum_required(VERSION 3.20)
-
-project(kv_client
- VERSION 0.0
- DESCRIPTION "I will work on this one later"
- LANGUAGES C)
-
-set(SOURCE_FILES main.c ../include/packet.c)
-set(HEADER_FILES ../include/packet.h)
-
-add_executable(kv_client ${SOURCE_FILES})
-target_compile_definitions(kv_client PUBLIC DEBUG)
-target_link_libraries(kv_client PRIVATE opus c)
diff --git a/compile_flags.txt b/compile_flags.txt
index 5d90624..e6e75eb 100644
--- a/compile_flags.txt
+++ b/compile_flags.txt
@@ -1,7 +1,6 @@
-Wall
-Wextra
-Wpedantic
--Werror
-ggdb
-Wno-language-extension-token
-Wno-empty-body
diff --git a/doc/PROTOCOLS b/doc/PROTOCOLS
index c15a83d..996bf0c 100644
--- a/doc/PROTOCOLS
+++ b/doc/PROTOCOLS
@@ -72,11 +72,11 @@ permission required.
arguments. Returns an array as follows: first 8 bytes are uint64 array_size,
next 8*array_size bytes are channels. Last 8 bytes are zeroes (they are after
the last channel). Possible memory layout (divided in 8 byte [64 bit] blocks):
-| 5 | 69 | 70 | 71 | 72 | 42 | 0 |
- ^ ^ ^ ^
- | |-start of data | |-leading zero
- |-size of array |-end of data
-1.3. Functions.
+| 5 | 69 | 70 | 71 | 727 | 420 | 0 |
+ ^ ^ ^ ^
+ | |-start of data | |-leading zero
+ |-size of array |-end of data
+1.3. Functions.
As the interface mainly uses `uint64` type for communications, there are 2
functions that are for some reason not present in linux api:
`uint64 hton64(uint64)`: Host TO Network: convert uint64 to
diff --git a/doc/channel_multiplexing b/doc/channel_multiplexing
new file mode 100644
index 0000000..a1ba681
--- /dev/null
+++ b/doc/channel_multiplexing
@@ -0,0 +1,84 @@
+This document is mostly for the developer to settle down ideas that are sitting
+in his mind.
+
+Server structure:
+Main thread: accepting TCP socket and reading-writing TCP sockets created by
+an accepting one. TODO: rewrite so that the entire system is using poll.
+
+Other `get_nprocs() - 1` threads (if `get_nprocs() == 1`, then only 1
+additional thread) are channel pools.
+
+Additionally, main thread maintains a list of all channels and threads
+associated with them. When new channel is requested, is is created on a thread
+that is maintaining the least amount of channels at the time.
+
+Each channel pool consists of an anonymous UNIX domain socket (which is used
+for recieving system messages from main thread) and some amount of INET domain
+sockets. All of them are switched on using `poll` system call.
+
+Each channel in the channel pool has an associated list of peers with it. When
+data is recieved on a channel, it is forwarded to every peer (except the
+sender).
+
+The basic structure of the loop should look something like this:
+
+```
+fds = {timerfd, master socket, udp1, udp2, ...};
+while (poll(fds, nfds, -1) > 0) {
+ if (socket is master and has data) process_system_commands();
+ if (timerfd is ready) check_keepalives(all sockets);
+ for (every socket that has data)
+ handle_peers(socket);
+}
+
+void process_system_commands() {
+ command = read(master);
+ switch (parse_command()) {
+ case add_user: addto(channel, user); break;
+ case add_channel: array_append(channels, new_channel()); break;
+ case remove_user: rmfrom(channel, user); break;
+ case remove_channel: array_pop(channel); break;
+ }
+}
+
+void handle_peers(int socket) {
+ data = read(socket);
+ for (every peer of socket)
+ sendto(socket, data, peer, O_NONBLOCK); // MSG_DONTWAIT, maybe?
+}
+
+void check_keepalives(int socket) {
+ for (every peer of socket) {
+ if (keepalive of peer too old) rmfrom(channel, user);
+ }
+}
+```
+
+Data structures:
+inside main thread:
+ struct channel{u64 id, int port};
+ struct channel_pool {
+ u16 num,
+ int master_pipe,
+ u64 thread_id,
+ array<struct channel> chs
+ };
+ // modern linux kernel has limit on maximim amount of cores:
+ // 8192. this would break 65535 port limit very fast anyways.
+ array<struct channel_pool> threads;
+inside pool:
+ struct ch_user { u64 id, u32 ip, u16 port, u64 last_keepalive };
+ struct channel {
+ u64 id,
+ u64 owner,
+ int udo,
+ hash_set<ch_user>* users
+ };
+ array<struct channel> channels;
+ int master = pipe
+
+Communication between threads will be done through pipes. Yes, this will be
+inefficient in a sense that there will be 2 file descriptors opened for each
+thread, but what are other options? Yes, none. If there are, mail this to me
+please
+
diff --git a/include/c_headers b/include/c_headers
-Subproject 8989b7dc005af7960aa66fd99b197a3b0b7a4fb
+Subproject ff8ab43201e5c862003d9353016409c13691562
diff --git a/include/kv.h b/include/kv.h
index ff28ce3..52f27b5 100644
--- a/include/kv.h
+++ b/include/kv.h
@@ -5,3 +5,4 @@
#define KV_SERVER_ADDRESS_INT ((95 << 24) | (164 << 16) | (2 << 8) | (199 << 0))
#endif // JUSTANOTHERCATGIRL_KV_H
+/* vim: set ts=8 noet: */
diff --git a/include/packet.c b/include/packet.c
index a523285..45ebcd1 100644
--- a/include/packet.c
+++ b/include/packet.c
@@ -1,14 +1,17 @@
#include "packet.h"
-i64 const commd_size_lookup[64] = {[CMD_CREATE] = sizeof(struct commd_create),
- [CMD_DELETE] = sizeof(struct commd_delete),
- [CMD_JOIN] = sizeof(struct commd_join),
- [CMD_LEAVE] = sizeof(struct commd_leave),
- [CMD_REGISTER] = sizeof(struct commd_register),
- [CMD_UNREGISTER] = sizeof(struct commd_unregister),
- [CMD_GET_PORT] = sizeof(struct commd_get_port),
- [CMD_GET_CHANNELS] = 0, // You can not get a sizeof(void)
- 0};
+i64 const commd_size_lookup[64] = {
+ [CMD_CREATE] = sizeof(struct commd_create),
+ [CMD_DELETE] = sizeof(struct commd_delete),
+ [CMD_JOIN] = sizeof(struct commd_join),
+ [CMD_LEAVE] = sizeof(struct commd_leave),
+ [CMD_REGISTER] = sizeof(struct commd_register),
+ [CMD_UNREGISTER] = sizeof(struct commd_unregister),
+ [CMD_GET_PORT] = sizeof(struct commd_get_port),
+ [CMD_GET_CHANNELS] = 0, // You can not get a sizeof(void)
+ [CMD_LAST] = -1,
+ 0
+};
u32 system_packet_checksum(struct kv_system_packet *packet)
{
@@ -61,6 +64,9 @@ const char *kv_strerror(enum commd_error e)
return "Not implemented";
case ERR_DO_IT_YOURSELF:
return "You should not have recieved this error. This is either server or network fault";
+ case ERR_TYPE:
+ return "No such command";
}
return "Unknown error";
}
+/* vim: set ts=8 noet: */
diff --git a/include/packet.h b/include/packet.h
index f1b1e60..01c006e 100644
--- a/include/packet.h
+++ b/include/packet.h
@@ -15,6 +15,9 @@
#define KV_PACKET_SIZE 512
+// These macros became obsolete due to log.h header in c_headers
+// TODO: remove
+/*
#ifdef DBG
# define DEBUGF(fmt, ...) fprintf(stderr, "DEBUG: %s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__)
# define DEBUG(msg) fprintf(stderr, "DEBUG: %s:%d:%s(): " msg, __FILE__, __LINE__, __func__)
@@ -24,6 +27,7 @@
# define DEBUG(fmt)
# define WHERE
#endif
+*/
#ifdef __clang__
# pragma GCC diagnostic ignored "-Wnullability-extension" // SHUT UP I'VE ALREADY PUT IT BEHIND A MACRO
# define NONNULL _Nonnull
@@ -33,6 +37,7 @@
# define NULLABLE
#endif
+
#include <arpa/inet.h>
#include <stddef.h>
#include <rstypes.h>
@@ -76,14 +81,14 @@ u32 system_packet_checksum(struct kv_system_packet *packet);
u8 is_system_packet(struct kv_packet *p);
enum permissions {
- perm_none = 0,
- perm_register_user = 1 << 1,
- perm_unregister_user = 1 << 2,
- perm_add_channel = 1 << 3,
- perm_unadd_channel = 1 << 4,
- perm_join_user = 1 << 5,
- perm_kick_user = 1 << 6,
- perm_admin = 0x7FFFFFFF,
+ PERM_NONE = 0,
+ PERM_REGISTER_USER = 1 << 1,
+ PERM_UNREGISTER_USER = 1 << 2,
+ PERM_ADD_CHANNEL = 1 << 3,
+ PERM_UNADD_CHANNEL = 1 << 4,
+ PERM_JOIN_USER = 1 << 5,
+ PERM_KICK_USER = 1 << 6,
+ PERM_ADMIN = 0x7FFFFFFF,
};
enum commd_error {
@@ -93,6 +98,8 @@ enum commd_error {
ERR_SERV,
// Error in parameters (e.g. nonexistant UID)
ERR_PARAM,
+ // The command does not exist
+ ERR_TYPE,
// Invalid parameters (e.g. not enough params)
ERR_INVAL,
// Access violation
@@ -125,6 +132,7 @@ enum commd_type {
CMD_UNREGISTER,
CMD_GET_PORT,
CMD_GET_CHANNELS,
+ CMD_LAST,
};
// Somehow, this is pretty similar to linux sockaddr.
struct commd {
@@ -192,3 +200,4 @@ u64 ntoh64(u64 a);
const char* kv_strerror(enum commd_error e);
#endif // KV_PACKET_H
+/* vim: set ts=8 noet: */
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt
deleted file mode 100644
index f63f897..0000000
--- a/server/CMakeLists.txt
+++ /dev/null
@@ -1,21 +0,0 @@
-# CMake files will be removed in the next several commits
-
-cmake_minimum_required(VERSION 3.20)
-
-project(kv_server
- VERSION 0.0
- DESCRIPTION "A server for kv project."
- LANGUAGES C)
-
-set(SOURCE_FILES main.c channel.c tcp.c ../include/packet.c)
-set(HEADER_FILES channel.h tcp.h)
-
-add_executable(kv_server ${SOURCE_FILES})
-target_compile_definitions(kv_server PUBLIC DEBUG)
-target_link_libraries(kv_server
- PUBLIC crypto
- PUBLIC pthread
-)
-
-
-
diff --git a/server/channel.c b/server/channel.c
index a0e2779..63ef11a 100644
--- a/server/channel.c
+++ b/server/channel.c
@@ -2,10 +2,11 @@
#include <packet.h>
#include <container.h>
+#include <log.h>
#include <errno.h>
#include <stdlib.h>
-#define decisive_push(array, index, elem) \
+#define maybe_push(array, index, elem) \
do { \
if (index >= array_size(array)) { \
array_push(array, elem); \
@@ -35,6 +36,7 @@ struct ch_user {
static __THREAD struct hash_set users;
static __THREAD int sockfd;
+
static int __user_hset_cmp(const void *a, const void *b) {
struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b;
return _a->id - _b->id;
@@ -54,17 +56,17 @@ bool channel_init(void) {
(sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 &&
bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0;
if (!chain_result) {
- perror("channel init failed");
+ LERRP("channel init failed");
if (hset_ok(users)) hset_free(&users);
if (sockfd >= 0) close(sockfd);
return false;
}
- DEBUGF("Channel [%i] created\n", sockfd);
+ LDEBUGF("Channel [%i] created", sockfd);
return true;
}
static void channel_uninit(void) {
hset_free(&users);
- if (close(sockfd) == -1) perror("could not gracefully uninitialize channel");
+ if (close(sockfd) == -1) LERRP("couldPnot gracefully uninitialize channel");
}
_Noreturn static inline void forced_cleanup(struct kv_packet **packets) {
clear_packet_array(packets);
@@ -118,7 +120,7 @@ static void send_packets_back(struct kv_packet **packets) {
.sin_port = htons(current_user->port),
.sin_addr = {htonl(current_user->ip)}};
for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) {
- DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid,
+ LDEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu", packets[j]->uid,
(destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF,
(destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF,
destination.sin_port);
@@ -126,14 +128,21 @@ static void send_packets_back(struct kv_packet **packets) {
int error_code = sendto(
sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination,
sizeof(destination));
- if (error_code) perror("could not send packets back");
+ if (error_code) LWARNP("could not send packets back");
}
}
}
/*
* An example of how you should NOT write code
- * Todo: please rewrite this shit
+ * Todo: please rewrite this shi
+ * Also, this started to busy-loop...
+ * As i've determined, this requires a complete re-write, so this is what i'm gonna be doing now
+ *
+ * Actually, am I reading this correct? Did I write this at the time when I thought
+ * UDP could magically broadcast my packets? Why it is this complex dude...
+ *
+ * I am abandoning this place in code and am implementing channel_pool
*/
void *thread_loop(void *arg) {
if (!channel_init()) pthread_exit(NULL);
@@ -143,6 +152,7 @@ void *thread_loop(void *arg) {
size_t recvd_index = 0;
int recv_flag = MSG_DONTWAIT;
while (true) {
+ LDEBUGV("I am looping");
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
ssize_t recvlength = recvfrom(
@@ -158,16 +168,21 @@ void *thread_loop(void *arg) {
struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE);
memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE);
++recvd_index;
- decisive_push(recvd_data, recvd_index, kv_copy);
+ maybe_push(recvd_data, recvd_index, kv_copy);
} else if (errno == EWOULDBLOCK) {
- if (array_size(recvd_data) == 0) {
- recv_flag &= ~MSG_DONTWAIT;
- continue;
- }
+ recv_flag &= ~MSG_DONTWAIT;
+ if (recvd_index == 0) continue;
send_packets_back(recvd_data);
clear_packet_array(recvd_data);
+ recvd_index = 0;
} else {
- perror("error in thread_loop");
+ LWARNP("error in thread_loop");
}
}
}
+
+void* new_thread_loop(void* arg) {
+ return arg;
+}
+
+/* vim: set ts=8 noet: */
diff --git a/server/channel.h b/server/channel.h
index 8068a4c..549ff21 100644
--- a/server/channel.h
+++ b/server/channel.h
@@ -18,10 +18,12 @@ struct thread_loop_arg {
pthread_mutex_t *sock_mx;
pthread_cond_t *sock_ready_cond;
u64 owner;
- const unsigned char *pubkey;
+ int pipefd;
};
// main function that manages every channel
void *thread_loop(void *);
+void *new_thread_loop(void *);
#endif // KV_SERVER_CHANNEL_H
+/* vim: set ts=8 noet: */
diff --git a/server/main.c b/server/main.c
index b6745e0..0d8d9f3 100644
--- a/server/main.c
+++ b/server/main.c
@@ -3,6 +3,7 @@
#define HSET_MAX_BUCKET_SIZE 4
#include <container.h>
#undef CONTAINER_IMPLEMENTATION
+#include <log.h>
#include "tcp.h"
@@ -11,22 +12,18 @@
#define MAIN_PORT 8164
void setup_signal(void) {
- struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = 0};
- if (sigaction(SIGUSR1, &signal, NULL) != 0) {
- WHERE;
- exit(EXIT_FAILURE);
- }
+ struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = SA_RESTART};
+ if (sigaction(SIGUSR1, &signal, NULL) != 0) LFAILV("sigaction");
signal.sa_handler = exit_tcp;
- if (sigaction(SIGTERM, &signal, NULL) != 0) {
- WHERE;
- exit(EXIT_FAILURE);
- }
+ if (sigaction(SIGTERM, &signal, NULL) != 0) LFAILV("sigaction");
}
int main(int argc, char *argv[]) {
(void)argc;
(void)argv;
setup_signal();
- tcp_loop();
+ new_tcp_loop();
return 0;
}
+
+/* vim: set ts=8 noet: */
diff --git a/server/tcp.c b/server/tcp.c
index 0d6e118..09cda70 100644
--- a/server/tcp.c
+++ b/server/tcp.c
@@ -1,11 +1,15 @@
#include "tcp.h"
+#include <assert.h>
+#include <log.h>
#include <errno.h>
#include <netinet/in.h>
#include <stddef.h>
+#include <sys/poll.h>
+#include <sys/fcntl.h>
#define report_error(socket, error) \
do { \
- DEBUGF("error on socket %i: %i\n", socket, error); \
+ LDEBUGF("error on socket %i: %i", socket, error); \
write(socket, &zerozu, sizeof(zerozu)); \
enum commd_error __ecpy = htonl(error); \
write(socket, &__ecpy, sizeof(__ecpy)); \
@@ -18,26 +22,36 @@
return 0; \
} while (0)
+#define process_error(pfd_p, state_p, error) \
+ do { \
+ pfd_p->events &= ~POLLIN; \
+ pfd_p->events |= POLLOUT; \
+ state_p->err = htonl(error); \
+ state_p->progress = PROG_ERRR; \
+ return; \
+ } while (0)
+
+struct hash_set channel_pools;
struct hash_set channels;
struct hash_set users;
static struct tcp_user varusr = {0};
static struct tcp_channel varchnl = {0};
static u64 lalloc_usr = 0;
static int udpctlfd = 0;
-static const size_t zerozu = 0;
+static const u64 zerozu = 0;
static bool should_exit = false;
void print_state(int _) {
(void)_;
-#ifdef DEBUG
- fputs("printing server state.\n hash_map<struct tcp_user> users {\n", stderr);
+#ifdef DBG
+ fputs("printing server state.\n hash_set<struct tcp_user> users {\n", stderr);
struct hash_set_iter iter;
for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) {
struct tcp_user *curr = hseti_get(&iter);
fprintf(stderr, "\ttcp_user {.id=%zu, .permissions=%u, .channel=%zu}\n", curr->id, curr->permissions,
curr->joined_channel);
}
- fputs("}\nhash_map<struct tcp_channel> channels {\n", stderr);
+ fputs("}\nhash_set<struct tcp_channel> channels {\n", stderr);
for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) {
struct tcp_channel *curr = hseti_get(&iter);
struct sockaddr_in addr;
@@ -52,7 +66,7 @@ void print_state(int _) {
void exit_tcp(int _) {
(void)_;
should_exit = true;
- DEBUGF("EXITING SERVER, setting `should_exit (%p)` to %i\n", (void*)&should_exit, (int)should_exit);
+ LINFOF("EXITING SERVER, setting `should_exit (%p)` to %i", (void*)&should_exit, (int)should_exit);
}
static int tcp_user_cmp(const struct tcp_user *a, const struct tcp_user *b) { return (a->id - b->id) ? 1 : 0; }
@@ -75,26 +89,26 @@ static int setup_socket(unsigned short port) {
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int flag = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag)) goto error;
+ if (fcntl(sock, F_SETFL, O_NONBLOCK)) goto error;
struct sockaddr_in localaddr = {
.sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {htonl(INADDR_ANY)}};
if (bind(sock, (struct sockaddr *)&localaddr, sizeof(localaddr))) goto error;
+ if (listen(sock, LISTEN_AMOUNT)) goto error;
return sock;
error:
- perror("TCP thread failed to initialize");
- exit(EXIT_FAILURE);
-
+ LFAIL("TCP thread failed to initialize");
}
static void init_admin(u64 aid) {
- struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = perm_admin, .joined_channel = 0};
+ struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = PERM_ADMIN, .joined_channel = 0};
hset_insert_copy(&users, &u);
}
static bool has_4bytes_0xff(u64 id) {
return (unsigned int)(id >> 32) == 0xFFFFFFFF || (unsigned int)(id & 0xFFFFFFFF) == 0xFFFFFFFF;
}
static u64 get_uniq_id(struct hash_set *set) {
- // while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr;
+ /* while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr; */
varusr.id = lalloc_usr;
- while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(lalloc_usr)) ++varusr.id;
+ while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(varusr.id)) ++varusr.id;
lalloc_usr = varusr.id;
return lalloc_usr;
}
@@ -112,10 +126,11 @@ static bool user_has_permission(u64 uid, unsigned int perm) {
struct tcp_user *u = hset_at(&users, &varusr);
if (u == NULL) return false;
unsigned int uperm = u->permissions;
- // bitwise implication must yield all ones (0xFFFFFFFF).
- // Inverse it for easier check
+ /* bitwise implication must yield all ones (0xFFFFFFFF). */
+ /* Invert it for easier check */
return (perm & ~uperm) == 0;
}
+/* TODO: remove */
static u64 send_channels(int sockfd, enum commd_error *e) {
struct hash_set_iter iter;
u64 array_length = hton64(hset_count(&channels));
@@ -125,83 +140,85 @@ static u64 send_channels(int sockfd, enum commd_error *e) {
u64 chid = hton64(c->id);
if (write(sockfd, &chid, sizeof(chid)) != sizeof(chid)) return_error(ERR_SERV);
}
- // the leading zero is written by the caller
- return_error(ERR_SUCCESS); // actually returns success but...
+ /* the leading zero is written by the caller */
+ /* or is it? */
+ return_error(ERR_SUCCESS); /* actually returns success but... */
+}
+static u64* get_channel_array(void) {
+ /* consider later: store allocated buffer with all channels somewhere
+ * but I would need to avoid race conditions (channel added while I was writing) */
+ hset_iter iter;
+ size_t i = 0;
+ u64 len = hset_count(&channels);
+ u64* ret = array_new(u64, len + 2);
+ ret[i++] = hton64(len); /* ret[0]; i=1 */
+ for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) {
+ struct tcp_channel *c = hseti_get(&iter);
+ ret[i++] = c->id;
+ }
+ ret[i] = 0LU;
+ return ret;
}
static inline u64 commd_register_process(struct commd_register *cmd, enum commd_error *e) {
/* fprintf(stderr, "%s: auid=%zu; perm=%zu\n", "commd_register_process", cmd->auid, cmd->perm); */
- if (!user_has_permission(cmd->auid, perm_join_user | cmd->perm)) return_error(ERR_ACCESS);
+ if (!user_has_permission(cmd->auid, PERM_JOIN_USER | cmd->perm)) return_error(ERR_ACCESS);
struct tcp_user new_user = {
.id = get_uniq_id(&users), .joined_channel = 0, .permissions = (unsigned int)cmd->perm};
hset_insert_copy(&users, &new_user);
return new_user.id;
}
static inline u64 commd_unregister_process(struct commd_unregister *cmd, enum commd_error *e) {
- /* DEBUGF("delete user %zu (admin %zu)\n", cmd->uid, cmd->auid); */
- if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, perm_unregister_user)) return_error(ERR_ACCESS);
+ LDEBUGF("delete user %zu (admin %zu)", cmd->uid, cmd->auid);
+ if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, PERM_UNREGISTER_USER)) return_error(ERR_ACCESS);
varusr.id = cmd->uid;
hset_remove(&users, &varusr);
return cmd->uid;
}
static inline u64 commd_create_process(struct commd_create *cmd, enum commd_error *e) {
- if (!user_has_permission(cmd->uid, perm_add_channel)) return_error(ERR_ACCESS);
+ if (!user_has_permission(cmd->uid, PERM_ADD_CHANNEL)) return_error(ERR_ACCESS);
u64 chid;
int sock = -1;
- unsigned short port = 0;
{
pthread_mutex_t sock_mx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sock_cond = PTHREAD_COND_INITIALIZER;
+
struct thread_loop_arg arg = {
.owner = cmd->uid, .sock_dest = &sock, .sock_mx = &sock_mx, .sock_ready_cond = &sock_cond};
pthread_mutex_lock(&sock_mx);
chid = spawn_channel(&arg);
- DEBUG("\n");
- while (sock == -1 || port == 0) pthread_cond_wait(&sock_cond, &sock_mx);
- DEBUG("\n");
+ while (sock == -1) pthread_cond_wait(&sock_cond, &sock_mx);
pthread_mutex_unlock(&sock_mx);
}
- struct tcp_channel new_channel = {.owner = cmd->uid, .name = NULL, .fd = sock};
+ struct tcp_channel new_channel = {.id = chid, .owner = cmd->uid, .name = NULL, .fd = sock};
hset_insert_copy(&channels, &new_channel);
return chid;
}
static inline u64 commd_delete_process(struct commd_delete *cmd, enum commd_error *e) {
- DEBUGF("received command%p\n", (void *)cmd);
+ LDEBUGF("received command%p", (void *)cmd);
varchnl.id = cmd->chid;
struct tcp_channel *c = hset_at(&channels, &varchnl);
if (c == NULL) return_error(ERR_PARAM);
- if (cmd->uid != c->owner && !user_has_permission(cmd->uid, perm_unadd_channel)) return_error(ERR_ACCESS);
+ if (cmd->uid != c->owner && !user_has_permission(cmd->uid, PERM_UNADD_CHANNEL)) return_error(ERR_ACCESS);
hset_remove(&channels, &varchnl);
return varchnl.id;
}
static inline u64 commd_join_process(struct commd_join *cmd, enum commd_error *e) {
- if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, perm_join_user)) return_error(ERR_ACCESS);
+ if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, PERM_JOIN_USER)) return_error(ERR_ACCESS);
struct kv_system_packet packet = {
.magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_JOIN), .user_id = cmd->juid};
if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV);
return (u64)get_channel_port(cmd->chid);
}
static inline u64 commd_leave_process(struct commd_leave *cmd, enum commd_error *e) {
- if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, perm_kick_user)) return_error(ERR_ACCESS);
+ if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, PERM_KICK_USER)) return_error(ERR_ACCESS);
struct kv_system_packet packet = {
.magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_LEAVE), .user_id = cmd->luid};
if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV);
return 1;
}
-/// switches on command type and operates accordingly
+/* switches on command type and operates accordingly */
static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error *NONNULL e) {
- // network byte order conversion
- switch (type) {
- case CMD_LEAVE:
- case CMD_JOIN: ((struct commd_conv *)cmd)->_3 = ntoh64(((struct commd_conv *)cmd)->_3); FALLTHROUGH;
- case CMD_UNREGISTER:
- case CMD_REGISTER:
- case CMD_DELETE: ((struct commd_conv *)cmd)->_2 = ntoh64(((struct commd_conv *)cmd)->_2); FALLTHROUGH;
- case CMD_CREATE:
- case CMD_GET_PORT: ((struct commd_conv *)cmd)->_1 = ntoh64(((struct commd_conv *)cmd)->_1); FALLTHROUGH;
- case CMD_GET_CHANNELS:;
- }
- // processing
switch (type) {
case CMD_REGISTER: return commd_register_process((struct commd_register *)cmd, e);
case CMD_UNREGISTER: return commd_unregister_process((struct commd_unregister *)cmd, e);
@@ -210,18 +227,22 @@ static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error
case CMD_JOIN: return commd_join_process((struct commd_join *)cmd, e);
case CMD_LEAVE: return commd_leave_process((struct commd_leave *)cmd, e);
case CMD_GET_PORT: return (u64)get_channel_port(((struct commd_get_port *)cmd)->cihd);
- case CMD_GET_CHANNELS: return_error(ERR_DO_IT_YOURSELF);
+ case CMD_GET_CHANNELS: UNREACHABLE; /* this should be unreachable */
+ case CMD_LAST: UNREACHABLE; /* this as well */
+ default: return_error(ERR_TYPE);
}
- return_error(ERR_PARAM);
}
+
+/* коммент */
static void process_connection(int sockfd) {
- DEBUG("PROCESSING CONNECTION\n");
- // TODO: protection against blocking reads
+ LDEBUG("Processing connection");
+ /* TODO: protection against blocking reads */
+ /* this will become irrelevant after implementing poll */
enum commd_type type;
if (read(sockfd, &type, sizeof(type)) != sizeof(type)) report_error(sockfd, ERR_INVAL);
type = ntohl(type);
struct commd cmd;
- memset(&cmd, 0, sizeof(cmd)); // TODO: consider to remove
+ memset(&cmd, 0, sizeof(cmd)); /* TODO: consider to remove */
i64 commd_size = commd_size_lookup[type];
if (read(sockfd, &cmd, commd_size) != commd_size) report_error(sockfd, ERR_INVAL);
enum commd_error e = ERR_SUCCESS;
@@ -262,19 +283,16 @@ void tcp_loop(void) {
init_statics();
init_admin(ADMIN_UID);
int sock = setup_socket(TCP_PORT);
- if (listen(sock, LISTEN_AMOUNT) != 0) {
- perror("listen on TCP socket failed");
- exit(EXIT_FAILURE);
- }
- DEBUGF("listening on port %hu\n", TCP_PORT);
+ if (listen(sock, LISTEN_AMOUNT) != 0) LFAIL("listen on TCP socket failed");
+ LDEBUGF("listening on port %hu", TCP_PORT);
struct sockaddr_in accept_addr;
socklen_t addrlen = sizeof(accept_addr);
int currfd;
while (!should_exit) {
currfd = accept(sock, (struct sockaddr *)&accept_addr, &addrlen);
if (currfd < 0) continue;
- DEBUGF("accepted connection on port %hu\n", accept_addr.sin_port);
- process_connection(currfd);
+ LDEBUGF("accepted connection on port %hu", accept_addr.sin_port);
+ process_connection(currfd); /* this is synchronous and UNACCEPTABLE! */
shutdown(currfd, SHUT_RDWR);
close(currfd);
}
@@ -283,3 +301,212 @@ void tcp_loop(void) {
hset_free(&users);
hset_free(&channels);
}
+
+/* Accepts address of pointer to array cause pointers might have to be modified */
+static void accept_new(struct pollfd **_sockets, struct connection_state **_states) {
+ struct pollfd * sockets = *_sockets; /* This is a hack */
+ struct connection_state * states = *_states; /* And a dirty one */
+
+ LDEBUG("Accepting new connection");
+ struct pollfd newsock = {accept(sockets[0].fd, NULL, NULL), POLLIN, 0};
+ if (newsock.fd < 0) {
+ LWARNP("Accept");
+ return;
+ }
+ if(fcntl(newsock.fd, F_SETFL, O_NONBLOCK)) {
+ LWARNF("Fcntl on fd %i failed: %s (errno %i)", newsock.fd, strerror(errno), errno);
+ return;
+ }
+ struct connection_state newstate;
+ newstate.progress = PROG_NONE,
+ newstate.bytes_ctr = 0,
+ newstate.response = 0,
+ newstate.cmd_size = 0,
+ newstate.err = ERR_SUCCESS,
+ newstate.type = -1/*,
+ newstate.response_array = NULL*/;
+ array_push(sockets, newsock);
+ array_push(states, newstate);
+ *_sockets = sockets; /* Yea, this is */
+ *_states = states; /* DiSgUsTiNg */
+ LDEBUGF("Accepted new connection: fd %i. Array size: %zu", newsock.fd, array_size(sockets));
+}
+static void reopen_master(int *masterfd) {
+ LWARN("Reopening master socket");
+ shutdown(*masterfd, SHUT_RDWR);
+ close(*masterfd);
+ *masterfd = setup_socket(TCP_PORT);
+}
+static bool try_read(int *fdp, u8* restrict where, u64* bytes_read, u64 how_much) {
+ int fd = *fdp;
+ i64 bytes = read(fd, where + *bytes_read, how_much - *bytes_read);
+ if (bytes <= 0) {
+ if (bytes < 0) LWARNP("Read on client socket");
+ *fdp = ~*fdp; /* Loop closes client socket */
+ return false;
+ }
+ *bytes_read += bytes;
+ if (*bytes_read == how_much) {
+ *bytes_read = 0;
+ return true;
+ }
+ return false; /* false? lick my ballse */
+}
+static bool try_write(int *fdp, const u8* from, u64* bytes_written, u64 how_much) {
+ int fd = *fdp;
+ i64 bytes = write(fd, from + *bytes_written, how_much - *bytes_written);
+ if (bytes <= 0) {
+ if (bytes < 0) {
+ LWARNP("Write on client socket");
+ *fdp = ~*fdp; /* Loop closes client socket */
+ }
+ return false;
+ }
+ *bytes_written += bytes;
+ if (*bytes_written == how_much) {
+ *bytes_written = 0;
+ return true;
+ }
+ return false; /* false? lick my ballse */
+}
+/* Accepts NOT arrays, but individual element pointers
+ * Note: how tf did this functinon turn into an asynchronous state machine...
+ * The calling function MUST free the array if,
+ * upon exit, fd->fd < 0 && state->progress == PROG_ARRY */
+static void process_socket(struct pollfd* fd, struct connection_state* state) {
+ LDEBUGF("FD %i: progress %i, type %i", fd->fd, state->progress, state->type);
+ if (fd->revents & (POLLNVAL | POLLERR | POLLHUP)) goto pollerr;
+
+ switch (state->progress) {
+ case PROG_NONE:
+ if (!try_read(&fd->fd, (u8*)&state->type, &state->bytes_ctr, sizeof (enum commd_type)))
+ return; /* continue reading on next iteration */
+ state->type = ntohl(state->type);
+ if (state->type < 0 || state->type >= CMD_LAST)
+ process_error(fd, state, ERR_TYPE);
+ state->cmd_size = commd_size_lookup[state->type];
+ ++state->progress;
+ return;
+ case PROG_TYPE:
+ if (!try_read(&fd->fd, (u8*)&state->cmd, &state->bytes_ctr, state->cmd_size))
+ return; /* continue reading on next iteration */
+ struct commd_conv *cmd = (struct commd_conv *)&state->cmd;
+ /* Network byte order conversion */
+ switch (state->type) {
+ case CMD_LEAVE:
+ case CMD_JOIN: cmd->_3 = ntoh64(cmd->_3); FALLTHROUGH;
+ case CMD_UNREGISTER:
+ case CMD_REGISTER:
+ case CMD_DELETE: cmd->_2 = ntoh64(cmd->_2); FALLTHROUGH;
+ case CMD_CREATE:
+ case CMD_GET_PORT: cmd->_1 = ntoh64(cmd->_1); FALLTHROUGH;
+ case CMD_GET_CHANNELS: break;
+ case CMD_LAST: UNREACHABLE; /* because it would have terminated on case PROG_NONE */
+ }
+ fd->events &= ~POLLIN; /* remove */
+ fd->events |= POLLOUT; /* add */
+ state->progress += 1 + (state->type == CMD_GET_CHANNELS);
+ /* same as: state->progress = state->type == CMD_GET_CHANNELS ? PROG_ARRAY : PROG_COMD; */
+
+ if (state->progress == PROG_COMD)
+ /* The actual command processing is done in this call */
+ state->response = hton64(process_cmd(state->type, &state->cmd, &state->err));
+ else if (state->progress == PROG_ARRY)
+ state->response_array = get_channel_array();
+
+ if (state->err != ERR_SUCCESS) {
+ state->err = htonl(state->err);
+ state->progress = PROG_ERRR;
+ return;
+ }
+
+ return;
+ case PROG_COMD:
+ if (!try_write(&fd->fd, (u8*)&state->response, &state->bytes_ctr, sizeof (u64)))
+ return; /* continue writing on next iteration */
+ goto socket_done;
+ case PROG_ARRY:
+ if (!try_write(
+ &fd->fd, (u8*)state->response_array, &state->bytes_ctr,
+ array_size(state->response_array) * sizeof (u64)
+ )) return; /* continue writing on next iteration */
+ /* array is freed in calling function */
+ goto socket_done;
+ case PROG_ERRR:
+ if (!try_write(&fd->fd, (u8*)&state->err, &state->bytes_ctr, sizeof (enum commd_error)))
+ return; /* continue writing on next iteration */
+ goto socket_done;
+ }
+
+pollerr:
+ LWARNF("Poll error on socket %i; closing connection (revents = %i)", fd->fd, fd->revents);
+socket_done:
+ shutdown(fd->fd, SHUT_RDWR);
+ close(fd->fd);
+ fd->fd = ~fd->fd;
+ return;
+
+}
+
+void new_tcp_loop(void) {
+ LINFO("Starting TCP loop");
+
+ init_statics();
+ init_admin(ADMIN_UID);
+ int master = setup_socket(TCP_PORT);
+ struct pollfd *sockets = array_new(struct pollfd, 1);
+ struct connection_state *states = array_new(struct connection_state, 1);
+ sockets[0].fd = master;
+ sockets[0].events = POLLIN;
+
+ int numready;
+ for(;;) {
+ numready = poll(sockets, array_size(sockets), POLL_RESTART);
+ if (should_exit == true) break;
+ if (numready < 0) switch (errno) {
+ case 0: break;
+ case EINTR: LINFO("Poll call was interrupted"); continue;
+ case ENOMEM: sleep(1); continue;
+ case EFAULT:
+ case EINVAL: LFAILV("poll");
+ }
+#if POLL_RESTART > 0
+ if (numready == 0) continue;
+#endif
+ /* assert(array_size(sockets) == array_size(states)); */
+ LDEBUGF("poll: %i sockets ready", numready);
+ /* master socket */
+ if (sockets[0].revents != 0) {
+ if (sockets[0].revents & POLLIN)
+ accept_new(&sockets, &states);
+ if (sockets[0].revents & (POLLNVAL | POLLHUP | POLLERR))
+ reopen_master(&sockets[0].fd);
+ }
+ LDEBUG("Array:");
+ for (size_t i = 0; i < array_size(sockets); ++i) {
+ LDEBUGF("%i", sockets[i].fd);
+ }
+ /* other sockets */
+ /* Please tell me I am not the only person who fucking hates reverse loops with
+ * array deletinos... */
+ for (size_t i = array_size(sockets) - 1; i > 0; --i) {
+ if (sockets[i].revents != 0) {
+ process_socket(sockets + i, states + i);
+ if (sockets[i].fd < 0) {
+ array_pop_at(sockets, i);
+ array_pop_at(states, i);
+ }
+ }
+ }
+ }
+ for (size_t i = 0; i < array_size(sockets); ++i) {
+ shutdown(sockets[i].fd, SHUT_RDWR);
+ close(sockets[i].fd);
+
+ }
+ array_free(sockets);
+ array_free(states);
+ hset_free(&users);
+ hset_free(&channels);
+}
+/* vim: set ts=8 noet: */
diff --git a/server/tcp.h b/server/tcp.h
index e239e19..a393922 100644
--- a/server/tcp.h
+++ b/server/tcp.h
@@ -17,19 +17,60 @@
#define TCP_MAX_WAIT_MS 10
#define TCP_MAX_RETRIES 0
#define ADMIN_UID 0
+#define POLL_RESTART 5000
+struct channel_pool {
+ u64 id;
+ u64 owner;
+ int pipefd;
+ char *name;
+ u16 port;
+};
+
+/* Size: 4 + 4 + 8 + 8 + 32 + 4 + 8 = 68 bytes */
+/* I can't fucking count it's 64 bytes */
+struct connection_state {
+ // TODO: consider using clang extension to pack this enum into u8
+ enum {
+ PROG_NONE = 0, // the socket has just been opened
+ // in loop: read type and continue
+ PROG_TYPE, // command type has been read
+ // in loop: start/continue reading command data
+ // when finished reading, set (& ~POLLIN) | POLLOUT
+ PROG_COMD, // command data has been read
+ // in loop: when revents & POLLOUT, write response
+ // or error, shutdown socket
+ PROG_ARRY, // extra data needs to be written
+ // (like in get_channels command)
+ PROG_ERRR, // an error happened and it should be reported,
+ // after that socket should be shut down and closed
+ } progress;
+ enum commd_type type;
+ u64 bytes_ctr;
+ u8 cmd_size;
+ struct commd cmd;
+ enum commd_error err;
+ union {
+ u64* response_array;
+ u64 response;
+ };
+};
/* val: struct tcp_channel */
extern struct hash_set channels;
/* val: struct tcp_user */
extern struct hash_set users;
+/* val: struct channel_pool*/
+extern struct hash_set pools;
void print_state(int);
void exit_tcp(int);
void tcp_loop(void);
+void new_tcp_loop(void);
u64 spawn_channel(struct thread_loop_arg *arg);
u64 spawn_channel_pool(void* arg); // TODO
bool sendto_channel(size_t chid, struct kv_system_packet* packet, int wait_ack_ms, int repeat);
#endif // KV_SERVER_TCP
+/* vim: set ts=8 noet: */
diff --git a/test/general.c b/test/general.c
index d7673a3..95109a3 100644
--- a/test/general.c
+++ b/test/general.c
@@ -6,6 +6,7 @@
#include <errno.h>
#include <packet.h>
+#include <log.h>
#define W printf("LINE %i\n", __LINE__)
@@ -14,7 +15,7 @@ enum commd_error cerr;
void setup_globals(void) { commsock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); }
-u64 sendtoserv(int sock, enum commd_type t, struct commd *cmd, enum commd_error * NULLABLE e) {
+u64 sendtoserv(int sock, enum commd_type t, struct commd *cmd, enum commd_error * NULLABLE e) {
sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in serv = {
.sin_family = AF_INET, .sin_port = htons(TCP_PORT), .sin_addr = {htonl(INADDR_LOOPBACK)}};
@@ -37,43 +38,50 @@ u64 sendtoserv(int sock, enum commd_type t, struct commd *cmd, enum commd_error
u64 register_user(i32 perm) {
struct commd_register r = {.auid = hton64(0), .perm = hton64(perm)};
- if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno));
return sendtoserv(commsock, CMD_REGISTER, (struct commd *)&r, &cerr);
+ if (cerr != ERR_SUCCESS) LDEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno));
}
void unreg_user(u64 u) {
struct commd_unregister unreg = {.auid = hton64(u), .uid = hton64(u)};
u64 resp = sendtoserv(commsock, CMD_UNREGISTER, (struct commd *)&unreg, &cerr);
- if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno));
+ if (cerr != ERR_SUCCESS) LDEBUGF("Server returned error %i: %s. errno: %s\n", cerr, kv_strerror(cerr), strerror(errno));
printf("sendtoserv returned %zu\n", resp);
}
u64 create_channel(u64 u) {
struct commd_create c = {.uid = hton64(u)};
u64 resp = sendtoserv(commsock, CMD_CREATE, (struct commd *)&c, &cerr);
- if (cerr != ERR_SUCCESS) DEBUGF("Server returned error %i: %s\n", cerr, kv_strerror(cerr));
+ if (cerr != ERR_SUCCESS) LDEBUGF("Server returned error %i: %s\n", cerr, kv_strerror(cerr));
return resp;
}
void* thr(void* a) {
(void)a;
- u64 uid = register_user(perm_none);
+ u64 uid = register_user(PERM_NONE);
printf("uid: %zu\n", uid);
unreg_user(uid);
- uid = register_user(perm_admin);
+ uid = register_user(PERM_ADMIN);
printf("uid: %zu\n", uid);
u64 chid = create_channel(uid);
printf("chid: %zu\n", chid);
return NULL;
}
+#ifndef n
+# define n 3
+#endif
+
int main(int argc, char *argv[]) {
(void)argc;
(void)argv;
- for (int i = 0; i < 1; ++i) {
- pthread_t t;
- pthread_create(&t, NULL, thr, NULL);
+ pthread_t ts[n];
+ for (int i = 0; i < n; ++i) {
+ pthread_create(ts + i, NULL, thr, NULL);
}
+ for (int i = 0; i < n; ++i)
+ pthread_join(ts[i], NULL);
sleep(1);
return 0;
}
+/* vim: set ts=8 noet: */