cURL / Mailing Lists / curl-library / Single Mail

curl-library

Re: Active FTP uploads

From: Patrick Heeb <speedpat_at_gmx.net>
Date: Wed, 07 Jun 2006 12:02:37 +0200

Daniel Stenberg wrote:
> On Wed, 7 Jun 2006, Patrick Heeb wrote:
>
>> I am trying to do parallel active mode FTP uploads with the multi
>> interface and detected a strange behaviour. The first upload of each
>> connection timeout's waiting for the server to connect. Retrying with
>> the same easy handle then starts the upload immediately.
>
> Using what libcurl version on what operating system and would it be
> possible to show us a complete source code for a program that shows
> this problem?
>
Im using libcurl 7.13.2 on Debian Sarge (installed from the package
libcurl3 7.13.2-2)

I wrote this small class to handle the uploads using a max of parallel
connections.
connect() sets up an array of easy_handles and a multi_handle
upload() does the actual upload.
setupItemHandles() is called by upload, and completes the easy_handles
with the needed data (source file, destination path).

#include <sstream>
#include <log4cpp/Category.hh>
#include "FTPTransferConnection.h"

/**
 * Create a FTPTransferConnection Instance
 */
FTPTransferConnection::FTPTransferConnection(int parallelConnections,
string hostname, string user, string password, string rootPath)
{
        this->loginInfo = user + ':' + password;
        this->hostname = hostname;
        this->rootPath = rootPath;
        this->item_handle_count = 0;
        this->item_handles = new CURL*[parallelConnections];
        this->multi_handle = NULL;
        this->commands = NULL;
        this->numConnections = parallelConnections;
}

/**
 * FTPTransferConnection Destructor
 */
FTPTransferConnection::~FTPTransferConnection()
{
        delete[] item_handles;
}

/**
 * Upload a list of TransferItems, return 0 on error, 1 on success
 */
int FTPTransferConnection::upload(TransferItem** items, int offset, int
itemCount)
{
        if (!connected())
        {
                return 0;
        }

        int itemPos = offset;
        int retry = 0;

        while (itemPos < (offset+itemCount) && retry < 3)
        {
                int loopItems = min(numConnections, itemCount + offset -
itemPos);
                int numItems = setupItemHandles(items, loopItems, itemPos);
                if (numItems <= 0)
                {
                        return 0;
                }

                int still_running = 1;
                int msgs_left = 0;
                CURLMsg* msg = NULL;
                bool failed = false;

                while (still_running)
                {
                        while (CURLM_CALL_MULTI_PERFORM ==
curl_multi_perform(multi_handle, &still_running))
                        {
                                // do loop
                        }

                        while ((msg = curl_multi_info_read(multi_handle,
&msgs_left)))
                        {
                            if (msg->msg == CURLMSG_DONE)
                            {
                                        int idx = 0;
                                        int found = 0;

                                        // Find out which handle this
message is about
                                        for (; (!found && (idx <
loopItems)); idx++)
                                        {
                                                found =
(msg->easy_handle == item_handles[idx]);
                                        }

                                        if (found)
                                        {
                                                idx--;
                                                TransferItem *item =
items[itemPos + idx];
                                                item->closeFile();
                                         
                                        if (msg->data.result != 0)
                                        {
                                                stringstream log;
                                                log.str("upload failed: ");
                                                log <<
item->getSourcePath() << " " << curl_easy_strerror(msg->data.result);
                                                logger.error(log.str());
                                                failed = true;
                                        }
                                        else
                                        {
                                                item->setTransferred();
                                               
curl_multi_remove_handle(multi_handle, item_handles[idx]);
                                        }
                                         }
                                }
                        }
                }

                if (!failed)
                {
                        itemPos += loopItems;
                        retry = 0;
                }
                else
                {
                        retry++;
                }

                for(int i = 0; i < loopItems; i++)
                {
                        curl_multi_remove_handle(multi_handle,
item_handles[i]);
                }
        }

        if (retry > 0)
        {
                return 0;
        }

        return 1;
}

int FTPTransferConnection::connect()
{
        if (multi_handle != NULL)
        {
                return 1;
        }

        if (CURLE_OK != curl_global_init(CURL_GLOBAL_ALL))
        {
                logger.error("curl_global_init failed");
                return 0;
        }

        multi_handle = curl_multi_init();
        if (multi_handle == NULL)
        {
                logger.error("curl_multi_init failed");
                return 0;
        }

        for(int i = 0; i < numConnections; i++)
        {
                item_handles[i] = curl_easy_init();
                if (item_handles[i] == NULL)
                {
                        logger.error("curl_easy_init failed");
                        return 0;
                }
                curl_easy_setopt(item_handles[i], CURLOPT_UPLOAD, 1);
                curl_easy_setopt(item_handles[i], CURLOPT_USERPWD,
loginInfo.c_str());
                curl_easy_setopt(item_handles[i],
CURLOPT_FTP_CREATE_MISSING_DIRS, 1);
                curl_easy_setopt(item_handles[i], CURLOPT_FTPPORT,
"192.168.2.224");
                curl_easy_setopt(item_handles[i], CURLOPT_FTP_USE_EPRT, 0);
                curl_easy_setopt(item_handles[i], CURLOPT_TIMEOUT, 30);
                curl_easy_setopt(item_handles[i],
CURLOPT_FTP_RESPONSE_TIMEOUT, 30);
                curl_easy_setopt(item_handles[i], CURLOPT_VERBOSE, 1);
        }

        int still_running = 0;

        return 1;
}

int FTPTransferConnection::disconnect()
{
        if (multi_handle == NULL)
        {
                return 1;
        }

        for(int i = 0; i < numConnections; i++)
        {
                curl_easy_cleanup(item_handles[i]);
        }

        curl_multi_cleanup(multi_handle);

        curl_global_cleanup();

        return 1;
}

int FTPTransferConnection::connected()
{
        return multi_handle != NULL;
}

string FTPTransferConnection::buildDestinationPath(string
itemDestinationPath)
{
        stringstream sDstPath;
        sDstPath.str("");
        sDstPath << "ftp://" << hostname << "/" << rootPath;
        char rootPathEnd = rootPath[rootPath.length()-1];
        char dstPathBegin = itemDestinationPath[0];
        if (rootPathEnd != '/' && dstPathBegin != '/')
        {
          sDstPath << "/";
          sDstPath << itemDestinationPath;
        }
        else if (rootPathEnd == '/' && dstPathBegin == '/')
        {
          string tmp = itemDestinationPath.substr(1,
itemDestinationPath.length()-1);
          sDstPath << tmp;
        }
        else
        {
          sDstPath << itemDestinationPath;
        }

        return sDstPath.str();
}

int FTPTransferConnection::setupItemHandles(TransferItem** items, int
loopItems, int itemPos)
{
                item_handle_count = 0;
                for(int i = 0; i < loopItems; i++)
                {
                        TransferItem* item = items[itemPos+i];
                        if (item->isTransferred())
                        {
                                continue;
                        }

                        FILE* file = item->getSourceFile();
                        if (file == NULL)
                        {
                                logger.error(string("source file not
found: ") + item->getSourcePath());
                                return (itemPos - 1)*-1;
                        }

                        item_handle_count++;

                        string itemDestinationPath =
buildDestinationPath(item->getDestinationPath());

                       
item->setProtocolDestinationPath(itemDestinationPath);

                        curl_easy_setopt(item_handles[i], CURLOPT_URL,
itemDestinationPath.c_str());
                        curl_easy_setopt(item_handles[i],
CURLOPT_READDATA, file);
                        curl_easy_setopt(item_handles[i],
CURLOPT_VERBOSE, 1);

                        curl_multi_add_handle(multi_handle,
item_handles[i]);
                }

                return item_handle_count;
}
Received on 2006-06-07