[ADD/WIP] Congestion, basic ad-hoc commands

Woah, took me a while, eh? Next up, getting forms!
This commit is contained in:
LDA 2024-07-13 16:26:33 +02:00
commit 408888ef67
9 changed files with 911 additions and 14 deletions

View file

@ -12,6 +12,7 @@
#include <Cytoplasm/Sha.h>
#include <StringStream.h>
#include <XMPPCommand.h>
#include <Matrix.h>
#include <XMPP.h>
#include <XML.h>
@ -19,6 +20,7 @@
#define IQ_ADVERT \
AdvertiseSimple("http://jabber.org/protocol/chatstates") \
AdvertiseSimple("http://jabber.org/protocol/commands") \
AdvertiseSimple("http://jabber.org/protocol/caps") \
AdvertiseSimple("urn:xmpp:avatar:metadata+notify") \
AdvertiseSimple("urn:xmpp:avatar:data+notify") \
@ -293,6 +295,7 @@ typedef struct XMPPThreadInfo {
ParseeData *args;
XMPPComponent *jabber;
XMPPCommandManager *m;
struct XMPPThread *dispatchers;
size_t available_dispatchers;
@ -864,13 +867,16 @@ TrimBase64(char *b64)
return ret;
}
static void
IQResult(ParseeData *args, XMLElement *stanza)
IQResult(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
XMLElement *vcard = XMLookForTKV(stanza, "vCard", "xmlns", "vcard-temp");
XMLElement *event = XMLookForTKV(stanza, "pubsub",
"xmlns", "http://jabber.org/protocol/pubsub"
);
(void) thr;
if (event)
{
size_t i;
@ -1033,15 +1039,60 @@ CreateTagWithText(const char *tn, char *text)
return tag;
}
static bool
IQIsCommandList(ParseeData *args, XMLElement *stanza)
{
char *parsee = NULL;
XMLElement *query = XMLookForTKV(
stanza, "query", "xmlns",
"http://jabber.org/protocol/disco#items"
);
bool ret = false;
if (!query ||
!StrEquals(HashMapGet(query->attrs, "node"),
"http://jabber.org/protocol/commands"))
{
return false;
}
parsee = ParseeJID(args);
ret = StrEquals(HashMapGet(stanza->attrs, "to"), parsee);
Free(parsee);
return ret;
}
static void
IQGet(ParseeData *args, XMLElement *stanza)
IQGet(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
XMPPComponent *jabber = args->jabber;
char *from = HashMapGet(stanza->attrs, "from");
char *to = HashMapGet(stanza->attrs, "to");
char *id = HashMapGet(stanza->attrs, "id");
if (XMLookForTKV(stanza, "vCard", "xmlns", "vcard-temp"))
(void) thr;
if (IQIsCommandList(args, stanza))
{
XMLElement *iq_reply = XMLCreateTag("iq");
XMLAddAttr(iq_reply, "type", "result");
XMLAddAttr(iq_reply, "from", to);
XMLAddAttr(iq_reply, "to", from);
XMLAddAttr(iq_reply, "id", id);
{
XMLElement *q = XMLCreateTag("query");
XMLAddAttr(q, "xmlns", "http://jabber.org/protocol/disco#items");
XMLAddAttr(q, "node", "http://jabber.org/protocol/commands");
XMPPShoveCommandList(thr->info->m, to, q);
XMLAddChild(iq_reply, q);
}
pthread_mutex_lock(&jabber->write_lock);
XMLEncode(jabber->stream, iq_reply);
StreamFlush(jabber->stream);
pthread_mutex_unlock(&jabber->write_lock);
XMLFreeElement(iq_reply);
}
else if (XMLookForTKV(stanza, "vCard", "xmlns", "vcard-temp"))
{
Log(LOG_INFO, "vCard information GET for %s", to);
@ -1127,13 +1178,22 @@ IQGet(ParseeData *args, XMLElement *stanza)
}
static void
IQError(ParseeData *args, XMLElement *stanza)
IQError(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
/* TODO */
}
static void
IQSet(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
XMPPCommandManager *manager = thr->info->m;
if (!XMPPManageCommand(manager, stanza, args))
{
Log(LOG_WARNING, "NOT A COMMAND");
}
}
#undef DISCO
static void
IQStanza(ParseeData *args, XMLElement *stanza)
IQStanza(ParseeData *args, XMLElement *stanza, XMPPThread *thr)
{
char *type;
type = HashMapGet(stanza->attrs, "type");
@ -1141,13 +1201,14 @@ IQStanza(ParseeData *args, XMLElement *stanza)
{ \
if (StrEquals(type, #ctyp)) \
{ \
callback(args, stanza); \
callback(args, stanza, thr); \
return; \
} \
} \
while (0)
OnType(get, IQGet);
OnType(set, IQSet);
OnType(error, IQError);
OnType(result, IQResult);
#undef OnType
@ -1467,7 +1528,7 @@ XMPPDispatcher(void *argp)
}
else if (StrEquals(stanza->name, "iq"))
{
IQStanza(args, stanza);
IQStanza(args, stanza, thread);
}
else
{
@ -1488,21 +1549,117 @@ typedef struct XMPPAwait {
XMLElement *stanza;
} XMPPAwait;
static pthread_mutex_t await_lock = PTHREAD_MUTEX_INITIALIZER;
static HashMap *await_table = NULL;
static XMPPThreadInfo info;
size_t
ParseeCongestion(void)
{
size_t congestion;
pthread_mutex_lock(&info.lock);
congestion = ArraySize(info.stanzas);
pthread_mutex_unlock(&info.lock);
return congestion;
}
static void
StatusCallback(XMPPCommandManager *m, HashMap *data, XMLElement *out)
{
size_t alloc = MemoryAllocated();
size_t kb = alloc >> 10;
size_t mb = kb >> 10;
size_t gb = mb >> 10;
size_t min = gb ? gb : (mb ? mb : (kb ? kb : alloc));
char *unit = gb ? "GB" : (mb ? "MB" : (kb ? "KB" : "B"));
XMLElement *x = XMLCreateTag("x");
XMLElement *title = XMLCreateTag("title");
{
XMLElement *title_text = XMLCreateText("Parsee statistics");
XMLAddChild(title, title_text);
}
XMLAddChild(x, title);
XMLAddAttr(x, "xmlns", "jabber:x:data");
XMLAddAttr(x, "type", "result");
{
XMLElement *reported, *item, *field, *value, *txt;
reported = XMLCreateTag("reported");
XMLAddChild(x, reported);
#define Report(id, label) do \
{ \
field = XMLCreateTag("field"); \
XMLAddAttr(field, "var", id); \
XMLAddAttr(field, "label", label); \
XMLAddChild(reported, field); \
} \
while(0)
#define BeginItem() item = XMLCreateTag("item")
#define EndItem() XMLAddChild(x, item)
#define SetField(id, val) do \
{ \
field = XMLCreateTag("field"); \
value = XMLCreateTag("value"); \
txt = XMLCreateText(val); \
XMLAddAttr(field, "var", id); \
XMLAddChild(value, txt); \
XMLAddChild(field, value); \
XMLAddChild(item, field); \
} \
while(0)
/* Report */
Report("mem-alloc", "Heap allocated with Cytoplasm");
Report("xml-congest", "Unprocessed stanzas(congestion)");
/* Set */
BeginItem();
{
char *min_str = StrInt(min);
char *congest = StrInt(ParseeCongestion());
char *alloc = StrConcat(3, min_str, " ", unit);
SetField("mem-alloc", alloc);
SetField("xml-congest", congest);
Free(congest);
Free(min_str);
Free(alloc);
}
EndItem();
#undef SetField
#undef EndItem
#undef BeginItem
#undef Report
}
XMLAddChild(out, x);
}
void *
ParseeXMPPThread(void *argp)
{
ParseeData *args = argp;
XMPPComponent *jabber = args->jabber;
XMLElement *stanza = NULL;
XMPPThreadInfo info;
size_t i;
/* Initialise the await table */
await_table = HashMapCreate();
/* Initialise the command manager, and add all ad-hoc commands */
info.m = XMPPCreateManager();
{
XMPPCommand *cmd;
cmd = XMPPBasicCmd(
"status", "Get status about Parsee",
StatusCallback
);
XMPPRegisterCommand(info.m, cmd);
}
/* Initialise the FIFO */
info.stanzas = ArrayCreate();
pthread_mutex_init(&info.lock, NULL);
@ -1578,6 +1735,8 @@ ParseeXMPPThread(void *argp)
HashMapFree(await_table);
pthread_mutex_destroy(&info.lock);
XMPPFreeManager(info.m);
return NULL;
}