import datetime as dt import json from NewsSocialSignaling import Config, Helper, Database import NewsSocialSignaling import sys import os import tweepy print(tweepy.__version__) import pickle import time import numpy as np import sys if sys.platform == 'linux': wait_on_rate_limit = False else: wait_on_rate_limit = False class TwitterApi: def __init__(self): # set up tweepy self.keys=Helper.load_keys() self.keys=dict((el0 + el1, self.keys['twitter_credentials'][el0][el1]) for el0 in self.keys['twitter_credentials'] for el1 in self.keys['twitter_credentials'][el0]) self.apis=dict((el, tweepy.Client(bearer_token=self.keys[el]['BEARER_TOKEN'],return_type=dict, wait_on_rate_limit= wait_on_rate_limit)) for el in self.keys) self.last_pings=dict((el, dt.datetime(2000, 1, 1)) for el in self.apis) self.user_fields=['created_at', 'description', 'entities', 'id', 'location', 'name', 'pinned_tweet_id', 'profile_image_url', 'protected', 'public_metrics', 'url', 'username', 'verified', 'verified_type', 'withheld'] def next_api(self): earliest_ping = dt.datetime(2100, 1, 1) api2return = None api_names = [el for el in self.apis] np.random.shuffle(api_names) for api_name in api_names: if self.last_pings[api_name] < earliest_ping: api2return = api_name earliest_ping = self.last_pings[api_name] self.last_pings[api2return] = dt.datetime.now() print('Pinging api {0}'.format(api2return)) return self.apis[api2return] def get_user(self, get_friends, resample, get_followers=None, extra_data=None, screen_name=None, uid=None, print_error=True, return_errors=False, max_pages_network=None): if not isinstance(screen_name, type(None)): screen_name = screen_name.lower() existing_user = Database.get_user(sn=screen_name) else: assert not isinstance(uid, type(None)) existing_user = Database.get_user(uid=str(uid)) if isinstance(existing_user, type(None)) or resample or isinstance(existing_user['friends'], type(None)) or len(existing_user['friends']) == 0: try: if not isinstance(screen_name, type(None)): print('Fetching user {0}'.format(screen_name)) api_results = self.next_api().get_user(username=screen_name,user_fields=self.user_fields, expansions=['pinned_tweet_id']) if not isinstance(api_results.get('errors'), type(None)) and api_results.get('errors')[0]['title'] == 'Not Found Error': #note: get_user raise an error when using "id" but not when using "username" raise ValueError(api_results.get('errors')[0]['detail']) else: print('Fetching user {0}'.format(uid)) api_results = self.next_api().get_user(id=str(uid),user_fields=self.user_fields, expansions=['pinned_tweet_id']) except Exception as e: if isinstance(e, (tweepy.TweepyException, tweepy.TwitterServerError, tweepy.BadRequest, tweepy.NotFound, tweepy.Forbidden, tweepy.HTTPException, tweepy.TooManyRequests, tweepy.Unauthorized)) or ((isinstance(e,ValueError) and ('could not find user with username' in str(e).lower()))): if print_error: print('API Error: {0}'.format(e)) if return_errors: return e else: return None else: raise e friends = None followers = None uid=api_results['data']['id'] if get_friends and not api_results['data']['protected']: friends = self.get_friends(uid=str(uid), max_pages_network=max_pages_network, peers_count=api_results['data']['public_metrics']['following_count']) if ((get_friends and isinstance(get_followers, type(None))) or get_followers) and not api_results['data']['protected']: followers = self.get_followers(uid=str(uid), max_pages_network=max_pages_network, peers_count=api_results['data']['public_metrics']['followers_count']) Database.add_user(user=api_results, friends=friends, followers=followers, extra_data=extra_data) if not isinstance(screen_name, type(None)): return Database.get_user(sn=screen_name) else: return Database.get_user(uid=str(uid)) def get_peers(self, uid, peer_type, max_pages_network=None, peers_count=None): if not peer_type in ('following','followers'): raise ValueError("Invalid peer_type input. Expected 'following' or 'followers'.") if isinstance(max_pages_network,type(None)): max_pages_network=1000 print('max_pages_network not specified, retrieved up to 1000 pages') elif not (type(max_pages_network) in (int, float)): raise ValueError("Invalid max_pages_network input. Expected 'None' or an integer.") output=dict() print('Fetching friends of {0}'.format(uid)) st = dt.datetime.now() try: page_ix = 0 api = self.next_api() for page in tweepy.Paginator(eval("api.get_users_"+peer_type), id=str(uid), max_results=1000, limit=max_pages_network, user_fields=self.user_fields, expansions=['pinned_tweet_id']): print('Page {0}'.format(page_ix)) try: if page_ix==0: output=page else: output=Helper.merge_pagination(master=output,using=page,page_ix=page_ix) if (type(peers_count) in (int, float)) and min(1000*max_pages_network,peers_count) > 4000: print('Scraping {0} for {1}: {2}/{3} {4:.3f}% ({5:.2f} minutes)'.format(peer_type, uid, len(output['data']), min(1000*max_pages_network,peers_count), 100*len(output['data'])/min(1000*max_pages_network,peers_count), (dt.datetime.now()-st).total_seconds()/60), end='\n') except ((tweepy.TweepyException, tweepy.TwitterServerError, tweepy.BadRequest,tweepy.NotFound, tweepy.Forbidden, tweepy.HTTPException,tweepy.TooManyRequests, tweepy.Unauthorized)) as e: print("Going to sleep:", e) time.sleep(10) page_ix += 1 except Exception as e: raise e return output def get_friends(self, uid, max_pages_network=None, peers_count=None): return self.get_peers(uid=uid,peer_type='following', max_pages_network=max_pages_network, peers_count=peers_count) def get_followers(self, uid, max_pages_network=None, peers_count=None): return self.get_peers(uid=uid, peer_type='followers', max_pages_network=max_pages_network, peers_count=peers_count) def search_user(self, q): results = self.next_api().search_users(q) return results def get_user_tweets(self, uid): tweets = self.next_api().user_timeline( user_id=uid, count=3200, trim_user=True, exclude_replies=False, include_rts=True ) tweets = dict((el.id, el) for el in tweets) return tweets def get_liked_tweets(self, uid): liked_tweets = self.next_api().favorites(user_id=uid, count=200, include_entities=True) return dict((el.id, el) for el in liked_tweets) if __name__ == '__main__': #uns = ['voxdotcom', 'alex_moehring', 'candresmolina', 'karlrove'] uns = ['candresmolina'] ta = TwitterApi() for un in uns: print(un) user = ta.get_user(screen_name=un, get_friends=True, resample=True, max_pages_network=1) NewsSocialSignaling.TwitterProcessor.calculate_user_slant(user) x = 1 #for i in user: # print(i)