From 3caffcebf16680576d8dd785437eed16a6c5f36d Mon Sep 17 00:00:00 2001 From: Linus Jahn Date: Wed, 9 Sep 2020 15:05:23 +0200 Subject: Add reporting of IQ responses with QFutures --- src/base/QXmppStream.cpp | 133 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) (limited to 'src/base/QXmppStream.cpp') diff --git a/src/base/QXmppStream.cpp b/src/base/QXmppStream.cpp index 7426f8b0..56f3afda 100644 --- a/src/base/QXmppStream.cpp +++ b/src/base/QXmppStream.cpp @@ -25,6 +25,7 @@ #include "QXmppStream.h" #include "QXmppConstants_p.h" +#include "QXmppIq.h" #include "QXmppLogger.h" #include "QXmppPacket_p.h" #include "QXmppStanza.h" @@ -35,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +49,45 @@ static bool randomSeeded = false; #endif +class IqState : public QFutureInterface +{ + Q_DISABLE_COPY(IqState) + +public: + IqState(QFuture sendFuture, bool streamManagementUsed) + : QFutureInterface(QFutureInterfaceBase::Started), + m_sendFuture(std::move(sendFuture)), + m_streamManagementUsed(streamManagementUsed) + { + auto *watcher = new QFutureWatcher(); + QObject::connect(watcher, &QFutureWatcher::finished, [=]() { + const auto result = watcher->future().results().last(); + + switch (result) { + case QXmpp::Acknowledged: + case QXmpp::Sent: + break; + case QXmpp::NotSent: + reportResult(result); + reportFinished(); + break; + } + + watcher->deleteLater(); + }); + watcher->setFuture(m_sendFuture); + } + + bool isStreamManagementUsed() const + { + return m_streamManagementUsed; + } + +private: + QFuture m_sendFuture; + bool m_streamManagementUsed; +}; + class QXmppStreamPrivate { public: @@ -60,6 +101,9 @@ public: // stream management QXmppStreamManager streamManager; + + // iq response handling + QMap runningIqs; }; QXmppStreamPrivate::QXmppStreamPrivate(QXmppStream *stream) @@ -68,6 +112,17 @@ QXmppStreamPrivate::QXmppStreamPrivate(QXmppStream *stream) { } +/// +/// \typedef QXmppStream::IqResult +/// +/// Contains a QDomElement containing the IQ response or if the request couldn't +/// be sent a QXmpp::PacketState. +/// +/// \warning THIS API IS NOT FINALIZED YET! +/// +/// \since QXmpp 1.5 +/// + /// /// Constructs a base XMPP stream. /// @@ -91,6 +146,7 @@ QXmppStream::QXmppStream(QObject *parent) /// QXmppStream::~QXmppStream() { + cancelOngoingIqs(); delete d; } @@ -174,6 +230,54 @@ QFuture QXmppStream::send(const QXmppStanza &stanza) return packet.future(); } +/// +/// Sends an IQ packet and returns the response asynchronously. +/// +/// \warning THIS API IS NOT FINALIZED YET! +/// +/// \since QXmpp 1.5 +/// +QFuture QXmppStream::sendIq(const QXmppIq &iq) +{ + if (iq.id().isEmpty()) { + warning(QStringLiteral("QXmppStream::sendIq() error: ID is empty. Using random ID.")); + auto newIq = iq; + newIq.setId(QXmppUtils::generateStanzaUuid()); + return sendIq(newIq); + } + if (d->runningIqs.contains(iq.id())) { + warning(QStringLiteral("QXmppStream::sendIq() error:" + "The IQ's ID (\"%1\") is already in use. Using random ID.") + .arg(iq.id())); + auto newIq = iq; + newIq.setId(QXmppUtils::generateStanzaUuid()); + return sendIq(newIq); + } + + auto *interface = new IqState(send(iq), d->streamManager.enabled()); + + if (!interface->isFinished()) { + d->runningIqs.insert(iq.id(), interface); + } + + return interface->future(); +} + +/// +/// Cancels all ongoing IQ requests and reports QXmpp::NotSent. +/// +/// \since QXmpp 1.5 +/// +void QXmppStream::cancelOngoingIqs() +{ + for (auto *state : std::as_const(d->runningIqs)) { + state->reportResult(QXmpp::NotSent); + state->reportFinished(); + delete state; + } + d->runningIqs.clear(); +} + /// /// Resets the stream management packages cache. /// @@ -330,7 +434,7 @@ void QXmppStream::processData(const QString &data) auto stanza = doc.documentElement().firstChildElement(); for (; !stanza.isNull(); stanza = stanza.nextSiblingElement()) { // handle possible stream management packets first - if (d->streamManager.handleStanza(stanza)) + if (d->streamManager.handleStanza(stanza) || handleIqResponse(stanza)) continue; // process all other kinds of packets @@ -352,6 +456,33 @@ void QXmppStream::sendPacket(QXmppPacket &packet) } } +bool QXmppStream::handleIqResponse(const QDomElement &stanza) +{ + if (stanza.tagName() != QStringLiteral("iq")) { + return false; + } + + // only accept "result" and "error" types + const auto iqType = stanza.attribute(QStringLiteral("type")); + if (iqType != QStringLiteral("result") && iqType != QStringLiteral("error")) { + return false; + } + + if (auto itr = d->runningIqs.find(stanza.attribute(QStringLiteral("id"))); + itr != d->runningIqs.end()) { + + auto *state = itr.value(); + state->reportResult(stanza); + state->reportFinished(); + delete state; + + d->runningIqs.erase(itr); + return true; + } + + return false; +} + /// /// Enables Stream Management acks / reqs (\xep{0198}). /// -- cgit v1.2.3