import json import os import pickle import tqdm import NewsSocialSignaling from NewsSocialSignaling import Config import pandas as pd import numpy as np import implicit import scipy.sparse import matplotlib import surprise import textwrap import urllib import time import copy import datetime as dt import tqdm matplotlib.use('Agg') import matplotlib.pyplot as plt from matplotlib import colors as mcolors from PIL import Image, ImageDraw plt.rcdefaults() # plt.rcParams['font.family'] = 'fantasy' class TwitterProcessor: def __init__(self): # self.publisher_slant_500 = pd.read_csv(Config.fn_publisher_slant_500, sep=',') # self.publisher_slant_500.twitter_id = pd.to_numeric(self.publisher_slant_500.twitter_id) self.publisher_slant = pd.read_csv(Config.fn_publisher_slant, sep='\t') self.publisher_slant.index = self.publisher_slant.twitter_id self.publisher_ids = set(self.publisher_slant.twitter_id.tolist()) self.publisher_recommendations = self.publisher_slant.loc[self.publisher_slant.recommended_publisher].copy() self.recommender_model = None self.recommender_pub2pubcat = None self.recommender_pubcat2pub = None self.recommender_explicit = None self.recommender_explicit_data = None self.load_recommender() # read in random sample self.random_sample_df = pd.read_csv(os.path.join(os.path.dirname(__file__), 'random_sample_scores.txt'), sep='\t') # assert pd.isna(self.random_sample_df.raw_slant).sum() == 0 # assert pd.isna(self.random_sample_df.raw_hard).sum() == 0 self.pub_data = None self.pub2pubid = dict((row.single_twitter_handle.lower(), row.twitter_id) for _, row in self.publisher_slant.iterrows()) # This code is duplicated in _ingest_publisher_data. It is b/c of a bug in otree # where the package is not loaded properly and _ingest_publisher_data is not called. fn = os.path.join(os.path.dirname(__file__), 'publisher_recommendation_data.p') if os.path.exists(fn): with open(fn, 'rb') as f: self.pub_data = pickle.load(f) def load_recommender(self): mod_fn = os.path.join(os.path.dirname(__file__), 'recommender_model.p') print(mod_fn) print(os.path.exists(mod_fn)) with open(mod_fn, 'rb') as f: model_dic = pickle.load(f) self.recommender_model = model_dic['model'] self.recommender_pub2pubcat = model_dic['pub2pubcat'] self.recommender_pubcat2pub = model_dic['pubcat2pub'] # mod_fn = os.path.join(os.path.dirname(__file__), 'recommender_model_explicit.p') # with open(mod_fn, 'rb') as f: # model_dic = pickle.load(f) # self.recommender_explicit = model_dic['model'] # self.recommender_explicit_data = model_dic['training_data'] def _ingest_publisher_data(self): pub_data = {} fn = os.path.join(os.path.dirname(__file__), 'publisher_recommendation_data.p') if os.path.exists(fn): with open(fn, 'rb') as f: pub_data = pickle.load(f) print('Ingesting publisher recommendation data.') time.sleep(0.1) if not os.path.exists('C:/users/moehring'): return for th in tqdm.tqdm(self.publisher_recommendations.single_twitter_handle.unique()): if th.lower() in pub_data: continue u = NewsSocialSignaling.TwitterAPI.get_user(screen_name=th, get_friends=False, resample=False) img_url = u['most_recent'].profile_image_url_https img_url = img_url.replace('_normal', '') try: f = urllib.request.urlopen(img_url) img = Image.open(f) except Exception as e: print('Error in {0}'.format(u['screen_name'])) continue # crop image width, height = img.size x = (width - height) // 2 img_cropped = img.crop((x, 0, x + height, height)) # create grayscale image with white circle (255) on black background (0) mask = Image.new('L', img_cropped.size) mask_draw = ImageDraw.Draw(mask) width, height = img_cropped.size mask_draw.ellipse((0, 0, width, height), fill=255) # mask.show() # add mask as alpha channel img_cropped.putalpha(mask) img_cropped = img_cropped.convert('RGB') img_fn = 'C:/users/moehring/git/NewsSocialSignaling/NewsSocialSignalingExperiment/_static/pub_figs/' + u['screen_name'] + '.jpg' # don't use os.path.join to avoid using \\ in windows img_cropped.save(img_fn) row = self.publisher_slant.loc[self.publisher_slant.single_twitter_handle == th] assert len(row) == 1, row.T pub_data[th.lower()] = { 'name': u['most_recent'].name, 'screen_name': th, 'id': u['id'], 'description': u['most_recent'].description, 'img_url': img_url, 'img_path': img_fn.split('static/')[-1], 'slant': row.slant.iloc[0], 'hard': row.hard_score.iloc[0], 'num_followers': row.num_followers.iloc[0] } with open(fn, 'wb') as f: pickle.dump(pub_data, f) assert not isinstance(pub_data, type(None)) self.pub_data = pub_data def calculate_user_slant(self, user, normalize=True): accounts_following = self.accounts_following(user) raw_slant = self._calculate_user_slant_raw(accounts_following=accounts_following) if pd.isna(raw_slant): if normalize: return 0.5 else: return 0 if normalize: relative_slant = pd.Series(self.random_sample_df.raw_slant < raw_slant).sum() / ( ~pd.isna(self.random_sample_df.raw_slant)).sum() assert -0.000001 <= relative_slant <= 1.0000001 return relative_slant else: # rescale raw_slant raw_slant = raw_slant / 0.6 raw_slant = max(raw_slant, -1) raw_slant = min(raw_slant, 1) return raw_slant def tmp_calculate_user_slant(self, raw_slant, normalize=True): if pd.isna(raw_slant): if normalize: return 0.5 else: return 0 if normalize: relative_slant = pd.Series(self.random_sample_df.raw_slant < raw_slant).sum() / ( ~pd.isna(self.random_sample_df.raw_slant)).sum() assert -0.000001 <= relative_slant <= 1.0000001 return relative_slant else: # rescale raw_slant raw_slant = raw_slant / 0.6 raw_slant = max(raw_slant, -1) raw_slant = min(raw_slant, 1) return raw_slant def _calculate_user_slant_raw(self, accounts_following): if len(accounts_following) == 0: return np.nan slant = self.publisher_slant.loc[accounts_following].slant.mean() return slant def calculate_user_hard(self, user): raw_hard = self._calculate_user_hard_raw(user=user) relative = pd.Series(self.random_sample_df.raw_hard < raw_hard).sum() / (~pd.isna(self.random_sample_df.raw_hard)).sum() assert -0.000001 <= relative <= 1.0000001 return relative def _calculate_user_hard_raw(self, user): accounts_following = self.accounts_following(user) return len(accounts_following) # if len(accounts_following) == 0: # return np.nan # score = self.publisher_slant.loc[accounts_following].hard_score.mean() # return score def accounts_following(self, user): friends = [int(el['id']) for el in user['most_recent_friends']['data']] accounts_following = list(np.unique([el for el in friends if el in self.publisher_ids])) return accounts_following def screen_names_following(self, user): afs = self.accounts_following(user) sns = [] for el in afs: tmp = self.publisher_slant.loc[self.publisher_slant.twitter_id == el].single_twitter_handle.iloc[0] sns.append(tmp) return list(np.unique(sns)) def num_accounts_following(self, user): return len(self.accounts_following(user)) def num_accounts_tweeted(self, user): tweets = user['tweets'] retweets = [el for el in tweets if hasattr(tweets[el], 'retweeted_status')] accounts_retweeted = list(np.unique([tweets[el].retweeted_status.author.id for el in retweets])) accounts_retweeted = [el for el in accounts_retweeted if el in self.publisher_ids] return len(accounts_retweeted) @staticmethod def slant_text(slant): # slant_cutoffs = { # 0: ('is heavily Democratic', 'blue'), # 0.25: ('leans Democratic', 'blue'), # 0.4: ('is neutral', 'purple'), # 0.6: ('leans Republican', 'red'), # 0.75: ('is heavily Republican', 'red'), # 1000: ('is heavily Republican', 'red') # } slant_cutoffs = { -10: ('is heavily Democratic', 'blue'), -0.35: ('leans Democratic', 'blue'), -0.15: ('is neutral', 'purple'), 0.15: ('leans Republican', 'red'), 0.35: ('is heavily Republican', 'red'), 1000: ('is heavily Republican', 'red') } slant_text = '' last_c = 0 for c in slant_cutoffs: if slant < c: slant_text = slant_cutoffs[last_c][0] break last_c = c slant_text = slant_text.replace(' ', '\\ ') return slant_text @staticmethod def hard_text(hard): hard_cutoffs = { 0: 'Light news diet', 0.25: 'Light news diet', 0.4: 'Average news diet', 0.6: 'Heavy news diet', 0.75: 'Heavy news diet', 1000: 'Heavy news diet' } hard_text = '' last_c = 0 for c in hard_cutoffs: if hard < c: hard_text = hard_cutoffs[last_c] break last_c = c hard_text = hard_text return hard_text @staticmethod def gradientbar(ax, cmap, labels, arrow_locs, arrow_types, locs): ax.axis('off') # grad = np.atleast_2d(np.linspace(0, 1, 256)).T bs = ax.barh([1], [1]) ax = bs[0].axes triangle_width = 0.1 bar = None for bar in bs: bar.set_zorder(1) bar.set_facecolor("none") x, y = bar.get_xy() w, h = bar.get_width(), bar.get_height() grad = np.atleast_2d(np.linspace(0, 1 * w, 256)) ax.imshow(grad, cmap=cmap, extent=[x, x + w, y, y + h], aspect="auto", zorder=0) ax.axis((-triangle_width, 1 + triangle_width, 0.4, 1.6)) # add triangles triangle_height = bar.get_height() / 1.7 rhs = plt.Polygon([[1.00, 1 - triangle_height], [1.00, 1 + triangle_height + 0.01], [1.1, 1]], color=cmap(0.9999), linewidth=0) lhs = plt.Polygon([[0, 1 - triangle_height], [0, 1 + triangle_height + 0.01], [-0.1, 1]], color=cmap(0.0), linewidth=0) ax.add_patch(rhs) ax.add_patch(lhs) # add labels for ix, l in enumerate(labels): font_size = 12 # find starting position start_x = locs[ix] # find starting height if '\n' in l: start_y = 1 - font_size / 100 else: start_y = 1 - font_size / 100 / 2 ax.text(start_x, start_y, l, size=font_size, color='white') # add arrow indicators aw = 0.05 ah = 0.25 for ix, arrow_loc in enumerate(arrow_locs): arrow_type = arrow_types[ix] if arrow_type == 'above': point_height = 1 + triangle_height points = [(arrow_loc, point_height), (arrow_loc - aw, point_height + ah), (arrow_loc + aw, point_height + ah)] elif arrow_type == 'below': point_height = 1 - triangle_height points = [(arrow_loc, point_height), (arrow_loc - aw, point_height - ah), (arrow_loc + aw, point_height - ah)] else: raise NotImplementedError() if len(points) > 1: indicator = plt.Polygon(points, color='k', linewidth=-1) ax.add_patch(indicator) return ax.get_xlim() @staticmethod def adjust_slant_infographic(slant): # adjust slant to spread out more slant = min(max(slant * 2, -1), 1) return slant def build_infographic(self, user, savefig=True): slant = self.calculate_user_slant(user=user, normalize=False) hard = self.calculate_user_hard(user=user) screen_name = user['screen_name'] if isinstance(user['most_recent'], dict): name = user['most_recent']['data']['username'] img_url = user['most_recent']['data']['profile_image_url'] else: name = user['most_recent'].name img_url = user['most_recent'].profile_image_url_https print('\n\n\n\n\n\n\n\n' + img_url) fig = plt.figure() nrow = 4 gs = fig.add_gridspec(nrows=nrow, ncols=3) callout_ax = fig.add_subplot(gs[3, 0]) name_ax = fig.add_subplot(gs[1, 1:3]) img_ax = fig.add_subplot(gs[0:3, 0]) slant_ax = fig.add_subplot(gs[2, 1:3]) # hard_ax = fig.add_subplot(gs[3, 1:3]) # hard_example_ax = fig.add_subplot(gs[4, 1:3]) between_ax = fig.add_subplot(gs[3, 1:3]) blue = '#2f2fc7' blue = '#1E397FED' purple = '#800080FF' purple = '#5C385CFF' red = '#9C2424FF' # red = '#8B3535FF' grey = '#6c6663' grey = '#a2b89a' green = '#244E21FF' # add callout text slant_text = self.slant_text(slant=slant) hard_text = self.hard_text(hard=hard) slant = self.adjust_slant_infographic(slant) print(slant) slant_cmap = mcolors.LinearSegmentedColormap.from_list("", [blue, purple, red]) hard_cmap = mcolors.LinearSegmentedColormap.from_list("", [grey, green]) slant_lim = self.gradientbar(cmap=plt.get_cmap(slant_cmap), ax=slant_ax, labels=['Left', 'Neutral', 'Right'], arrow_locs=[(slant + 1) / 2], arrow_types=['below'], locs=[0.02, 0.41, 0.83]) shrinkage_factor = 0.3 # add text in between bars # '#ebe9dd' props = dict(boxstyle='round', facecolor='white', alpha=1) between_ax.set_xlim(slant_lim) between_ax.text( (1-shrinkage_factor)*(slant + 1) / 2 + 0.5 * shrinkage_factor, 1, slant_text.replace('\\', '').replace('is ', '').title(), transform=between_ax.transAxes, fontsize=12, verticalalignment='center', horizontalalignment='center',bbox=props, wrap=True, color='k' ) callout_ax.axis('off') img_ax.axis('off') between_ax.axis('off') name_ax.axis('off') max_width = 18 callout_text = ('\n'.join(textwrap.wrap('@{0} has a news diet that '.format(screen_name), width=max_width)) + '\n' + r'$\bf{' + slant_text + '}$' + '\n' + '\n'.join(textwrap.wrap('and follows more news publishers than ' + r'$\bf{' + '{0:.0f}'.format(hard * 100) + '}$%' + ' of active twitter users', width=max_width)) ) # callout_text = '\n'.join(textwrap.wrap(callout_text, width=16)) # callout_text = r""" # @alex_moehring has # a news diet that # $\bf{is\ heavily\ Democratic}$ # and follows more # news publishers # than $\bf{54}$% of # active twitter # users # """ callout_ax.text( 0.5, 1, callout_text, transform=callout_ax.transAxes, fontsize=9, verticalalignment='center', horizontalalignment='center', bbox=props, wrap=True ) # add image img_url = img_url.replace('_normal', '') f = urllib.request.urlopen(img_url) img = Image.open(f) # crop image width, height = img.size x = (width - height) // 2 img_cropped = img.crop((x, 0, x + height, height)) # create grayscale image with white circle (255) on black background (0) mask = Image.new('L', img_cropped.size) mask_draw = ImageDraw.Draw(mask) width, height = img_cropped.size mask_draw.ellipse((0, 0, width, height), fill=255) # mask.show() # add mask as alpha channel img_cropped.putalpha(mask) # save as png which keeps alpha channel img_ax.imshow(img_cropped, extent=[0, 1, 0, 1], interpolation='none') # outline # if os.sys # fig_path = os.path.join() fig.tight_layout() # between_ax.text( # 0, 0.5, '\n'.join(textwrap.wrap("Note: The weight of a news diet measures how many publishers a user follows, with heavier news diets following more publishers than lighter diets. Both news diet slant and weight are measured relative to a representative population of Twitter Users.", width=65)), # fontsize=8, transform=hard_example_ax.transAxes, verticalalignment='center', horizontalalignment='left', # wrap=True # ) # name_ax.text( 0.5, 0.9, name,# + "'s News Diet", fontsize=16, transform=name_ax.transAxes, verticalalignment='top', horizontalalignment='center', wrap=True, color='k', weight='bold' ) name_ax.text( 0.5, 0.35, '@' + screen_name, transform=name_ax.transAxes, fontsize=12, verticalalignment='center', horizontalalignment='center', wrap=True, color='gray', weight='bold' ) # name_ax.text( # -0.28, 2.78, name, transform=name_ax.transAxes, fontsize=14, verticalalignment='center', # horizontalalignment='center', wrap=True, color='k', weight='bold' # ) # screen_name='test' # name_ax.text( # -0.28, 2.48, screen_name, transform=name_ax.transAxes, fontsize=10, verticalalignment='center', # horizontalalignment='center', wrap=True, color='gray', weight='bold' # ) if savefig: fn = os.path.join(Config.dir_twitter_data, 'twitter_summaries', 'infographics') if not os.path.exists(fn): os.makedirs(fn) fn = os.path.join(fn, user['screen_name'] + '.pdf') plt.savefig(fn) # clean text clean_callout = copy.copy(callout_text) to_drop = ['\\bf', '$', '{', '}', '\\'] clean_callout = clean_callout.replace('\n', ' ') for el in to_drop: clean_callout = clean_callout.replace(el, '') return { 'fig': fig, 'text': clean_callout } def build_infographic_peers(self, user, peer_data, savefig=True): slant = self.calculate_user_slant(user=user, normalize=False) hard = self.calculate_user_hard(user=user) if isinstance(peer_data, type(None)): return None peer_slant = peer_data['slant_score_avg'] peer_hard = peer_data['hard_score_avg'] slant = self.adjust_slant_infographic(slant) peer_slant = self.adjust_slant_infographic(peer_slant) screen_name = user['screen_name'] fig = plt.figure() nrow = 3 gs = fig.add_gridspec(nrows=nrow, ncols=1) above_ax = fig.add_subplot(gs[0, :]) slant_ax = fig.add_subplot(gs[1, :]) between_ax = fig.add_subplot(gs[2, :]) # hard_ax = fig.add_subplot(gs[3, :]) # below_ax = fig.add_subplot(gs[4, :]) blue = '#1E397FED' purple = '#5C385CFF' red = '#9C2424FF' grey = '#a2b89a' green = '#244E21FF' slant_cmap = mcolors.LinearSegmentedColormap.from_list("", [blue, purple, red]) # hard_cmap = mcolors.LinearSegmentedColormap.from_list("", [grey, green]) slant_lim = self.gradientbar(cmap=plt.get_cmap(slant_cmap), ax=slant_ax, labels=['Left', 'Neutral', 'Right'], arrow_locs=[(1 + slant) / 2, (1 + peer_slant) / 2], arrow_types=['below', 'above'], locs=[0.02, 0.45, 0.88]) # hard_lim = self.gradientbar(cmap=plt.get_cmap(hard_cmap), ax=hard_ax, labels=['Light', 'Heavy'], arrow_locs=[hard, peer_hard], arrow_types=['above', 'below'], locs=[0.02, 0.88]) # assert slant_lim[0] == hard_lim[0] and slant_lim[1] == hard_lim[1] shrinkage_factor = 0.3 # add text in between bars props = dict(boxstyle='round', facecolor='white', alpha=1) between_ax.set_xlim(slant_lim) between_ax.text( (1-shrinkage_factor)*(1 + slant) / 2 + 0.5 * shrinkage_factor, 1, 'Your News Diet', transform=between_ax.transAxes, fontsize=12, verticalalignment='center', horizontalalignment='center', bbox=props, wrap=True, color='k' ) # between_ax.text( # (1-shrinkage_factor)*hard + 0.5 * shrinkage_factor, 0, 'Your News Diet', transform=between_ax.transAxes, fontsize=12, verticalalignment='center', # horizontalalignment='center', bbox=props, wrap=True, color='k' # ) above_ax.text( (1-shrinkage_factor)*(1 + peer_slant) / 2 + 0.5 * shrinkage_factor, 0, 'News Diet of Followers', transform=above_ax.transAxes, fontsize=12, verticalalignment='center', horizontalalignment='center', bbox=props, wrap=True, color='k' ) # below_ax.text( # (1-shrinkage_factor)*peer_hard + 0.5 * shrinkage_factor, 1, 'News Diet of Followers', transform=below_ax.transAxes, fontsize=12, verticalalignment='center', # horizontalalignment='center', bbox=props, wrap=True, color='k' # ) above_ax.axis('off') between_ax.axis('off') # outline # if os.sys # fig_path = os.path.join() fig.tight_layout() if savefig: fn = os.path.join(Config.dir_twitter_data, 'twitter_summaries', 'infographics') if not os.path.exists(fn): os.makedirs(fn) fn = os.path.join(fn, user['screen_name'] + '_peer_'+ '.pdf') plt.savefig(fn) # clean text return { 'fig': fig, # 'text': clean_callout } def follower_summary_stat(self, user, k=5, seed=None, max_pages_network=1, bad_users=None): if isinstance(bad_users, type(None)): bad_users = set() # sample followers rnd = np.random.RandomState() if not isinstance(seed, type(None)): rnd = np.random.RandomState(seed=seed) followers = [int(el['id']) for el in user['most_recent_followers']['data']] # first determine if eligible (must have downloaded their followers) if isinstance(followers, type(None)) or len(followers) == 0: return None if len(followers) <= k: sampled_friends = followers else: sampled_friends = list(rnd.choice(followers, size=k)) friends_data = [] for el in sampled_friends: if el in bad_users: continue try: to_add = NewsSocialSignaling.TwitterAPI.get_user(uid=el, get_friends=True, get_followers=False, resample=False, max_pages_network=max_pages_network) if not isinstance(to_add['most_recent_friends'], type(None)): friends_data.append(to_add) print('Good {0}'.format(el)) except Exception as e: print(e) print('Bad {0}'.format(el)) bad_users.add(el) continue if len(friends_data) == 0: return None friend_hard_scores = dict((el['id'], self.calculate_user_hard(user=el)) for el in friends_data) friend_slant_scores = dict((el['id'], self.calculate_user_slant(user=el, normalize=False)) for el in friends_data) friend_times = dict((el['id'], np.max(list(el['user_objects'].keys()))) for el in friends_data) user_slant_score = self.calculate_user_slant(user=user, normalize=False) user_hard_score = self.calculate_user_hard(user=user) return { 'time': dt.datetime.utcnow(), 'sampled_friends': sampled_friends, 'friend_times': friend_times, 'hard_scores': friend_hard_scores, 'slant_scores': friend_slant_scores, 'hard_score_avg': np.average([friend_hard_scores[el] for el in friend_hard_scores]), 'slant_score_avg': np.average([friend_slant_scores[el] for el in friend_slant_scores]), 'own_slant': user_slant_score, 'own_hard': user_hard_score } # Recommender methods def get_recommendations(self, user, recommendation_type, balanced, num_recs, verbose=False): # get possible publishers to be recommended accounts_following = self.accounts_following(user=user) sns_following = self.screen_names_following(user=user) possible_suggestions = self.publisher_recommendations.copy() possible_suggestions = possible_suggestions.loc[~possible_suggestions.twitter_id.isin(accounts_following)] pub_data = self.pub_data possible_suggestions = [copy.copy(self.pub_data[el.lower()]) for el in possible_suggestions.single_twitter_handle if el.lower() in pub_data] current_slant_raw = self._calculate_user_slant_raw(accounts_following=accounts_following) if pd.isna(current_slant_raw): current_slant_raw = 0.0 if len(possible_suggestions) == 0: return [] if len([el for el in sns_following if el.lower() in self.recommender_pub2pubcat]) == 0 and recommendation_type == 'personalized': recommendation_type = 'popular' # this is how we handle cold start problem # now sort possible suggestions by the specified algorithm. # these functions ignore balance and rank all possible suggestions # we then select either the top k or the balanced top k later on if recommendation_type == 'random': rec_order = possible_suggestions elif recommendation_type == 'personalized': rec_order = self.personalized_suggestions(possible_suggestions, publishers_followed=sns_following) possible_suggestions_dict = dict((el['screen_name'], el) for el in possible_suggestions) assert len(possible_suggestions_dict) == len(possible_suggestions) rec_order = [possible_suggestions_dict[el] for el in rec_order] # elif recommendation_type == 'personalized_explicit': # rec_order = self.personalized_suggestions_explicit(possible_suggestions, publishers_followed=sns_following) # possible_suggestions_dict = dict((el['screen_name'], el) for el in possible_suggestions) # assert len(possible_suggestions_dict) == len(possible_suggestions) # rec_order = [possible_suggestions_dict[el] for el in rec_order] elif recommendation_type == 'popular': rec_order = sorted(possible_suggestions, key=lambda d: d['num_followers'], reverse=True) else: raise NotImplementedError(recommendation_type) np.random.shuffle(rec_order) rec_order = [el for el in rec_order if el['id'] not in accounts_following] # now add what direction they would biggest_change = np.max(np.abs([el['slant'] - current_slant_raw for el in possible_suggestions])) for el in possible_suggestions: el['slant_change'] = (el['slant'] - current_slant_raw) / biggest_change # now balance if requested if balanced: recs = [] for sign in [-1, 1]: poss_recs = [el for el in rec_order if np.sign(el['slant_change']) == sign] if len(poss_recs) > 0: recs += [el for el in poss_recs[0:min((num_recs // 2), len(poss_recs))]] else: recs = rec_order[0:min(num_recs, len(rec_order))] assert len(recs) <= num_recs # shuffle the order np.random.shuffle(recs) # clean up for r in recs: total_width = 150 r['arrow_total_width'] = np.abs(total_width * r['slant_change']) + 30 r['arrow_line_width'] = np.abs(total_width * r['slant_change']) if verbose: print(['{0}: {1}'.format(el['screen_name'], el['slant_change']) for el in recs]) print(len(recs)) return {'possible_recs': possible_suggestions, 'recs': recs} # # def publisher_suggestions(self, user): # accounts_following = self.accounts_following(user=user) # possible_suggestions = self.publisher_recommendations.copy() # possible_suggestions = possible_suggestions.loc[~possible_suggestions.twitter_id.isin(accounts_following)] # pub_data = self.pub_data # # possible_suggestions = [copy.copy(self.pub_data[el.lower()]) for el in possible_suggestions.single_twitter_handle if el.lower() in pub_data] # current_slant_raw = self._calculate_user_slant_raw(accounts_following=accounts_following) # if pd.isna(current_slant_raw): # current_slant_raw = 0.0 # # if len(possible_suggestions) == 0: # return [] # # # now add what direction they would # biggest_change = np.max(np.abs([el['slant'] - current_slant_raw for el in possible_suggestions])) # for el in possible_suggestions: # el['slant_change'] = (el['slant'] - current_slant_raw) / biggest_change # return possible_suggestions def personalized_suggestions(self, possible_suggestions, publishers_followed): model = self.recommender_model publishers_followed = [el.lower() for el in publishers_followed] # get recommendations tmp_uf = np.zeros(len(self.recommender_pub2pubcat)) for pub_id in publishers_followed: if pub_id not in self.recommender_pub2pubcat: continue pub_cat = self.recommender_pub2pubcat[pub_id] tmp_uf[pub_cat] = 1 assert np.sum(tmp_uf) > 0 model_recs = model.recommend( 1e6, scipy.sparse.csr_matrix(tmp_uf), N=len(self.recommender_pub2pubcat), filter_already_liked_items=False, filter_items=[self.recommender_pub2pubcat[el] for el in publishers_followed if el in self.recommender_pub2pubcat], recalculate_user=True, ) lowerpub2pub = dict((el['screen_name'].lower(), el['screen_name']) for el in possible_suggestions) model_recs = [self.recommender_pubcat2pub[el].lower() for el in model_recs[0]] possible_suggestions_screen_names = [el['screen_name'].lower() for el in possible_suggestions] model_recs = [el for el in model_recs if el in possible_suggestions_screen_names] unique_model_recs = [] for el in model_recs: if el in unique_model_recs: continue unique_model_recs.append(el) return [lowerpub2pub[el] for el in unique_model_recs] def personalized_suggestions_explicit(self, possible_suggestions, publishers_followed): publishers_followed = [el.lower() for el in publishers_followed] # add to training data training_data = self.recommender_explicit_data.copy() uid = training_data.user.max() + 1 to_add = pd.DataFrame([{'pubid': el, 'user': uid, 'follow': 1} for el in publishers_followed]) training_data = pd.concat([training_data, to_add]) # fit recommender model reader = surprise.Reader(rating_scale=(0, 1)) data = surprise.Dataset.load_from_df(training_data, reader).build_full_trainset() algo = self.recommender_explicit algo.fit(data) recs = [] for el in possible_suggestions: p = algo.predict(uid=uid, iid=el['screen_name'].lower(), clip=False, verbose=False) recs.append(p) recs = pd.DataFrame(recs).sort_values('est', ascending=False) lowerpub2pub = dict((el['screen_name'].lower(), el['screen_name']) for el in possible_suggestions) return [lowerpub2pub[el] for el in recs.iid] if __name__ == '__main__': users = ['alex_moehring', 'mike_pence', 'charrismusic', 'candresmolina', 'AOC', 'TuckerCarlson', 'maddow', 'KamalaHarris', 'tedcruz'] # users = ['AOC', 'TuckerCarlson', 'maddow', 'KamalaHarris', 'tedcruz'] # users = ['erikbryn'] # users = ['maddow'] # print(NewsSocialSignaling.TwitterProcessor.get_recommendations( # user={'most_recent_friends': [NewsSocialSignaling.TwitterProcessor.pub2pubid[el] for el in ['oann']]}, # recommendation_type='personalized_explicit', # balanced=False, # num_recs=2, # verbose=True # )) carlos_data = {} for sn in users: print(sn) u = NewsSocialSignaling.TwitterAPI.get_user(screen_name=sn, get_friends=True, get_followers=True, resample=False, max_pages_network=2) af = NewsSocialSignaling.TwitterProcessor.accounts_following(user=u) h = NewsSocialSignaling.TwitterProcessor.calculate_user_hard(u) s = NewsSocialSignaling.TwitterProcessor.calculate_user_slant(u) print('Number of publishers: {0}'.format(NewsSocialSignaling.TwitterProcessor.num_accounts_following(user=u))) if NewsSocialSignaling.TwitterProcessor.num_accounts_following(u) < 10: sdf = NewsSocialSignaling.TwitterProcessor.publisher_slant print('; '.join([el for el in sdf.loc[sdf.twitter_id.isin(NewsSocialSignaling.TwitterProcessor.accounts_following(u))].canonical])) NewsSocialSignaling.TwitterProcessor.build_infographic(user=u) # peer_data = NewsSocialSignaling.TwitterProcessor.follower_summary_stat(user=u, seed=35045, wait_on_rate_limit=True) # NewsSocialSignaling.TwitterProcessor.build_infographic_peers(user=u, peer_data=peer_data, savefig=True) # NewsSocialSignaling.TwitterProcessor.publisher_suggestions(user=u) num_sims = 100 [NewsSocialSignaling.TwitterProcessor.get_recommendations(user=u, recommendation_type='random', num_recs=10, balanced=False) for _ in tqdm.tqdm(range(num_sims))] [NewsSocialSignaling.TwitterProcessor.get_recommendations(user=u, recommendation_type='popular', num_recs=10, balanced=False, verbose=False) for _ in tqdm.tqdm(range(num_sims))] [NewsSocialSignaling.TwitterProcessor.get_recommendations(user=u, recommendation_type='personalized', num_recs=10, balanced=False, verbose=False) for _ in tqdm.tqdm(range(num_sims))] # [NewsSocialSignaling.TwitterProcessor.get_recommendations(user=u, recommendation_type='personalized_explicit', num_recs=10, balanced=False, verbose=False) for _ in tqdm.tqdm(range(num_sims))] break time.sleep(1) # give carlos the publishers followed and their slants for these samples ps = NewsSocialSignaling.TwitterProcessor.publisher_slant carlos_data[sn] = [] for aid in af: row = ps.loc[ps.twitter_id == aid].iloc[0] carlos_data[sn].append({ 'publisher': row.single_twitter_handle, 'slant': row.slant }) input('Press any key to continue') # print(json.dumps(carlos_data, indent=4))