# Import Statements from otree.api import * import random from utils.live_utils import live_page, live_method from time import time # Game Description doc = """ Bandit Game. Two players. Player A, who makes the initial decision, and Player B, who responds. """ # Utility Functions def track(page_name): def _track(cls): orig_get = cls.get orig_post = cls.post def _tracking_get(page): now = int(time()) page.participant.vars[f'_tracking_get_timestamp_{page_name}'] = now if f'_tracking_first_timestamp_{page_name}' not in page.participant.vars: page.participant.vars[f'_tracking_first_timestamp_{page_name}'] = now if f'_tracking_post_timestamp_{page_name}' in page.participant.vars: del page.participant.vars[f'_tracking_post_timestamp_{page_name}'] return orig_get(page) def _tracking_post(page): page.participant.vars[f'_tracking_post_timestamp_{page_name}'] = int(time()) return orig_post(page) cls.get = _tracking_get cls.post = _tracking_post return cls return _track def last(participant, page_name): return participant.vars.get(f'_tracking_post_timestamp_{page_name}', 0) - participant.vars.get(f'_tracking_get_timestamp_{page_name}', 0) def total(participant, page_name): return participant.vars[f'_tracking_post_timestamp_{page_name}'] - participant.vars[f'_tracking_first_timestamp_{page_name}'] # Constants Class class C(BaseConstants): NAME_IN_URL = 'bandit_game_3' PLAYERS_PER_GROUP = 2 NUM_ROUNDS = 10 TAX_MIN = cu(0) TAX_MAX = cu(100) NUM_SLIDERS = 10 SLIDER_RANGE = 10 MAX_OFFSET = 50 TASK_TIMEOUT = 100 #100 TIMEOUT_PLAYERPAGES = 90 TIMEOUT_RESULTS = 90 TIMEOUT_SURVEY = 120 CHOICES = [(i, str(i)) for i in range(9)] # Subsession Class class Subsession(BaseSubsession): def start_url(self): return '/?participantId={}&assignmentId={}&projectId={}'.format( self.session.config.get('participantId', 'defaultParticipantId'), self.session.config.get('assignmentId', 'defaultAssignmentId'), self.session.config.get('projectId', 'defaultProjectId') ) # Group Class class Group(BaseGroup): tax_amount = models.IntegerField(min=C.TAX_MIN, max=C.TAX_MAX, label="") # Player Class class Player(BasePlayer): participant_label = models.StringField() dynamic_label = models.StringField() sliders_solved = models.IntegerField(initial=0) total_score = models.IntegerField(initial=0) terminated = models.BooleanField(initial=False) isBot = models.BooleanField(initial=False) # Survey fields power = models.IntegerField( choices=C.CHOICES, label="POWER (social power, authority, wealth)", widget=widgets.RadioSelectHorizontal() ) achievement = models.IntegerField( choices=C.CHOICES, label="ACHIEVEMENT (success, capability, ambition, influence on people and events)", widget=widgets.RadioSelectHorizontal() ) hedonism = models.IntegerField( choices=C.CHOICES, label="HEDONISM (gratification of desires, enjoyment in life, self-indulgence)", widget=widgets.RadioSelectHorizontal() ) stimulation = models.IntegerField( choices=C.CHOICES, label="STIMULATION (daring, a varied and challenging life, an exciting life)", widget=widgets.RadioSelectHorizontal() ) self_direction = models.IntegerField( choices=C.CHOICES, label="SELF-DIRECTION (creativity, freedom, curiosity, independence, choosing one's own goals)", widget=widgets.RadioSelectHorizontal() ) universalism = models.IntegerField( choices=C.CHOICES, label="UNIVERSALISM (broad-mindedness, beauty of nature and arts, social justice, a world at peace, equality, wisdom, unity with nature, environmental protection)", widget=widgets.RadioSelectHorizontal() ) benevolence = models.IntegerField( choices=C.CHOICES, label="BENEVOLENCE (helpfulness, honesty, forgiveness, loyalty, responsibility)", widget=widgets.RadioSelectHorizontal() ) tradition = models.IntegerField( choices=C.CHOICES, label="TRADITION (respect for tradition, humbleness, accepting one's portion in life, devotion, modesty)", widget=widgets.RadioSelectHorizontal() ) conformity = models.IntegerField( choices=C.CHOICES, label="CONFORMITY (obedience, honoring parents and elders, self-discipline, politeness)", widget=widgets.RadioSelectHorizontal() ) security = models.IntegerField( choices=C.CHOICES, label="SECURITY (national security, family security, social order, cleanliness, reciprocation of favors)", widget=widgets.RadioSelectHorizontal() ) if_you_assigned = models.IntegerField( min=0, max=100, label="If you were assigned the role mentioned above, what would be your decision?", blank=True ) time_player_a = models.FloatField() time_player_b = models.FloatField() time_sliders = models.FloatField() time_results = models.FloatField() time_survey = models.FloatField() time_total = models.FloatField() role_name = models.StringField() def role(self): if self.id_in_group % 2 == 1: self.role_name = 'Bandit' # PlayerA self.dynamic_label = 'Bandit' else: self.role_name = 'Producer' # PlayerB self.dynamic_label = 'Producer' # ExtraModel for Sliders class Slider(ExtraModel): player = models.Link(Player) name = models.StringField() offset = models.IntegerField() initial = models.IntegerField() value = models.IntegerField() solved = models.BooleanField(initial=False) # Slider Generation Functions def generate_slider(player: Player, idx: int): offset = random.randint(0, C.MAX_OFFSET) value = random.randint(1, C.SLIDER_RANGE) * random.choice([-1, +1]) return Slider.create(player=player, name=f"s{idx}", offset=offset, initial=value, value=value) def generate_sliders(player): existing_sliders = Slider.filter(player=player) for slider in existing_sliders: slider.delete() return [generate_slider(player, i + 1) for i in range(C.NUM_SLIDERS)] # Session Creation Function def creating_session(subsession: Subsession): for player in subsession.get_players(): player.participant.vars['participantId'] = subsession.session.config.get('participantId') player.participant.vars['assignmentId'] = subsession.session.config.get('assignmentId') player.participant.vars['projectId'] = subsession.session.config.get('projectId') generate_sliders(player) player.role() player.time_total = 0 # Formatting Functions for Live Pages def format_progress(player: Player): return {"total": C.NUM_SLIDERS, "solved": player.sliders_solved, "score": player.total_score, "terminated": player.terminated} def format_slider(slider: Slider): return {"name": slider.name, "value": slider.value, "margins": {'l': slider.offset, 'r': C.MAX_OFFSET - slider.offset}} def format_feedback(slider: Slider): return {"name": slider.name, "solved": slider.solved} # Game Logic Functions def set_payoffs(group: Group): p1 = group.get_player_by_id(1) p2 = group.get_player_by_id(2) total_produced_percent = p2.sliders_solved / C.NUM_SLIDERS * 10 p1.payoff = round(group.tax_amount * total_produced_percent / 100) p2.payoff = total_produced_percent - p1.payoff def evaluate_move(player: Player, slider: Slider, move: dict): was_solved = slider.solved slider.value = move["value"] slider.solved = slider.value == 0 if was_solved != slider.solved: if slider.solved: player.sliders_solved += 1 else: player.sliders_solved -= 1 if player.sliders_solved == C.NUM_SLIDERS: player.terminated = True # Function PlayerA and PlayerB pages def get_data_for_player_pages(player): round_results = [] total_payoff = 0 sum_p2_current_round = 0 for round in player.in_all_rounds(): group = round.group tax_amount = group.field_maybe_none('tax_amount') tax_amount_text = f"{tax_amount}%" if tax_amount is not None else "this round" p2_player = group.get_player_by_id(2) # p2_current_round = p2_player.sliders_solved if not p2_player.isBot else C.SLIDER_SOLVED_BOT p2_current_round = p2_player.sliders_solved p2_current_round_text = str(p2_current_round) if player.round_number != round.round_number else "pending, this round" payoff_text = str(round.payoff) if player.round_number != round.round_number else "pending, this round" round_results.append({ 'round_number': round.round_number, 'tax_amount': tax_amount_text, 'p2_current_round': p2_current_round_text, 'payoff': payoff_text }) total_payoff += round.payoff if round.payoff is not None else 0 sum_p2_current_round += p2_current_round if p2_current_round is not None else 0 task_timeout_minutes_str = "{:.1f}".format(C.TASK_TIMEOUT / 60) return { 'round_number': player.round_number, 'round_results': round_results, 'num_rounds': C.NUM_ROUNDS, 'task_timeout': C.TASK_TIMEOUT, 'task_timeout_minutes': task_timeout_minutes_str, 'total_payoff': total_payoff, 'sum_p2_current_round': sum_p2_current_round, 'p2_current_round': p2_current_round } def get_common_data(player): round_results = [] total_payoff = 0 sum_p2_current_round = 0 p2_current_round = None for round in player.in_all_rounds(): group = round.group tax_amount = group.field_maybe_none('tax_amount') tax_amount_text = f"{tax_amount}%" if tax_amount is not None else "this round" p2_player = group.get_player_by_id(2) p1_player = group.get_player_by_id(1) if p2_player.isBot: p2_current_round = p2_player.sliders_solved else: p2_current_round = p2_player.sliders_solved if player.round_number == round.round_number: p2_current_round_text = p2_current_round if p2_current_round is not None else "this round" payoff_text = round.payoff if round.payoff is not None else "this round" else: p2_current_round_text = p2_current_round if p2_current_round is not None else "not yet produced" payoff_text = round.payoff if round.payoff is not None else "not yet received" round_results.append({ 'round_number': round.round_number, 'tax_amount': tax_amount_text, 'p2_current_round': p2_current_round_text, 'payoff': payoff_text }) total_payoff += round.payoff if round.payoff is not None else 0 sum_p2_current_round += p2_current_round if p2_current_round is not None else 0 task_timeout_minutes_str = "{:.1f}".format(C.TASK_TIMEOUT / 60) return { 'round_number': player.round_number, 'round_results': round_results, 'num_rounds': C.NUM_ROUNDS, 'task_timeout': C.TASK_TIMEOUT, 'task_timeout_minutes': task_timeout_minutes_str, 'total_payoff': total_payoff, 'sum_p2_current_round': sum_p2_current_round, 'p2_current_round': p2_current_round } # Page Classes class Lobby(WaitPage): title_text = "Please wait for the other participant to join." body_text = "Thank you for participating! Please wait here for the other participant to join. This waiting period allows to pair you with another participant for the upcoming activity. We typically expect this to take a few minutes. We appreciate your patience. During the activity, you will be asked to make decisions in a series of tasks. Please read all instructions carefully." group_by_arrival_time = True def is_displayed(self): return self.round_number < 2 @track("PlayerA") class PlayerA(Page): timeout_seconds = C.TIMEOUT_PLAYERPAGES form_model = 'group' form_fields = ['tax_amount'] def is_displayed(self): return self.id_in_group == 1 def before_next_page(self, timeout_happened): self.time_player_a = last(self.participant, "PlayerA") self.time_total += last(self.participant, "PlayerA") if timeout_happened: self.isBot = True self.group.tax_amount = random.randint(C.TAX_MIN, C.TAX_MAX) def vars_for_template(self): return get_data_for_player_pages(self) class WaitForP1(WaitPage): title_text = "Please wait for the other participant decision." body_text = "Please wait here for the other participant decision. We typically expect this to take a few minutes. We appreciate your patience. Please read all instructions carefully." @track("PlayerB") class PlayerB(Page): timeout_seconds = C.TIMEOUT_PLAYERPAGES def is_displayed(self): return self.id_in_group == 2 def before_next_page(self, timeout_happened): self.time_player_b = last(self.participant, "PlayerB") self.time_total += last(self.participant, "PlayerB") if timeout_happened: self.isBot = True def vars_for_template(self): return get_data_for_player_pages(self) class Results(Page): timeout_seconds = C.TIMEOUT_RESULTS def vars_for_template(self): return get_common_data(self) def before_next_page(self, timeout_happened): self.time_results = last(self.participant, "Results") self.time_total += last(self.participant, "Results") if timeout_happened: self.isBot = True class ResultsWaitPage(WaitPage): title_text = "Please wait" body_text = "Thank you for your patience. You are currently waiting for Player B to complete slider task. This process takes few minutes. Once Player B finishes, we will proceed to the results. We appreciate your patience during this time." after_all_players_arrive = set_payoffs @track("Survey") class Survey(Page): form_model = 'player' form_fields = [ 'power', 'achievement', 'hedonism', 'stimulation', 'self_direction', 'universalism', 'benevolence', 'tradition', 'conformity', 'security', 'if_you_assigned', ] # timeout_seconds = C.TIMEOUT_SURVEY # Comment out or remove this line def vars_for_template(player: Player): if player.id_in_group == 1: # Player A player.dynamic_label = f'If you have been assigned the opposite – namely the role of Player B, the producer, and Player A would have established a tax level of {player.group.tax_amount}%, meaning that Player A would receive {player.group.tax_amount}% of your production. Please indicate how much in a range of 0 to 100 would you produce in that case?' elif player.id_in_group == 2: # Player B player.dynamic_label = 'If you have been assigned the opposite – namely the role of Player A, the percentage that you set will determine the portion of Player B`s production that will be transferred to you. Please enter a percentage in a range of 0 to 100 of Player B`s production that will be transferred to you in this case?' return {} def before_next_page(self, timeout_happened): self.time_survey = last(self.participant, "Survey") self.time_total += last(self.participant, "Survey") if timeout_happened: self.isBot = True def is_displayed(self): return self.round_number == C.NUM_ROUNDS @track("Sliders") @live_page class Sliders(Page): timeout_seconds = C.TASK_TIMEOUT def vars_for_template(self): sliders = Slider.filter(player=self) formatted_sliders = [format_slider(slider) for slider in sliders] return { 'sliders': formatted_sliders, 'progress': format_progress(self), 'round_number': self.round_number, 'num_rounds': C.NUM_ROUNDS } def js_vars(self): return {"progress": format_progress(self)} @live_method("move") def handle_move(self, data: dict): [slider] = Slider.filter(player=self, name=data["name"]) evaluate_move(self, slider, data) yield "feedback", format_feedback(slider) yield "progress", format_progress(self) def before_next_page(self, timeout_happened): set_payoffs(self.group) self.time_sliders = last(self.participant, "Sliders") self.time_total += last(self.participant, "Sliders") if timeout_happened and all(not slider.solved for slider in Slider.filter(player=self)): self.participant.vars['isBot'] = True self.isBot = True self.sliders_solved = random.randint(0, C.NUM_SLIDERS) def is_displayed(self): return self.id_in_group == 2 class ThankYou(Page): def before_next_page(self, timeout_happened): self.time_total += last(self.participant, "ThankYou") def is_displayed(self): return self.round_number == C.NUM_ROUNDS # Page Sequence page_sequence = [ Lobby, PlayerA, WaitForP1, PlayerB, Sliders, ResultsWaitPage, Results, Survey, ThankYou ]