import random
import time
from datetime import datetime
from typing import List, Dict, Tuple
from otree.api import *
from intro import MeasureTime
from settings import SESSION_CONFIGS
from treatments import *
doc = """
Players are matched in groups and report their die's value.
"""
class C(BaseConstants):
NAME_IN_URL = "dice_roll"
PLAYERS_PER_GROUP = 2
NUM_ROUNDS = 10
NUM_ROUNDS_PART1 = 1
ROUNDS_PART1 = range(1, NUM_ROUNDS_PART1 + 1)
ROUNDS_PART2 = range(NUM_ROUNDS_PART1 + 1, NUM_ROUNDS + 1)
# Roles
PLAYER_A = "Person A"
PLAYER_B = "Person B"
# Treatment Names
BASE = TreatmentName.BASE
SIM = TreatmentName.SIM
COMM = TreatmentName.COMM
IND = TreatmentName.IND
FOR = TreatmentName.FOR
# Timers
TIME_TO_CHAT = 30
MAX_TIME_ON_WAITPAGE_MIN = 10 # TODO: set back to 10
MAX_TIME_ON_WAITPAGE = MAX_TIME_ON_WAITPAGE_MIN * 60
WAIT_PAGE_TEXT = f"You are matched with another player. The study will continue automatically as soon as you are matched.
The maximum waiting time is {MAX_TIME_ON_WAITPAGE_MIN} minutes. To be considered for matching, you have to stay on this tab actively, i.e. do not switch to another tab. The tab must be green for you to be considered for matching. If the tab is yellow, you will not be considered for matching. If you have been waiting longer than the maximum waiting time, please refresh this page."
TIMEOUT_SECONDS = 5 * 60 # TODO. set back to 5 * 60
# Error Messages
TIMEOUT_RESPONSIBLE = "you did not respond in time"
TIMEOUT_NOT_RESPONSIBLE = "a player you were matched with did not respond in time"
class Subsession(BaseSubsession):
pass
def creating_session(subsession: Subsession):
if subsession.round_number > 1:
return
from dice_roll.matching import get_matching
# configure session's matching parameters
session = subsession.session
session.matching = get_matching(matching_type=get_treatment(subsession).matching_type)
session.batch_nr = 1
session.nr_players_in_batch = 0
session.open_matching = False
# configure players
for p in subsession.get_players():
pt = p.participant
# draw paid round
paid_part = random.choice([C.ROUNDS_PART1, C.ROUNDS_PART2]) # draw paid part
# draw paid round in paid part
pt.paid_round = random.choice(paid_part)
# set defaults
pt.partner_dice_report = 0
pt.partner = 0
pt.dropout = False
pt.is_responsible = False
pt.err_msg = ""
pt.time_spent_on_pages = []
pt.batch = 0
pt.potential_payoff = []
def get_dice_field():
return models.IntegerField(min=1, max=6, label="")
class Group(BaseGroup):
dice_value = get_dice_field()
partner_entered_chat = models.BooleanField(initial=False)
partner_left_chat = models.BooleanField(initial=False)
chat_status = models.StringField()
chat_time_left = models.IntegerField()
class Player(BasePlayer):
dice_report = get_dice_field()
entered_chat = models.BooleanField(initial=False)
chat_status = models.StringField()
left_chat = models.StringField()
def set_dropout(self, is_responsible: bool, err_msg: str):
pt = self.participant
pt.dropout = True
pt.err_msg = err_msg
pt.is_responsible = is_responsible
if is_responsible:
self.payoff = 0
pt.payoff = 0
else: # if player was not responsible for drop, pay them a random round
if len(pt.potential_payoff) and pt.payoff == 0:
i = random.choice(range(len(pt.potential_payoff)))
pt.paid_round = i + 1
self.payoff = pt.potential_payoff[i]
# PAGES
def get_treatment(subsession: BaseSubsession) -> Treatment:
return subsession.session.config["treatment"]
def is_shown_report(player: Player):
return player.participant.role == C.PLAYER_B and (is_sequential(player) or is_individual(player))
def is_reporting(player: Player):
return not (player.participant.role == C.PLAYER_B and is_individual(player))
def is_submitting_multiple_reports(player: Player):
return player.participant.role == C.PLAYER_B and is_sequential(player) and not is_individual(player)
def has_communication(player: Player):
return get_treatment(player.subsession).communication
def is_sequential(player: Player):
return get_treatment(player.subsession).decision_order == DecisionOrder.SEQUENTIAL
def is_individual(player: Player):
return get_treatment(player.subsession).decision_order == DecisionOrder.INDIVIDUAL
def get_dice_path(nr_eyes: int) -> str:
return f"dices/dice{nr_eyes}.svg"
def get_partner_id(player_id: int) -> int:
return C.PLAYERS_PER_GROUP - ((player_id + 1) % C.PLAYERS_PER_GROUP)
def match_in_each_round(subsession: BaseSubsession) -> bool:
return get_treatment(subsession).matching_type == MatchingType.STRANGER
def is_matching_round(subsession: BaseSubsession, round_number: int):
return round_number <= 1 or match_in_each_round(subsession)
class DisplayPage(MeasureTime):
@staticmethod
def is_displayed(player: Player):
return not player.participant.dropout
def drop_batch(player: Player, is_player_responsible: bool = True):
subsession: Subsession = player.subsession
if match_in_each_round(subsession): # in Stranger Matching: drop batch
batch_id = player.participant.batch
players = subsession.get_players()
drop_players = [p for p in players if p.participant.batch == batch_id]
else: # in Partner Matching: drop group
drop_players = player.group.get_players()
for p in drop_players:
is_responsible = p == player and is_player_responsible
err_msg = C.TIMEOUT_RESPONSIBLE if is_responsible else C.TIMEOUT_NOT_RESPONSIBLE
p.set_dropout(is_responsible, err_msg)
class SoftTimerPage(DisplayPage):
timeout_seconds = C.TIMEOUT_SECONDS
class HardTimerPage(SoftTimerPage):
@staticmethod
def before_next_page(player: Player, timeout_happened):
"""
OVERRIDE before_next_page ON PAGES THAT ARE NOT GROUP-RELEVANT, OTHERWISE IF A PLAYER TIMES OUT,
IT ALSO DROPS THE PARTNER WHO MIGHT ALREADY HAVE MOVED ON TO A NEXT
ROUND, LEAVING THE PARTNER'S NEW PARTNER ALONE
"""
if timeout_happened:
drop_batch(player)
class DisplayWaitPage(WaitPage):
@staticmethod
def is_displayed(player: Player):
return not player.participant.dropout
class Matching(DisplayWaitPage):
group_by_arrival_time = True
body_text = C.WAIT_PAGE_TEXT
@staticmethod
def is_displayed(player: Player):
return DisplayWaitPage.is_displayed(player) and is_matching_round(player.subsession, player.round_number)
def group_by_arrival_time_method(subsession: BaseSubsession, waiting_players: List[Player]):
return subsession.session.matching.try_match(subsession, waiting_players)
class DiceRollWait(DisplayWaitPage):
title_text = "Dice Roll"
body_text = (
"Please wait for the other participant to arrive. In the meantime, the die is rolled for the upcoming round."
)
@staticmethod
def after_all_players_arrive(group: Group):
group.dice_value = random.randint(1, 6)
class CommunicationIntro(SoftTimerPage):
@staticmethod
def vars_for_template(player: Player):
return dict(dice_img=get_dice_path(player.group.dice_value))
@staticmethod
def is_displayed(player: Player):
return DisplayPage.is_displayed(player) and has_communication(player)
class CommunicationWait(DisplayWaitPage):
@staticmethod
def is_displayed(player: Player):
return DisplayWaitPage.is_displayed(player) and has_communication(player)
class Communication(DisplayPage):
timeout_seconds = 30
@staticmethod
def vars_for_template(player: Player):
return dict(dice_img=get_dice_path(player.group.dice_value))
@staticmethod
def before_next_page(player: Player, timeout_happened):
player.left_chat = str(datetime.now())
@staticmethod
def is_displayed(player: Player):
return DisplayPage.is_displayed(player) and has_communication(player)
def make_report(reporter: str, report: int) -> Dict[str, int]:
return dict(reporter=reporter, report=report)
def get_all_reports(player: Player) -> List[Dict[str, int]]:
return [
make_report(p.participant.role if p != player else "You", p.dice_report)
for p in get_players_by_role(player.group)
if p.field_maybe_none("dice_report")
]
class ReportDiceRoll(HardTimerPage):
form_model = "player"
form_fields = ["dice_report"]
@staticmethod
def vars_for_template(player: Player):
return dict(
dice_img=get_dice_path(player.group.dice_value),
reports=get_all_reports(player),
allow_input=is_reporting(player),
submit_multiple_reports=is_submitting_multiple_reports(player),
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
if timeout_happened:
# TODO: UNCOMMENT
# player.dice_report = random.randint(1, 6)
HardTimerPage.before_next_page(player, timeout_happened)
@staticmethod
def is_displayed(player: Player):
return DisplayPage.is_displayed(player) and not is_shown_report(player)
def get_players_by_role(group: Group) -> Tuple[Player, Player]:
"""Returns the players in the order of the roles (Player A, Player B)"""
players = group.get_players()
if len(players) == 2:
player1, player2 = players[0], players[1]
if player1.participant.role == C.PLAYER_A:
return player1, player2
else:
return player2, player1
class AwaitReport(DisplayWaitPage):
title_text = "Report"
body_text = "You are Person B. Please wait for Person A to transmit the report. After that, it's your turn."
@staticmethod
def is_displayed(player: Player):
return DisplayWaitPage.is_displayed(player) and is_shown_report(player)
class ReportDiceRollWithPartnerReport(ReportDiceRoll):
template_name = "dice_roll/ReportDiceRoll.html"
@staticmethod
def get_form_fields(player):
if is_reporting(player):
return ["dice_report"]
@staticmethod
def is_displayed(player: Player):
return DisplayPage.is_displayed(player) and is_shown_report(player)
def get_paid_players_for_round(players: List[Player], round_number: int) -> List[Player]:
paid_players = []
for p in players:
if round_number == p.participant.paid_round:
paid_players.append(p)
return paid_players
def set_payments(player_a: Player, player_b: Player, treatment: Treatment):
"""
Set Payoff for group in this round. `paid_players` are players that receive payment in this particular round.
The paid round is determined randomly.
If the report is < 5: report is paid in points. If the report is 6: no payment
"""
def report_to_payoff(report: int) -> int:
# Report of 6 does not receive payoff
return report if report < 6 else 0
players = [player_a, player_b]
paid_players = get_paid_players_for_round(players, player_a.round_number)
report_a_payoff = report_to_payoff(player_a.dice_report)
# In individual treatments, everybody is paid Player A's report
if treatment.decision_order == DecisionOrder.INDIVIDUAL:
report_payoff = report_a_payoff
# Otherwise, the lower report counts
else:
report_b_payoff = report_to_payoff(player_b.dice_report)
report_payoff = min([report_a_payoff, report_b_payoff])
# Save the potential payoff for everyone
for p in players:
p.participant.potential_payoff.append(report_payoff)
# Only set the actual payoff for those paid in this particular round
for p in paid_players:
p.payoff = report_payoff
class PaymentWait(DisplayWaitPage):
body_text = "Please wait for Person B to transmit the report."
@staticmethod
def after_all_players_arrive(group: Group):
player_a, player_b = get_players_by_role(group)
if not player_a.participant.dropout:
set_payments(player_a, player_b, get_treatment(group.subsession))
class ReportInfo(SoftTimerPage):
timeout_seconds = 2 * 60
@staticmethod
def before_next_page(player: Player, timeout_happened):
# FOR THE LAST PAGE OF THIS APP
# only set matching_arrival_time if player will be matched in next round and
# `InstructionsPart2` does not follow, as this will set the matching_arrival_time
if not InstructionsPart2.is_displayed(player):
player.participant.matching_arrival_time = time.time()
@staticmethod
def vars_for_template(player: Player):
return dict(
dice_img=get_dice_path(player.group.dice_value),
allow_input=False,
reports=get_all_reports(player),
)
@staticmethod
def is_displayed(player: Player):
return SoftTimerPage.is_displayed(player) and not is_individual(player)
class InstructionsPart2(SoftTimerPage):
@staticmethod
def before_next_page(player: Player, timeout_happened):
# FOR THE LAST PAGE OF THIS APP
player.participant.matching_arrival_time = time.time()
@staticmethod
def is_displayed(player: Player):
return DisplayPage.is_displayed(player) and player.round_number == 1
page_sequence = [
Matching,
DiceRollWait,
CommunicationIntro,
CommunicationWait,
Communication,
ReportDiceRoll,
AwaitReport,
ReportDiceRollWithPartnerReport,
PaymentWait,
ReportInfo,
InstructionsPart2,
]