From eb82be34f09a83fb385f0bfef54eddcd254838aa Mon Sep 17 00:00:00 2001 From: Jeremy Lainé Date: Wed, 3 Mar 2010 15:24:13 +0000 Subject: add support for SOCKS5 proxies --- source/QXmppTransferManager.cpp | 263 ++++++++++++++++++++++++++++++++-------- 1 file changed, 213 insertions(+), 50 deletions(-) (limited to 'source/QXmppTransferManager.cpp') diff --git a/source/QXmppTransferManager.cpp b/source/QXmppTransferManager.cpp index fabbc2ee..fe4d41ed 100644 --- a/source/QXmppTransferManager.cpp +++ b/source/QXmppTransferManager.cpp @@ -62,6 +62,12 @@ QXmppTransferJob::QXmppTransferJob(const QString &jid, QXmppTransferJob::Directi { } +void QXmppTransferJob::abort() +{ + // FIXME : actually abort job cleanly + terminate(AbortError); +} + void QXmppTransferJob::accept(QIODevice *iodevice) { if (!m_iodevice) @@ -194,16 +200,33 @@ QXmppTransferManager::QXmppTransferManager(QXmppClient *client) void QXmppTransferManager::byteStreamIqReceived(const QXmppByteStreamIq &iq) { + // handle IQ from proxy + foreach (QXmppTransferJob *job, m_jobs) + { + if (job->m_socksProxy.jid() == iq.getFrom() && job->m_requestId == iq.getId()) + { + if (iq.getType() == QXmppIq::Result && iq.streamHosts().size() > 0) + { + job->m_socksProxy = iq.streamHosts().first(); + socksServerSendOffer(job); + return; + } + } + } + if (iq.getType() == QXmppIq::Result) byteStreamResultReceived(iq); else if (iq.getType() == QXmppIq::Set) byteStreamSetReceived(iq); } +/// Handle a response to a bystream set, i.e. after we informed the remote party +/// that we connected to a stream host. void QXmppTransferManager::byteStreamResponseReceived(const QXmppIq &iq) { QXmppTransferJob *job = getJobByRequestId(iq.getFrom(), iq.getId()); if (!job || + job->direction() != QXmppTransferJob::IncomingDirection || job->method() != QXmppTransferJob::SocksMethod || job->state() != QXmppTransferJob::StartState) return; @@ -212,23 +235,60 @@ void QXmppTransferManager::byteStreamResponseReceived(const QXmppIq &iq) job->terminate(QXmppTransferJob::ProtocolError); } -/// Handle a bytestream result, i.e. after the remote party has connected to our stream host. +/// Handle a bytestream result, i.e. after the remote party has connected to +/// a stream host. void QXmppTransferManager::byteStreamResultReceived(const QXmppByteStreamIq &iq) { QXmppTransferJob *job = getJobByRequestId(iq.getFrom(), iq.getId()); if (!job || + job->direction() != QXmppTransferJob::OutgoingDirection || job->method() != QXmppTransferJob::SocksMethod || job->state() != QXmppTransferJob::StartState) return; - // start sending data + // check the stream host + if (iq.streamHostUsed() == job->m_socksProxy.jid()) + { + const QXmppByteStreamIq::StreamHost streamHost = job->m_socksProxy; + qDebug() << "Connecting to proxy" << streamHost.jid(); + qDebug() << " host:" << streamHost.host().toString(); + qDebug() << " port:" << streamHost.port(); + + // connect to proxy + const QString hostName = streamHash(job->m_sid, + m_client->getConfiguration().getJid(), + job->m_jid); + + job->m_socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); + job->m_socksClient->connectToHost(hostName, 0); + if (!job->m_socksClient->waitForConnected()) + { + qWarning() << "Failed to connect to" << streamHost.host().toString() << streamHost.port() << ":" << job->m_socksClient->errorString(); + job->terminate(QXmppTransferJob::ProtocolError); + return; + } + + // activate stream + QXmppByteStreamIq streamIq; + streamIq.setType(QXmppIq::Set); + streamIq.setFrom(m_client->getConfiguration().getJid()); + streamIq.setTo(streamHost.jid()); + streamIq.setSid(job->m_sid); + streamIq.setActivate(job->m_jid); + job->m_requestId = streamIq.getId(); + m_client->sendPacket(streamIq); + return; + } + + // direction connection, start sending data job->setState(QXmppTransferJob::TransferState); connect(job->m_socksServer, SIGNAL(bytesWritten(qint64)), this, SLOT(socksServerDataSent())); connect(job->m_socksServer, SIGNAL(disconnected()), this, SLOT(socksServerDisconnected())); socksServerSendData(job); } -/// Handle a bytestream set, i.e. an invitation from the remote party to connect to their stream host. +/// Handle a bytestream set, i.e. an invitation from the remote party to connect +/// to a stream host. void QXmppTransferManager::byteStreamSetReceived(const QXmppByteStreamIq &iq) { QXmppIq response; @@ -237,6 +297,7 @@ void QXmppTransferManager::byteStreamSetReceived(const QXmppByteStreamIq &iq) QXmppTransferJob *job = getJobBySid(iq.getFrom(), iq.sid()); if (!job || + job->direction() != QXmppTransferJob::IncomingDirection || job->method() != QXmppTransferJob::SocksMethod || job->state() != QXmppTransferJob::StartState) { @@ -256,9 +317,9 @@ void QXmppTransferManager::byteStreamSetReceived(const QXmppByteStreamIq &iq) qDebug() << " host:" << streamHost.host().toString(); qDebug() << " port:" << streamHost.port(); - QString hostName = streamHash(job->m_sid, - streamHost.jid(), - m_client->getConfiguration().getJid()); + const QString hostName = streamHash(job->m_sid, + job->m_jid, + m_client->getConfiguration().getJid()); // try to connect to stream host job->m_socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); @@ -333,7 +394,9 @@ void QXmppTransferManager::ibbCloseIqReceived(const QXmppIbbCloseIq &iq) response.setId(iq.getId()); QXmppTransferJob *job = getJobBySid(iq.getFrom(), iq.getSid()); - if (!job || job->method() != QXmppTransferJob::InBandMethod) + if (!job || + job->direction() != QXmppTransferJob::IncomingDirection || + job->method() != QXmppTransferJob::InBandMethod) { // the job is unknown, cancel it QXmppStanza::Error error(QXmppStanza::Error::Cancel, QXmppStanza::Error::ItemNotFound); @@ -358,7 +421,9 @@ void QXmppTransferManager::ibbDataIqReceived(const QXmppIbbDataIq &iq) response.setId(iq.getId()); QXmppTransferJob *job = getJobBySid(iq.getFrom(), iq.getSid()); - if (!job || job->method() != QXmppTransferJob::InBandMethod) + if (!job || + job->direction() != QXmppTransferJob::IncomingDirection || + job->method() != QXmppTransferJob::InBandMethod) { // the job is unknown, cancel it QXmppStanza::Error error(QXmppStanza::Error::Cancel, QXmppStanza::Error::ItemNotFound); @@ -394,7 +459,9 @@ void QXmppTransferManager::ibbOpenIqReceived(const QXmppIbbOpenIq &iq) response.setId(iq.getId()); QXmppTransferJob *job = getJobBySid(iq.getFrom(), iq.getSid()); - if (!job || job->method() != QXmppTransferJob::InBandMethod) + if (!job || + job->direction() != QXmppTransferJob::IncomingDirection || + job->method() != QXmppTransferJob::InBandMethod) { // the job is unknown, cancel it QXmppStanza::Error error(QXmppStanza::Error::Cancel, QXmppStanza::Error::ItemNotFound); @@ -426,6 +493,7 @@ void QXmppTransferManager::ibbResponseReceived(const QXmppIq &iq) { QXmppTransferJob *job = getJobByRequestId(iq.getFrom(), iq.getId()); if (!job || + job->direction() != QXmppTransferJob::OutgoingDirection || job->method() != QXmppTransferJob::InBandMethod || job->state() == QXmppTransferJob::FinishedState) return; @@ -477,6 +545,35 @@ void QXmppTransferManager::ibbResponseReceived(const QXmppIq &iq) void QXmppTransferManager::iqReceived(const QXmppIq &iq) { + // handle IQ from proxy + foreach (QXmppTransferJob *job, m_jobs) + { + if (job->m_socksProxy.jid() == iq.getFrom() && job->m_requestId == iq.getId()) + { + if (job->m_socksClient) + { + // proxy connection activation result + if (iq.getType() == QXmppIq::Result) + { + // proxy stream activated, start sending data + job->setState(QXmppTransferJob::TransferState); + connect(job->m_socksClient, SIGNAL(bytesWritten(qint64)), this, SLOT(socksProxyDataSent())); + connect(job->m_socksClient, SIGNAL(disconnected()), this, SLOT(socksProxyDisconnected())); + socksServerSendData(job); + } else if (iq.getType() == QXmppIq::Error) { + // proxy stream not activated, terminate + qWarning("Could not activate SOCKS5 proxy bytestream"); + job->terminate(QXmppTransferJob::ProtocolError); + } + } else { + // we could not get host/port from proxy, procede without a proxy + if (iq.getType() == QXmppIq::Error) + socksServerSendOffer(job); + } + return; + } + } + QXmppTransferJob *job = getJobByRequestId(iq.getFrom(), iq.getId()); if (!job) return; @@ -621,6 +718,28 @@ void QXmppTransferManager::socksClientDisconnected() job->checkData(); } +void QXmppTransferManager::socksProxyDataSent() +{ + QXmppSocksClient *socksClient = qobject_cast(sender()); + QXmppTransferJob *job = getJobBySocksClient(socksClient); + if (!job || job->state() != QXmppTransferJob::TransferState) + return; + + // send next data block + socksServerSendData(job); +} + +void QXmppTransferManager::socksProxyDisconnected() +{ + QXmppSocksClient *socksClient = qobject_cast(sender()); + QXmppTransferJob *job = getJobBySocksClient(socksClient); + if (!job || job->state() == QXmppTransferJob::FinishedState) + return; + + // terminate transfer + job->terminate(QXmppTransferJob::ProtocolError); +} + void QXmppTransferManager::socksServerDataSent() { QXmppSocksServer *socksServer = qobject_cast(sender()); @@ -648,7 +767,10 @@ void QXmppTransferManager::socksServerSendData(QXmppTransferJob *job) const QByteArray buffer = job->m_iodevice->read(job->m_blockSize); if (buffer.size()) { - job->m_socksServer->write(buffer); + if (job->m_socksClient) + job->m_socksClient->write(buffer); + else + job->m_socksServer->write(buffer); job->m_done += buffer.size(); job->progress(job->m_done, job->fileSize()); @@ -658,6 +780,53 @@ void QXmppTransferManager::socksServerSendData(QXmppTransferJob *job) } } +void QXmppTransferManager::socksServerSendOffer(QXmppTransferJob *job) +{ + const QString ownJid = m_client->getConfiguration().getJid(); + + // discover local IPs + QList streamHosts; + foreach (const QNetworkInterface &interface, QNetworkInterface::allInterfaces()) + { + if (!(interface.flags() & QNetworkInterface::IsRunning) || + interface.flags() & QNetworkInterface::IsLoopBack) + continue; + + foreach (const QNetworkAddressEntry &entry, interface.addressEntries()) + { + if (entry.ip().protocol() != QAbstractSocket::IPv4Protocol || + entry.netmask().isNull() || + entry.netmask() == QHostAddress::Broadcast) + continue; + + QXmppByteStreamIq::StreamHost streamHost; + streamHost.setHost(entry.ip()); + streamHost.setPort(job->m_socksServer->serverPort()); + streamHost.setJid(ownJid); + streamHosts.append(streamHost); + } + } + if (!job->m_socksProxy.jid().isEmpty()) + streamHosts.append(job->m_socksProxy); + + // check we have some stream hosts + if (!streamHosts.size()) + { + qWarning("Could not determine local stream hosts"); + job->terminate(QXmppTransferJob::ProtocolError); + return; + } + + // send offer + QXmppByteStreamIq streamIq; + streamIq.setType(QXmppIq::Set); + streamIq.setTo(job->m_jid); + streamIq.setSid(job->m_sid); + streamIq.setStreamHosts(streamHosts); + job->m_requestId = streamIq.getId(); + m_client->sendPacket(streamIq); +} + void QXmppTransferManager::streamInitiationIqReceived(const QXmppStreamInitiationIq &iq) { if (iq.getType() == QXmppIq::Result) @@ -669,7 +838,8 @@ void QXmppTransferManager::streamInitiationIqReceived(const QXmppStreamInitiatio void QXmppTransferManager::streamInitiationResultReceived(const QXmppStreamInitiationIq &iq) { QXmppTransferJob *job = getJobByRequestId(iq.getFrom(), iq.getId()); - if (!job) + if (!job || + job->direction() != QXmppTransferJob::OutgoingDirection) return; foreach (const QXmppElement &item, iq.getSiItems()) @@ -705,53 +875,34 @@ void QXmppTransferManager::streamInitiationResultReceived(const QXmppStreamIniti job->m_requestId = openIq.getId(); m_client->sendPacket(openIq); } else if (job->method() == QXmppTransferJob::SocksMethod) { - QXmppByteStreamIq streamIq; - streamIq.setType(QXmppIq::Set); - streamIq.setTo(job->m_jid); - streamIq.setSid(job->m_sid); - const QString ownJid = m_client->getConfiguration().getJid(); - QList streamHosts; - // find interface to bind to + // start listening job->m_socksServer = new QXmppSocksServer(this); job->m_socksServer->setHostName(streamHash(job->m_sid, ownJid, job->jid())); job->m_socksServer->setHostPort(0); - foreach (const QNetworkInterface &interface, QNetworkInterface::allInterfaces()) + if (!job->m_socksServer->listen()) { - if (!(interface.flags() & QNetworkInterface::IsRunning) || - interface.flags() & QNetworkInterface::IsLoopBack) - continue; - - foreach (const QNetworkAddressEntry &entry, interface.addressEntries()) - { - if (entry.ip().protocol() != QAbstractSocket::IPv4Protocol || - entry.netmask().isNull() || - entry.netmask() == QHostAddress::Broadcast) - continue; - - // we let the server pick a port - if (!job->m_socksServer->listen(entry.ip())) - { - qWarning() << "QXmppSocksServer could not listen on address" << entry.ip(); - continue; - } - - qDebug() << "QXmppSocksServer listening on" << job->m_socksServer->serverAddress() << job->m_socksServer->serverPort(); + qWarning() << "QXmppSocksServer could not start listening"; + job->terminate(QXmppTransferJob::ProtocolError); + return; + } + qDebug() << "QXmppSocksServer listening on port" << job->m_socksServer->serverPort(); - QXmppByteStreamIq::StreamHost streamHost; - streamHost.setHost(job->m_socksServer->serverAddress()); - streamHost.setPort(job->m_socksServer->serverPort()); - streamHost.setJid(ownJid); - streamHosts.append(streamHost); - streamIq.setStreamHosts(streamHosts); - job->m_requestId = streamIq.getId(); - m_client->sendPacket(streamIq); - return; - } + if (!m_proxy.isEmpty()) + { + job->m_socksProxy.setJid(m_proxy); + + // query proxy + QXmppByteStreamIq streamIq; + streamIq.setType(QXmppIq::Get); + streamIq.setTo(job->m_socksProxy.jid()); + streamIq.setSid(job->m_sid); + job->m_requestId = streamIq.getId(); + m_client->sendPacket(streamIq); + } else { + socksServerSendOffer(job); } - qWarning("Could not determine public IP"); - job->terminate(QXmppTransferJob::ProtocolError); } else { qWarning("We received an unsupported method"); job->terminate(QXmppTransferJob::ProtocolError); @@ -883,6 +1034,18 @@ void QXmppTransferManager::streamInitiationSetReceived(const QXmppStreamInitiati m_client->sendPacket(response); } +/// Return the bytestream proxy. +QString QXmppTransferManager::proxy() const +{ + return m_proxy; +} + +/// Set the bytestream proxy. +void QXmppTransferManager::setProxy(const QString &proxy) +{ + m_proxy = proxy; +} + /// Return the supported stream methods. int QXmppTransferManager::supportedMethods() const { -- cgit v1.2.3