diff options
| author | Jeremy Lainé <jeremy.laine@m4x.org> | 2011-09-06 15:50:52 +0000 |
|---|---|---|
| committer | Jeremy Lainé <jeremy.laine@m4x.org> | 2011-09-06 15:50:52 +0000 |
| commit | 2ba5ac7e09c1c9cf9d59d9bc1ed7937e1559458b (patch) | |
| tree | 0c150bf945f36a10ecedfdac0b0d7521431f6d3d /src | |
| parent | 51caca53408f2626414b593adb825449712e0bc1 (diff) | |
| download | qxmpp-2ba5ac7e09c1c9cf9d59d9bc1ed7937e1559458b.tar.gz | |
Improve QXmppServer packet routing performance.
Diffstat (limited to 'src')
| -rw-r--r-- | src/QXmppIncomingClient.cpp | 49 | ||||
| -rw-r--r-- | src/QXmppOutgoingClient.cpp | 3 | ||||
| -rw-r--r-- | src/QXmppOutgoingServer.cpp | 3 | ||||
| -rw-r--r-- | src/QXmppServer.cpp | 100 | ||||
| -rw-r--r-- | src/QXmppServer.h | 12 | ||||
| -rw-r--r-- | src/QXmppStream.cpp | 56 | ||||
| -rw-r--r-- | src/QXmppStream.h | 10 |
7 files changed, 137 insertions, 96 deletions
diff --git a/src/QXmppIncomingClient.cpp b/src/QXmppIncomingClient.cpp index 7b8ce038..da8067a8 100644 --- a/src/QXmppIncomingClient.cpp +++ b/src/QXmppIncomingClient.cpp @@ -45,9 +45,11 @@ public: QString domain; QString username; QString resource; + QString jid; QXmppPasswordChecker *passwordChecker; QXmppSaslDigestMd5 saslDigest; - int saslStep; + int saslDigestStep; + QString saslDigestUsername; }; /// Constructs a new incoming client stream. @@ -63,7 +65,7 @@ QXmppIncomingClient::QXmppIncomingClient(QSslSocket *socket, const QString &doma { d->passwordChecker = 0; d->domain = domain; - d->saslStep = 0; + d->saslDigestStep = 0; if (socket) { info(QString("Incoming client connection from %1 %2").arg( @@ -105,12 +107,7 @@ bool QXmppIncomingClient::isConnected() const QString QXmppIncomingClient::jid() const { - if (d->username.isEmpty()) - return QString(); - QString jid = d->username + "@" + d->domain; - if (!d->resource.isEmpty()) - jid += "/" + d->resource; - return jid; + return d->jid; } /// Sets the number of seconds after which a client will be disconnected @@ -138,7 +135,8 @@ void QXmppIncomingClient::handleStream(const QDomElement &streamElement) { if (d->idleTimer->interval()) d->idleTimer->start(); - d->saslStep = 0; + d->saslDigestStep = 0; + d->saslDigestUsername.clear(); // start stream const QByteArray sessionId = generateStanzaHash().toAscii(); @@ -236,7 +234,7 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) // generate nonce d->saslDigest.setNonce(QXmppSaslDigestMd5::generateNonce()); d->saslDigest.setQop("auth"); - d->saslStep = 1; + d->saslDigestStep = 1; QMap<QByteArray, QByteArray> challenge; challenge["nonce"] = d->saslDigest.nonce(); @@ -258,7 +256,7 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) } else if (nodeRecv.tagName() == "response") { - if (d->saslStep == 1) + if (d->saslDigestStep == 1) { const QByteArray raw = QByteArray::fromBase64(nodeRecv.text().toAscii()); QMap<QByteArray, QByteArray> saslResponse = QXmppSaslDigestMd5::parseMessage(raw); @@ -282,12 +280,15 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) reply->setProperty("__sasl_raw", raw); connect(reply, SIGNAL(finished()), this, SLOT(onDigestReply())); } - else if (d->saslStep == 2) + else if (d->saslDigestStep == 2) { // authentication succeeded - d->saslStep = 3; + d->saslDigestStep = 3; + d->username = d->saslDigestUsername; + d->jid = QString("%1@%2").arg(d->username, d->domain); info(QString("Authentication succeeded for '%1'").arg(d->username)); sendData("<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>"); + handleStart(); } } } @@ -303,11 +304,12 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) d->resource = bindSet.resource().trimmed(); if (d->resource.isEmpty()) d->resource = generateStanzaHash(); + d->jid = QString("%1@%2/%3").arg(d->username, d->domain, d->resource); QXmppBindIq bindResult; bindResult.setType(QXmppIq::Result); bindResult.setId(bindSet.id()); - bindResult.setJid(jid()); + bindResult.setJid(d->jid); sendPacket(bindResult); // bound @@ -322,7 +324,7 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) QXmppIq sessionResult; sessionResult.setType(QXmppIq::Result); sessionResult.setId(sessionSet.id()); - sessionResult.setTo(jid()); + sessionResult.setTo(d->jid); sendPacket(sessionResult); return; } @@ -330,7 +332,7 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) // check the sender is legitimate const QString from = nodeRecv.attribute("from"); - if (!from.isEmpty() && from != jid() && from != jidToBareJid(jid())) + if (!from.isEmpty() && from != d->jid && from != jidToBareJid(d->jid)) { warning(QString("Received a stanza from unexpected JID %1").arg(from)); return; @@ -349,9 +351,9 @@ void QXmppIncomingClient::handleStanza(const QDomElement &nodeRecv) if (nodeFull.tagName() == "presence" && (nodeFull.attribute("type") == "subscribe" || nodeFull.attribute("type") == "subscribed")) - nodeFull.setAttribute("from", jidToBareJid(jid())); + nodeFull.setAttribute("from", jidToBareJid(d->jid)); else - nodeFull.setAttribute("from", jid()); + nodeFull.setAttribute("from", d->jid); } // if the recipient is empty, set it to the local domain @@ -393,8 +395,8 @@ void QXmppIncomingClient::onDigestReply() } // send new challenge - d->username = username; - d->saslStep = 2; + d->saslDigestUsername = username; + d->saslDigestStep = 2; QMap<QByteArray, QByteArray> challenge; challenge["rspauth"] = d->saslDigest.calculateDigest( QByteArray(":") + d->saslDigest.digestUri()); @@ -413,8 +415,10 @@ void QXmppIncomingClient::onPasswordReply() switch (reply->error()) { case QXmppPasswordReply::NoError: d->username = username; + d->jid = QString("%1@%2").arg(d->username, d->domain); info(QString("Authentication succeeded for '%1'").arg(username)); sendData("<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>"); + handleStart(); break; case QXmppPasswordReply::AuthorizationError: warning(QString("Authentication failed for '%1'").arg(username)); @@ -431,8 +435,11 @@ void QXmppIncomingClient::onPasswordReply() void QXmppIncomingClient::onTimeout() { - warning(QString("Idle timeout for %1").arg(jid())); + warning(QString("Idle timeout for '%1'").arg(d->jid)); disconnectFromHost(); + + // make sure disconnected() gets emitted no matter what + QTimer::singleShot(30, this, SIGNAL(disconnected())); } diff --git a/src/QXmppOutgoingClient.cpp b/src/QXmppOutgoingClient.cpp index ee4483b1..5079d4bf 100644 --- a/src/QXmppOutgoingClient.cpp +++ b/src/QXmppOutgoingClient.cpp @@ -217,11 +217,12 @@ void QXmppOutgoingClient::socketError(QAbstractSocket::SocketError socketError) { Q_UNUSED(socketError); emit error(QXmppClient::SocketError); - warning(QString("Socket error: " + socket()->errorString())); } void QXmppOutgoingClient::handleStart() { + QXmppStream::handleStart(); + // reset authentication step d->saslStep = 0; d->sessionStarted = false; diff --git a/src/QXmppOutgoingServer.cpp b/src/QXmppOutgoingServer.cpp index 7a258ee5..3dc0ce74 100644 --- a/src/QXmppOutgoingServer.cpp +++ b/src/QXmppOutgoingServer.cpp @@ -125,6 +125,8 @@ void QXmppOutgoingServer::connectToHost(const QXmppSrvInfo &serviceInfo) void QXmppOutgoingServer::handleStart() { + QXmppStream::handleStart(); + QString data = QString("<?xml version='1.0'?><stream:stream" " xmlns='%1' xmlns:db='%2' xmlns:stream='%3' version='1.0'>").arg( ns_server, @@ -294,7 +296,6 @@ void QXmppOutgoingServer::slotSslErrors(const QList<QSslError> &errors) void QXmppOutgoingServer::socketError(QAbstractSocket::SocketError error) { Q_UNUSED(error); - warning(QString("Socket error: " + socket()->errorString())); emit disconnected(); } diff --git a/src/QXmppServer.cpp b/src/QXmppServer.cpp index 5fab41ea..507486a3 100644 --- a/src/QXmppServer.cpp +++ b/src/QXmppServer.cpp @@ -66,6 +66,7 @@ public: QString domain; QList<QXmppServerExtension*> extensions; + // bare-jid -> full-jid -> presence QMap<QString, QMap<QString, QXmppPresence> > presences; QMap<QString, QSet<QString> > subscribers; QXmppLogger *logger; @@ -74,6 +75,8 @@ public: // client-to-server QXmppSslServer *serverForClients; QList<QXmppIncomingClient*> incomingClients; + QMap<QString, QXmppIncomingClient*> incomingClientsByJid; + QMap<QString, QSet<QXmppIncomingClient*> > incomingClientsByBareJid; // server-to-server QList<QXmppIncomingServer*> incomingServers; @@ -109,11 +112,11 @@ QXmppOutgoingServer* QXmppServerPrivate::connectToDomain(const QString &toDomain stream->setLocalStreamKey(generateStanzaHash().toAscii()); check = QObject::connect(stream, SIGNAL(connected()), - q, SLOT(slotStreamConnected())); + q, SLOT(_q_streamConnected())); Q_ASSERT(check); check = QObject::connect(stream, SIGNAL(disconnected()), - q, SLOT(slotStreamDisconnected())); + q, SLOT(_q_streamDisconnected())); Q_UNUSED(check); // add stream @@ -138,8 +141,12 @@ QList<QXmppStream*> QXmppServerPrivate::getStreams(const QString &to) const QString toDomain = jidToDomain(to); if (toDomain == domain) { // look for a client connection - foreach (QXmppIncomingClient *conn, incomingClients) { - if (conn->jid() == to || jidToBareJid(conn->jid()) == to) + if (jidToResource(to).isEmpty()) { + foreach (QXmppIncomingClient *conn, incomingClientsByBareJid.value(to)) + found << conn; + } else { + QXmppIncomingClient *conn = incomingClientsByJid.value(to); + if (conn) found << conn; } } else if (toDomain.endsWith("." + domain)) { @@ -373,12 +380,12 @@ QXmppServer::QXmppServer(QObject *parent) d = new QXmppServerPrivate(this); d->serverForClients = new QXmppSslServer(this); bool check = connect(d->serverForClients, SIGNAL(newConnection(QSslSocket*)), - this, SLOT(slotClientConnection(QSslSocket*))); + this, SLOT(_q_clientConnection(QSslSocket*))); Q_ASSERT(check); d->serverForServers = new QXmppSslServer(this); check = connect(d->serverForServers, SIGNAL(newConnection(QSslSocket*)), - this, SLOT(slotServerConnection(QSslSocket*))); + this, SLOT(_q_serverConnection(QSslSocket*))); Q_ASSERT(check); } @@ -641,15 +648,15 @@ void QXmppServer::addIncomingClient(QXmppIncomingClient *stream) stream->setPasswordChecker(d->passwordChecker); bool check = connect(stream, SIGNAL(connected()), - this, SLOT(slotStreamConnected())); + this, SLOT(_q_streamConnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(disconnected()), - this, SLOT(slotStreamDisconnected())); + this, SLOT(_q_streamDisconnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(elementReceived(QDomElement)), - this, SLOT(slotElementReceived(QDomElement))); + this, SLOT(_q_elementReceived(QDomElement))); Q_ASSERT(check); // add stream @@ -661,15 +668,21 @@ void QXmppServer::addIncomingClient(QXmppIncomingClient *stream) /// /// \param socket -void QXmppServer::slotClientConnection(QSslSocket *socket) +void QXmppServer::_q_clientConnection(QSslSocket *socket) { + // check the socket didn't die since the signal was emitted + if (socket->state() != QAbstractSocket::ConnectedState) { + delete socket; + return; + } + QXmppIncomingClient *stream = new QXmppIncomingClient(socket, d->domain, this); stream->setInactivityTimeout(120); socket->setParent(stream); addIncomingClient(stream); } -void QXmppServer::slotDialbackRequestReceived(const QXmppDialback &dialback) +void QXmppServer::_q_dialbackRequestReceived(const QXmppDialback &dialback) { QXmppIncomingServer *stream = qobject_cast<QXmppIncomingServer *>(sender()); if (!stream) @@ -698,7 +711,7 @@ void QXmppServer::slotDialbackRequestReceived(const QXmppDialback &dialback) /// Handle an incoming XML element. -void QXmppServer::slotElementReceived(const QDomElement &element) +void QXmppServer::_q_elementReceived(const QDomElement &element) { QXmppStream *incoming = qobject_cast<QXmppStream *>(sender()); if (!incoming) @@ -710,25 +723,31 @@ void QXmppServer::slotElementReceived(const QDomElement &element) /// /// \param socket -void QXmppServer::slotServerConnection(QSslSocket *socket) +void QXmppServer::_q_serverConnection(QSslSocket *socket) { + // check the socket didn't die since the signal was emitted + if (socket->state() != QAbstractSocket::ConnectedState) { + delete socket; + return; + } + QXmppIncomingServer *stream = new QXmppIncomingServer(socket, d->domain, this); socket->setParent(stream); bool check = connect(stream, SIGNAL(connected()), - this, SLOT(slotStreamConnected())); + this, SLOT(_q_streamConnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(disconnected()), - this, SLOT(slotStreamDisconnected())); + this, SLOT(_q_streamDisconnected())); Q_ASSERT(check); check = connect(stream, SIGNAL(dialbackRequestReceived(QXmppDialback)), - this, SLOT(slotDialbackRequestReceived(QXmppDialback))); + this, SLOT(_q_dialbackRequestReceived(QXmppDialback))); Q_ASSERT(check); check = connect(stream, SIGNAL(elementReceived(QDomElement)), - this, SLOT(slotElementReceived(QDomElement))); + this, SLOT(_q_elementReceived(QDomElement))); Q_ASSERT(check); // add stream @@ -739,7 +758,7 @@ void QXmppServer::slotServerConnection(QSslSocket *socket) /// Handle a successful stream connection. /// -void QXmppServer::slotStreamConnected() +void QXmppServer::_q_streamConnected() { QXmppStream *stream = qobject_cast<QXmppStream*>(sender()); if (!stream) @@ -747,22 +766,22 @@ void QXmppServer::slotStreamConnected() // handle incoming clients QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient *>(stream); - if (client) - { + if (client) { + // FIXME: at this point the JID must contain a resource, assert it? + const QString jid = client->jid(); + // check whether the connection conflicts with another one - foreach (QXmppIncomingClient *conn, d->incomingClients) - { - if (conn != client && conn->jid() == client->jid()) - { - conn->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>"); - conn->disconnectFromHost(); - } + QXmppIncomingClient *old = d->incomingClientsByJid.value(jid); + if (old && old != client) { + old->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>"); + old->disconnectFromHost(); } + d->incomingClientsByJid.insert(jid, client); + d->incomingClientsByBareJid[jidToBareJid(jid)].insert(client); } // flush queue - if (d->queues.contains(stream)) - { + if (d->queues.contains(stream)) { foreach (const QByteArray &data, d->queues[stream]) stream->sendData(data); d->queues.remove(stream); @@ -775,19 +794,18 @@ void QXmppServer::slotStreamConnected() /// Handle a stream disconnection. /// -void QXmppServer::slotStreamDisconnected() +void QXmppServer::_q_streamDisconnected() { // handle clients QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender()); - if (stream && d->incomingClients.contains(stream)) - { + if (stream && d->incomingClients.contains(stream)) { const QString jid = stream->jid(); // check the user exited cleanly if (!jid.isEmpty()) { QDomDocument doc; QDomElement presence = doc.createElement("presence"); - presence.setAttribute("from", stream->jid()); + presence.setAttribute("from", jid); presence.setAttribute("type", "unavailable"); if (d->presences.value(jidToBareJid(jid)).contains(jid)) { @@ -803,6 +821,13 @@ void QXmppServer::slotStreamDisconnected() d->handleStanza(stream, presence); } } + + // remove stream from routing tables + if (d->incomingClientsByJid.value(jid) == stream) + d->incomingClientsByJid.remove(jid); + const QString bareJid = jidToBareJid(jid); + if (d->incomingClientsByBareJid.contains(bareJid)) + d->incomingClientsByBareJid[bareJid].remove(stream); } // remove stream @@ -865,9 +890,12 @@ QXmppSslServer::~QXmppSslServer() void QXmppSslServer::incomingConnection(int socketDescriptor) { QSslSocket *socket = new QSslSocket; - socket->setSocketDescriptor(socketDescriptor); - if (!d->localCertificate.isNull() && !d->privateKey.isNull()) - { + if (!socket->setSocketDescriptor(socketDescriptor)) { + delete socket; + return; + } + + if (!d->localCertificate.isNull() && !d->privateKey.isNull()) { socket->setProtocol(QSsl::AnyProtocol); socket->addCaCertificates(d->caCertificates); socket->setLocalCertificate(d->localCertificate); diff --git a/src/QXmppServer.h b/src/QXmppServer.h index 1bef6495..7a5bc6da 100644 --- a/src/QXmppServer.h +++ b/src/QXmppServer.h @@ -100,12 +100,12 @@ signals: void streamRemoved(QXmppStream *stream); private slots: - void slotClientConnection(QSslSocket *socket); - void slotDialbackRequestReceived(const QXmppDialback &dialback); - void slotElementReceived(const QDomElement &element); - void slotServerConnection(QSslSocket *socket); - void slotStreamConnected(); - void slotStreamDisconnected(); + void _q_clientConnection(QSslSocket *socket); + void _q_dialbackRequestReceived(const QXmppDialback &dialback); + void _q_elementReceived(const QDomElement &element); + void _q_serverConnection(QSslSocket *socket); + void _q_streamConnected(); + void _q_streamDisconnected(); private: friend class QXmppServerPrivate; diff --git a/src/QXmppStream.cpp b/src/QXmppStream.cpp index 2221411a..f87efcd0 100644 --- a/src/QXmppStream.cpp +++ b/src/QXmppStream.cpp @@ -96,9 +96,13 @@ void QXmppStream::disconnectFromHost() /// Handles a stream start event, which occurs when the underlying transport /// becomes ready (socket connected, encryption started). +/// +/// If you redefine handleStart(), make sure to call the base class's method. void QXmppStream::handleStart() { + d->dataBuffer.clear(); + d->streamStart.clear(); } /// Returns true if the stream is connected. @@ -172,19 +176,22 @@ void QXmppStream::setSocket(QSslSocket *socket) // socket events bool check = connect(socket, SIGNAL(connected()), - this, SLOT(socketConnected())); + this, SLOT(_q_socketConnected())); Q_ASSERT(check); check = connect(socket, SIGNAL(disconnected()), - this, SLOT(socketDisconnected())); + this, SLOT(_q_socketDisconnected())); Q_ASSERT(check); check = connect(socket, SIGNAL(encrypted()), - this, SLOT(socketEncrypted())); + this, SLOT(_q_socketEncrypted())); Q_ASSERT(check); + check = connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), + this, SLOT(_q_socketError(QAbstractSocket::SocketError))); + check = connect(socket, SIGNAL(readyRead()), - this, SLOT(socketReadyRead())); + this, SLOT(_q_socketReadyRead())); Q_ASSERT(check); // relay signals @@ -193,34 +200,34 @@ void QXmppStream::setSocket(QSslSocket *socket) Q_ASSERT(check); } -void QXmppStream::socketConnected() +void QXmppStream::_q_socketConnected() { info(QString("Socket connected to %1 %2").arg( d->socket->peerAddress().toString(), QString::number(d->socket->peerPort()))); - d->dataBuffer.clear(); handleStart(); } -void QXmppStream::socketDisconnected() +void QXmppStream::_q_socketDisconnected() { info("Socket disconnected"); - d->dataBuffer.clear(); } -void QXmppStream::socketEncrypted() +void QXmppStream::_q_socketEncrypted() { debug("Socket encrypted"); - d->dataBuffer.clear(); handleStart(); } -void QXmppStream::socketReadyRead() +void QXmppStream::_q_socketError(QAbstractSocket::SocketError socketError) { - const QByteArray data = d->socket->readAll(); - //debug("SERVER [COULD BE PARTIAL DATA]:" + data.left(20)); + Q_UNUSED(socketError); + warning(QString("Socket error: " + socket()->errorString())); +} - d->dataBuffer.append(data); +void QXmppStream::_q_socketReadyRead() +{ + d->dataBuffer.append(d->socket->readAll()); // FIXME : maybe these QRegExps could be static? QRegExp startStreamRegex("^(<\\?xml.*\\?>)?\\s*<stream:stream.*>"); @@ -232,36 +239,31 @@ void QXmppStream::socketReadyRead() QByteArray completeXml = d->dataBuffer; const QString strData = QString::fromUtf8(d->dataBuffer); bool streamStart = false; - if(strData.contains(startStreamRegex)) - { + if (d->streamStart.isEmpty() && strData.contains(startStreamRegex)) streamStart = true; - d->streamStart = startStreamRegex.cap(0).toUtf8(); - } else completeXml.prepend(d->streamStart); - if(!strData.contains(endStreamRegex)) + if (!strData.contains(endStreamRegex)) completeXml.append(streamRootElementEnd); // check whether we have a valid XML document QDomDocument doc; - if(!doc.setContent(completeXml, true)) + if (!doc.setContent(completeXml, true)) return; // remove data from buffer logReceived(strData); d->dataBuffer.clear(); + if (streamStart) + d->streamStart = startStreamRegex.cap(0).toUtf8(); // process stream start - QDomElement nodeRecv = doc.documentElement().firstChildElement(); if (streamStart) - { - QDomElement streamElement = doc.documentElement(); - handleStream(streamElement); - } + handleStream(doc.documentElement()); // process stanzas - while(!nodeRecv.isNull()) - { + QDomElement nodeRecv = doc.documentElement().firstChildElement(); + while (!nodeRecv.isNull()) { handleStanza(nodeRecv); nodeRecv = nodeRecv.nextSiblingElement(); } diff --git a/src/QXmppStream.h b/src/QXmppStream.h index f90fd101..2f31835a 100644 --- a/src/QXmppStream.h +++ b/src/QXmppStream.h @@ -26,6 +26,7 @@ #ifndef QXMPPSTREAM_H #define QXMPPSTREAM_H +#include <QAbstractSocket> #include <QObject> #include "QXmppLogger.h" @@ -78,10 +79,11 @@ protected: virtual void handleStream(const QDomElement &element) = 0; private slots: - void socketConnected(); - void socketDisconnected(); - void socketEncrypted(); - void socketReadyRead(); + void _q_socketConnected(); + void _q_socketDisconnected(); + void _q_socketEncrypted(); + void _q_socketError(QAbstractSocket::SocketError error); + void _q_socketReadyRead(); private: QXmppStreamPrivate * const d; |
