forked from karpathy/arxiv-sanity-preserver
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathtwitter_daemon.py
270 lines (214 loc) · 9.02 KB
/
twitter_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""
Periodically checks Twitter for tweets about arxiv papers we recognize
and logs the tweets into mongodb database "arxiv", under "tweets" collection.
"""
import json
import logging
import os
import re
from collections import defaultdict
from time import sleep
import pytz
import math
import datetime
import tweepy
import pymongo
from logger import logger_config
from utils import Config, catch_exceptions
# settings
# -----------------------------------------------------------------------------
sleep_time = 60*15 # in seconds, between twitter API calls. Default rate limit is 180 per 15 minutes
max_tweet_records = 15
logger_config(info_filename='twitter_daemon.log')
logger = logging.getLogger(__name__)
USERS_FILENAME = 'twitter_users.json'
# convenience functions
# -----------------------------------------------------------------------------
def get_api_connector(consumer_key, consumer_secret):
auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
return tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
def get_keys():
lines = open('twitter.txt', 'r').read().splitlines()
return lines
def extract_arxiv_pids(r):
pids = []
for u in r.entities['urls']:
m = re.search('arxiv.org/abs/([0-9]+\.[0-9]+)', u['expanded_url'])
if m:
rawid = m.group(1)
pids.append(rawid)
return pids
def get_latest_or_loop(q):
results = None
while results is None:
try:
results = api.search(q=q, count=100, result_type="mixed", tweet_mode='extended')
except Exception as e:
logger.info('there was some problem (waiting some time and trying again):')
logger.error(e)
logger.info('Fetched results')
return results
epochd = datetime.datetime(1970,1,1,tzinfo=pytz.utc) # time of epoch
def get_age_decay(age):
"""
Calc Gauss decay factor - based on elastic search decay function
:param age: age in hours
:return: decay factor
"""
SCALE = 7 # The distance from origin at which the computed factor will equal decay parameter
DECAY = 0.5 # Defines the score at scale compared to zero (better to update only the scale and keep it fixed
OFFSET = 2 # the decay function will only compute the decay function for post with a distance greater
TIME_FACTOR = 0.75 # Reduce the decay over time by taking the TIME FACTOR power of the time value
if age <= OFFSET:
return 1
gamma = math.log(DECAY) / SCALE
return math.exp(gamma * (age ** TIME_FACTOR))
def calc_papers_twitter_score(papers_to_update):
papers_to_update = list(set(papers_to_update))
papers_tweets = list(db_tweets.find({'pids': {'$in': papers_to_update}}))
score_per_paper = defaultdict(int)
links_per_paper = defaultdict(list)
for t in papers_tweets:
followers_score = max(math.log10(t['user_followers_count'] + 1), 1)
tot_score = (t['likes'] + 2 * t['retweets']) * (t.get('replies', 0) * 4 + 0.5) / followers_score
for cur_p in t['pids']:
score_per_paper[cur_p] += tot_score
links_per_paper[cur_p].append({'tname': t['user_screen_name'], 'tid': t['_id'], 'rt': t['retweets'],
'name': t.get('user_name', t['user_screen_name']), 'likes': t['likes'],
'replies': t.get('replies', 0)})
return score_per_paper, links_per_paper
def summarize_tweets(papers_to_update):
score_per_paper, links_per_paper = calc_papers_twitter_score(papers_to_update)
dnow_utc = datetime.datetime.now()
dminus = dnow_utc - datetime.timedelta(days=30)
all_papers = list(db_papers.find({'$or': [{'time_published': {'$gt': dminus}}, {'_id': {'$in': papers_to_update}}]}))
for cur_p in all_papers:
logger.info(f'Updating paper {cur_p["_id"]}')
new_p_score = score_per_paper.get(cur_p['_id'], 0)
old_p_score = cur_p.get('twitter_score', 0)
twitter_score = max(new_p_score, old_p_score)
if twitter_score > 0:
age_days = (dnow_utc - cur_p['time_published']).total_seconds() / 86400.0
twitter_score_decayed = twitter_score * get_age_decay(age_days)
data = {'twtr_score': twitter_score, 'twtr_score_dec': twitter_score_decayed}
if cur_p['_id'] in links_per_paper:
data['twtr_links'] = links_per_paper[cur_p['_id']]
db_papers.update({'_id': cur_p['_id']}, {'$set': data}, True)
def get_banned():
banned = {}
if os.path.isfile(Config.banned_path):
with open(Config.banned_path, 'r') as f:
lines = f.read().split('\n')
for l in lines:
if l: banned[l] = 1 # mark banned
print('banning users:', list(banned.keys()))
return banned
def fetch_twitter_users(usernames):
logger.info('Fetching tweets from users list')
tweets = []
for idx, u in enumerate(usernames):
try:
tweets += api.user_timeline(screen_name=u['screen_name'], count=100, tweet_mode='extended')
# if idx > 3:
# break
except Exception as e:
logger.error(f'Failed to fetch tweets from {u["screen_name"]}')
sleep(1)
logger.info('Finished fetching tweets from users list')
return tweets
def fetch_tweets():
logger.info('Fetching tweets')
# fetch the latest mentioning arxiv.org
results = get_latest_or_loop('arxiv.org')
if os.path.isfile(USERS_FILENAME):
usernames = json.load(open(USERS_FILENAME, 'r'))
results += fetch_twitter_users(usernames)
else:
logger.warning('Users file is missing')
return results
def tweet_to_dict(r, arxiv_pids, dnow_utc, num_replies):
d = r.created_at.replace(tzinfo=pytz.UTC) # datetime instance
tweet = {}
tweet['_id'] = r.id_str
tweet['pids'] = arxiv_pids # arxiv paper ids mentioned in this tweet
tweet['inserted_at_date'] = dnow_utc
tweet['created_at_date'] = d
tweet['created_at_time'] = (d - epochd).total_seconds() # seconds since epoch
tweet['lang'] = r.lang
tweet['text'] = r.full_text
tweet['retweets'] = r.retweet_count
tweet['likes'] = r.favorite_count
tweet['replies'] = num_replies
tweet['user_screen_name'] = r.author.screen_name
tweet['user_name'] = r.author.name
tweet['user_followers_count'] = r.author.followers_count
tweet['user_following_count'] = r.author.friends_count
return tweet
def is_tweet_new(tweet_id_q):
if db_tweets.find_one(tweet_id_q):
is_new = False
else:
is_new = True
return is_new
def find_num_replies(t):
try:
replies = api.search(q=f'to:{t.author.screen_name}', since_id=t.id_str, count=100)
filter_func = lambda x: x.in_reply_to_status_id_str == t.id_str and x.author.screen_name != t.author.screen_name
rel_replies = list(filter(filter_func, replies))
return len(rel_replies)
except Exception as e:
logger.error(f'Failed to fetch replies for tweet - {t.id_str} - {e}')
return 0
def get_pids_in_db(arxiv_pids):
papers_in_db = list(db_papers.find({'_id': {'$in': arxiv_pids}}, {'_id': 1}))
return [x['_id'] for x in papers_in_db]
def process_tweets(tweets_raw_data):
logger.info('Process tweets')
dnow_utc = datetime.datetime.now(datetime.timezone.utc)
banned = get_banned()
to_insert = []
papers_to_update = []
unique_tweet_ids = set()
for r in tweets_raw_data:
if hasattr(r, 'retweeted_status'):
# logger.info('Tweet is a retweet')
r = r.retweeted_status
if r.id_str in unique_tweet_ids: continue
arxiv_pids = extract_arxiv_pids(r)
if not arxiv_pids : continue
arxiv_pids = get_pids_in_db(arxiv_pids)
if not arxiv_pids:
logger.info(f'Arxiv pids are not in DB - tweet {r.id_str}')
continue
if r.author.screen_name in banned: continue
papers_to_update += arxiv_pids
num_replies = find_num_replies(r)
tweet = tweet_to_dict(r, arxiv_pids, dnow_utc, num_replies)
tweet_id_q = {'_id': r.id_str}
if is_tweet_new(tweet_id_q):
to_insert.append(tweet)
else:
db_tweets.update(tweet_id_q, {'$set': tweet}, True)
unique_tweet_ids.add(r.id_str)
logger.info(f'Found tweet for {arxiv_pids} with {tweet["likes"]} likes')
if to_insert:
db_tweets.insert_many(to_insert)
logger.info('processed %d/%d new tweets. Currently maintaining total %d' % (len(to_insert), len(tweets_raw_data), db_tweets.count()))
return papers_to_update
@catch_exceptions(logger=logger)
def main_twitter_fetcher():
tweets = fetch_tweets()
papers_to_update = process_tweets(tweets)
summarize_tweets(papers_to_update)
# -----------------------------------------------------------------------------
# authenticate to twitter API
keys = get_keys()
api = get_api_connector(keys[0], keys[1])
# connect to mongodb instance
client = pymongo.MongoClient()
mdb = client.arxiv
db_tweets = mdb.tweets # the "tweets" collection in "arxiv" database
db_papers = mdb.papers
# main loop
if __name__ == '__main__':
main_twitter_fetcher()