mirror of
https://forge.fsky.io/lda/Parsee.git
synced 2026-03-13 10:45:11 +00:00
[ADD] Try to renegociate an XMPP stream on failure
This commit is contained in:
parent
b78f7b6ab3
commit
40e3242465
1 changed files with 77 additions and 47 deletions
|
|
@ -244,66 +244,96 @@ ParseeXMPPThread(void *argp)
|
|||
}
|
||||
}
|
||||
|
||||
while (true)
|
||||
while (!args->halted)
|
||||
{
|
||||
char *id;
|
||||
|
||||
stanza = XMLDecode(jabber->stream, false);
|
||||
if (!stanza)
|
||||
while (true)
|
||||
{
|
||||
/* Try to check if an error is abound */
|
||||
if (args->verbosity >= PARSEE_VERBOSE_COMICAL)
|
||||
char *id;
|
||||
|
||||
stanza = XMLDecode(jabber->stream, false);
|
||||
if (!stanza)
|
||||
{
|
||||
Log(LOG_DEBUG, "RECEIVED EOF.");
|
||||
/* Try to check if an error is abound */
|
||||
if (args->verbosity >= PARSEE_VERBOSE_COMICAL)
|
||||
{
|
||||
Log(LOG_DEBUG, "RECEIVED EOF.");
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (args->verbosity >= PARSEE_VERBOSE_STANZA)
|
||||
{
|
||||
Stream *output = StreamStderr();
|
||||
StreamPrintf(output, "-------STANZA BEGIN-------" "\n");
|
||||
XMLEncode(output, stanza);
|
||||
StreamPrintf(output, "\n--------STANZA END--------" "\n");
|
||||
StreamFlush(output);
|
||||
}
|
||||
|
||||
id = HashMapGet(stanza->attrs, "id");
|
||||
if (id)
|
||||
{
|
||||
XMPPAwait *await;
|
||||
/* Lock out the table to see if we're awaiting. */
|
||||
pthread_mutex_lock(&await_lock);
|
||||
if ((await = HashMapGet(await_table, id)))
|
||||
if (args->verbosity >= PARSEE_VERBOSE_STANZA)
|
||||
{
|
||||
pthread_mutex_lock(&await->cond_lock);
|
||||
await->stanza = stanza;
|
||||
pthread_cond_signal(&await->condition);
|
||||
pthread_mutex_unlock(&await->cond_lock);
|
||||
Stream *output = StreamStderr();
|
||||
StreamPrintf(output, "-------STANZA BEGIN-------" "\n");
|
||||
XMLEncode(output, stanza);
|
||||
StreamPrintf(output, "\n--------STANZA END--------" "\n");
|
||||
StreamFlush(output);
|
||||
}
|
||||
|
||||
HashMapDelete(await_table, id);
|
||||
id = HashMapGet(stanza->attrs, "id");
|
||||
if (id)
|
||||
{
|
||||
XMPPAwait *await;
|
||||
/* Lock out the table to see if we're awaiting. */
|
||||
pthread_mutex_lock(&await_lock);
|
||||
if ((await = HashMapGet(await_table, id)))
|
||||
{
|
||||
pthread_mutex_lock(&await->cond_lock);
|
||||
await->stanza = stanza;
|
||||
pthread_cond_signal(&await->condition);
|
||||
pthread_mutex_unlock(&await->cond_lock);
|
||||
|
||||
HashMapDelete(await_table, id);
|
||||
|
||||
pthread_mutex_unlock(&await_lock);
|
||||
continue;
|
||||
}
|
||||
pthread_mutex_unlock(&await_lock);
|
||||
continue;
|
||||
}
|
||||
pthread_mutex_unlock(&await_lock);
|
||||
}
|
||||
|
||||
/* Push it into the stanza FIFO. A dispatcher thread should then
|
||||
* be able to freely grab a value(locked by a mutex). We can't make
|
||||
* dispatchers read stanzas on their own, since that's _not_ how
|
||||
* streams work, but this should mitigate some issues, and allow a
|
||||
* few threads to be busy, while the rest of Parsee works. */
|
||||
PushStanza(&info, stanza);
|
||||
/* Push it into the stanza FIFO. A dispatcher thread should then
|
||||
* be able to freely grab a value(locked by a mutex). We can't make
|
||||
* dispatchers read stanzas on their own, since that's _not_ how
|
||||
* streams work, but this should mitigate some issues, and allow a
|
||||
* few threads to be busy, while the rest of Parsee works. */
|
||||
PushStanza(&info, stanza);
|
||||
}
|
||||
pthread_mutex_lock(&args->halt_lock);
|
||||
if (!args->halted)
|
||||
{
|
||||
Log(LOG_WARNING, "XMPP server is closing stream...");
|
||||
for (size_t i = 0; i < 50; i++)
|
||||
{
|
||||
UtilSleepMillis(100); /* Wait a bit so that temporary failures don't fuck everything up */
|
||||
|
||||
Log(LOG_WARNING, "Restarting XMPP stream.");
|
||||
/* This is the part where a new connection is being considered */
|
||||
XMPPFinishCompStream(jabber);
|
||||
XMPPEndCompStream(jabber);
|
||||
|
||||
args->jabber = XMPPInitialiseCompStream(
|
||||
args->config->component_addr,
|
||||
args->config->component_host,
|
||||
args->config->component_port
|
||||
);
|
||||
jabber = args->jabber;
|
||||
if (!jabber || !XMPPAuthenticateCompStream(jabber, args->config->shared_comp_secret))
|
||||
{
|
||||
/* Oops, there is something wrong! */
|
||||
Log(LOG_ERR, "Couldn't authenticate to XMPP server");
|
||||
XMPPEndCompStream(jabber);
|
||||
args->jabber = NULL;
|
||||
jabber = NULL;
|
||||
if (i == 4)
|
||||
{
|
||||
pthread_mutex_unlock(&args->halt_lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&args->halt_lock);
|
||||
}
|
||||
pthread_mutex_lock(&args->halt_lock);
|
||||
if (!args->halted)
|
||||
{
|
||||
Log(LOG_WARNING, "XMPP server is closing stream...");
|
||||
Log(LOG_WARNING, "Stopping %s...", NAME);
|
||||
error = true;
|
||||
}
|
||||
pthread_mutex_unlock(&args->halt_lock);
|
||||
|
||||
info.running = false;
|
||||
for (i = 0; i < info.available_dispatchers; i++)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue