curl-and-python

Performance less than ideal, suggestions?

From: my name <gm41lu53r_at_gmail.com>
Date: Thu, 4 Feb 2010 12:46:19 -0500

I've modified retriever-multi.py to constantly fetch URLs from a database
and do some work on it. I'm able to push out roughly 5MB/s despite having
over 80mbps at my disposal. Is there any way to get better performance out
of this? I'm thinking I should implement cStringIO rather than writing to a
file and re-reading it in.

Thanks.

Code below.

#! /usr/bin/env python

import sys
import os
import re
import uuid

import pycurl

from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine

#random notes:
#http://curl.haxx.se/mail/curlpython-2005-03/0014.html

#options
num_conn = 250 #number of concurrent connections for HTTP requests
data_dir = '/home/crawler/data/url_process_stack/' #directory to write
urljobs to
http_header = ['User-Agent: Mozilla/5.0 (rv:1.8.1.12) Gecko/20080129
Firefox/2.0.0.14 Windows; U; Windows NT 6.0; en-US; rv:1.8.1.14']

#db options
db_host = '127.0.0.1' #'192.168.70.1'
db_user = 'crawler'
db_pass = 'crawl'
db_main = 'CrawlerDB'
db_limit = 2000 #number of items to process from the database at once

#establish database connection and setup sessions
db = create_engine("mysql://%s:%s@%s/%s" %
(db_user,db_pass,db_host,db_main))

Session = scoped_session(sessionmaker(bind=db, autocommit=True,
autoflush=True))

def HandleUrl(urlfile):
   #read in url data
   fh = open(urlfile,'r')
   data = fh.read(1048576)
   data = unicode(data, errors="ignore")
   data = data.lower()
   fh.close

   #get url info
   urlbasefile = os.path.basename(urlfile)
   domainid, urlid = urlbasefile[:-4].split('-')
   print "HandleUrl got %s - %s" % (domainid, urlid)

   #ensure there's url data. there won't be any if there was an error.
   if not data:
      print "HandleUrl no data for file"
      sess = Session()
      sess.execute('UPDATE urls SET state = 2 WHERE urlid = :uid',{'uid':
urlid})
      sess.close()
      os.unlink(urlfile)
      return

   #strip html tags
   data = re.sub("</?[^\W].{0,10}?>", "", data)

   #check for keywords
   if 'product' in data:
      print 'HandleUrl found kw'
   else:
      print 'HandleUrl found nothing'

   #remove url data
   os.unlink(urlfile)

def FetchUrls(num_conn,data_dir,http_header):
    #setup our job\client id via uuid()
    client_id = str(uuid.uuid4())

    #get our urls from database
    queue = []

    sess = Session()
    sess.execute('update urls set state = 1, datechecked = curdate(),
client_id = \'' + client_id + '\' where state = 0 limit ' + str(db_limit))
    resultset = sess.execute('SELECT d.domainid, d.domain, urlid, https,
datechecked, url FROM urls u inner join domains d on u.domain = d.domainid
where u.client_id = \'' + client_id + '\'');

    for x, job in enumerate(list(resultset)):
        full_url = 'http://www.' + job['domain'] + '/' + job['url']
        full_url = full_url.replace('www.www.', 'www.')
        queue.append((full_url, job['urlid'],job['domainid']))

    sess.close()

    # Check args
    num_urls = len(queue)
    num_conn = min(num_conn, num_urls)

    print "PycURL %s (compiled against 0x%x)" % (pycurl.version,
pycurl.COMPILE_LIBCURL_VERSION_NUM)
    print "----- Getting", num_urls, "URLs using", num_conn, "connections
-----"

    # Pre-allocate a list of curl objects
    m = pycurl.CurlMulti()
    m.handles = []

    for i in range(num_conn):
        c = pycurl.Curl()
        c.fp = None
        c.setopt(pycurl.FOLLOWLOCATION, 1)
        c.setopt(pycurl.MAXREDIRS, 5)
        c.setopt(pycurl.CONNECTTIMEOUT, 50)
        c.setopt(pycurl.TIMEOUT, 250)
        c.setopt(pycurl.HTTPHEADER, http_header)
        m.handles.append(c)

    # Main loop

    freelist = m.handles[:]
    num_processed = 0

    while num_processed < num_urls:
        # If there is an url to process and a free curl object, add to multi
stack

        while queue and freelist:
            url, urlid,domainid = queue.pop(0)
            filename = "%s%d-%d.dat" % (data_dir, domainid,urlid)
            c = freelist.pop()
            c.fp = open(filename, "wb")
            c.setopt(pycurl.URL, url)
            #CURLOPT_RANGE?
            c.setopt(pycurl.WRITEDATA, c.fp)
            m.add_handle(c)
            # store some info
            c.url = url
            c.urlid = urlid #add our urlid
            c.domainid = domainid #add our urlid
            c.fname = filename
            # Run the internal curl state machine for the multi stack
        while 1:
            ret, num_handles = m.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break

            # Check for curl objects which have terminated, and add them to
the freelist
        while 1:
            num_q, ok_list, err_list = m.info_read()
            for c in ok_list:
                c.fp.close()
                c.fp = None
                m.remove_handle(c)
                print c.urlid, c.url, c.getinfo(pycurl.EFFECTIVE_URL)
                HandleUrl(c.fname)
                freelist.append(c)
            for c, errno, errmsg in err_list:
                c.fp.close()
                c.fp = None
                m.remove_handle(c)
                print "err", c.urlid, c.url, errno, errmsg
                HandleUrl(c.fname)
                freelist.append(c)

            num_processed = num_processed + len(ok_list) + len(err_list)
             if num_q == 0:
                break

        # Currently no more I/O is pending, could do something in the
meantime
        # (display a progress bar, etc.).
        # We just call select() to sleep until some more data is available.
        m.select(1.0)
        #print "progress: %s out of %s.." % (num_processed, num_urls)

        if (num_processed >= num_urls // 2):
            print "progress: %s out of %s.." % (num_processed, num_urls)

            #get more items
            client_id = str(uuid.uuid4())
            old_num_urls = num_urls
            old_num_processed = num_processed
            sess = Session()
            sess.execute('update urls set state = 1, datechecked =
curdate(), client_id = \'' + client_id + '\' where state = 0 limit ' +
str(db_limit))
            resultset = sess.execute('SELECT d.domainid, d.domain, urlid,
https, datechecked, url FROM urls u inner join domains d on u.domain =
d.domainid where u.client_id = \'' + client_id + '\'');
            for x, job in enumerate(list(resultset)):
                full_url = 'http://www.' + job['domain'] + '/' + job['url']
                full_url = full_url.replace('www.www.', 'www.')
                queue.append((full_url, job['urlid'],job['domainid']))

            sess.close()
            num_processed = 0
            num_urls = len(queue)
            print "progress: old num_processed: %s, new num_processed: %s,
old num_urls %s, new num_urls: %s" % (old_num_processed, num_processed,
old_num_urls, num_urls)

    # Cleanup
    for c in m.handles:
        if c.fp is not None:
            c.fp.close()
            c.fp = None
            c.close()
            m.close()

for i in range(1):
    FetchUrls(num_conn,data_dir,http_header)

_______________________________________________
http://cool.haxx.se/cgi-bin/mailman/listinfo/curl-and-python
Received on 2010-02-04