aboutsummaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/CMakeLists.txt21
-rw-r--r--server/channel.c41
-rw-r--r--server/channel.h4
-rw-r--r--server/main.c17
-rw-r--r--server/tcp.c335
-rw-r--r--server/tcp.h41
6 files changed, 360 insertions, 99 deletions
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt
deleted file mode 100644
index f63f897..0000000
--- a/server/CMakeLists.txt
+++ /dev/null
@@ -1,21 +0,0 @@
-# CMake files will be removed in the next several commits
-
-cmake_minimum_required(VERSION 3.20)
-
-project(kv_server
- VERSION 0.0
- DESCRIPTION "A server for kv project."
- LANGUAGES C)
-
-set(SOURCE_FILES main.c channel.c tcp.c ../include/packet.c)
-set(HEADER_FILES channel.h tcp.h)
-
-add_executable(kv_server ${SOURCE_FILES})
-target_compile_definitions(kv_server PUBLIC DEBUG)
-target_link_libraries(kv_server
- PUBLIC crypto
- PUBLIC pthread
-)
-
-
-
diff --git a/server/channel.c b/server/channel.c
index a0e2779..63ef11a 100644
--- a/server/channel.c
+++ b/server/channel.c
@@ -2,10 +2,11 @@
#include <packet.h>
#include <container.h>
+#include <log.h>
#include <errno.h>
#include <stdlib.h>
-#define decisive_push(array, index, elem) \
+#define maybe_push(array, index, elem) \
do { \
if (index >= array_size(array)) { \
array_push(array, elem); \
@@ -35,6 +36,7 @@ struct ch_user {
static __THREAD struct hash_set users;
static __THREAD int sockfd;
+
static int __user_hset_cmp(const void *a, const void *b) {
struct ch_user *_a = (struct ch_user *)a, *_b = (struct ch_user *)b;
return _a->id - _b->id;
@@ -54,17 +56,17 @@ bool channel_init(void) {
(sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) >= 0 &&
bind(sockfd, (struct sockaddr *)&thread_local_address, sizeof(thread_local_address)) == 0;
if (!chain_result) {
- perror("channel init failed");
+ LERRP("channel init failed");
if (hset_ok(users)) hset_free(&users);
if (sockfd >= 0) close(sockfd);
return false;
}
- DEBUGF("Channel [%i] created\n", sockfd);
+ LDEBUGF("Channel [%i] created", sockfd);
return true;
}
static void channel_uninit(void) {
hset_free(&users);
- if (close(sockfd) == -1) perror("could not gracefully uninitialize channel");
+ if (close(sockfd) == -1) LERRP("couldPnot gracefully uninitialize channel");
}
_Noreturn static inline void forced_cleanup(struct kv_packet **packets) {
clear_packet_array(packets);
@@ -118,7 +120,7 @@ static void send_packets_back(struct kv_packet **packets) {
.sin_port = htons(current_user->port),
.sin_addr = {htonl(current_user->ip)}};
for (size_t j = 0; packets[j] != NULL && j < array_size(packets); ++j) {
- DEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu\n", packets[j]->uid,
+ LDEBUGF("sending packet with id %zu to destination: %u.%u.%u.%u:%hu", packets[j]->uid,
(destination.sin_addr.s_addr >> 24) & 0xFF, (destination.sin_addr.s_addr >> 16) & 0xFF,
(destination.sin_addr.s_addr >> 8) & 0xFF, destination.sin_addr.s_addr & 0xFF,
destination.sin_port);
@@ -126,14 +128,21 @@ static void send_packets_back(struct kv_packet **packets) {
int error_code = sendto(
sockfd, packets[j], KV_PACKET_SIZE, 0, (struct sockaddr *)&destination,
sizeof(destination));
- if (error_code) perror("could not send packets back");
+ if (error_code) LWARNP("could not send packets back");
}
}
}
/*
* An example of how you should NOT write code
- * Todo: please rewrite this shit
+ * Todo: please rewrite this shi
+ * Also, this started to busy-loop...
+ * As i've determined, this requires a complete re-write, so this is what i'm gonna be doing now
+ *
+ * Actually, am I reading this correct? Did I write this at the time when I thought
+ * UDP could magically broadcast my packets? Why it is this complex dude...
+ *
+ * I am abandoning this place in code and am implementing channel_pool
*/
void *thread_loop(void *arg) {
if (!channel_init()) pthread_exit(NULL);
@@ -143,6 +152,7 @@ void *thread_loop(void *arg) {
size_t recvd_index = 0;
int recv_flag = MSG_DONTWAIT;
while (true) {
+ LDEBUGV("I am looping");
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
ssize_t recvlength = recvfrom(
@@ -158,16 +168,21 @@ void *thread_loop(void *arg) {
struct kv_packet *kv_copy = malloc(KV_PACKET_SIZE);
memcpy(kv_copy, &work_buffer, KV_PACKET_SIZE);
++recvd_index;
- decisive_push(recvd_data, recvd_index, kv_copy);
+ maybe_push(recvd_data, recvd_index, kv_copy);
} else if (errno == EWOULDBLOCK) {
- if (array_size(recvd_data) == 0) {
- recv_flag &= ~MSG_DONTWAIT;
- continue;
- }
+ recv_flag &= ~MSG_DONTWAIT;
+ if (recvd_index == 0) continue;
send_packets_back(recvd_data);
clear_packet_array(recvd_data);
+ recvd_index = 0;
} else {
- perror("error in thread_loop");
+ LWARNP("error in thread_loop");
}
}
}
+
+void* new_thread_loop(void* arg) {
+ return arg;
+}
+
+/* vim: set ts=8 noet: */
diff --git a/server/channel.h b/server/channel.h
index 8068a4c..549ff21 100644
--- a/server/channel.h
+++ b/server/channel.h
@@ -18,10 +18,12 @@ struct thread_loop_arg {
pthread_mutex_t *sock_mx;
pthread_cond_t *sock_ready_cond;
u64 owner;
- const unsigned char *pubkey;
+ int pipefd;
};
// main function that manages every channel
void *thread_loop(void *);
+void *new_thread_loop(void *);
#endif // KV_SERVER_CHANNEL_H
+/* vim: set ts=8 noet: */
diff --git a/server/main.c b/server/main.c
index b6745e0..0d8d9f3 100644
--- a/server/main.c
+++ b/server/main.c
@@ -3,6 +3,7 @@
#define HSET_MAX_BUCKET_SIZE 4
#include <container.h>
#undef CONTAINER_IMPLEMENTATION
+#include <log.h>
#include "tcp.h"
@@ -11,22 +12,18 @@
#define MAIN_PORT 8164
void setup_signal(void) {
- struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = 0};
- if (sigaction(SIGUSR1, &signal, NULL) != 0) {
- WHERE;
- exit(EXIT_FAILURE);
- }
+ struct sigaction signal = {.sa_handler = print_state, .sa_mask = {{0}}, .sa_flags = SA_RESTART};
+ if (sigaction(SIGUSR1, &signal, NULL) != 0) LFAILV("sigaction");
signal.sa_handler = exit_tcp;
- if (sigaction(SIGTERM, &signal, NULL) != 0) {
- WHERE;
- exit(EXIT_FAILURE);
- }
+ if (sigaction(SIGTERM, &signal, NULL) != 0) LFAILV("sigaction");
}
int main(int argc, char *argv[]) {
(void)argc;
(void)argv;
setup_signal();
- tcp_loop();
+ new_tcp_loop();
return 0;
}
+
+/* vim: set ts=8 noet: */
diff --git a/server/tcp.c b/server/tcp.c
index 0d6e118..09cda70 100644
--- a/server/tcp.c
+++ b/server/tcp.c
@@ -1,11 +1,15 @@
#include "tcp.h"
+#include <assert.h>
+#include <log.h>
#include <errno.h>
#include <netinet/in.h>
#include <stddef.h>
+#include <sys/poll.h>
+#include <sys/fcntl.h>
#define report_error(socket, error) \
do { \
- DEBUGF("error on socket %i: %i\n", socket, error); \
+ LDEBUGF("error on socket %i: %i", socket, error); \
write(socket, &zerozu, sizeof(zerozu)); \
enum commd_error __ecpy = htonl(error); \
write(socket, &__ecpy, sizeof(__ecpy)); \
@@ -18,26 +22,36 @@
return 0; \
} while (0)
+#define process_error(pfd_p, state_p, error) \
+ do { \
+ pfd_p->events &= ~POLLIN; \
+ pfd_p->events |= POLLOUT; \
+ state_p->err = htonl(error); \
+ state_p->progress = PROG_ERRR; \
+ return; \
+ } while (0)
+
+struct hash_set channel_pools;
struct hash_set channels;
struct hash_set users;
static struct tcp_user varusr = {0};
static struct tcp_channel varchnl = {0};
static u64 lalloc_usr = 0;
static int udpctlfd = 0;
-static const size_t zerozu = 0;
+static const u64 zerozu = 0;
static bool should_exit = false;
void print_state(int _) {
(void)_;
-#ifdef DEBUG
- fputs("printing server state.\n hash_map<struct tcp_user> users {\n", stderr);
+#ifdef DBG
+ fputs("printing server state.\n hash_set<struct tcp_user> users {\n", stderr);
struct hash_set_iter iter;
for (hseti_begin(&users, &iter); !hseti_end(&iter); hseti_next(&iter)) {
struct tcp_user *curr = hseti_get(&iter);
fprintf(stderr, "\ttcp_user {.id=%zu, .permissions=%u, .channel=%zu}\n", curr->id, curr->permissions,
curr->joined_channel);
}
- fputs("}\nhash_map<struct tcp_channel> channels {\n", stderr);
+ fputs("}\nhash_set<struct tcp_channel> channels {\n", stderr);
for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) {
struct tcp_channel *curr = hseti_get(&iter);
struct sockaddr_in addr;
@@ -52,7 +66,7 @@ void print_state(int _) {
void exit_tcp(int _) {
(void)_;
should_exit = true;
- DEBUGF("EXITING SERVER, setting `should_exit (%p)` to %i\n", (void*)&should_exit, (int)should_exit);
+ LINFOF("EXITING SERVER, setting `should_exit (%p)` to %i", (void*)&should_exit, (int)should_exit);
}
static int tcp_user_cmp(const struct tcp_user *a, const struct tcp_user *b) { return (a->id - b->id) ? 1 : 0; }
@@ -75,26 +89,26 @@ static int setup_socket(unsigned short port) {
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int flag = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag)) goto error;
+ if (fcntl(sock, F_SETFL, O_NONBLOCK)) goto error;
struct sockaddr_in localaddr = {
.sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {htonl(INADDR_ANY)}};
if (bind(sock, (struct sockaddr *)&localaddr, sizeof(localaddr))) goto error;
+ if (listen(sock, LISTEN_AMOUNT)) goto error;
return sock;
error:
- perror("TCP thread failed to initialize");
- exit(EXIT_FAILURE);
-
+ LFAIL("TCP thread failed to initialize");
}
static void init_admin(u64 aid) {
- struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = perm_admin, .joined_channel = 0};
+ struct tcp_user u = {.id = aid, .pubkey = NULL, .permissions = PERM_ADMIN, .joined_channel = 0};
hset_insert_copy(&users, &u);
}
static bool has_4bytes_0xff(u64 id) {
return (unsigned int)(id >> 32) == 0xFFFFFFFF || (unsigned int)(id & 0xFFFFFFFF) == 0xFFFFFFFF;
}
static u64 get_uniq_id(struct hash_set *set) {
- // while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr;
+ /* while (map has lalloc_usr inex || lalloc_usr has 4 bytes of ones) next lalloc_usr; */
varusr.id = lalloc_usr;
- while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(lalloc_usr)) ++varusr.id;
+ while (hset_at(set, &varusr) != NULL || has_4bytes_0xff(varusr.id)) ++varusr.id;
lalloc_usr = varusr.id;
return lalloc_usr;
}
@@ -112,10 +126,11 @@ static bool user_has_permission(u64 uid, unsigned int perm) {
struct tcp_user *u = hset_at(&users, &varusr);
if (u == NULL) return false;
unsigned int uperm = u->permissions;
- // bitwise implication must yield all ones (0xFFFFFFFF).
- // Inverse it for easier check
+ /* bitwise implication must yield all ones (0xFFFFFFFF). */
+ /* Invert it for easier check */
return (perm & ~uperm) == 0;
}
+/* TODO: remove */
static u64 send_channels(int sockfd, enum commd_error *e) {
struct hash_set_iter iter;
u64 array_length = hton64(hset_count(&channels));
@@ -125,83 +140,85 @@ static u64 send_channels(int sockfd, enum commd_error *e) {
u64 chid = hton64(c->id);
if (write(sockfd, &chid, sizeof(chid)) != sizeof(chid)) return_error(ERR_SERV);
}
- // the leading zero is written by the caller
- return_error(ERR_SUCCESS); // actually returns success but...
+ /* the leading zero is written by the caller */
+ /* or is it? */
+ return_error(ERR_SUCCESS); /* actually returns success but... */
+}
+static u64* get_channel_array(void) {
+ /* consider later: store allocated buffer with all channels somewhere
+ * but I would need to avoid race conditions (channel added while I was writing) */
+ hset_iter iter;
+ size_t i = 0;
+ u64 len = hset_count(&channels);
+ u64* ret = array_new(u64, len + 2);
+ ret[i++] = hton64(len); /* ret[0]; i=1 */
+ for (hseti_begin(&channels, &iter); !hseti_end(&iter); hseti_next(&iter)) {
+ struct tcp_channel *c = hseti_get(&iter);
+ ret[i++] = c->id;
+ }
+ ret[i] = 0LU;
+ return ret;
}
static inline u64 commd_register_process(struct commd_register *cmd, enum commd_error *e) {
/* fprintf(stderr, "%s: auid=%zu; perm=%zu\n", "commd_register_process", cmd->auid, cmd->perm); */
- if (!user_has_permission(cmd->auid, perm_join_user | cmd->perm)) return_error(ERR_ACCESS);
+ if (!user_has_permission(cmd->auid, PERM_JOIN_USER | cmd->perm)) return_error(ERR_ACCESS);
struct tcp_user new_user = {
.id = get_uniq_id(&users), .joined_channel = 0, .permissions = (unsigned int)cmd->perm};
hset_insert_copy(&users, &new_user);
return new_user.id;
}
static inline u64 commd_unregister_process(struct commd_unregister *cmd, enum commd_error *e) {
- /* DEBUGF("delete user %zu (admin %zu)\n", cmd->uid, cmd->auid); */
- if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, perm_unregister_user)) return_error(ERR_ACCESS);
+ LDEBUGF("delete user %zu (admin %zu)", cmd->uid, cmd->auid);
+ if (cmd->auid != cmd->uid && !user_has_permission(cmd->auid, PERM_UNREGISTER_USER)) return_error(ERR_ACCESS);
varusr.id = cmd->uid;
hset_remove(&users, &varusr);
return cmd->uid;
}
static inline u64 commd_create_process(struct commd_create *cmd, enum commd_error *e) {
- if (!user_has_permission(cmd->uid, perm_add_channel)) return_error(ERR_ACCESS);
+ if (!user_has_permission(cmd->uid, PERM_ADD_CHANNEL)) return_error(ERR_ACCESS);
u64 chid;
int sock = -1;
- unsigned short port = 0;
{
pthread_mutex_t sock_mx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sock_cond = PTHREAD_COND_INITIALIZER;
+
struct thread_loop_arg arg = {
.owner = cmd->uid, .sock_dest = &sock, .sock_mx = &sock_mx, .sock_ready_cond = &sock_cond};
pthread_mutex_lock(&sock_mx);
chid = spawn_channel(&arg);
- DEBUG("\n");
- while (sock == -1 || port == 0) pthread_cond_wait(&sock_cond, &sock_mx);
- DEBUG("\n");
+ while (sock == -1) pthread_cond_wait(&sock_cond, &sock_mx);
pthread_mutex_unlock(&sock_mx);
}
- struct tcp_channel new_channel = {.owner = cmd->uid, .name = NULL, .fd = sock};
+ struct tcp_channel new_channel = {.id = chid, .owner = cmd->uid, .name = NULL, .fd = sock};
hset_insert_copy(&channels, &new_channel);
return chid;
}
static inline u64 commd_delete_process(struct commd_delete *cmd, enum commd_error *e) {
- DEBUGF("received command%p\n", (void *)cmd);
+ LDEBUGF("received command%p", (void *)cmd);
varchnl.id = cmd->chid;
struct tcp_channel *c = hset_at(&channels, &varchnl);
if (c == NULL) return_error(ERR_PARAM);
- if (cmd->uid != c->owner && !user_has_permission(cmd->uid, perm_unadd_channel)) return_error(ERR_ACCESS);
+ if (cmd->uid != c->owner && !user_has_permission(cmd->uid, PERM_UNADD_CHANNEL)) return_error(ERR_ACCESS);
hset_remove(&channels, &varchnl);
return varchnl.id;
}
static inline u64 commd_join_process(struct commd_join *cmd, enum commd_error *e) {
- if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, perm_join_user)) return_error(ERR_ACCESS);
+ if (cmd->uid != cmd->juid && !user_has_permission(cmd->uid, PERM_JOIN_USER)) return_error(ERR_ACCESS);
struct kv_system_packet packet = {
.magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_JOIN), .user_id = cmd->juid};
if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV);
return (u64)get_channel_port(cmd->chid);
}
static inline u64 commd_leave_process(struct commd_leave *cmd, enum commd_error *e) {
- if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, perm_kick_user)) return_error(ERR_ACCESS);
+ if (cmd->uid != cmd->luid && !user_has_permission(cmd->uid, PERM_KICK_USER)) return_error(ERR_ACCESS);
struct kv_system_packet packet = {
.magic_bytes = SYS_PACKET_MAGIC_BYTES, .operation_id = htonl(SYS_LEAVE), .user_id = cmd->luid};
if (!sendto_channel(cmd->chid, &packet, TCP_MAX_WAIT_MS, TCP_MAX_RETRIES)) return_error(ERR_SERV);
return 1;
}
-/// switches on command type and operates accordingly
+/* switches on command type and operates accordingly */
static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error *NONNULL e) {
- // network byte order conversion
- switch (type) {
- case CMD_LEAVE:
- case CMD_JOIN: ((struct commd_conv *)cmd)->_3 = ntoh64(((struct commd_conv *)cmd)->_3); FALLTHROUGH;
- case CMD_UNREGISTER:
- case CMD_REGISTER:
- case CMD_DELETE: ((struct commd_conv *)cmd)->_2 = ntoh64(((struct commd_conv *)cmd)->_2); FALLTHROUGH;
- case CMD_CREATE:
- case CMD_GET_PORT: ((struct commd_conv *)cmd)->_1 = ntoh64(((struct commd_conv *)cmd)->_1); FALLTHROUGH;
- case CMD_GET_CHANNELS:;
- }
- // processing
switch (type) {
case CMD_REGISTER: return commd_register_process((struct commd_register *)cmd, e);
case CMD_UNREGISTER: return commd_unregister_process((struct commd_unregister *)cmd, e);
@@ -210,18 +227,22 @@ static u64 process_cmd(enum commd_type type, struct commd *cmd, enum commd_error
case CMD_JOIN: return commd_join_process((struct commd_join *)cmd, e);
case CMD_LEAVE: return commd_leave_process((struct commd_leave *)cmd, e);
case CMD_GET_PORT: return (u64)get_channel_port(((struct commd_get_port *)cmd)->cihd);
- case CMD_GET_CHANNELS: return_error(ERR_DO_IT_YOURSELF);
+ case CMD_GET_CHANNELS: UNREACHABLE; /* this should be unreachable */
+ case CMD_LAST: UNREACHABLE; /* this as well */
+ default: return_error(ERR_TYPE);
}
- return_error(ERR_PARAM);
}
+
+/* коммент */
static void process_connection(int sockfd) {
- DEBUG("PROCESSING CONNECTION\n");
- // TODO: protection against blocking reads
+ LDEBUG("Processing connection");
+ /* TODO: protection against blocking reads */
+ /* this will become irrelevant after implementing poll */
enum commd_type type;
if (read(sockfd, &type, sizeof(type)) != sizeof(type)) report_error(sockfd, ERR_INVAL);
type = ntohl(type);
struct commd cmd;
- memset(&cmd, 0, sizeof(cmd)); // TODO: consider to remove
+ memset(&cmd, 0, sizeof(cmd)); /* TODO: consider to remove */
i64 commd_size = commd_size_lookup[type];
if (read(sockfd, &cmd, commd_size) != commd_size) report_error(sockfd, ERR_INVAL);
enum commd_error e = ERR_SUCCESS;
@@ -262,19 +283,16 @@ void tcp_loop(void) {
init_statics();
init_admin(ADMIN_UID);
int sock = setup_socket(TCP_PORT);
- if (listen(sock, LISTEN_AMOUNT) != 0) {
- perror("listen on TCP socket failed");
- exit(EXIT_FAILURE);
- }
- DEBUGF("listening on port %hu\n", TCP_PORT);
+ if (listen(sock, LISTEN_AMOUNT) != 0) LFAIL("listen on TCP socket failed");
+ LDEBUGF("listening on port %hu", TCP_PORT);
struct sockaddr_in accept_addr;
socklen_t addrlen = sizeof(accept_addr);
int currfd;
while (!should_exit) {
currfd = accept(sock, (struct sockaddr *)&accept_addr, &addrlen);
if (currfd < 0) continue;
- DEBUGF("accepted connection on port %hu\n", accept_addr.sin_port);
- process_connection(currfd);
+ LDEBUGF("accepted connection on port %hu", accept_addr.sin_port);
+ process_connection(currfd); /* this is synchronous and UNACCEPTABLE! */
shutdown(currfd, SHUT_RDWR);
close(currfd);
}
@@ -283,3 +301,212 @@ void tcp_loop(void) {
hset_free(&users);
hset_free(&channels);
}
+
+/* Accepts address of pointer to array cause pointers might have to be modified */
+static void accept_new(struct pollfd **_sockets, struct connection_state **_states) {
+ struct pollfd * sockets = *_sockets; /* This is a hack */
+ struct connection_state * states = *_states; /* And a dirty one */
+
+ LDEBUG("Accepting new connection");
+ struct pollfd newsock = {accept(sockets[0].fd, NULL, NULL), POLLIN, 0};
+ if (newsock.fd < 0) {
+ LWARNP("Accept");
+ return;
+ }
+ if(fcntl(newsock.fd, F_SETFL, O_NONBLOCK)) {
+ LWARNF("Fcntl on fd %i failed: %s (errno %i)", newsock.fd, strerror(errno), errno);
+ return;
+ }
+ struct connection_state newstate;
+ newstate.progress = PROG_NONE,
+ newstate.bytes_ctr = 0,
+ newstate.response = 0,
+ newstate.cmd_size = 0,
+ newstate.err = ERR_SUCCESS,
+ newstate.type = -1/*,
+ newstate.response_array = NULL*/;
+ array_push(sockets, newsock);
+ array_push(states, newstate);
+ *_sockets = sockets; /* Yea, this is */
+ *_states = states; /* DiSgUsTiNg */
+ LDEBUGF("Accepted new connection: fd %i. Array size: %zu", newsock.fd, array_size(sockets));
+}
+static void reopen_master(int *masterfd) {
+ LWARN("Reopening master socket");
+ shutdown(*masterfd, SHUT_RDWR);
+ close(*masterfd);
+ *masterfd = setup_socket(TCP_PORT);
+}
+static bool try_read(int *fdp, u8* restrict where, u64* bytes_read, u64 how_much) {
+ int fd = *fdp;
+ i64 bytes = read(fd, where + *bytes_read, how_much - *bytes_read);
+ if (bytes <= 0) {
+ if (bytes < 0) LWARNP("Read on client socket");
+ *fdp = ~*fdp; /* Loop closes client socket */
+ return false;
+ }
+ *bytes_read += bytes;
+ if (*bytes_read == how_much) {
+ *bytes_read = 0;
+ return true;
+ }
+ return false; /* false? lick my ballse */
+}
+static bool try_write(int *fdp, const u8* from, u64* bytes_written, u64 how_much) {
+ int fd = *fdp;
+ i64 bytes = write(fd, from + *bytes_written, how_much - *bytes_written);
+ if (bytes <= 0) {
+ if (bytes < 0) {
+ LWARNP("Write on client socket");
+ *fdp = ~*fdp; /* Loop closes client socket */
+ }
+ return false;
+ }
+ *bytes_written += bytes;
+ if (*bytes_written == how_much) {
+ *bytes_written = 0;
+ return true;
+ }
+ return false; /* false? lick my ballse */
+}
+/* Accepts NOT arrays, but individual element pointers
+ * Note: how tf did this functinon turn into an asynchronous state machine...
+ * The calling function MUST free the array if,
+ * upon exit, fd->fd < 0 && state->progress == PROG_ARRY */
+static void process_socket(struct pollfd* fd, struct connection_state* state) {
+ LDEBUGF("FD %i: progress %i, type %i", fd->fd, state->progress, state->type);
+ if (fd->revents & (POLLNVAL | POLLERR | POLLHUP)) goto pollerr;
+
+ switch (state->progress) {
+ case PROG_NONE:
+ if (!try_read(&fd->fd, (u8*)&state->type, &state->bytes_ctr, sizeof (enum commd_type)))
+ return; /* continue reading on next iteration */
+ state->type = ntohl(state->type);
+ if (state->type < 0 || state->type >= CMD_LAST)
+ process_error(fd, state, ERR_TYPE);
+ state->cmd_size = commd_size_lookup[state->type];
+ ++state->progress;
+ return;
+ case PROG_TYPE:
+ if (!try_read(&fd->fd, (u8*)&state->cmd, &state->bytes_ctr, state->cmd_size))
+ return; /* continue reading on next iteration */
+ struct commd_conv *cmd = (struct commd_conv *)&state->cmd;
+ /* Network byte order conversion */
+ switch (state->type) {
+ case CMD_LEAVE:
+ case CMD_JOIN: cmd->_3 = ntoh64(cmd->_3); FALLTHROUGH;
+ case CMD_UNREGISTER:
+ case CMD_REGISTER:
+ case CMD_DELETE: cmd->_2 = ntoh64(cmd->_2); FALLTHROUGH;
+ case CMD_CREATE:
+ case CMD_GET_PORT: cmd->_1 = ntoh64(cmd->_1); FALLTHROUGH;
+ case CMD_GET_CHANNELS: break;
+ case CMD_LAST: UNREACHABLE; /* because it would have terminated on case PROG_NONE */
+ }
+ fd->events &= ~POLLIN; /* remove */
+ fd->events |= POLLOUT; /* add */
+ state->progress += 1 + (state->type == CMD_GET_CHANNELS);
+ /* same as: state->progress = state->type == CMD_GET_CHANNELS ? PROG_ARRAY : PROG_COMD; */
+
+ if (state->progress == PROG_COMD)
+ /* The actual command processing is done in this call */
+ state->response = hton64(process_cmd(state->type, &state->cmd, &state->err));
+ else if (state->progress == PROG_ARRY)
+ state->response_array = get_channel_array();
+
+ if (state->err != ERR_SUCCESS) {
+ state->err = htonl(state->err);
+ state->progress = PROG_ERRR;
+ return;
+ }
+
+ return;
+ case PROG_COMD:
+ if (!try_write(&fd->fd, (u8*)&state->response, &state->bytes_ctr, sizeof (u64)))
+ return; /* continue writing on next iteration */
+ goto socket_done;
+ case PROG_ARRY:
+ if (!try_write(
+ &fd->fd, (u8*)state->response_array, &state->bytes_ctr,
+ array_size(state->response_array) * sizeof (u64)
+ )) return; /* continue writing on next iteration */
+ /* array is freed in calling function */
+ goto socket_done;
+ case PROG_ERRR:
+ if (!try_write(&fd->fd, (u8*)&state->err, &state->bytes_ctr, sizeof (enum commd_error)))
+ return; /* continue writing on next iteration */
+ goto socket_done;
+ }
+
+pollerr:
+ LWARNF("Poll error on socket %i; closing connection (revents = %i)", fd->fd, fd->revents);
+socket_done:
+ shutdown(fd->fd, SHUT_RDWR);
+ close(fd->fd);
+ fd->fd = ~fd->fd;
+ return;
+
+}
+
+void new_tcp_loop(void) {
+ LINFO("Starting TCP loop");
+
+ init_statics();
+ init_admin(ADMIN_UID);
+ int master = setup_socket(TCP_PORT);
+ struct pollfd *sockets = array_new(struct pollfd, 1);
+ struct connection_state *states = array_new(struct connection_state, 1);
+ sockets[0].fd = master;
+ sockets[0].events = POLLIN;
+
+ int numready;
+ for(;;) {
+ numready = poll(sockets, array_size(sockets), POLL_RESTART);
+ if (should_exit == true) break;
+ if (numready < 0) switch (errno) {
+ case 0: break;
+ case EINTR: LINFO("Poll call was interrupted"); continue;
+ case ENOMEM: sleep(1); continue;
+ case EFAULT:
+ case EINVAL: LFAILV("poll");
+ }
+#if POLL_RESTART > 0
+ if (numready == 0) continue;
+#endif
+ /* assert(array_size(sockets) == array_size(states)); */
+ LDEBUGF("poll: %i sockets ready", numready);
+ /* master socket */
+ if (sockets[0].revents != 0) {
+ if (sockets[0].revents & POLLIN)
+ accept_new(&sockets, &states);
+ if (sockets[0].revents & (POLLNVAL | POLLHUP | POLLERR))
+ reopen_master(&sockets[0].fd);
+ }
+ LDEBUG("Array:");
+ for (size_t i = 0; i < array_size(sockets); ++i) {
+ LDEBUGF("%i", sockets[i].fd);
+ }
+ /* other sockets */
+ /* Please tell me I am not the only person who fucking hates reverse loops with
+ * array deletinos... */
+ for (size_t i = array_size(sockets) - 1; i > 0; --i) {
+ if (sockets[i].revents != 0) {
+ process_socket(sockets + i, states + i);
+ if (sockets[i].fd < 0) {
+ array_pop_at(sockets, i);
+ array_pop_at(states, i);
+ }
+ }
+ }
+ }
+ for (size_t i = 0; i < array_size(sockets); ++i) {
+ shutdown(sockets[i].fd, SHUT_RDWR);
+ close(sockets[i].fd);
+
+ }
+ array_free(sockets);
+ array_free(states);
+ hset_free(&users);
+ hset_free(&channels);
+}
+/* vim: set ts=8 noet: */
diff --git a/server/tcp.h b/server/tcp.h
index e239e19..a393922 100644
--- a/server/tcp.h
+++ b/server/tcp.h
@@ -17,19 +17,60 @@
#define TCP_MAX_WAIT_MS 10
#define TCP_MAX_RETRIES 0
#define ADMIN_UID 0
+#define POLL_RESTART 5000
+struct channel_pool {
+ u64 id;
+ u64 owner;
+ int pipefd;
+ char *name;
+ u16 port;
+};
+
+/* Size: 4 + 4 + 8 + 8 + 32 + 4 + 8 = 68 bytes */
+/* I can't fucking count it's 64 bytes */
+struct connection_state {
+ // TODO: consider using clang extension to pack this enum into u8
+ enum {
+ PROG_NONE = 0, // the socket has just been opened
+ // in loop: read type and continue
+ PROG_TYPE, // command type has been read
+ // in loop: start/continue reading command data
+ // when finished reading, set (& ~POLLIN) | POLLOUT
+ PROG_COMD, // command data has been read
+ // in loop: when revents & POLLOUT, write response
+ // or error, shutdown socket
+ PROG_ARRY, // extra data needs to be written
+ // (like in get_channels command)
+ PROG_ERRR, // an error happened and it should be reported,
+ // after that socket should be shut down and closed
+ } progress;
+ enum commd_type type;
+ u64 bytes_ctr;
+ u8 cmd_size;
+ struct commd cmd;
+ enum commd_error err;
+ union {
+ u64* response_array;
+ u64 response;
+ };
+};
/* val: struct tcp_channel */
extern struct hash_set channels;
/* val: struct tcp_user */
extern struct hash_set users;
+/* val: struct channel_pool*/
+extern struct hash_set pools;
void print_state(int);
void exit_tcp(int);
void tcp_loop(void);
+void new_tcp_loop(void);
u64 spawn_channel(struct thread_loop_arg *arg);
u64 spawn_channel_pool(void* arg); // TODO
bool sendto_channel(size_t chid, struct kv_system_packet* packet, int wait_ack_ms, int repeat);
#endif // KV_SERVER_TCP
+/* vim: set ts=8 noet: */