Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
578 views
in Technique[技术] by (71.8m points)

multithreading - Learning python and threading. I think my code runs infinitely. Help me find bugs?

So I've started learning python now, and I absolutely am in love with it.

I'm building a small scale facebook data scraper. Basically, it will use the Graph API and scrape the first names of the specified number of users. It works fine in a single thread (or no thread I guess).

I used online tutorials to come up with the following multithreaded version (updated code):

import requests
import json
import time
import threading
import Queue

GraphURL = 'http://graph.facebook.com/'
first_names = {} # will store first names and their counts
queue = Queue.Queue()

def getOneUser(url):
    http_response = requests.get(url) # open the request URL
    if http_response.status_code == 200:
        data = http_response.text.encode('utf-8', 'ignore') # Get the text of response, and encode it
        json_obj = json.loads(data) # load it as a json object
        # name = json_obj['name']
        return json_obj['first_name']
        # last = json_obj['last_name']
    return None

class ThreadGet(threading.Thread):
    """ Threaded name scraper """
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            #print 'thread started
'
            url = GraphURL + str(self.queue.get())
            first = getOneUser(url) # get one user's first name
            if first is not None:
                if first_names.has_key(first): # if name has been encountered before
                    first_names[first] = first_names[first] + 1 # increment the count
                else:
                    first_names[first] = 1 # add the new name
            self.queue.task_done()
            #print 'thread ended
'

def main():
    start = time.time()
    for i in range(6):
        t = ThreadGet(queue)
        t.setDaemon(True)
        t.start()

    for i in range(100):
        queue.put(i)

    queue.join()

    for name in first_names.keys():
        print name + ': ' + str(first_names[name])

    print '----------------------------------------------------------------'
    print '================================================================'
    # Print top first names
    for key in first_names.keys():
        if first_names[key] > 2:
            print key + ': ' + str(first_names[key])

    print 'It took ' + str(time.time()-start) + 's'

main()

To be honest, I don't understand some of the parts of the code but I get the main idea. The output is nothing. I mean the shell has nothing in it, so I believe it keeps on running.

So what I am doing is filling queue with integers that are the user id's on fb. Then each ID is used to build the api call URL. getOneUser returns the name of one user at a time. That task (ID) is marked as 'done' and it moves on.

What is wrong with the code above?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Your usage of first_names is not thread-safe. You could add a lock to protect the increment. Otherwise the code should work. You might be hitting some facebook api limit i.e., you should limit your request rate.

You could simplify your code by using a thread pool and counting the names in the main thread:

#!/usr/bin/env python
import json
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads

def get_name(url):
    try:
        return json.load(urllib2.urlopen(url))['first_name']
    except Exception:
        return None # error

urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

To see what errors you get, you could add logging:

#!/usr/bin/env python
import json
import logging
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")

def get_name(url):
    try:
        name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name

urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

A simple way to limit number of requests per given time period is to use a semaphore:

#!/usr/bin/env python
import json
import logging
import time
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads
from threading import _BoundedSemaphore as BoundedSemaphore, Timer

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")

class RatedSemaphore(BoundedSemaphore):
    """Limit to 1 request per `period / value` seconds (over long run)."""
    def __init__(self, value=1, period=1):
        BoundedSemaphore.__init__(self, value)
        t = Timer(period, self._add_token_loop,
                  kwargs=dict(time_delta=float(period) / value))
        t.daemon = True
        t.start()

    def _add_token_loop(self, time_delta):
        """Add token every time_delta seconds."""
        while True:
            try:
                BoundedSemaphore.release(self)
            except ValueError: # ignore if already max possible value
                pass
            time.sleep(time_delta) # ignore EINTR

    def release(self):
        pass # do nothing (only time-based release() is allowed)

def get_name(gid, rate_limit=RatedSemaphore(value=100, period=600)):
    url = 'http://graph.facebook.com/%d' % gid
    try:
        with rate_limit:
            name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name

p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, xrange(200)))
print first_names.most_common()

After the initial burst, it should make a single request every 6 seconds.

Consider using batch requests.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...