I previously posted this code for retrieving tweets from the twitter streaming api. It used the tweepy library and worked as a quick solution, but unfortunately occasional hiccups would cause the program to crash. Having to always tend to the program to make sure it was still running, restarting it when needed, etc got to be a real hastle so I decided to rewrite the code using python and urllib2.
imports and vars
First, a few dependencies. foreignwords is a simple module that contains a list of approximately 200 of the most frequent Spanish and Indonesian words that will be used to filter out non-English posts (this is a requirement of mine) and is available here.import signal, socket, base64, sys, json, time, threading, urllib2, Queue import foreignwords SLEEP_TIME_INIT = 0.2 SLEEP_TIME_THROTTLE = 4.0 TIMEOUT_DURATION = 6.0 MAX_SLEEP_TIME = 240.0 WAIT_TIME = 4.0 USR = '' PWD = ''
support functions
Next a few support functions. Occasionally the read will hang indefinitely. Since urllib2 abstracts away the actual socket I've used threads to create reads which timeout. Another possibility here would be to use signals. I also read you can modify the default socket timeout length, but this seemed like the best option. The other two functions are for stripping control characters and attempting to verify a tweet is English.
def timeout(func, args = [], duration = 2.0):
class TimeoutThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = None
def run(self):
try:
self.result = func(*args)
except:
self.result = None
tothread = TimeoutThread()
tothread.start()
tothread.join(duration)
if tothread.isAlive():
return None
else:
return tothread.result
def cleanse(text):
return text.replace('\n', ' ').replace('\r', ' ')
def probably_english(text):
text = text.lower()
for c in text:
o = ord(c)
if o < ord(' ') or o > ord('~'):
return False
for word in foreignwords.words:
if word in text:
return False
return True
producer
I've used a producer consumer model for retrieving and handling the tweets. The producer makes a httprequest and then reads tweets and puts them into a queue. If an error occurs while making the connection the thread will sleep for an amount of time, which increases on subsequent errors, and then try and open a new connection.
def producer(request, q, run_flag):
time_to_sleep = SLEEP_TIME_INIT
count = 0
while len(run_flag) == 0:
try:
print 'PRODUCER > making request'
f = urllib2.urlopen(request)
print 'PRODUCER > request done, reading from socket'
line = timeout(f.readline, duration = TIMEOUT_DURATION)
while line:
if len(run_flag) == 0:
q.put(line)
line = timeout(f.readline, duration = TIMEOUT_DURATION)
else:
line = None
f.close()
return
time_to_sleep = SLEEP_TIME_INIT
except (urllib2.URLError, urllib2.HTTPError), e:
print >> sys.stderr, 'PRODUCER > Exception: %s, retry in %f' %(e, time_to_sleep)
time.sleep(time_to_sleep)
time_to_sleep *= SLEEP_TIME_THROTTLE
if time_to_sleep > MAX_SLEEP_TIME:
time_to_sleep = MAX_SLEEP_TIME
except Exception as err:
print >> sys.stderr, 'PRODUCER > Exception: %s' %(err)
consumer
The consumer reads tweets from the queue and uses the json library to get the actual text. The tweet is then processed and written to a file.
def consumer(q, fout, run_flag):
buff = ''
print 'CONSUMER > starting'
while len(run_flag) == 0:
try:
if not q.empty():
while not q.empty():
item = q.get()
buff += item
if item.endswith('\r\n') and item.strip():
json_result = json.loads(buff)
buff = ''
try:
lang = json_result['user']['lang']
text = json_result['text']
if lang == 'en': # only english (not sufficient)
if not text.startswith('RT'): # no retweets
text = cleanse(text) # strip '\n' & '\r'
if probably_english(text): # foreign words and ascii val
try:
print >> fout, text
except UnicodeEncodeError as err:
print >> sys.stderr, 'CONSUMER > Exception: %s' %(err)
except KeyError as err:
pass
except Exception as err:
print >> sys.stderr, 'Consumer > Exception: %s' %(err)
time.sleep(WAIT_TIME)
main
Main does all the initialization of the urllib2 request and starts the threads. It then waits for a SIGINT (control c) and shuts down the threads.
def main():
q = Queue.Queue()
run_flag = []
#log = open('fetch.log', 'w')
#sys.stderr = log
#sys.stdout = log
fout = open('tweets.txt', 'w')
print 'MAIN > starting'
auth = base64.encodestring('%s:%s' %(USR, PWD))[:-1]
authheader = 'Basic %s' %(auth)
twitter_req = urllib2.Request('http://stream.twitter.com/1/statuses/sample.json')
twitter_req.add_header('Authorization', authheader)
consumer_thread = threading.Thread(target = consumer, args = (q, fout, run_flag))
try:
consumer_thread.start()
producer(twitter_req, q, run_flag)
# should never get here
run_flag.append(False)
consumer_thread.join()
except KeyboardInterrupt:
run_flag.append(False)
consumer_thread.join()
fout.close()
log.close()
sys.exit()
if __name__ == '__main__':
USR = sys.argv[1] #username
PWD = sys.argv[2] #password
main()
usage
Full code is available here and can be run using$python filename.py <username> <password>or if running in the background uncomment the redirection of stdout and stderr in the main function and start with nohup
$nohup python filename.py <username> <password> &



I've also found a lot of dutch posts, and found a dutch word list to filter these out as well. So much for lang=en ... I'm wondering if portuguese is also a concern.
ReplyDeleteIn the tweetstream community (http://pypi.python.org/pypi/tweetstream) the buffering problem of the readline function on the urllib2 stream was solved with the statements:
ReplyDeleteimport socket
socket._fileobject.default_bufsize = 0
However, it is not part of the main tweetstream package, but only mentioned in one of the forks:
https://bitbucket.org/andrewclegg/tweetstream/compare/..runeh/tweetstream
I am constantly getting the 'Connection Refused' and 'Connection Timeout' messages. Before running the script I made a few requests directly through the browser. Are these errors because I have made too many attempts or am I missing something here?
ReplyDelete