import math import random import time from otree.api import * doc = """ App 2: Dynamic matching, ranking, belief elicitation, questionnaire, and final results. """ class C(BaseConstants): NAME_IN_URL = 'iq_rank_belief' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 MATCH_GROUP_SIZE = 4 WAIT_TIMEOUT_SECONDS = 180 QUIZ_NUM_ROUNDS = 10 TREATMENT_CELLS = [ ('Easy', 'Reveal'), ('Easy', 'NoReveal'), ('Hard', 'Reveal'), ('Hard', 'NoReveal'), ] class Subsession(BaseSubsession): def group_by_arrival_time_method(self, waiting_players): now = time.time() for difficulty, reveal in C.TREATMENT_CELLS: same_cell = [ p for p in waiting_players if p.difficulty_treatment == difficulty and p.reveal_treatment == reveal ] if len(same_cell) >= C.MATCH_GROUP_SIZE: return same_cell[:C.MATCH_GROUP_SIZE] if len(same_cell) >= 2: oldest_arrival = min( p.participant.vars.get('rank_wait_arrival_time', now) for p in same_cell ) if now - oldest_arrival >= C.WAIT_TIMEOUT_SECONDS: return same_cell if len(same_cell) == 1: oldest_arrival = same_cell[0].participant.vars.get('rank_wait_arrival_time', now) if now - oldest_arrival >= C.WAIT_TIMEOUT_SECONDS: return same_cell class Group(BaseGroup): pass class Player(BasePlayer): difficulty_treatment = models.StringField() reveal_treatment = models.StringField() initial_top_belief = models.IntegerField(min=0, max=100, blank=True) risk_preference = models.StringField( choices=[ ['A', 'Option A'], ['B', 'Option B'], ['C', 'Indifferent between the two options'], ], widget=widgets.RadioSelect, blank=True, ) risk_CE = models.IntegerField(min=0, max=10, blank=True) ambiguity_preference = models.StringField( choices=[ ['A', 'Option A'], ['B', 'Option B'], ['C', 'Indifferent between the two options'], ], widget=widgets.RadioSelect, blank=True, ) ambiguity_prob = models.IntegerField(min=0, max=100, blank=True) def bsr_payment(prob_percent, event_happened): q = prob_percent / 100 if event_happened: win_prob = 1 - (1 - q) ** 2 else: win_prob = 1 - q ** 2 return 1000 if random.random() < win_prob else 0 def creating_session(subsession: Subsession): for player in subsession.get_players(): player.difficulty_treatment = player.participant.difficulty_treatment player.reveal_treatment = player.participant.reveal_treatment class RankAssignmentWaitPage(WaitPage): group_by_arrival_time = True @staticmethod def is_displayed(player: Player): if 'rank_wait_arrival_time' not in player.participant.vars: player.participant.vars['rank_wait_arrival_time'] = time.time() return player.round_number == 1 @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() score_groups = {} for p in players: score = p.participant.quiz_score score_groups.setdefault(score, []).append(p) sorted_scores = sorted(score_groups.keys(), reverse=True) ordered_players = [] for score in sorted_scores: tied_players = score_groups[score] random.shuffle(tied_players) ordered_players.extend(tied_players) n_players = len(ordered_players) n_top = math.ceil(n_players / 2) for idx, p in enumerate(ordered_players): p.participant.rank_type = 'top' if idx < n_top else 'bottom' p.participant.vars.pop('rank_wait_arrival_time', None) class BeliefElicitation(Page): form_model = 'player' form_fields = ['initial_top_belief'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return dict( reveal_treatment=player.reveal_treatment, total_correct=player.participant.quiz_score, total_questions=C.QUIZ_NUM_ROUNDS, ) @staticmethod def error_message(player: Player, values): if values.get('initial_top_belief') is None: return 'Please report your belief before continuing.' @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.initial_top_belief = player.initial_top_belief class Questionnaire(Page): form_model = 'player' form_fields = ['risk_preference', 'ambiguity_preference', 'risk_CE', 'ambiguity_prob'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def error_message(player: Player, values): if values.get('risk_preference') is None: return 'Please answer question 1.' if values.get('risk_CE') is None: return 'Please answer question 2.' if values.get('ambiguity_preference') is None: return 'Please answer question 3.' if values.get('ambiguity_prob') is None: return 'Please answer question 4.' @staticmethod def before_next_page(player: Player, timeout_happened): event_happened = (player.participant.rank_type == 'top') belief_points = bsr_payment(player.participant.initial_top_belief, event_happened) player.participant.initial_belief_event = event_happened player.participant.initial_belief_payment_points = belief_points final_source = random.choice(['quiz', 'initial_belief']) if final_source == 'quiz': final_points = player.participant.quiz_payment_points else: final_points = player.participant.initial_belief_payment_points player.participant.part1_payment_source = final_source player.participant.part1_payment_points = final_points class Results(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): payment_source = player.participant.part1_payment_source payment_points = player.participant.part1_payment_points if payment_source == 'quiz': return dict( payment_source=payment_source, payment_points=payment_points, question_number=player.participant.quiz_payment_question, was_correct=player.participant.quiz_payment_correct, ) return dict( payment_source=payment_source, payment_points=payment_points, rank_type=player.participant.rank_type, rank_type_display='Top' if player.participant.rank_type == 'top' else 'Bottom', reported_belief=player.participant.initial_top_belief, ) player.participant.finished = True page_sequence = [ RankAssignmentWaitPage, BeliefElicitation, Questionnaire, Results, ]