import dataset from dataset.types import Types from NewsSocialSignaling import Config import pickle import datetime as dt import numpy as np import json import time import codecs import copy import sys class Database: db = dataset.connect('sqlite:///{0}'.format(Config.fn_twitter_db)) @staticmethod def table(tn): return Database.db.get_table(tn, primary_id='id', primary_type=Types.integer) @staticmethod def _process_single_user_load(tmp): if isinstance(tmp, type(None)): return None tmp['user_objects'] = pickle.loads(tmp['user_objects']) max_date = np.max([el for el in tmp['user_objects'].keys()]) tmp['most_recent'] = tmp['user_objects'][max_date] tmp['friends'] = json.loads(tmp['friends']) tmp['followers'] = json.loads(tmp['followers']) tmp['extra_data'] = json.loads(tmp['extra_data']) tmp_tweets = tmp['tweets'] if 'tweets' in tmp else None if not isinstance(tmp_tweets, type(None)): tmp['tweets'] = pickle.loads(codecs.decode(tmp_tweets.encode(), 'base64')) for el in ['friends', 'followers']: col = 'most_recent_{0}'.format(el) if len(tmp[el]) == 0: tmp[col] = None else: max_date = np.max([float(d) for d in tmp[el]]) tmp[col] = tmp[el][str(max_date)] return tmp @staticmethod def _get_user_row(uid=None, sn=None): table = Database.table(tn='users') if isinstance(uid, str): tmp = table.find_one(id=uid) elif isinstance(sn, str): tmp = table.find_one(screen_name=sn.lower()) else: raise NotImplementedError('UID or SN must be a string') return tmp @staticmethod def get_user(uid=None, sn=None): tmp = Database._get_user_row(uid=uid, sn=sn) return Database._process_single_user_load(tmp) @staticmethod def get_all_users(): table = Database.table(tn='users') for el in table.all(): yield Database._process_single_user_load(el) @staticmethod def add_user(user, friends, followers, extra_data=None): existing_user = Database.get_user(uid=str(user.get('data').get('id'))) user_objects = {} friend_dict = {} follower_dict = {} if not isinstance(existing_user, type(None)): user_objects = existing_user['user_objects'] friend_dict = existing_user['friends'] follower_dict = existing_user['followers'] ts = float(dt.datetime.utcnow().timestamp()) if not isinstance(friends, type(None)): friend_dict[ts] = friends if not isinstance(followers, type(None)): follower_dict[ts] = followers user_objects[ts] = user to_add = { 'id': user.get('data').get('id'), 'user_objects': pickle.dumps(user_objects), 'screen_name': user.get('data').get('username').lower(), 'friends': json.dumps(friend_dict), 'followers': json.dumps(follower_dict), 'extra_data': json.dumps(extra_data), 'api_new_version': True } table = Database.table(tn='users') table.upsert(to_add, ['id']) @staticmethod def add_user_tweets(tweets, uid=None, sn=None): row = Database._get_user_row(uid=uid, sn=sn) if 'tweets' in row and not isinstance(row['tweets'], type(None)): old_tweets = row['tweets'] old_tweets = pickle.loads(codecs.decode(old_tweets.encode(), 'base64')) for t in tweets: if t not in old_tweets: old_tweets[t] = tweets[t] tweets = old_tweets table = Database.table(tn='users') to_add = {'id': row['id'], 'tweets': codecs.encode(pickle.dumps(tweets), 'base64').decode()} table.upsert(to_add, keys=['id']) class MemoryDatabase: db = {} @staticmethod def table(tn): if tn not in MemoryDatabase.db: MemoryDatabase.db[tn] = {} return MemoryDatabase.db[tn] @staticmethod def _get_user_row(uid=None, sn=None): table = MemoryDatabase.table(tn='users') if isinstance(uid, str): if uid not in table: return None return table[uid] elif isinstance(sn, str): tmp = [el for el in table if table[el]['screen_name'].lower() == sn.lower()] if len(tmp) == 0: return None return table[tmp[0]] else: raise NotImplementedError('UID or SN must be a string') return tmp @staticmethod def get_user(uid=None, sn=None): tmp = copy.copy(MemoryDatabase._get_user_row(uid=uid, sn=sn)) return Database._process_single_user_load(tmp) @staticmethod def get_all_users(): table = MemoryDatabase.table(tn='users') for el in table: yield Database._process_single_user_load(table[el]) @staticmethod def add_user(user, friends, followers, extra_data=None): existing_user = MemoryDatabase.get_user(uid=user.get('data').get('id')) user_objects = {} friend_dict = {} follower_dict = {} if not isinstance(existing_user, type(None)): user_objects = existing_user['user_objects'] friend_dict = existing_user['friends'] follower_dict = existing_user['followers'] ts = float(dt.datetime.utcnow().timestamp()) if not isinstance(friends, type(None)): friend_dict[ts] = friends if not isinstance(followers, type(None)): follower_dict[ts] = followers user_objects[ts] = user to_add = { 'id': user.get('data').get('id'), 'user_objects': pickle.dumps(user_objects), 'screen_name': user.get('data').get('username').lower(), 'friends': json.dumps(friend_dict), 'followers': json.dumps(follower_dict), 'extra_data': json.dumps(extra_data), 'api_new_version': True, 'time_added': dt.datetime.utcnow() } table = MemoryDatabase.table(tn='users') table[to_add['id']] = to_add # if linux, purge data older than 1 hour uids2drop = [] for uid in table: if (dt.datetime.utcnow() - table[uid]['time_added']).total_seconds() / 60 > 60: uids2drop.append(uid) MemoryDatabase.db['users'] = dict((el, table[el]) for el in table if el not in uids2drop)