import random import time from datetime import datetime from typing import List, Dict, Tuple from otree.api import * from intro import MeasureTime from settings import SESSION_CONFIGS from treatments import * doc = """ Players are matched in groups and report their die's value. """ class C(BaseConstants): NAME_IN_URL = "dice_roll" PLAYERS_PER_GROUP = 2 NUM_ROUNDS = 10 NUM_ROUNDS_PART1 = 1 ROUNDS_PART1 = range(1, NUM_ROUNDS_PART1 + 1) ROUNDS_PART2 = range(NUM_ROUNDS_PART1 + 1, NUM_ROUNDS + 1) # Roles PLAYER_A = "Person A" PLAYER_B = "Person B" # Treatment Names BASE = TreatmentName.BASE SIM = TreatmentName.SIM COMM = TreatmentName.COMM IND = TreatmentName.IND FOR = TreatmentName.FOR # Timers TIME_TO_CHAT = 30 MAX_TIME_ON_WAITPAGE_MIN = 10 # TODO: set back to 10 MAX_TIME_ON_WAITPAGE = MAX_TIME_ON_WAITPAGE_MIN * 60 WAIT_PAGE_TEXT = f"You are matched with another player. The study will continue automatically as soon as you are matched.
The maximum waiting time is {MAX_TIME_ON_WAITPAGE_MIN} minutes. To be considered for matching, you have to stay on this tab actively, i.e. do not switch to another tab. The tab must be green for you to be considered for matching. If the tab is yellow, you will not be considered for matching. If you have been waiting longer than the maximum waiting time, please refresh this page." TIMEOUT_SECONDS = 5 * 60 # TODO. set back to 5 * 60 # Error Messages TIMEOUT_RESPONSIBLE = "you did not respond in time" TIMEOUT_NOT_RESPONSIBLE = "a player you were matched with did not respond in time" class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): if subsession.round_number > 1: return from dice_roll.matching import get_matching # configure session's matching parameters session = subsession.session session.matching = get_matching(matching_type=get_treatment(subsession).matching_type) session.batch_nr = 1 session.nr_players_in_batch = 0 session.open_matching = False # configure players for p in subsession.get_players(): pt = p.participant # draw paid round paid_part = random.choice([C.ROUNDS_PART1, C.ROUNDS_PART2]) # draw paid part # draw paid round in paid part pt.paid_round = random.choice(paid_part) # set defaults pt.partner_dice_report = 0 pt.partner = 0 pt.dropout = False pt.is_responsible = False pt.err_msg = "" pt.time_spent_on_pages = [] pt.batch = 0 pt.potential_payoff = [] def get_dice_field(): return models.IntegerField(min=1, max=6, label="") class Group(BaseGroup): dice_value = get_dice_field() partner_entered_chat = models.BooleanField(initial=False) partner_left_chat = models.BooleanField(initial=False) chat_status = models.StringField() chat_time_left = models.IntegerField() class Player(BasePlayer): dice_report = get_dice_field() entered_chat = models.BooleanField(initial=False) chat_status = models.StringField() left_chat = models.StringField() def set_dropout(self, is_responsible: bool, err_msg: str): pt = self.participant pt.dropout = True pt.err_msg = err_msg pt.is_responsible = is_responsible if is_responsible: self.payoff = 0 pt.payoff = 0 else: # if player was not responsible for drop, pay them a random round if len(pt.potential_payoff) and pt.payoff == 0: i = random.choice(range(len(pt.potential_payoff))) pt.paid_round = i + 1 self.payoff = pt.potential_payoff[i] # PAGES def get_treatment(subsession: BaseSubsession) -> Treatment: return subsession.session.config["treatment"] def is_shown_report(player: Player): return player.participant.role == C.PLAYER_B and (is_sequential(player) or is_individual(player)) def is_reporting(player: Player): return not (player.participant.role == C.PLAYER_B and is_individual(player)) def is_submitting_multiple_reports(player: Player): return player.participant.role == C.PLAYER_B and is_sequential(player) and not is_individual(player) def has_communication(player: Player): return get_treatment(player.subsession).communication def is_sequential(player: Player): return get_treatment(player.subsession).decision_order == DecisionOrder.SEQUENTIAL def is_individual(player: Player): return get_treatment(player.subsession).decision_order == DecisionOrder.INDIVIDUAL def get_dice_path(nr_eyes: int) -> str: return f"dices/dice{nr_eyes}.svg" def get_partner_id(player_id: int) -> int: return C.PLAYERS_PER_GROUP - ((player_id + 1) % C.PLAYERS_PER_GROUP) def match_in_each_round(subsession: BaseSubsession) -> bool: return get_treatment(subsession).matching_type == MatchingType.STRANGER def is_matching_round(subsession: BaseSubsession, round_number: int): return round_number <= 1 or match_in_each_round(subsession) class DisplayPage(MeasureTime): @staticmethod def is_displayed(player: Player): return not player.participant.dropout def drop_batch(player: Player, is_player_responsible: bool = True): subsession: Subsession = player.subsession if match_in_each_round(subsession): # in Stranger Matching: drop batch batch_id = player.participant.batch players = subsession.get_players() drop_players = [p for p in players if p.participant.batch == batch_id] else: # in Partner Matching: drop group drop_players = player.group.get_players() for p in drop_players: is_responsible = p == player and is_player_responsible err_msg = C.TIMEOUT_RESPONSIBLE if is_responsible else C.TIMEOUT_NOT_RESPONSIBLE p.set_dropout(is_responsible, err_msg) class SoftTimerPage(DisplayPage): timeout_seconds = C.TIMEOUT_SECONDS class HardTimerPage(SoftTimerPage): @staticmethod def before_next_page(player: Player, timeout_happened): """ OVERRIDE before_next_page ON PAGES THAT ARE NOT GROUP-RELEVANT, OTHERWISE IF A PLAYER TIMES OUT, IT ALSO DROPS THE PARTNER WHO MIGHT ALREADY HAVE MOVED ON TO A NEXT ROUND, LEAVING THE PARTNER'S NEW PARTNER ALONE """ if timeout_happened: drop_batch(player) class DisplayWaitPage(WaitPage): @staticmethod def is_displayed(player: Player): return not player.participant.dropout class Matching(DisplayWaitPage): group_by_arrival_time = True body_text = C.WAIT_PAGE_TEXT @staticmethod def is_displayed(player: Player): return DisplayWaitPage.is_displayed(player) and is_matching_round(player.subsession, player.round_number) def group_by_arrival_time_method(subsession: BaseSubsession, waiting_players: List[Player]): return subsession.session.matching.try_match(subsession, waiting_players) class DiceRollWait(DisplayWaitPage): title_text = "Dice Roll" body_text = ( "Please wait for the other participant to arrive. In the meantime, the die is rolled for the upcoming round." ) @staticmethod def after_all_players_arrive(group: Group): group.dice_value = random.randint(1, 6) class CommunicationIntro(SoftTimerPage): @staticmethod def vars_for_template(player: Player): return dict(dice_img=get_dice_path(player.group.dice_value)) @staticmethod def is_displayed(player: Player): return DisplayPage.is_displayed(player) and has_communication(player) class CommunicationWait(DisplayWaitPage): @staticmethod def is_displayed(player: Player): return DisplayWaitPage.is_displayed(player) and has_communication(player) class Communication(DisplayPage): timeout_seconds = 30 @staticmethod def vars_for_template(player: Player): return dict(dice_img=get_dice_path(player.group.dice_value)) @staticmethod def before_next_page(player: Player, timeout_happened): player.left_chat = str(datetime.now()) @staticmethod def is_displayed(player: Player): return DisplayPage.is_displayed(player) and has_communication(player) def make_report(reporter: str, report: int) -> Dict[str, int]: return dict(reporter=reporter, report=report) def get_all_reports(player: Player) -> List[Dict[str, int]]: return [ make_report(p.participant.role if p != player else "You", p.dice_report) for p in get_players_by_role(player.group) if p.field_maybe_none("dice_report") ] class ReportDiceRoll(HardTimerPage): form_model = "player" form_fields = ["dice_report"] @staticmethod def vars_for_template(player: Player): return dict( dice_img=get_dice_path(player.group.dice_value), reports=get_all_reports(player), allow_input=is_reporting(player), submit_multiple_reports=is_submitting_multiple_reports(player), ) @staticmethod def before_next_page(player: Player, timeout_happened): if timeout_happened: # TODO: UNCOMMENT # player.dice_report = random.randint(1, 6) HardTimerPage.before_next_page(player, timeout_happened) @staticmethod def is_displayed(player: Player): return DisplayPage.is_displayed(player) and not is_shown_report(player) def get_players_by_role(group: Group) -> Tuple[Player, Player]: """Returns the players in the order of the roles (Player A, Player B)""" players = group.get_players() if len(players) == 2: player1, player2 = players[0], players[1] if player1.participant.role == C.PLAYER_A: return player1, player2 else: return player2, player1 class AwaitReport(DisplayWaitPage): title_text = "Report" body_text = "You are Person B. Please wait for Person A to transmit the report. After that, it's your turn." @staticmethod def is_displayed(player: Player): return DisplayWaitPage.is_displayed(player) and is_shown_report(player) class ReportDiceRollWithPartnerReport(ReportDiceRoll): template_name = "dice_roll/ReportDiceRoll.html" @staticmethod def get_form_fields(player): if is_reporting(player): return ["dice_report"] @staticmethod def is_displayed(player: Player): return DisplayPage.is_displayed(player) and is_shown_report(player) def get_paid_players_for_round(players: List[Player], round_number: int) -> List[Player]: paid_players = [] for p in players: if round_number == p.participant.paid_round: paid_players.append(p) return paid_players def set_payments(player_a: Player, player_b: Player, treatment: Treatment): """ Set Payoff for group in this round. `paid_players` are players that receive payment in this particular round. The paid round is determined randomly. If the report is < 5: report is paid in points. If the report is 6: no payment """ def report_to_payoff(report: int) -> int: # Report of 6 does not receive payoff return report if report < 6 else 0 players = [player_a, player_b] paid_players = get_paid_players_for_round(players, player_a.round_number) report_a_payoff = report_to_payoff(player_a.dice_report) # In individual treatments, everybody is paid Player A's report if treatment.decision_order == DecisionOrder.INDIVIDUAL: report_payoff = report_a_payoff # Otherwise, the lower report counts else: report_b_payoff = report_to_payoff(player_b.dice_report) report_payoff = min([report_a_payoff, report_b_payoff]) # Save the potential payoff for everyone for p in players: p.participant.potential_payoff.append(report_payoff) # Only set the actual payoff for those paid in this particular round for p in paid_players: p.payoff = report_payoff class PaymentWait(DisplayWaitPage): body_text = "Please wait for Person B to transmit the report." @staticmethod def after_all_players_arrive(group: Group): player_a, player_b = get_players_by_role(group) if not player_a.participant.dropout: set_payments(player_a, player_b, get_treatment(group.subsession)) class ReportInfo(SoftTimerPage): timeout_seconds = 2 * 60 @staticmethod def before_next_page(player: Player, timeout_happened): # FOR THE LAST PAGE OF THIS APP # only set matching_arrival_time if player will be matched in next round and # `InstructionsPart2` does not follow, as this will set the matching_arrival_time if not InstructionsPart2.is_displayed(player): player.participant.matching_arrival_time = time.time() @staticmethod def vars_for_template(player: Player): return dict( dice_img=get_dice_path(player.group.dice_value), allow_input=False, reports=get_all_reports(player), ) @staticmethod def is_displayed(player: Player): return SoftTimerPage.is_displayed(player) and not is_individual(player) class InstructionsPart2(SoftTimerPage): @staticmethod def before_next_page(player: Player, timeout_happened): # FOR THE LAST PAGE OF THIS APP player.participant.matching_arrival_time = time.time() @staticmethod def is_displayed(player: Player): return DisplayPage.is_displayed(player) and player.round_number == 1 page_sequence = [ Matching, DiceRollWait, CommunicationIntro, CommunicationWait, Communication, ReportDiceRoll, AwaitReport, ReportDiceRollWithPartnerReport, PaymentWait, ReportInfo, InstructionsPart2, ]