aboutsummaryrefslogtreecommitdiff
path: root/src/QXmppServer.cpp
diff options
context:
space:
mode:
authorJeremy Lainé <jeremy.laine@m4x.org>2010-08-27 15:07:52 +0000
committerJeremy Lainé <jeremy.laine@m4x.org>2010-08-27 15:07:52 +0000
commit3cf9b81493c6713c5697d2bab2ea92b6cfe72be4 (patch)
tree182819ffc5f7c4239f6b636766638fd9bfaa90dd /src/QXmppServer.cpp
parenta8203708313675c20e341681acd673ab2ac497f1 (diff)
downloadqxmpp-3cf9b81493c6713c5697d2bab2ea92b6cfe72be4.tar.gz
refactor queue management
Diffstat (limited to 'src/QXmppServer.cpp')
-rw-r--r--src/QXmppServer.cpp162
1 files changed, 98 insertions, 64 deletions
diff --git a/src/QXmppServer.cpp b/src/QXmppServer.cpp
index 47412610..5b6c7607 100644
--- a/src/QXmppServer.cpp
+++ b/src/QXmppServer.cpp
@@ -26,6 +26,7 @@
#include <QPluginLoader>
#include <QSslSocket>
+#include "QXmppConstants.h"
#include "QXmppDialback.h"
#include "QXmppIq.h"
#include "QXmppIncomingClient.h"
@@ -62,6 +63,7 @@ public:
QList<QXmppIncomingServer*> incomingServers;
QList<QXmppOutgoingServer*> outgoingServers;
QXmppSslServer *serverForServers;
+ QMap<QXmppStream*, QList<QByteArray> > queues;
private:
bool loaded;
@@ -348,9 +350,12 @@ QXmppOutgoingServer* QXmppServer::connectToDomain(const QString &domain)
stream->configuration().setHost(domain);
stream->configuration().setPort(5269);
- bool check = connect(stream, SIGNAL(disconnected()),
- this, SLOT(slotServerDisconnected()));
+ bool check = connect(stream, SIGNAL(connected()),
+ this, SLOT(slotStreamConnected()));
Q_ASSERT(check);
+
+ check = connect(stream, SIGNAL(disconnected()),
+ this, SLOT(slotStreamDisconnected()));
Q_UNUSED(check);
// add stream
@@ -385,7 +390,7 @@ QList<QXmppStream*> QXmppServer::getStreams(const QString &to)
// look for an outgoing S2S connection
foreach (QXmppOutgoingServer *conn, d->outgoingServers)
{
- if (conn->configuration().domain() == toDomain && conn->isConnected())
+ if (conn->configuration().domain() == toDomain)
{
found << conn;
break;
@@ -395,11 +400,7 @@ QList<QXmppStream*> QXmppServer::getStreams(const QString &to)
// if we did not find an outgoing server,
// we need to establish the S2S connection
if (found.isEmpty())
- {
- connectToDomain(toDomain);
-
- // FIXME : the current packet will not be delivered
- }
+ found << connectToDomain(toDomain);
}
return found;
}
@@ -527,8 +528,18 @@ bool QXmppServer::sendElement(const QDomElement &element)
const QString to = element.attribute("to");
foreach (QXmppStream *conn, getStreams(to))
{
- if (conn->sendElement(element))
+ if (conn->isConnected() && conn->sendElement(element))
+ sent = true;
+ else
+ {
+ // queue packet
+ QByteArray data;
+ QXmlStreamWriter xmlStream(&data);
+ const QStringList omitNamespaces = QStringList() << ns_client << ns_server;
+ helperToXmlAddDomElement(&xmlStream, element, omitNamespaces);
+ d->queues[conn] << data;
sent = true;
+ }
}
return sent;
}
@@ -542,8 +553,17 @@ bool QXmppServer::sendPacket(const QXmppStanza &packet)
bool sent = false;
foreach (QXmppStream *conn, getStreams(packet.to()))
{
- if (conn->sendPacket(packet))
+ if (conn->isConnected() && conn->sendPacket(packet))
sent = true;
+ else
+ {
+ // queue packet
+ QByteArray data;
+ QXmlStreamWriter xmlStream(&data);
+ packet.toXml(&xmlStream);
+ d->queues[conn] << data;
+ sent = true;
+ }
}
return sent;
}
@@ -560,11 +580,11 @@ void QXmppServer::slotClientConnection(QSslSocket *socket)
stream->setPasswordChecker(d->passwordChecker);
bool check = connect(stream, SIGNAL(connected()),
- this, SLOT(slotClientConnected()));
+ this, SLOT(slotStreamConnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(disconnected()),
- this, SLOT(slotClientDisconnected()));
+ this, SLOT(slotStreamDisconnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(elementReceived(QDomElement)),
@@ -576,54 +596,6 @@ void QXmppServer::slotClientConnection(QSslSocket *socket)
emit streamAdded(stream);
}
-/// Handle a successful client authentication.
-///
-
-void QXmppServer::slotClientConnected()
-{
- QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender());
- if (!stream || !d->incomingClients.contains(stream))
- return;
-
- // check whether the connection conflicts with another one
- foreach (QXmppIncomingClient *conn, d->incomingClients)
- {
- if (conn != stream && conn->jid() == stream->jid())
- {
- conn->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>");
- conn->disconnectFromHost();
- }
- }
-}
-
-/// Handle a disconnection from a client.
-///
-
-void QXmppServer::slotClientDisconnected()
-{
- QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender());
- if (!stream || !d->incomingClients.contains(stream))
- return;
-
- // notify subscribed peers of disconnection
- if (!stream->jid().isEmpty())
- {
- foreach (QString subscriber, d->subscribers.value(stream->jid()))
- {
- QXmppPresence presence;
- presence.setFrom(stream->jid());
- presence.setTo(subscriber);
- presence.setType(QXmppPresence::Unavailable);
- sendPacket(presence);
- }
- }
-
- // remove stream
- d->incomingClients.removeAll(stream);
- emit streamRemoved(stream);
- stream->deleteLater();
-}
-
void QXmppServer::slotDialbackRequestReceived(const QXmppDialback &dialback)
{
QXmppIncomingServer *stream = qobject_cast<QXmppIncomingServer *>(sender());
@@ -671,8 +643,12 @@ void QXmppServer::slotServerConnection(QSslSocket *socket)
socket->setParent(stream);
stream->setLogger(d->logger);
- bool check = connect(stream, SIGNAL(disconnected()),
- this, SLOT(slotServerDisconnected()));
+ bool check = connect(stream, SIGNAL(connected()),
+ this, SLOT(slotStreamConnected()));
+ Q_ASSERT(check);
+
+ check = connect(stream, SIGNAL(disconnected()),
+ this, SLOT(slotStreamDisconnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(dialbackRequestReceived(QXmppDialback)),
@@ -688,16 +664,73 @@ void QXmppServer::slotServerConnection(QSslSocket *socket)
emit streamAdded(stream);
}
-/// Handle a disconnection from a server.
+/// Handle a successful stream connection.
///
-void QXmppServer::slotServerDisconnected()
+void QXmppServer::slotStreamConnected()
{
+ // handle incoming clients
+ QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient *>(sender());
+ if (client || d->incomingClients.contains(client))
+ {
+ // check whether the connection conflicts with another one
+ foreach (QXmppIncomingClient *conn, d->incomingClients)
+ {
+ if (conn != client && conn->jid() == client->jid())
+ {
+ conn->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>");
+ conn->disconnectFromHost();
+ }
+ }
+ }
+
+ // flush queue
+ QXmppStream *stream = qobject_cast<QXmppStream*>(sender());
+ if (stream && d->queues.contains(stream))
+ {
+ qDebug("FLUSHING QUEUE");
+ foreach (const QByteArray &data, d->queues[stream])
+ stream->sendData(data);
+ d->queues.remove(stream);
+ }
+}
+
+/// Handle a stream disconnection.
+///
+
+void QXmppServer::slotStreamDisconnected()
+{
+ // handle clients
+ QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender());
+ if (stream && d->incomingClients.contains(stream))
+ {
+ // notify subscribed peers of disconnection
+ if (!stream->jid().isEmpty())
+ {
+ foreach (QString subscriber, d->subscribers.value(stream->jid()))
+ {
+ QXmppPresence presence;
+ presence.setFrom(stream->jid());
+ presence.setTo(subscriber);
+ presence.setType(QXmppPresence::Unavailable);
+ sendPacket(presence);
+ }
+ }
+
+ // remove stream
+ d->incomingClients.removeAll(stream);
+ d->queues.remove(stream);
+ emit streamRemoved(stream);
+ stream->deleteLater();
+ return;
+ }
+
// handle incoming streams
QXmppIncomingServer *incoming = qobject_cast<QXmppIncomingServer *>(sender());
if (incoming && d->incomingServers.contains(incoming))
{
d->incomingServers.removeAll(incoming);
+ d->queues.remove(incoming);
emit streamRemoved(incoming);
incoming->deleteLater();
return;
@@ -708,6 +741,7 @@ void QXmppServer::slotServerDisconnected()
if (outgoing && d->outgoingServers.contains(outgoing))
{
d->outgoingServers.removeAll(outgoing);
+ d->queues.remove(incoming);
emit streamRemoved(outgoing);
outgoing->deleteLater();
return;