aboutsummaryrefslogtreecommitdiff
path: root/src/QXmppServer.cpp
diff options
context:
space:
mode:
authorJeremy Lainé <jeremy.laine@m4x.org>2011-09-13 12:08:15 +0000
committerJeremy Lainé <jeremy.laine@m4x.org>2011-09-13 12:08:15 +0000
commit8eaf6c3def32f1aa07fe863d5eab69f44672b3a5 (patch)
treee6950f5dbceeb0e34ac98f7639a63049f8cf7c44 /src/QXmppServer.cpp
parent5d6e2cabfb730fa6b537397db342bed8eec15de1 (diff)
downloadqxmpp-8eaf6c3def32f1aa07fe863d5eab69f44672b3a5.tar.gz
* hide some QXmppServer internals
* improve QXmppServer performance
Diffstat (limited to 'src/QXmppServer.cpp')
-rw-r--r--src/QXmppServer.cpp487
1 files changed, 176 insertions, 311 deletions
diff --git a/src/QXmppServer.cpp b/src/QXmppServer.cpp
index 2be22877..0ab2e4be 100644
--- a/src/QXmppServer.cpp
+++ b/src/QXmppServer.cpp
@@ -21,12 +21,14 @@
*
*/
+#include <QCoreApplication>
#include <QDomElement>
#include <QFileInfo>
#include <QPluginLoader>
#include <QSslCertificate>
#include <QSslKey>
#include <QSslSocket>
+#include <QtConcurrentRun>
#include "QXmppConstants.h"
#include "QXmppDialback.h"
@@ -40,6 +42,8 @@
#include "QXmppServerPlugin.h"
#include "QXmppUtils.h"
+#include "server/mod_presence.h"
+
// Core plugins
Q_IMPORT_PLUGIN(mod_disco)
Q_IMPORT_PLUGIN(mod_ping)
@@ -52,12 +56,8 @@ class QXmppServerPrivate
{
public:
QXmppServerPrivate(QXmppServer *qq);
- QXmppOutgoingServer *connectToDomain(const QString &domain);
- QList<QXmppStream*> getStreams(const QString &to);
- void handleStanza(QXmppStream *stream, const QDomElement &element);
void loadExtensions(QXmppServer *server);
- QStringList presenceSubscribers(const QString &jid);
- QStringList presenceSubscriptions(const QString &jid);
+ bool routeData(const QString &to, const QByteArray &data);
void startExtensions();
void stopExtensions();
@@ -66,23 +66,19 @@ public:
QString domain;
QList<QXmppServerExtension*> extensions;
- // bare-jid -> full-jid -> presence
- QMap<QString, QMap<QString, QXmppPresence> > presences;
- QMap<QString, QSet<QString> > subscribers;
QXmppLogger *logger;
QXmppPasswordChecker *passwordChecker;
// client-to-server
QXmppSslServer *serverForClients;
- QList<QXmppIncomingClient*> incomingClients;
- QMap<QString, QXmppIncomingClient*> incomingClientsByJid;
- QMap<QString, QSet<QXmppIncomingClient*> > incomingClientsByBareJid;
+ QSet<QXmppIncomingClient*> incomingClients;
+ QHash<QString, QXmppIncomingClient*> incomingClientsByJid;
+ QHash<QString, QSet<QXmppIncomingClient*> > incomingClientsByBareJid;
// server-to-server
- QList<QXmppIncomingServer*> incomingServers;
- QList<QXmppOutgoingServer*> outgoingServers;
+ QSet<QXmppIncomingServer*> incomingServers;
+ QSet<QXmppOutgoingServer*> outgoingServers;
QXmppSslServer *serverForServers;
- QMap<QXmppStream*, QList<QByteArray> > queues;
private:
bool loaded;
@@ -99,48 +95,23 @@ QXmppServerPrivate::QXmppServerPrivate(QXmppServer *qq)
{
}
-/// Returns a new outgoing server-to-server connection to the given domain.
-///
-/// \param toDomain
-
-QXmppOutgoingServer* QXmppServerPrivate::connectToDomain(const QString &toDomain)
-{
- bool check;
-
- // initialise outgoing server-to-server
- QXmppOutgoingServer *stream = new QXmppOutgoingServer(domain, q);
- stream->setLocalStreamKey(generateStanzaHash().toAscii());
-
- check = QObject::connect(stream, SIGNAL(connected()),
- q, SLOT(_q_streamConnected()));
- Q_ASSERT(check);
-
- check = QObject::connect(stream, SIGNAL(disconnected()),
- q, SLOT(_q_streamDisconnected()));
- Q_UNUSED(check);
-
- // add stream
- outgoingServers.append(stream);
- emit q->streamAdded(stream);
-
- // connect to remote server
- stream->connectToHost(toDomain);
- return stream;
-}
-
-/// Returns the XMPP streams for the given recipient.
+/// Routes XMPP data to the given recipient.
///
/// \param to
+/// \param data
///
-QList<QXmppStream*> QXmppServerPrivate::getStreams(const QString &to)
+bool QXmppServerPrivate::routeData(const QString &to, const QByteArray &data)
{
- QList<QXmppStream*> found;
- if (to.isEmpty())
- return found;
+ // refuse to route packets to empty destination, own domain or sub-domains
const QString toDomain = jidToDomain(to);
+ if (to.isEmpty() || to == domain || toDomain.endsWith("." + domain))
+ return false;
+
if (toDomain == domain) {
+
// look for a client connection
+ QList<QXmppIncomingClient*> found;
if (jidToResource(to).isEmpty()) {
foreach (QXmppIncomingClient *conn, incomingClientsByBareJid.value(to))
found << conn;
@@ -149,88 +120,71 @@ QList<QXmppStream*> QXmppServerPrivate::getStreams(const QString &to)
if (conn)
found << conn;
}
- } else if (toDomain.endsWith("." + domain)) {
- // refuse to route packets to sub-domains
- return found;
- } else {
+
+ // send data
+ foreach (QXmppStream *conn, found)
+ QMetaObject::invokeMethod(conn, "sendData", Q_ARG(QByteArray, data));
+ return !found.isEmpty();
+
+ } else if (serverForServers->isListening()) {
+
+ bool check;
+ Q_UNUSED(check);
+
// look for an outgoing S2S connection
foreach (QXmppOutgoingServer *conn, outgoingServers) {
if (conn->remoteDomain() == toDomain) {
- found << conn;
- break;
+ // send or queue data
+ QMetaObject::invokeMethod(conn, "queueData", Q_ARG(QByteArray, data));
+ return true;
}
}
// if we did not find an outgoing server,
// we need to establish the S2S connection
- if (found.isEmpty() && serverForServers->isListening())
- found << connectToDomain(toDomain);
+ QXmppOutgoingServer *conn = new QXmppOutgoingServer(domain, 0);
+ conn->setLocalStreamKey(generateStanzaHash().toAscii());
+ conn->moveToThread(q->thread());
+ conn->setParent(q);
+
+ check = QObject::connect(conn, SIGNAL(disconnected()),
+ q, SLOT(_q_outgoingServerDisconnected()));
+ Q_UNUSED(check);
+
+ // add stream
+ outgoingServers.insert(conn);
+
+ // queue data and connect to remote server
+ QMetaObject::invokeMethod(conn, "queueData", Q_ARG(QByteArray, data));
+ QMetaObject::invokeMethod(conn, "connectToHost", Q_ARG(QString, toDomain));
+ return true;
+
+ } else {
+
+ // S2S is disabled, failed to route data
+ return false;
+
}
- return found;
}
/// Handles an incoming XML element.
///
+/// \param server
/// \param stream
/// \param element
-void QXmppServerPrivate::handleStanza(QXmppStream *stream, const QDomElement &element)
+static void handleStanza(QXmppServer *server, const QDomElement &element)
{
// try extensions
- foreach (QXmppServerExtension *extension, extensions)
- if (extension->handleStanza(stream, element))
+ foreach (QXmppServerExtension *extension, server->extensions())
+ if (extension->handleStanza(element))
return;
// default handlers
+ const QString domain = server->domain();
const QString to = element.attribute("to");
if (to == domain) {
- if (element.tagName() == "presence") {
- // presence to the local domain, broadcast it to subscribers
- if (element.attribute("type").isEmpty() || element.attribute("type") == "unavailable") {
- const QString from = element.attribute("from");
- const QString bareFrom = jidToBareJid(from);
- bool isInitial = false;
-
- // record the presence for future use
- QXmppPresence presence;
- presence.parse(element);
- if (presence.type() == QXmppPresence::Available) {
- isInitial = !presences.value(bareFrom).contains(from);
- presences[bareFrom][from] = presence;
- } else {
- presences[bareFrom].remove(from);
- }
-
- // broadcast it to subscribers
- foreach (const QString &subscriber, presenceSubscribers(from)) {
- // avoid loop
- if (subscriber == to)
- continue;
- QDomElement changed(element);
- changed.setAttribute("to", subscriber);
- handleStanza(stream, changed);
- }
-
- // get presences from subscriptions
- if (isInitial) {
- foreach (const QString &subscription, presenceSubscriptions(from)) {
- if (jidToDomain(subscription) != domain) {
- QXmppPresence probe;
- probe.setType(QXmppPresence::Probe);
- probe.setFrom(from);
- probe.setTo(subscription);
- q->sendPacket(probe);
- } else {
- QXmppPresence push;
- foreach (push, presences.value(subscription).values()) {
- push.setTo(from);
- q->sendPacket(push);
- }
- }
- }
- }
- }
- } else if (element.tagName() == "iq") {
+ if (element.tagName() == QLatin1String("iq")) {
// we do not support the given IQ
QXmppIq request;
request.parse(element);
@@ -243,26 +197,14 @@ void QXmppServerPrivate::handleStanza(QXmppStream *stream, const QDomElement &el
QXmppStanza::Error error(QXmppStanza::Error::Cancel,
QXmppStanza::Error::FeatureNotImplemented);
response.setError(error);
- stream->sendPacket(response);
+ server->sendPacket(response);
}
}
} else {
- if (element.tagName() == "presence") {
- // directed presence, update subscribers
- QXmppPresence presence;
- presence.parse(element);
-
- const QString from = presence.from();
- if (presence.type() == QXmppPresence::Available)
- subscribers[from].insert(to);
- else if (presence.type() == QXmppPresence::Unavailable)
- subscribers[from].remove(to);
- }
-
// route element or reply on behalf of missing peer
- if (!q->sendElement(element) && element.tagName() == "iq") {
+ if (!server->sendElement(element) && element.tagName() == QLatin1String("iq")) {
QXmppIq request;
request.parse(element);
@@ -273,7 +215,7 @@ void QXmppServerPrivate::handleStanza(QXmppStream *stream, const QDomElement &el
QXmppStanza::Error error(QXmppStanza::Error::Cancel,
QXmppStanza::Error::ServiceUnavailable);
response.setError(error);
- stream->sendPacket(response);
+ server->sendPacket(response);
}
}
}
@@ -308,40 +250,12 @@ void QXmppServerPrivate::loadExtensions(QXmppServer *server)
foreach (const QString &key, plugin->keys())
server->addExtension(plugin->create(key));
}
- loaded = true;
- }
-}
-
-QStringList QXmppServerPrivate::presenceSubscribers(const QString &jid)
-{
- // start with directed presences
- QSet<QString> recipients = subscribers.value(jid);
-
- // query extensions
- foreach (QXmppServerExtension *extension, extensions)
- {
- const QStringList extras = extension->presenceSubscribers(jid);
- foreach (const QString &extra, extras)
- recipients.insert(extra);
- }
-
- return recipients.toList();
-}
-QStringList QXmppServerPrivate::presenceSubscriptions(const QString &jid)
-{
- // FIXME : start with directed presences?
- QSet<QString> recipients;
-
- // query extensions
- foreach (QXmppServerExtension *extension, extensions)
- {
- const QStringList extras = extension->presenceSubscriptions(jid);
- foreach (const QString &extra, extras)
- recipients.insert(extra);
+ // FIXME: until we can handle presence errors, we need to
+ // keep this extension last.
+ server->addExtension(new QXmppServerPresence);
+ loaded = true;
}
-
- return recipients.toList();
}
/// Start the server's extensions.
@@ -380,6 +294,8 @@ QXmppServer::QXmppServer(QObject *parent)
bool check;
Q_UNUSED(check);
+ qRegisterMetaType<QDomElement>("QDomElement");
+
d = new QXmppServerPrivate(this);
d->serverForClients = new QXmppSslServer(this);
check = connect(d->serverForClients, SIGNAL(newConnection(QSslSocket*)),
@@ -424,15 +340,6 @@ QList<QXmppServerExtension*> QXmppServer::extensions()
return d->extensions;
}
-/// Returns the list of available resources for the given local JID.
-///
-/// \param bareJid
-
-QList<QXmppPresence> QXmppServer::availablePresences(const QString &bareJid)
-{
- return d->presences.value(bareJid).values();
-}
-
/// Returns the server's domain.
///
@@ -492,6 +399,18 @@ void QXmppServer::setPasswordChecker(QXmppPasswordChecker *checker)
d->passwordChecker = checker;
}
+/// Returns the statistics for the server.
+
+QVariantMap QXmppServer::statistics() const
+{
+ QVariantMap stats;
+ stats["version"] = qApp->applicationVersion();
+ stats["incoming-clients"] = d->incomingClients.size();
+ stats["incoming-servers"] = d->incomingServers.size();
+ stats["outgoing-servers"] = d->outgoingServers.size();
+ return stats;
+}
+
/// Sets the path for additional SSL CA certificates.
///
/// \param path
@@ -602,22 +521,14 @@ bool QXmppServer::listenForServers(const QHostAddress &address, quint16 port)
bool QXmppServer::sendElement(const QDomElement &element)
{
- bool sent = false;
- const QString to = element.attribute("to");
- foreach (QXmppStream *conn, d->getStreams(to)) {
- if (conn->isConnected() && conn->sendElement(element)) {
- sent = true;
- } else {
- // queue packet
- QByteArray data;
- QXmlStreamWriter xmlStream(&data);
- const QStringList omitNamespaces = QStringList() << ns_client << ns_server;
- helperToXmlAddDomElement(&xmlStream, element, omitNamespaces);
- d->queues[conn] << data;
- sent = true;
- }
- }
- return sent;
+ // serialize data
+ QByteArray data;
+ QXmlStreamWriter xmlStream(&data);
+ const QStringList omitNamespaces = QStringList() << ns_client << ns_server;
+ helperToXmlAddDomElement(&xmlStream, element, omitNamespaces);
+
+ // route data
+ return d->routeData(element.attribute("to"), data);
}
/// Route an XMPP packet.
@@ -626,20 +537,13 @@ bool QXmppServer::sendElement(const QDomElement &element)
bool QXmppServer::sendPacket(const QXmppStanza &packet)
{
- bool sent = false;
- foreach (QXmppStream *conn, d->getStreams(packet.to())) {
- if (conn->isConnected() && conn->sendPacket(packet)) {
- sent = true;
- } else {
- // queue packet
- QByteArray data;
- QXmlStreamWriter xmlStream(&data);
- packet.toXml(&xmlStream);
- d->queues[conn] << data;
- sent = true;
- }
- }
- return sent;
+ // serialize data
+ QByteArray data;
+ QXmlStreamWriter xmlStream(&data);
+ packet.toXml(&xmlStream);
+
+ // route data
+ return d->routeData(packet.to(), data);
}
/// Add a new incoming client stream.
@@ -654,20 +558,19 @@ void QXmppServer::addIncomingClient(QXmppIncomingClient *stream)
stream->setPasswordChecker(d->passwordChecker);
check = connect(stream, SIGNAL(connected()),
- this, SLOT(_q_streamConnected()));
+ this, SLOT(_q_clientConnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(disconnected()),
- this, SLOT(_q_streamDisconnected()));
+ this, SLOT(_q_clientDisconnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(elementReceived(QDomElement)),
- this, SLOT(_q_elementReceived(QDomElement)));
+ this, SLOT(handleElement(QDomElement)));
Q_ASSERT(check);
// add stream
- d->incomingClients.append(stream);
- emit streamAdded(stream);
+ d->incomingClients.insert(stream);
}
/// Handle a new incoming TCP connection from a client.
@@ -688,6 +591,59 @@ void QXmppServer::_q_clientConnection(QSslSocket *socket)
addIncomingClient(stream);
}
+/// Handle a successful stream connection for a client.
+///
+
+void QXmppServer::_q_clientConnected()
+{
+ QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient*>(sender());
+ if (!client)
+ return;
+
+ // FIXME: at this point the JID must contain a resource, assert it?
+ const QString jid = client->jid();
+
+ // check whether the connection conflicts with another one
+ QXmppIncomingClient *old = d->incomingClientsByJid.value(jid);
+ if (old && old != client) {
+ old->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>");
+ old->disconnectFromHost();
+ }
+ d->incomingClientsByJid.insert(jid, client);
+ d->incomingClientsByBareJid[jidToBareJid(jid)].insert(client);
+
+ // emit signal
+ emit clientConnected(jid);
+}
+
+/// Handle a stream disconnection for a client.
+
+void QXmppServer::_q_clientDisconnected()
+{
+ QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient *>(sender());
+ if (!client)
+ return;
+
+ if (d->incomingClients.remove(client)) {
+ // remove stream from routing tables
+ const QString jid = client->jid();
+ if (!jid.isEmpty()) {
+ if (d->incomingClientsByJid.value(jid) == client)
+ d->incomingClientsByJid.remove(jid);
+ const QString bareJid = jidToBareJid(jid);
+ if (d->incomingClientsByBareJid.contains(bareJid))
+ d->incomingClientsByBareJid[bareJid].remove(client);
+ }
+
+ // destroy client
+ client->deleteLater();
+
+ // emit signal
+ if (!jid.isEmpty())
+ emit clientDisconnected(jid);
+ }
+}
+
void QXmppServer::_q_dialbackRequestReceived(const QXmppDialback &dialback)
{
QXmppIncomingServer *stream = qobject_cast<QXmppIncomingServer *>(sender());
@@ -697,8 +653,7 @@ void QXmppServer::_q_dialbackRequestReceived(const QXmppDialback &dialback)
if (dialback.command() == QXmppDialback::Verify)
{
// handle a verify request
- foreach (QXmppOutgoingServer *out, d->outgoingServers)
- {
+ foreach (QXmppOutgoingServer *out, d->outgoingServers) {
if (out->remoteDomain() != dialback.from())
continue;
@@ -717,12 +672,21 @@ void QXmppServer::_q_dialbackRequestReceived(const QXmppDialback &dialback)
/// Handle an incoming XML element.
-void QXmppServer::_q_elementReceived(const QDomElement &element)
+void QXmppServer::handleElement(const QDomElement &element)
{
- QXmppStream *incoming = qobject_cast<QXmppStream *>(sender());
- if (!incoming)
+ handleStanza(this, element);
+}
+
+/// Handle a stream disconnection for an outgoing server.
+
+void QXmppServer::_q_outgoingServerDisconnected()
+{
+ QXmppOutgoingServer *outgoing = qobject_cast<QXmppOutgoingServer *>(sender());
+ if (!outgoing)
return;
- d->handleStanza(incoming, element);
+
+ if (d->outgoingServers.remove(outgoing))
+ outgoing->deleteLater();
}
/// Handle a new incoming TCP connection from a server.
@@ -743,12 +707,8 @@ void QXmppServer::_q_serverConnection(QSslSocket *socket)
QXmppIncomingServer *stream = new QXmppIncomingServer(socket, d->domain, this);
socket->setParent(stream);
- check = connect(stream, SIGNAL(connected()),
- this, SLOT(_q_streamConnected()));
- Q_ASSERT(check);
-
check = connect(stream, SIGNAL(disconnected()),
- this, SLOT(_q_streamDisconnected()));
+ this, SLOT(_q_serverDisconnected()));
Q_ASSERT(check);
check = connect(stream, SIGNAL(dialbackRequestReceived(QXmppDialback)),
@@ -756,118 +716,23 @@ void QXmppServer::_q_serverConnection(QSslSocket *socket)
Q_ASSERT(check);
check = connect(stream, SIGNAL(elementReceived(QDomElement)),
- this, SLOT(_q_elementReceived(QDomElement)));
+ this, SLOT(handleElement(QDomElement)));
Q_ASSERT(check);
// add stream
- d->incomingServers.append(stream);
- emit streamAdded(stream);
-}
-
-/// Handle a successful stream connection.
-///
-
-void QXmppServer::_q_streamConnected()
-{
- QXmppStream *stream = qobject_cast<QXmppStream*>(sender());
- if (!stream)
- return;
-
- // handle incoming clients
- QXmppIncomingClient *client = qobject_cast<QXmppIncomingClient *>(stream);
- if (client) {
- // FIXME: at this point the JID must contain a resource, assert it?
- const QString jid = client->jid();
-
- // check whether the connection conflicts with another one
- QXmppIncomingClient *old = d->incomingClientsByJid.value(jid);
- if (old && old != client) {
- old->sendData("<stream:error><conflict xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Replaced by new connection</text></stream:error>");
- old->disconnectFromHost();
- }
- d->incomingClientsByJid.insert(jid, client);
- d->incomingClientsByBareJid[jidToBareJid(jid)].insert(client);
- }
-
- // flush queue
- if (d->queues.contains(stream)) {
- foreach (const QByteArray &data, d->queues[stream])
- stream->sendData(data);
- d->queues.remove(stream);
- }
-
- // emit signal
- emit streamConnected(stream);
+ d->incomingServers.insert(stream);
}
-/// Handle a stream disconnection.
-///
+/// Handle a stream disconnection for an incoming server.
-void QXmppServer::_q_streamDisconnected()
+void QXmppServer::_q_serverDisconnected()
{
- // handle clients
- QXmppIncomingClient *stream = qobject_cast<QXmppIncomingClient *>(sender());
- if (stream && d->incomingClients.contains(stream)) {
- const QString jid = stream->jid();
-
- // check the user exited cleanly
- if (!jid.isEmpty()) {
- QDomDocument doc;
- QDomElement presence = doc.createElement("presence");
- presence.setAttribute("from", jid);
- presence.setAttribute("type", "unavailable");
-
- if (d->presences.value(jidToBareJid(jid)).contains(jid)) {
- // the client had sent an initial available presence but did
- // not sent an unavailable presence, synthesize it
- presence.setAttribute("to", d->domain);
- d->handleStanza(stream, presence);
- } else {
- // synthesize unavailable presence to directed presence receivers
- const QSet<QString> recipients = d->subscribers.value(jid);
- foreach (const QString &recipient, recipients) {
- presence.setAttribute("to", recipient);
- d->handleStanza(stream, presence);
- }
- }
-
- // remove stream from routing tables
- if (d->incomingClientsByJid.value(jid) == stream)
- d->incomingClientsByJid.remove(jid);
- const QString bareJid = jidToBareJid(jid);
- if (d->incomingClientsByBareJid.contains(bareJid))
- d->incomingClientsByBareJid[bareJid].remove(stream);
- }
-
- // remove stream
- d->incomingClients.removeAll(stream);
- d->queues.remove(stream);
- emit streamRemoved(stream);
- stream->deleteLater();
- return;
- }
-
- // handle incoming streams
QXmppIncomingServer *incoming = qobject_cast<QXmppIncomingServer *>(sender());
- if (incoming && d->incomingServers.contains(incoming))
- {
- d->incomingServers.removeAll(incoming);
- d->queues.remove(incoming);
- emit streamRemoved(incoming);
- incoming->deleteLater();
+ if (!incoming)
return;
- }
- // handle outgoing streams
- QXmppOutgoingServer *outgoing = qobject_cast<QXmppOutgoingServer *>(sender());
- if (outgoing && d->outgoingServers.contains(outgoing))
- {
- d->outgoingServers.removeAll(outgoing);
- d->queues.remove(outgoing);
- emit streamRemoved(outgoing);
- outgoing->deleteLater();
- return;
- }
+ if (d->incomingServers.remove(incoming))
+ incoming->deleteLater();
}
class QXmppSslServerPrivate