[MOD/WIP] Threading, and bot "API".

Sloppy code!
This commit is contained in:
LDA 2024-07-03 22:28:31 +02:00
commit b81e267141
8 changed files with 259 additions and 124 deletions

View file

@ -5,6 +5,7 @@
#include <Cytoplasm/Memory.h>
#include <Cytoplasm/Base64.h>
#include <Cytoplasm/Util.h>
#include <Cytoplasm/Log.h>
#include <Cytoplasm/Str.h>
#include <Cytoplasm/Sha.h>
@ -269,11 +270,30 @@ ParseeVerifyAllStanza(ParseeData *args, XMLElement *stanza)
return ret;
}
struct XMPPThread;
typedef struct XMPPThreadInfo {
/* A FIFO of stanzas */
Array *stanzas;
pthread_mutex_t lock;
ParseeData *args;
XMPPComponent *jabber;
struct XMPPThread *dispatchers;
size_t available_dispatchers;
bool running;
pthread_mutex_t chk_lock;
} XMPPThreadInfo;
typedef struct XMPPThread {
pthread_t thr;
XMPPThreadInfo *info;
} XMPPThread;
/* TODO: Clean up all of this. We are currently separating DMs from MUCs,
* where we could unify all our code, and generalise everything. */
static bool
MessageStanza(ParseeData *args, XMLElement *stanza)
MessageStanza(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
XMPPComponent *jabber = args->jabber;
@ -282,9 +302,14 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
XMLElement *data = NULL;
char *to, *room, *from, *from_matrix, *decode_from;
char *chat_id, *mroom_id;
char *chat_id = NULL, *mroom_id = NULL;
size_t i;
to = NULL;
from = NULL;
decode_from = NULL;
from_matrix = NULL;
from = HashMapGet(stanza->attrs, "from");
#define CHAT_STATES "http://jabber.org/protocol/chatstates"
@ -292,29 +317,31 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
{
decode_from = ParseeLookupJID(from);
from_matrix = ParseeEncodeJID(args->config, decode_from, true);
chat_id = ParseeGetFromMUCID(args, from);
mroom_id = ParseeGetRoomID(args, chat_id);
mroom_id = ParseeGetBridgedRoom(args, stanza);
ASType(args->config, from_matrix, mroom_id, true);
Free(decode_from);
Free(from_matrix);
Free(mroom_id);
Free(chat_id);
mroom_id = NULL;
decode_from = NULL;
from_matrix = NULL;
}
else if (XMLookForTKV(stanza, "active", "xmlns", CHAT_STATES))
{
decode_from = ParseeLookupJID(from);
from_matrix = ParseeEncodeJID(args->config, decode_from, true);
chat_id = ParseeGetFromMUCID(args, from);
mroom_id = ParseeGetRoomID(args, chat_id);
mroom_id = ParseeGetBridgedRoom(args, stanza);
ASType(args->config, from_matrix, mroom_id, false);
Free(decode_from);
Free(from_matrix);
Free(mroom_id);
Free(chat_id);
mroom_id = NULL;
decode_from = NULL;
from_matrix = NULL;
}
#undef CHAT_STATES
body = XMLookForUnique(stanza, "body");
@ -335,6 +362,19 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
room = ParseeFindDMRoom(args, to, from);
data = ArrayGet(body->children, 0);
/* TODO: THIS IS A HACK. THIS CODE IGNORES ALL MUC MESSAGES EVER
* SENT TO NON-PARSEE PUPPETS, AS A "FIX" TO THE MULTITHREADED
* ISSUE.
*
* I HATE THIS. I NEED TO FIND A BETTER WAY. */
if (!room)
{
if (strncmp(HashMapGet(stanza->attrs, "to"), "parsee@", 7))
{
goto end;
}
}
mroom_id = ParseeGetBridgedRoom(args, stanza);
if (mroom_id && !XMPPIsParseeStanza(stanza))
{
@ -355,10 +395,12 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
chat = true;
}
pthread_mutex_lock(&thr->info->chk_lock);
if (ParseeVerifyAllStanza(args, stanza) && !replaced)
{
XMLElement *oob, *oob_data;
pthread_mutex_unlock(&thr->info->chk_lock);
ASRegisterUser(args->config, encoded);
if (!chat)
{
@ -430,8 +472,10 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
"m.room.message", ev
);
}
pthread_mutex_lock(&thr->info->chk_lock);
ParseePushAllStanza(args, stanza, event_id);
Free(event_id);
pthread_mutex_unlock(&thr->info->chk_lock);
}
else if (replaced)
{
@ -441,10 +485,14 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
"m.room.message", MatrixCreateReplace(event_id, data->data)
));
ParseePushAllStanza(args, stanza, event_id);
pthread_mutex_unlock(&thr->info->chk_lock);
Free(event_id);
}
else
{
pthread_mutex_unlock(&thr->info->chk_lock);
}
Free(res);
Free(encoded);
}
@ -458,7 +506,9 @@ MessageStanza(ParseeData *args, XMLElement *stanza)
ParseePushAllStanza(args, stanza, e_d->data);
}
end:
Free(mroom_id);
mroom_id = NULL;
Free(from_matrix);
Free(decode_from);
Free(room);
@ -647,20 +697,38 @@ PresenceStanza(ParseeData *args, XMLElement *stanza)
#undef MUC_USER_NS
}
void *
ParseeXMPPThread(void *argp)
static XMLElement *
RetrieveStanza(XMPPThread *thread)
{
ParseeData *args = argp;
XMPPComponent *jabber = args->jabber;
XMLElement *stanza = NULL;
while (true)
{
char *to, *room, *from, *from_matrix;
char *chat_id, *mroom_id;
XMLElement *ret = NULL;
stanza = XMLDecode(jabber->stream, false);
pthread_mutex_lock(&thread->info->lock);
ret = ArrayDelete(thread->info->stanzas, 0);
pthread_mutex_unlock(&thread->info->lock);
return ret;
}
static void
PushStanza(XMPPThreadInfo *info, XMLElement *stanza)
{
pthread_mutex_lock(&info->lock);
ArrayAdd(info->stanzas, stanza);
pthread_mutex_unlock(&info->lock);
}
static void *
XMPPDispatcher(void *argp)
{
XMPPThread *thread = argp;
ParseeData *args = thread->info->args;
XMPPComponent *jabber = thread->info->jabber;
while (thread->info->running)
{
XMLElement *stanza = RetrieveStanza(thread);
if (!stanza)
{
UtilSleepMillis(10);
continue;
}
@ -674,28 +742,7 @@ ParseeXMPPThread(void *argp)
else if (StrEquals(stanza->name, "message"))
{
size_t i;
if (XMPPIsKiller(stanza))
{
const char *killer = "killer";
const char *suspend="suspend";
from = HashMapGet(stanza->attrs, "from");
Log(LOG_INFO, "Killer.");
if (!strncmp(from, killer, strlen(killer)))
{
XMLFreeElement(stanza);
break;
}
else if (!strncmp(from, suspend, strlen(suspend)))
{
XMLFreeElement(stanza);
pthread_mutex_lock(&cond_var_lock);
pthread_cond_wait(&cond_var, &cond_var_lock);
pthread_mutex_unlock(&cond_var_lock);
continue;
}
}
if (!MessageStanza(args, stanza))
if (!MessageStanza(args, stanza, thread))
{
continue;
}
@ -713,6 +760,92 @@ ParseeXMPPThread(void *argp)
}
XMLFreeElement(stanza);
}
}
void *
ParseeXMPPThread(void *argp)
{
ParseeData *args = argp;
XMPPComponent *jabber = args->jabber;
XMLElement *stanza = NULL;
XMPPThreadInfo info;
pthread_mutex_t stanzas_lock = PTHREAD_MUTEX_INITIALIZER;
size_t i;
/* Initialise the FIFO */
info.stanzas = ArrayCreate();
pthread_mutex_init(&info.lock, NULL);
/* TODO: Make that configurable. */
info.available_dispatchers = 8;
info.dispatchers = Malloc(
sizeof(*info.dispatchers) * info.available_dispatchers
);
info.args = args;
info.jabber = jabber;
info.running = true;
pthread_mutex_init(&info.chk_lock, NULL);
for (i = 0; i < info.available_dispatchers; i++)
{
pthread_t *thr = &info.dispatchers[i].thr;
info.dispatchers[i].info = &info;
pthread_create(thr, NULL, XMPPDispatcher, &info.dispatchers[i]);
}
while (true)
{
char *to, *room, *from, *from_matrix;
char *chat_id, *mroom_id;
ssize_t cntr;
stanza = XMLDecode(jabber->stream, false);
if (!stanza)
{
continue;
}
if (StrEquals(stanza->name, "message") && XMPPIsKiller(stanza))
{
const char *killer = "killer";
const char *suspend="suspend";
from = HashMapGet(stanza->attrs, "from");
if (!strncmp(from, killer, strlen(killer)))
{
XMLFreeElement(stanza);
break;
}
else if (!strncmp(from, suspend, strlen(suspend)))
{
/* TODO */
XMLFreeElement(stanza);
pthread_mutex_lock(&cond_var_lock);
pthread_cond_wait(&cond_var, &cond_var_lock);
pthread_mutex_unlock(&cond_var_lock);
continue;
}
}
PushStanza(&info, stanza);
//XMLFreeElement(stanza);
}
info.running = false;
for (i = 0; i < info.available_dispatchers; i++)
{
pthread_t thr = info.dispatchers[i].thr;
pthread_join(thr, NULL);
}
Free(info.dispatchers);
for (i = 0; i < ArraySize(info.stanzas); i++)
{
XMLFreeElement(ArrayGet(info.stanzas, i));
}
ArrayFree(info.stanzas);
pthread_mutex_destroy(&info.lock);
return NULL;
}