Tuesday, January 4, 2011

twitter streaming api with python urlib2

I previously posted this code for retrieving tweets from the twitter streaming api. It used the tweepy library and worked as a quick solution, but unfortunately occasional hiccups would cause the program to crash. Having to always tend to the program to make sure it was still running, restarting it when needed, etc got to be a real hastle so I decided to rewrite the code using python and urllib2.

imports and vars

First, a few dependencies. foreignwords is a simple module that contains a list of approximately 200 of the most frequent Spanish and Indonesian words that will be used to filter out non-English posts (this is a requirement of mine) and is available here.

import signal, socket, base64, sys, json, time, threading, urllib2, Queue
import foreignwords

SLEEP_TIME_INIT = 0.2
SLEEP_TIME_THROTTLE = 4.0
TIMEOUT_DURATION = 6.0
MAX_SLEEP_TIME = 240.0
WAIT_TIME = 4.0

USR = ''
PWD = ''

support functions

Next a few support functions. Occasionally the read will hang indefinitely. Since urllib2 abstracts away the actual socket I've used threads to create reads which timeout. Another possibility here would be to use signals. I also read you can modify the default socket timeout length, but this seemed like the best option. The other two functions are for stripping control characters and attempting to verify a tweet is English.

def timeout(func, args = [], duration = 2.0):
    class TimeoutThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = None
        def run(self):
            try:
                self.result = func(*args)
            except:
                self.result = None
    tothread = TimeoutThread()
    tothread.start()
    tothread.join(duration)
    if tothread.isAlive():
        return None
    else:
        return tothread.result

def cleanse(text):
    return text.replace('\n', ' ').replace('\r', ' ')

def probably_english(text):
    text = text.lower()
    for c in text:
        o = ord(c)
        if o < ord(' ') or o > ord('~'):
            return False
    for word in foreignwords.words:
        if word in text:
            return False
    return True

producer

I've used a producer consumer model for retrieving and handling the tweets. The producer makes a httprequest and then reads tweets and puts them into a queue. If an error occurs while making the connection the thread will sleep for an amount of time, which increases on subsequent errors, and then try and open a new connection.

def producer(request, q, run_flag):
    time_to_sleep = SLEEP_TIME_INIT
    count = 0
    while len(run_flag) == 0:
        try:
            print 'PRODUCER > making request'
            f = urllib2.urlopen(request)
            print 'PRODUCER > request done, reading from socket'
            line = timeout(f.readline, duration = TIMEOUT_DURATION)
            while line:
                if len(run_flag) == 0:
                    q.put(line)
                    line = timeout(f.readline, duration = TIMEOUT_DURATION)
                else:
                    line = None
                    f.close()
                    return
            time_to_sleep = SLEEP_TIME_INIT
        except (urllib2.URLError, urllib2.HTTPError), e:
            print >> sys.stderr, 'PRODUCER > Exception: %s, retry in %f' %(e, time_to_sleep)
            time.sleep(time_to_sleep)
            time_to_sleep *= SLEEP_TIME_THROTTLE
            if time_to_sleep > MAX_SLEEP_TIME:
                time_to_sleep = MAX_SLEEP_TIME
        except Exception as err:
            print >> sys.stderr, 'PRODUCER > Exception: %s' %(err)

consumer

The consumer reads tweets from the queue and uses the json library to get the actual text. The tweet is then processed and written to a file.

def consumer(q, fout, run_flag):
    buff = ''
    print 'CONSUMER > starting'
    while len(run_flag) == 0:
        try:
            if not q.empty():
                while not q.empty():
                    item = q.get()
                    buff += item
                    if item.endswith('\r\n') and item.strip():
                        json_result = json.loads(buff)
                        buff = ''
                        try:
                            lang = json_result['user']['lang']
                            text = json_result['text']
                            if lang == 'en':                    # only english (not sufficient)
                                if not text.startswith('RT'):   # no retweets
                                    text = cleanse(text)        # strip '\n' & '\r'
                                    if probably_english(text):  # foreign words and ascii val
                                        try:
                                            print >> fout, text
                                        except UnicodeEncodeError as err:
                                            print >> sys.stderr, 'CONSUMER > Exception: %s' %(err)
                        except KeyError as err:
                            pass
        except Exception as err:
            print >> sys.stderr, 'Consumer > Exception: %s' %(err)
        time.sleep(WAIT_TIME)

main

Main does all the initialization of the urllib2 request and starts the threads. It then waits for a SIGINT (control c) and shuts down the threads.

def main():
    q = Queue.Queue()
    run_flag = []
    #log = open('fetch.log', 'w')
    #sys.stderr = log
    #sys.stdout = log
    fout = open('tweets.txt', 'w')
    print 'MAIN > starting'
    auth = base64.encodestring('%s:%s' %(USR, PWD))[:-1]
    authheader = 'Basic %s' %(auth)
    twitter_req = urllib2.Request('http://stream.twitter.com/1/statuses/sample.json')
    twitter_req.add_header('Authorization', authheader)
    consumer_thread = threading.Thread(target = consumer, args = (q, fout, run_flag))
    try:
        consumer_thread.start()
        producer(twitter_req, q, run_flag)
        # should never get here
        run_flag.append(False)
        consumer_thread.join()
    except KeyboardInterrupt:
        run_flag.append(False)
        consumer_thread.join()
        fout.close()
        log.close()
        sys.exit()

if __name__ == '__main__':
    USR = sys.argv[1] #username
    PWD = sys.argv[2] #password
    main()

usage

Full code is available here and can be run using
$python filename.py <username> <password>
or if running in the background uncomment the redirection of stdout and stderr in the main function and start with nohup
$nohup python filename.py <username> <password> &

3 comments:

  1. I've also found a lot of dutch posts, and found a dutch word list to filter these out as well. So much for lang=en ... I'm wondering if portuguese is also a concern.

    ReplyDelete
  2. In the tweetstream community (http://pypi.python.org/pypi/tweetstream) the buffering problem of the readline function on the urllib2 stream was solved with the statements:

    import socket
    socket._fileobject.default_bufsize = 0

    However, it is not part of the main tweetstream package, but only mentioned in one of the forks:

    https://bitbucket.org/andrewclegg/tweetstream/compare/..runeh/tweetstream

    ReplyDelete
  3. I am constantly getting the 'Connection Refused' and 'Connection Timeout' messages. Before running the script I made a few requests directly through the browser. Are these errors because I have made too many attempts or am I missing something here?

    ReplyDelete