from otree.api import * import random, json, requests, itertools from datetime import datetime from common_modules import functions as fun doc = """ A network version of the Keynesian beauty contest. """ class Constants(BaseConstants): name_in_url = 'beauty_en' players_per_group = None num_rounds = 10 instructions_template = 'network_guess/instructions.html' def creating_session(subsession): # Initializing session parameters: # Initializing parameters in subsessions: if subsession.round_number == 1: # games_list session variable if subsession.session.config['dropbox_games_url'] != "": games_url = 'https://www.dropbox.com/s/bqgrnew0o1fnbg7/games_list.json?dl=1' res = requests.get(games_url)#, headers={"Authorization": "Bearer " + access_token}) subsession.session.games_list = json.loads(res.text) else: games_file = open('network_games_list/games_list_en.json') subsession.session.games_list = json.load(games_file) # Initialize round variables. This must be done in order of rounds 1,2,3,4,5...... stage_durations = eval(subsession.session.config['stage_durations']) game_sequence = eval(subsession.session.config['game_sequence']) game_labels = eval(subsession.session.config['game_labels']) deltas = eval(subsession.session.config['deltas']) initial_points = eval(subsession.session.config['initial_points']) feedback_round_selection_type = subsession.session.config['feedback_round_selection_type'] round_num = 1 #for i, stages in enumerate(stage_durations): for i, gstr in enumerate(game_sequence): stages = stage_durations[i] curr_subsession = subsession.in_round(round_num) curr_subsession.num_periods = len(stages) if feedback_round_selection_type == 'random': selected_periods = [random.randrange(1, len(stages)+1)] elif feedback_round_selection_type == 'all': selected_periods = list(range(1, len(stages)+1)) else: selected_periods = list(range(1, len(stages)+1)) curr_subsession.selected_periods = str(selected_periods) curr_subsession.is_played = True curr_subsession.delta = deltas[i] curr_subsession.initial_points = initial_points[i] curr_subsession.game_label = game_labels[i] curr_subsession.game_index = i + 1 curr_subsession.duration = sum(stages) curr_subsession.curr_game = game_sequence[i] curr_subsession.num_players = len(get_games_list(curr_subsession)[curr_subsession.curr_game]['weights']) # Now we create a Period extra models for each player, and initialize history field for player in curr_subsession.get_players(): history = [ [] for i in range(len(stages))] for j, stg_dur in enumerate(stages): player_period = PlayerPeriod.create( player = player, period = j + 1, round_number = player.round_number ) player.history = str(history) round_num += 1 # The last round in the session curr_subsession.is_last_round_in_session = True # Store number of valid rounds for ss in curr_subsession.in_rounds(1, round_num - 1): ss.num_rounds_in_session = round_num - 1 class Subsession(BaseSubsession): is_played = models.BooleanField(initial=False) delta = models.FloatField() initial_points = models.FloatField() duration = models.IntegerField() curr_game = models.StringField() game_label = models.StringField(initial="") game_index = models.IntegerField() num_players = models.IntegerField() num_periods = models.IntegerField() num_rounds_in_session = models.IntegerField() is_last_round_in_session = models.BooleanField(initial=False) selected_periods = models.StringField() def image_path(subsession): games = get_games_list(subsession) return "network_guess/" + games[subsession.curr_game]['image_path'] def num_games(subsession): return len(eval(subsession.session.config['game_sequence'])) # GROUP SHUFFLING FUNCTIONS def shuffle_by_player_type(subsession, player_types, query_type_func, random_shuffle=True): shuffled_players = [] for player_type in player_types: players = [player.id_in_subsession for player in subsession.get_players() if query_type_func(player) == player_type] if random_shuffle: random.shuffle(players) shuffled_players.extend(players) return shuffled_players def shuffle(subsession): shuffle_type = subsession.session.config['shuffle_type'] if shuffle_type == 'session': subsession.session_shuffle() elif shuffle_type == 'game': subsession.game_shuffle() else: subsession.game_shuffle() def session_shuffle(subsession): if subsession.round_number > 1: subsession.group_like_round(1) else: # We assume all games have the same size as round 1 shuffled_players = subsession.shuffle_by_player_type([True, False], player_passed_control, True) matrix = fun.chunks(shuffled_players, subsession.num_players) subsession.set_group_matrix(matrix) for player in subsession.get_players(): player.label_num = player.id_in_group player.update_label() def game_shuffle(subsession: BaseSubsession): shuffled_players = subsession.shuffle_by_player_type([True, False], player_passed_control, True) matrix = fun.chunks(shuffled_players, subsession.num_players) subsession.set_group_matrix(matrix) for player in subsession.get_players(): player.label_num = player.id_in_group player.update_label() class Group(BaseGroup): pass class Player(BasePlayer): # GAME FIELDS label_num = models.IntegerField() label = models.StringField(inital="") history = models.StringField() curr_guess = models.FloatField() curr_guess_type = models.StringField() curr_guess_time = models.FloatField() num_submissions = models.IntegerField(initial=0) blocked_guesses = models.BooleanField(initial=False) num_registrations = models.IntegerField(initial=0) points_in_game = models.FloatField(initial=0) # time_created refers to the first_time the Guess page is sent to the player, to keep track of periods locally. time_created = models.FloatField() def update_label(player): player.label = get_games_list(player)[player.subsession.curr_game]['labels'][player.label_num - 1] def update_history(player, player_guess): period = player_guess.player_period.period history = eval(player.history) item = dict( guess = player_guess.guess, guess_type = player_guess.guess_type, guess_time = player_guess.guess_time ) history[period-1].append([player_guess.guess, player_guess.guess_type, player_guess.guess_time]) player.history = str(history) class PlayerPeriod(ExtraModel): player = models.Link(Player) period = models.IntegerField() curr_guess = models.FloatField() curr_guess_type = models.StringField(initial="") curr_guess_time = models.FloatField() round_number = models.IntegerField() target = models.FloatField() distance = models.FloatField() penalty = models.FloatField() points = models.FloatField() class PlayerGuess(ExtraModel): player_period = models.Link(PlayerPeriod) guess = models.FloatField() guess_type = models.StringField(initial="") guess_time = models.FloatField() def custom_export(players): yield ['session_code', 'participant_code', 'participant_label', 'id_in_subsession', 'group_id', 'id_in_group', 'label', 'round_number', 'game', 'period', 'guess', 'guess_type', 'guess_time', 'period_guess', 'period_guess_type', 'period_guess_time', 'period_target', 'period_distance', 'period_penalty', 'period_points', 'player_guess', 'player_guess_type', 'player_guess_time', 'player_submissions', 'player_registrations', 'player_blocked_guesses', 'game_points', 'session_points'] if players[0].subsession.is_played: for p in players: participant = p.participant part_label = participant.label session = p.session session_points = sum([pl.points_in_game for pl in p.in_rounds(1,p.subsession.num_games())]) player_periods = PlayerPeriod.filter(player=p) for player_period in player_periods: guesses = PlayerGuess.filter(player_period=player_period) for guess in guesses: x = [session.code, participant.code, part_label, p.id_in_subsession, p.group.id_in_subsession, p.id_in_group, p.label, p.round_number, p.subsession.curr_game, player_period.period, guess.guess, guess.guess_type, guess.guess_time, player_period.curr_guess, player_period.curr_guess_type, player_period.curr_guess_time, player_period.target, player_period.distance, player_period.penalty, player_period.points, p.curr_guess, p.curr_guess_type, p.curr_guess_time, p.num_submissions, p.num_registrations, p.blocked_guesses, p.points_in_game, session_points] #print(x) yield x # yield ['session', 'participant_code', 'round_number', 'group_id', 'id_in_group', 'player_submissions', 'player_registrations', 'player_blocked_guesses', 'label', 'period', 'guesses', # 'curr_guess', 'curr_guess_type', 'target', 'distance', 'points'] # if players[0].subsession.is_played: # for p in players: # participant = p.participant # session = p.session # player_periods = PlayerPeriod.filter(player=p) # #print("playerperiods: ", player_periods) # for player_period in player_periods: # guesses = PlayerGuess.filter(player_period=player_period) # guesses = [{'guess': g.guess, 'guess_type': g.guess_type, 'guess_time': g.guess_time} for g in guesses] # #print("guesses:", guesses) # yield [session.code, participant.code, p.round_number, p.group.id_in_subsession, p.id_in_group, p.num_submissions, p.num_registrations, p.blocked_guesses, p.label, player_period.period, str(guesses), # player_period.curr_guess, player_period.curr_guess_type, player_period.target, player_period.distance, player_period.points] # FUNCTIONS # Players who are not dropped off def player_passed_control(player: Player): return (player.session.config['ignore_dropout_conditions'] or (not player.participant.message['demographics']['absent'] and player.participant.message['introduction']['num_correct_answers'] >= player.session.config['control_threshold'] ) ) # Here "thing" can be Player, Group or Subsession def get_games_list(thing): return thing.session.games_list; def guess_decimals(thing): if thing.session.config['only_integers'] == True: return 0 else: return thing.session.config['guess_decimals'] # This computes targets and payoffs. # It is called after all players have finished a round (a game) def compute_targets(group: Group): players = group.get_players() players_periods = [None]*len(players) for player in players: player_periods = PlayerPeriod.filter(player=player) player_periods.sort(key=lambda player_period: player_period.period) # We recreate all player_period. It is assumend player_period 1 already has something (by the previous code) prev = player_periods[0] if prev.curr_guess is None: #or player.field_maybe_none('curr_guess') is None: set_curr_guess(player, 1, random_guess(player), 'random') for player_period in player_periods[1:]: if player_period.curr_guess is None: player_period.curr_guess = prev.curr_guess player_period.curr_guess_type = prev.curr_guess_type player_period.curr_guess_time = prev.curr_guess_time prev = player_period # player_periods matrix players_periods[player.label_num - 1] = player_periods # Computing points and payoffs game = get_games_list(group)[group.subsession.curr_game] for per in range(group.subsession.num_periods): for player in players: i = player.label_num - 1 target = game['anchors'][i] for j, pp in enumerate(players_periods): target += pp[per].curr_guess*game['weights'][i][j] player_period = players_periods[i][per] player_period.target = target player_period.distance = abs(player_period.curr_guess - target) player_period.penalty = player.subsession.delta*player_period.distance player_period.points = player.subsession.initial_points - player_period.penalty if player_period.period in eval(group.subsession.selected_periods): player.points_in_game += player_period.points # Compute a random guess for player def random_guess(player: Player): lowerB = get_games_list(player)[player.subsession.curr_game]['strategy_space'][0] upperB = get_games_list(player)[player.subsession.curr_game]['strategy_space'][1] if player.session.config['only_integers'] == True: return random.randrange(int(lowerB), int(upperB)+1) else: return round(random.uniform(lowerB, upperB), guess_decimals(player)) # Register a new guess for player def set_curr_guess(player: Player, period, guess, guess_type): if guess_type == 'manual': player.num_submissions += 1 if guess_type == 'manual' and player.blocked_guesses: return None if player.num_submissions >= player.session.config['max_submissions_per_game'] and guess_type == 'manual': player.blocked_guesses = True round_guess = round(guess, guess_decimals(player)) # Discard repeated guess if guess_type == 'manual' and round_guess == player.field_maybe_none('curr_guess'): return None if guess_type == 'manual': player.num_registrations += 1 # Set player's current guess player.curr_guess = round_guess player.curr_guess_type = guess_type player.curr_guess_time = datetime.now().timestamp() # Set player_period current guess player_period = PlayerPeriod.filter(player=player,period=period)[0] player_period.curr_guess = round_guess player_period.curr_guess_type = guess_type player_period.curr_guess_time = player.curr_guess_time # Create PlayerGuess extra-model. This will be usefull for custom exports player_guess = PlayerGuess.create( player_period = player_period, guess = player_period.curr_guess, guess_type = player_period.curr_guess_type, guess_time = player_period.curr_guess_time, ) # Add to history player.update_history(player_guess) return player_period.curr_guess # PAGES # Initial waiting page for shuffling players according to previous control test class InitialWaitPage(WaitPage): wait_for_all_groups = True def after_all_players_arrive(subsession): # shuffle players in all rounds. for subs in subsession.in_rounds(1, subsession.num_rounds_in_session): subs.shuffle() @staticmethod def is_displayed(player: Player): return player.round_number == 1 class FailedTest(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 and not player_passed_control(player) @staticmethod def get_timeout_seconds(player): group_id = player.group.id_in_subsession gap = player.session.config['gap_between_groups'] return player.session.config['welcome_time_out'] + (group_id-1)*gap class StartingPage(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 and player_passed_control(player) @staticmethod def vars_for_template(player: Player): stage_durations = eval(player.session.config['stage_durations']) games_data = [ dict(id = i+1, num_rounds = len(sd)) for i, sd in enumerate(stage_durations)] return dict( num_games = player.subsession.num_games(), games_data = games_data ) @staticmethod def get_timeout_seconds(player): group_id = player.group.id_in_subsession gap = player.session.config['gap_between_groups'] return player.session.config['welcome_time_out'] + (group_id-1)*gap class Guess(Page): form_model = 'player' @staticmethod def get_timeout_seconds(player): return player.subsession.duration @staticmethod def vars_for_template(player: Player): game = get_games_list(player)[player.subsession.curr_game] game_colors = eval(player.session.config['game_colors']) gd = guess_decimals(player) if gd < 1: step = "1" else: step = "." + ("0"*(gd-1)) + "1" stage_durations = eval(player.session.config['stage_durations']) elapsed_time = datetime.now().timestamp() - player.time_created accum_durations = list(itertools.accumulate(stage_durations[player.subsession.game_index-1])) period = next(i for i, val in enumerate(accum_durations) if val > elapsed_time) + 1 return dict( game = game, label = game['labels'][player.label_num - 1], num_players = len(game['weights']), min_guess = game['strategy_space'][0], max_guess = game['strategy_space'][1], image_path = "network_guess/" + game['image_path'], step = step, bg_color = game_colors[(player.subsession.game_index - 1) % len(game_colors)], server_elapsed_time = elapsed_time, accum_durations = accum_durations, period = period ) @staticmethod def is_displayed(player: Player): # Don't display if not playable or not dropped out if not player.subsession.is_played or not player_passed_control(player): return False # Initilize time_created for the first time. if player.field_maybe_none('time_created') is None: player.time_created = datetime.now().timestamp() # Don't display if timeout if player.time_created + player.subsession.duration < datetime.now().timestamp(): return False return True @staticmethod def js_vars(player: Player): elapsed_time = datetime.now().timestamp() - player.time_created stage_durations = eval(player.session.config['stage_durations']) accum_durations = list(itertools.accumulate(stage_durations[player.subsession.game_index-1])) return dict( my_id=player.id_in_group, elapsed_time = elapsed_time, accum_durations = accum_durations, blocked_guesses = player.blocked_guesses ) # It is assumed that all validation has already taken place in html @staticmethod def live_method(player: Player, data): if data['type'] == 'guess': # We try to decode the message from the webpage and put the guess in the database. # Finally, we return the actual guess registered in the database period = int(data['period']) guess = float(data['guess']) if not player.blocked_guesses: set_curr_guess(player, period, guess, "manual") elif data['type'] == 'period1_ended': if player.field_maybe_none('curr_guess') is None: set_curr_guess(player, 1, random_guess(player), 'random') if player.field_maybe_none('curr_guess') is not None: curr_guess = player.curr_guess else: curr_guess = '' return {player.id_in_group: dict(type='server_response', curr_guess=curr_guess, blocked_guesses=player.blocked_guesses)} # This WaitPage is ALWAYS displayed class GameWaitPage(WaitPage): @staticmethod def is_displayed(player: Player): return player.subsession.round_number == 1 class ResultsWaitPage(WaitPage): after_all_players_arrive = compute_targets @staticmethod def is_displayed(player: Player): return player.subsession.is_played # class ResultsInGame(Page): # @staticmethod # def is_displayed(player: Player): # return player.session.config['show_feedback_after_game'] and player.subsession.is_played and player_passed_control(player) # @staticmethod # def vars_for_template(player: Player): # selected_periods = eval(player.subsession.selected_periods) # player_in_rounds = [p for p in PlayerPeriod.filter(player=player) if p.period in selected_periods] # return dict( # player_in_rounds = player_in_rounds, # total_points = sum(p.points for p in player_in_rounds) # ) class ResultsInSession(Page): @staticmethod def is_displayed(player: Player): return player.subsession.is_last_round_in_session and player.session.config['show_feedback_after_session'] and player_passed_control(player) @staticmethod def vars_for_template(player: Player): # players_data is a matrix of PlayerPeriod # with dimension num_games x gamesize x numperiods players_data = [] for player_in_game in player.in_rounds(1, player.subsession.num_games()): selected_periods = eval(player_in_game.subsession.selected_periods) group = player_in_game.group # Traspose players x periods to periods x players sorted_players = group.get_players() sorted_players.sort(key=lambda player: player.label_num) items = fun.transpose([[pp for pp in PlayerPeriod.filter(player=player_in_group) if pp.period in selected_periods] for player_in_group in sorted_players]) players_data.extend(items) total_points = sum([p.points_in_game for p in player.in_rounds(1,player.subsession.num_games())]) return dict( players_data = players_data, total_points = total_pointsx ) @staticmethod def get_timeout_seconds(player): return player.session.config['session_results_timeout'] # This WaitPage is ALWAYS displayed # class FinalWait(WaitPage): # wait_for_all_groups = True # @staticmethod # def is_displayed(player: Player): # return player.subsession.is_last_round_in_session page_sequence = [InitialWaitPage, FailedTest, StartingPage, GameWaitPage, Guess, ResultsWaitPage]#, ResultsInSession]#, , ResultsInSession, FinalWait]