From 946561deabadada49ab1733db3aae9a4be20c16a Mon Sep 17 00:00:00 2001 From: Jeremy Lainé Date: Fri, 5 Mar 2010 16:01:13 +0000 Subject: abort jobs cleanly and add some code documentation --- source/QXmppTransferManager.cpp | 160 ++++++++++++++++++++++++---------------- 1 file changed, 97 insertions(+), 63 deletions(-) (limited to 'source/QXmppTransferManager.cpp') diff --git a/source/QXmppTransferManager.cpp b/source/QXmppTransferManager.cpp index 829aac65..2ff188c3 100644 --- a/source/QXmppTransferManager.cpp +++ b/source/QXmppTransferManager.cpp @@ -57,20 +57,27 @@ QXmppTransferJob::QXmppTransferJob(const QString &jid, QXmppTransferJob::Directi m_state(StartState), m_fileSize(0), m_ibbSequence(0), - m_socksClient(0), m_socksSocket(0) { } +/// Call this method if you wish to abort on ongoing transfer job. +/// + void QXmppTransferJob::abort() { - // FIXME : actually abort job cleanly terminate(AbortError); } +/// Call this method if you wish to accept an incoming transfer job. +/// +/// To accept the transfer job, you must call the accept() method from +/// a slot connected to the QXmppTransferManager's fileReceived() signal. +/// + void QXmppTransferJob::accept(QIODevice *iodevice) { - if (!m_iodevice) + if (m_direction == IncomingDirection && !m_iodevice) m_iodevice = iodevice; } @@ -159,10 +166,9 @@ void QXmppTransferJob::setState(QXmppTransferJob::State state) void QXmppTransferJob::slotTerminated() { emit stateChanged(m_state); - if (m_error == NoError) - emit finished(); - else + if (m_error != NoError) emit error(m_error); + emit finished(); } void QXmppTransferJob::terminate(QXmppTransferJob::Error cause) @@ -178,11 +184,12 @@ void QXmppTransferJob::terminate(QXmppTransferJob::Error cause) if (m_iodevice) m_iodevice->close(); - // close sockets - if (m_socksClient) - m_socksClient->close(); + // close socket if (m_socksSocket) + { + m_socksSocket->flush(); m_socksSocket->close(); + } // emit signals later QTimer::singleShot(0, this, SLOT(slotTerminated())); @@ -278,14 +285,18 @@ void QXmppTransferManager::byteStreamResultReceived(const QXmppByteStreamIq &iq) m_client->getConfiguration().jid(), job->m_jid); - job->m_socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); - job->m_socksClient->connectToHost(hostName, 0); - if (!job->m_socksClient->waitForConnected()) + QXmppSocksClient *socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); + socksClient->connectToHost(hostName, 0); + if (!socksClient->waitForReady()) { - qWarning() << "Failed to connect to" << streamHost.host().toString() << streamHost.port() << ":" << job->m_socksClient->errorString(); + qWarning() << "Failed to connect to" << streamHost.host().toString() << streamHost.port() << ":" << socksClient->errorString(); + delete socksClient; job->terminate(QXmppTransferJob::ProtocolError); return; } + job->m_socksSocket = socksClient; + connect(job->m_socksSocket, SIGNAL(bytesWritten(qint64)), this, SLOT(socksSocketDataSent())); + connect(job->m_socksSocket, SIGNAL(disconnected()), this, SLOT(socksSocketDisconnected())); // activate stream QXmppByteStreamIq streamIq; @@ -347,13 +358,14 @@ void QXmppTransferManager::byteStreamSetReceived(const QXmppByteStreamIq &iq) m_client->getConfiguration().jid()); // try to connect to stream host - job->m_socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); - job->m_socksClient->connectToHost(hostName, 0); - if (job->m_socksClient->waitForConnected()) + QXmppSocksClient *socksClient = new QXmppSocksClient(streamHost.host(), streamHost.port(), job); + socksClient->connectToHost(hostName, 0); + if (socksClient->waitForReady()) { job->setState(QXmppTransferJob::TransferState); - connect(job->m_socksClient, SIGNAL(readyRead()), this, SLOT(socksClientDataReceived())); - connect(job->m_socksClient, SIGNAL(disconnected()), this, SLOT(socksClientDisconnected())); + job->m_socksSocket = socksClient; + connect(job->m_socksSocket, SIGNAL(readyRead()), this, SLOT(socksSocketDataReceived())); + connect(job->m_socksSocket, SIGNAL(disconnected()), this, SLOT(socksSocketDisconnected())); QXmppByteStreamIq ackIq; ackIq.setId(iq.id()); @@ -364,9 +376,8 @@ void QXmppTransferManager::byteStreamSetReceived(const QXmppByteStreamIq &iq) m_client->sendPacket(ackIq); return; } else { - qWarning() << "Failed to connect to" << streamHost.host().toString() << streamHost.port() << ":" << job->m_socksClient->errorString(); - delete job->m_socksClient; - job->m_socksClient = 0; + qWarning() << "Failed to connect to" << streamHost.host().toString() << streamHost.port() << ":" << socksClient->errorString(); + delete socksClient; } } @@ -396,14 +407,6 @@ QXmppTransferJob* QXmppTransferManager::getJobBySid(const QString &jid, const QS return 0; } -QXmppTransferJob* QXmppTransferManager::getJobBySocksClient(QXmppSocksClient *socksClient) -{ - foreach (QXmppTransferJob *job, m_jobs) - if (job->m_socksClient == socksClient) - return job; - return 0; -} - QXmppTransferJob* QXmppTransferManager::getJobBySocksSocket(QTcpSocket *socksSocket) { foreach (QXmppTransferJob *job, m_jobs) @@ -448,7 +451,8 @@ void QXmppTransferManager::ibbDataIqReceived(const QXmppIbbDataIq &iq) QXmppTransferJob *job = getJobBySid(iq.from(), iq.sid()); if (!job || job->direction() != QXmppTransferJob::IncomingDirection || - job->method() != QXmppTransferJob::InBandMethod) + job->method() != QXmppTransferJob::InBandMethod || + job->state() != QXmppTransferJob::TransferState) { // the job is unknown, cancel it QXmppStanza::Error error(QXmppStanza::Error::Cancel, QXmppStanza::Error::ItemNotFound); @@ -575,16 +579,13 @@ void QXmppTransferManager::iqReceived(const QXmppIq &iq) { if (job->m_socksProxy.jid() == iq.from() && job->m_requestId == iq.id()) { - if (job->m_socksClient) + if (job->m_socksSocket) { // proxy connection activation result if (iq.type() == QXmppIq::Result) { // proxy stream activated, start sending data job->setState(QXmppTransferJob::TransferState); - job->m_socksSocket = job->m_socksClient->socket(); - connect(job->m_socksSocket, SIGNAL(bytesWritten(qint64)), this, SLOT(socksSocketDataSent())); - connect(job->m_socksSocket, SIGNAL(disconnected()), this, SLOT(socksSocketDisconnected())); socksServerSendData(job); } else if (iq.type() == QXmppIq::Error) { // proxy stream not activated, terminate @@ -614,6 +615,25 @@ void QXmppTransferManager::iqReceived(const QXmppIq &iq) } } +void QXmppTransferManager::jobError(QXmppTransferJob::Error error) +{ + QXmppTransferJob *job = qobject_cast(sender()); + if (!job || !m_jobs.contains(job)) + return; + + if (job->direction() == QXmppTransferJob::OutgoingDirection && + job->method() == QXmppTransferJob::InBandMethod && + error == QXmppTransferJob::AbortError) + { + // close the bytestream + QXmppIbbCloseIq closeIq; + closeIq.setTo(job->m_jid); + closeIq.setSid(job->m_sid); + job->m_requestId = closeIq.id(); + m_client->sendPacket(closeIq); + } +} + QXmppTransferJob *QXmppTransferManager::sendFile(const QString &jid, const QString &fileName) { QFileInfo info(fileName); @@ -710,6 +730,7 @@ QXmppTransferJob *QXmppTransferManager::sendFile(const QString &jid, const QStri // start job m_jobs.append(job); + connect(job, SIGNAL(error(QXmppTransferJob::Error)), this, SLOT(jobError(QXmppTransferJob::Error))); QXmppStreamInitiationIq request; request.setType(QXmppIq::Set); @@ -723,27 +744,6 @@ QXmppTransferJob *QXmppTransferManager::sendFile(const QString &jid, const QStri return job; } -void QXmppTransferManager::socksClientDataReceived() -{ - QXmppSocksClient *socks = qobject_cast(sender()); - QXmppTransferJob *job = getJobBySocksClient(socks); - if (!job || job->state() != QXmppTransferJob::TransferState) - return; - - job->writeData(job->m_socksClient->readAll()); -} - -void QXmppTransferManager::socksClientDisconnected() -{ - QXmppSocksClient *socks = qobject_cast(sender()); - QXmppTransferJob *job = getJobBySocksClient(socks); - if (!job || job->state() == QXmppTransferJob::FinishedState) - return; - - // check received data - job->checkData(); -} - void QXmppTransferManager::socksServerConnected(QTcpSocket *socket, const QString &hostName, quint16 port) { const QString ownJid = m_client->getConfiguration().jid(); @@ -759,6 +759,24 @@ void QXmppTransferManager::socksServerConnected(QTcpSocket *socket, const QStrin socket->close(); } +void QXmppTransferManager::socksSocketDataReceived() +{ + QTcpSocket *socksSocket = qobject_cast(sender()); + QXmppTransferJob *job = getJobBySocksSocket(socksSocket); + if (!job || job->state() != QXmppTransferJob::TransferState) + return; + + // receive data block + if (job->direction() == QXmppTransferJob::IncomingDirection) + { + job->writeData(job->m_socksSocket->readAll()); + + // if we have received all the data, stop here + if (job->fileSize() && job->m_done >= job->fileSize()) + job->checkData(); + } +} + void QXmppTransferManager::socksSocketDataSent() { QTcpSocket *socksSocket = qobject_cast(sender()); @@ -767,7 +785,8 @@ void QXmppTransferManager::socksSocketDataSent() return; // send next data block - socksServerSendData(job); + if (job->direction() == QXmppTransferJob::OutgoingDirection) + socksServerSendData(job); } void QXmppTransferManager::socksSocketDisconnected() @@ -778,7 +797,15 @@ void QXmppTransferManager::socksSocketDisconnected() return; // terminate transfer - job->terminate(QXmppTransferJob::ProtocolError); + if (job->direction() == QXmppTransferJob::IncomingDirection) + { + job->checkData(); + } else { + if (job->fileSize() && job->m_done != job->fileSize()) + job->terminate(QXmppTransferJob::ProtocolError); + else + job->terminate(QXmppTransferJob::NoError); + } } void QXmppTransferManager::socksServerSendData(QXmppTransferJob *job) @@ -790,7 +817,6 @@ void QXmppTransferManager::socksServerSendData(QXmppTransferJob *job) job->m_done += buffer.size(); job->progress(job->m_done, job->fileSize()); } else { - // FIXME : close socket job->terminate(QXmppTransferJob::NoError); } } @@ -995,7 +1021,7 @@ void QXmppTransferManager::streamInitiationSetReceived(const QXmppStreamInitiati // allow user to accept or decline the job emit fileReceived(job); - if (!job->m_iodevice) + if (!job->m_iodevice || !job->m_iodevice->isWritable()) { QXmppStanza::Error error(QXmppStanza::Error::Cancel, QXmppStanza::Error::Forbidden); error.setCode(403); @@ -1010,6 +1036,7 @@ void QXmppTransferManager::streamInitiationSetReceived(const QXmppStreamInitiati // the job was accepted m_jobs.append(job); + connect(job, SIGNAL(error(QXmppTransferJob::Error)), this, SLOT(jobError(QXmppTransferJob::Error))); QXmppElement value; value.setTagName("value"); @@ -1041,19 +1068,25 @@ void QXmppTransferManager::streamInitiationSetReceived(const QXmppStreamInitiati m_client->sendPacket(response); } -/// Return the bytestream proxy. +/// Return the JID of the bytestream proxy. +/// + QString QXmppTransferManager::proxy() const { return m_proxy; } -/// Set the bytestream proxy. -void QXmppTransferManager::setProxy(const QString &proxy) +/// Set the JID of the bytestream proxy. +/// + +void QXmppTransferManager::setProxy(const QString &proxyJid) { - m_proxy = proxy; + m_proxy = proxyJid; } /// Return the supported stream methods. +/// + int QXmppTransferManager::supportedMethods() const { return m_supportedMethods; @@ -1065,6 +1098,7 @@ int QXmppTransferManager::supportedMethods() const /// The methods argument is a combination of zero or more /// QXmppTransferJob::Method. /// + void QXmppTransferManager::setSupportedMethods(int methods) { m_supportedMethods = (methods & QXmppTransferJob::AnyMethod); -- cgit v1.2.3