From 40e324246510585ea6fab202fcaac8e2aa6f6650 Mon Sep 17 00:00:00 2001 From: lda Date: Wed, 19 Feb 2025 16:14:30 +0000 Subject: [PATCH] [ADD] Try to renegociate an XMPP stream on failure --- src/XMPPThread/ReListener.c | 126 ++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 48 deletions(-) diff --git a/src/XMPPThread/ReListener.c b/src/XMPPThread/ReListener.c index 28d117b..44f22f8 100644 --- a/src/XMPPThread/ReListener.c +++ b/src/XMPPThread/ReListener.c @@ -244,66 +244,96 @@ ParseeXMPPThread(void *argp) } } - while (true) + while (!args->halted) { - char *id; - - stanza = XMLDecode(jabber->stream, false); - if (!stanza) + while (true) { - /* Try to check if an error is abound */ - if (args->verbosity >= PARSEE_VERBOSE_COMICAL) + char *id; + + stanza = XMLDecode(jabber->stream, false); + if (!stanza) { - Log(LOG_DEBUG, "RECEIVED EOF."); + /* Try to check if an error is abound */ + if (args->verbosity >= PARSEE_VERBOSE_COMICAL) + { + Log(LOG_DEBUG, "RECEIVED EOF."); + } + break; } - break; - } - if (args->verbosity >= PARSEE_VERBOSE_STANZA) - { - Stream *output = StreamStderr(); - StreamPrintf(output, "-------STANZA BEGIN-------" "\n"); - XMLEncode(output, stanza); - StreamPrintf(output, "\n--------STANZA END--------" "\n"); - StreamFlush(output); - } - - id = HashMapGet(stanza->attrs, "id"); - if (id) - { - XMPPAwait *await; - /* Lock out the table to see if we're awaiting. */ - pthread_mutex_lock(&await_lock); - if ((await = HashMapGet(await_table, id))) + if (args->verbosity >= PARSEE_VERBOSE_STANZA) { - pthread_mutex_lock(&await->cond_lock); - await->stanza = stanza; - pthread_cond_signal(&await->condition); - pthread_mutex_unlock(&await->cond_lock); + Stream *output = StreamStderr(); + StreamPrintf(output, "-------STANZA BEGIN-------" "\n"); + XMLEncode(output, stanza); + StreamPrintf(output, "\n--------STANZA END--------" "\n"); + StreamFlush(output); + } - HashMapDelete(await_table, id); + id = HashMapGet(stanza->attrs, "id"); + if (id) + { + XMPPAwait *await; + /* Lock out the table to see if we're awaiting. */ + pthread_mutex_lock(&await_lock); + if ((await = HashMapGet(await_table, id))) + { + pthread_mutex_lock(&await->cond_lock); + await->stanza = stanza; + pthread_cond_signal(&await->condition); + pthread_mutex_unlock(&await->cond_lock); + HashMapDelete(await_table, id); + + pthread_mutex_unlock(&await_lock); + continue; + } pthread_mutex_unlock(&await_lock); - continue; } - pthread_mutex_unlock(&await_lock); - } - /* Push it into the stanza FIFO. A dispatcher thread should then - * be able to freely grab a value(locked by a mutex). We can't make - * dispatchers read stanzas on their own, since that's _not_ how - * streams work, but this should mitigate some issues, and allow a - * few threads to be busy, while the rest of Parsee works. */ - PushStanza(&info, stanza); + /* Push it into the stanza FIFO. A dispatcher thread should then + * be able to freely grab a value(locked by a mutex). We can't make + * dispatchers read stanzas on their own, since that's _not_ how + * streams work, but this should mitigate some issues, and allow a + * few threads to be busy, while the rest of Parsee works. */ + PushStanza(&info, stanza); + } + pthread_mutex_lock(&args->halt_lock); + if (!args->halted) + { + Log(LOG_WARNING, "XMPP server is closing stream..."); + for (size_t i = 0; i < 50; i++) + { + UtilSleepMillis(100); /* Wait a bit so that temporary failures don't fuck everything up */ + + Log(LOG_WARNING, "Restarting XMPP stream."); + /* This is the part where a new connection is being considered */ + XMPPFinishCompStream(jabber); + XMPPEndCompStream(jabber); + + args->jabber = XMPPInitialiseCompStream( + args->config->component_addr, + args->config->component_host, + args->config->component_port + ); + jabber = args->jabber; + if (!jabber || !XMPPAuthenticateCompStream(jabber, args->config->shared_comp_secret)) + { + /* Oops, there is something wrong! */ + Log(LOG_ERR, "Couldn't authenticate to XMPP server"); + XMPPEndCompStream(jabber); + args->jabber = NULL; + jabber = NULL; + if (i == 4) + { + pthread_mutex_unlock(&args->halt_lock); + break; + } + } + } + } + pthread_mutex_unlock(&args->halt_lock); } - pthread_mutex_lock(&args->halt_lock); - if (!args->halted) - { - Log(LOG_WARNING, "XMPP server is closing stream..."); - Log(LOG_WARNING, "Stopping %s...", NAME); - error = true; - } - pthread_mutex_unlock(&args->halt_lock); info.running = false; for (i = 0; i < info.available_dispatchers; i++)