diff options
| author | Jeremy Lainé <jeremy.laine@m4x.org> | 2010-08-27 15:07:52 +0000 |
|---|---|---|
| committer | Jeremy Lainé <jeremy.laine@m4x.org> | 2010-08-27 15:07:52 +0000 |
| commit | 3cf9b81493c6713c5697d2bab2ea92b6cfe72be4 (patch) | |
| tree | 182819ffc5f7c4239f6b636766638fd9bfaa90dd /src/QXmppServer.cpp | |
| parent | a8203708313675c20e341681acd673ab2ac497f1 (diff) | |
| download | qxmpp-3cf9b81493c6713c5697d2bab2ea92b6cfe72be4.tar.gz | |
refactor queue management
Diffstat (limited to 'src/QXmppServer.cpp')
| -rw-r--r-- | src/QXmppServer.cpp | 162 |
1 files changed, 98 insertions, 64 deletions
diff --git a/src/QXmppServer.cpp b/src/QXmppServer.cpp index 47412610..5b6c7607 100644 --- a/src/QXmppServer.cpp +++ b/src/QXmppServer.cpp @@ -26,6 +26,7 @@ #include <QPluginLoader> #include <QSslSocket> +#include "QXmppConstants.h" #include "QXmppDialback.h" #include "QXmppIq.h" #include "QXmppIncomingClient.h" @@ -62,6 +63,7 @@ public: QList<QXmppIncomingServer*> incomingServers; QList<QXmppOutgoingServer*> outgoingServers; QXmppSslServer *serverForServers; + QMap<QXmppStream*, QList<QByteArray> > queues; private: bool loaded; @@ -348,9 +350,12 @@ QXmppOutgoingServer* QXmppServer::connectToDomain(const QString &domain) stream->configuration().setHost(domain); stream->configuration().setPort(5269); - bool check = connect(stream, SIGNAL(disconnected()), - this, SLOT(slotServerDisconnected())); + bool check = connect(stream, SIGNAL(connected()), + this, SLOT(slotStreamConnected())); Q_ASSERT(check); + + check = connect(stream, SIGNAL(disconnected()), + this, SLOT(slotStreamDisconnected())); Q_UNUSED(check); // add stream @@ -385,7 +390,7 @@ QList<QXmppStream*> QXmppServer::getStreams(const QString &to) // look for an outgoing S2S connection foreach (QXmppOutgoingServer *conn, d->outgoingServers) { - if (conn->configuration().domain() == toDomain && conn->isConnected()) + if (conn->configuration().domain() == toDomain) { found << conn; break; @@ -395,11 +400,7 @@ QList<QXmppStream*> QXmppServer::getStreams(const QString &to) // if we did not find an outgoing server, // we need to establish the S2S connection if (found.isEmpty()) - { - connectToDomain(toDomain); - - // FIXME : the current packet will not be delivered - } + found << connectToDomain(toDomain); } return found; } @@ -527,8 +528,18 @@ bool QXmppServer::sendElement(const QDomElement &element) const QString to = element.attribute("to"); foreach (QXmppStream *conn, getStreams(to)) { - if (conn->sendElement(element)) + if (conn->isConnected() && conn->sendElement(element)) + sent = true; + else + { + // queue packet + QByteArray data; + QXmlStreamWriter xmlStream(&data); + const QStringList omitNamespaces = QStringList() << ns_client << ns_server; + helperToXmlAddDomElement(&xmlStream, element, omitNamespaces); + d->queues[conn] << data; sent = true; + } } return sent; } @@ -542,8 +553,17 @@ bool QXmppServer::sendPacket(const QXmppStanza &packet) bool sent = false; foreach (QXmppStream *conn, getStreams(packet.to())) { - if (conn->sendPacket(packet)) + if (conn->isConnected() && conn->sendPacket(packet)) sent = true; + else + { + // queue packet + QByteArray data; + QXmlStreamWriter xmlStream(&data); + packet.toXml(&xmlStream); + d->queues[conn] << data; + sent = true; + } } return sent; } @@ -560,11 +580,11 @@ void QXmppServer::slotClientConnection(QSslSocket *socket) stream->setPasswordChecker(d->passwordChecker); bool check = connect(stream, SIGNAL(connected()), - this, SLOT(slotClientConnected())); + this, SLOT(slotStreamConnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(disconnected()), - this, SLOT(slotClientDisconnected())); + this, SLOT(slotStreamDisconnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(elementReceived(QDomElement)), @@ -576,54 +596,6 @@ void QXmppServer::slotClientConnection(QSslSocket *socket) emit streamAdded(stream); } -/// Handle a successful client authentication. -/// - -void QXmppServer::slotClientConnected() -{ - QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender()); - if (!stream || !d->incomingClients.contains(stream)) - return; - - // check whether the connection conflicts with another one - foreach (QXmppIncomingClient *conn, d->incomingClients) - { - if (conn != stream && conn->jid() == stream->jid()) - { - conn->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>"); - conn->disconnectFromHost(); - } - } -} - -/// Handle a disconnection from a client. -/// - -void QXmppServer::slotClientDisconnected() -{ - QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender()); - if (!stream || !d->incomingClients.contains(stream)) - return; - - // notify subscribed peers of disconnection - if (!stream->jid().isEmpty()) - { - foreach (QString subscriber, d->subscribers.value(stream->jid())) - { - QXmppPresence presence; - presence.setFrom(stream->jid()); - presence.setTo(subscriber); - presence.setType(QXmppPresence::Unavailable); - sendPacket(presence); - } - } - - // remove stream - d->incomingClients.removeAll(stream); - emit streamRemoved(stream); - stream->deleteLater(); -} - void QXmppServer::slotDialbackRequestReceived(const QXmppDialback &dialback) { QXmppIncomingServer *stream = qobject_cast<QXmppIncomingServer *>(sender()); @@ -671,8 +643,12 @@ void QXmppServer::slotServerConnection(QSslSocket *socket) socket->setParent(stream); stream->setLogger(d->logger); - bool check = connect(stream, SIGNAL(disconnected()), - this, SLOT(slotServerDisconnected())); + bool check = connect(stream, SIGNAL(connected()), + this, SLOT(slotStreamConnected())); + Q_ASSERT(check); + + check = connect(stream, SIGNAL(disconnected()), + this, SLOT(slotStreamDisconnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(dialbackRequestReceived(QXmppDialback)), @@ -688,16 +664,73 @@ void QXmppServer::slotServerConnection(QSslSocket *socket) emit streamAdded(stream); } -/// Handle a disconnection from a server. +/// Handle a successful stream connection. /// -void QXmppServer::slotServerDisconnected() +void QXmppServer::slotStreamConnected() { + // handle incoming clients + QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient *>(sender()); + if (client || d->incomingClients.contains(client)) + { + // check whether the connection conflicts with another one + foreach (QXmppIncomingClient *conn, d->incomingClients) + { + if (conn != client && conn->jid() == client->jid()) + { + conn->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>"); + conn->disconnectFromHost(); + } + } + } + + // flush queue + QXmppStream *stream = qobject_cast<QXmppStream*>(sender()); + if (stream && d->queues.contains(stream)) + { + qDebug("FLUSHING QUEUE"); + foreach (const QByteArray &data, d->queues[stream]) + stream->sendData(data); + d->queues.remove(stream); + } +} + +/// Handle a stream disconnection. +/// + +void QXmppServer::slotStreamDisconnected() +{ + // handle clients + QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender()); + if (stream && d->incomingClients.contains(stream)) + { + // notify subscribed peers of disconnection + if (!stream->jid().isEmpty()) + { + foreach (QString subscriber, d->subscribers.value(stream->jid())) + { + QXmppPresence presence; + presence.setFrom(stream->jid()); + presence.setTo(subscriber); + presence.setType(QXmppPresence::Unavailable); + sendPacket(presence); + } + } + + // remove stream + d->incomingClients.removeAll(stream); + d->queues.remove(stream); + emit streamRemoved(stream); + stream->deleteLater(); + return; + } + // handle incoming streams QXmppIncomingServer *incoming = qobject_cast<QXmppIncomingServer *>(sender()); if (incoming && d->incomingServers.contains(incoming)) { d->incomingServers.removeAll(incoming); + d->queues.remove(incoming); emit streamRemoved(incoming); incoming->deleteLater(); return; @@ -708,6 +741,7 @@ void QXmppServer::slotServerDisconnected() if (outgoing && d->outgoingServers.contains(outgoing)) { d->outgoingServers.removeAll(outgoing); + d->queues.remove(incoming); emit streamRemoved(outgoing); outgoing->deleteLater(); return; |
