[WIP/FIX] Lock write operations to XMPP

I *think* that fixes stream errors with two threads writing at once.
Even if it doesn't, I think it's always a nice-to-have.
This commit is contained in:
LDA 2024-06-23 17:54:58 +02:00
commit 37155316b2
4 changed files with 32 additions and 0 deletions

View file

@ -76,6 +76,7 @@ XMPPInitialiseCompStream(char *host, int port)
comp = Malloc(sizeof(*comp)); comp = Malloc(sizeof(*comp));
comp->host = StrDuplicate(host); comp->host = StrDuplicate(host);
comp->stream = stream; comp->stream = stream;
pthread_mutex_init(&comp->write_lock, NULL);
return comp; return comp;
} }
@ -112,6 +113,8 @@ XMPPAuthenticateCompStream(XMPPComponent *comp, char *shared)
return false; return false;
} }
pthread_mutex_lock(&comp->write_lock);
stream = comp->stream; stream = comp->stream;
as = comp->host; as = comp->host;
StreamPrintf(stream, StreamPrintf(stream,
@ -178,6 +181,7 @@ XMPPAuthenticateCompStream(XMPPComponent *comp, char *shared)
Free(handshake); Free(handshake);
end: end:
XMLFreeLexer(sax); XMLFreeLexer(sax);
pthread_mutex_unlock(&comp->write_lock);
return ret; return ret;
} }
@ -188,6 +192,7 @@ XMPPEndCompStream(XMPPComponent *comp)
{ {
return; return;
} }
pthread_mutex_destroy(&comp->write_lock);
StreamClose(comp->stream); StreamClose(comp->stream);
Free(comp->host); Free(comp->host);
Free(comp); Free(comp);

View file

@ -17,6 +17,9 @@ XMPPQueryMUC(XMPPComponent *jabber, char *muc, MUCInfo *out)
{ {
return false; return false;
} }
pthread_mutex_lock(&jabber->write_lock);
iq_query = XMLCreateTag("iq"); iq_query = XMLCreateTag("iq");
query = XMLCreateTag("query"); query = XMLCreateTag("query");
@ -47,6 +50,7 @@ XMPPQueryMUC(XMPPComponent *jabber, char *muc, MUCInfo *out)
{ {
XMLFreeElement(iq_query); XMLFreeElement(iq_query);
ParseeWakeupThread(); ParseeWakeupThread();
pthread_mutex_unlock(&jabber->write_lock);
return false; return false;
} }
query = XMLookForUnique(iq_query, "query"); query = XMLookForUnique(iq_query, "query");
@ -58,6 +62,7 @@ XMPPQueryMUC(XMPPComponent *jabber, char *muc, MUCInfo *out)
{ {
XMLFreeElement(iq_query); XMLFreeElement(iq_query);
ParseeWakeupThread(); ParseeWakeupThread();
pthread_mutex_unlock(&jabber->write_lock);
return false; return false;
} }
@ -74,6 +79,7 @@ XMPPQueryMUC(XMPPComponent *jabber, char *muc, MUCInfo *out)
} }
} }
/* Wake it up once we're done */ /* Wake it up once we're done */
pthread_mutex_unlock(&jabber->write_lock);
ParseeWakeupThread(); ParseeWakeupThread();
return true; return true;
} }

View file

@ -15,6 +15,8 @@ XMPPSendPlain(XMPPComponent *comp, char *fr, char *to, char *msg, char *type)
{ {
return; return;
} }
pthread_mutex_lock(&comp->write_lock);
message = XMLCreateTag("message"); message = XMLCreateTag("message");
XMLAddAttr(message, "from", (from = StrConcat(3, fr, "@", comp->host))); XMLAddAttr(message, "from", (from = StrConcat(3, fr, "@", comp->host)));
@ -56,6 +58,8 @@ XMPPSendPlain(XMPPComponent *comp, char *fr, char *to, char *msg, char *type)
StreamFlush(comp->stream); StreamFlush(comp->stream);
XMLFreeElement(message); XMLFreeElement(message);
Free(from); Free(from);
pthread_mutex_unlock(&comp->write_lock);
} }
void void
XMPPSendMUC(XMPPComponent *comp, char *fr, char *as, char *to, char *msg, char *type) XMPPSendMUC(XMPPComponent *comp, char *fr, char *as, char *to, char *msg, char *type)
@ -67,6 +71,8 @@ XMPPSendMUC(XMPPComponent *comp, char *fr, char *as, char *to, char *msg, char *
return; return;
} }
pthread_mutex_lock(&comp->write_lock);
message = XMLCreateTag("message"); message = XMLCreateTag("message");
XMLAddAttr(message, "from", XMLAddAttr(message, "from",
(from = StrConcat(5, fr, "@", comp->host, "/", as))); (from = StrConcat(5, fr, "@", comp->host, "/", as)));
@ -86,6 +92,8 @@ XMPPSendMUC(XMPPComponent *comp, char *fr, char *as, char *to, char *msg, char *
XMLFreeElement(message); XMLFreeElement(message);
Free(from); Free(from);
Free(id); Free(id);
pthread_mutex_unlock(&comp->write_lock);
} }
void void
XMPPJoinMUC(XMPPComponent *comp, char *fr, char *muc) XMPPJoinMUC(XMPPComponent *comp, char *fr, char *muc)
@ -97,6 +105,8 @@ XMPPJoinMUC(XMPPComponent *comp, char *fr, char *muc)
return; return;
} }
pthread_mutex_lock(&comp->write_lock);
presence = XMLCreateTag("presence"); presence = XMLCreateTag("presence");
x = XMLCreateTag("x"); x = XMLCreateTag("x");
XMLAddAttr(presence, "from", (from = StrConcat(3, fr, "@", comp->host))); XMLAddAttr(presence, "from", (from = StrConcat(3, fr, "@", comp->host)));
@ -112,6 +122,8 @@ XMPPJoinMUC(XMPPComponent *comp, char *fr, char *muc)
XMLFreeElement(presence); XMLFreeElement(presence);
Free(from); Free(from);
Free(id); Free(id);
pthread_mutex_unlock(&comp->write_lock);
} }
void void
XMPPKillThread(XMPPComponent *jabber, char *killer) XMPPKillThread(XMPPComponent *jabber, char *killer)
@ -124,6 +136,8 @@ XMPPKillThread(XMPPComponent *jabber, char *killer)
return; return;
} }
pthread_mutex_lock(&jabber->write_lock);
from = StrConcat(3, killer, "@", jabber->host); from = StrConcat(3, killer, "@", jabber->host);
message = XMLCreateTag("message"); message = XMLCreateTag("message");
XMLAddAttr(message, "from", from); XMLAddAttr(message, "from", from);
@ -136,6 +150,8 @@ XMPPKillThread(XMPPComponent *jabber, char *killer)
StreamFlush(jabber->stream); StreamFlush(jabber->stream);
XMLFreeElement(message); XMLFreeElement(message);
Free(from); Free(from);
pthread_mutex_unlock(&jabber->write_lock);
} }
bool bool
XMPPIsKiller(XMLElement *stanza) XMPPIsKiller(XMLElement *stanza)

View file

@ -3,9 +3,14 @@
#include <Cytoplasm/Stream.h> #include <Cytoplasm/Stream.h>
#include <pthread.h>
#include <XML.h> #include <XML.h>
typedef struct XMPPComponent { typedef struct XMPPComponent {
/* A lock for all write operations */
pthread_mutex_t write_lock;
char *host; char *host;
Stream *stream; Stream *stream;
} XMPPComponent; } XMPPComponent;