import math
import random
from otree.api import *
from .understanding import UNDERSTANDING
doc = """
Projet Jemna
"""
class C(BaseConstants):
NAME_IN_URL = 'jemna'
PLAYERS_PER_GROUP_A = 2
PLAYERS_PER_GROUP_B_1 = 2
PLAYERS_PER_GROUP_B_2 = 1
PLAYERS_PER_GROUP_B = PLAYERS_PER_GROUP_B_1 + PLAYERS_PER_GROUP_B_2
PLAYERS_PER_GROUP = PLAYERS_PER_GROUP_A + PLAYERS_PER_GROUP_B
OTHER_PLAYERS = PLAYERS_PER_GROUP - 1
NUM_ROUNDS_PART_1 = 7
NUM_ROUNDS_PART_2 = 7
NUM_ROUNDS = NUM_ROUNDS_PART_1 + NUM_ROUNDS_PART_2
EXTRACTION_MAX_A = 25
EXTRACTION_FIXE_B_1 = 1
EXTRACTION_MAX_B_1 = EXTRACTION_MAX_A - EXTRACTION_FIXE_B_1
EXTRACTION_FIXE_B_2 = 1
EXTRACTION_MAX_B_2 = 0
PREDICTION_CONST = 1
PREDICTION_CONST_B_2 = 1.3
PREDICTION_TAUX = 0.013
GROUPE_A = "A"
GROUPE_B = "B"
TYPE_B_1 = "1"
TYPE_B_2 = "2"
# ax - bx2 : attention si ces valeurs sont changées il faut changer dans la fonction calculer_valeur
PARAM_A = 220
PARAM_B = 5
DECISION_TIME = 180 # en secondes
RESULT_TIME = 60 # en secondes
COMMUNICATION_TIME = 120 # en secondes
# Traitements
BASELINE = "baseline" # pas de simulateur en partie 2
SIMULATEUR = "simulateur" # simulateur en partie 2
EXPERT = "expert" # simulateur en partie 2 + conseil d'expert
COMMUNICATION = "communication" # simulateur en partie 2 + communication
COMMUNICATION_ROUNDS = [1, 3, 5]
# MODEL ================================================================================================================
class Subsession(BaseSubsession):
treatment = models.StringField()
simulateur = models.BooleanField()
class Group(BaseGroup):
cout_first_jeton = models.IntegerField()
total_extraction = models.IntegerField()
cout_last_jeton = models.IntegerField()
cout_total_jeton = models.IntegerField()
cout_moyen_jeton = models.FloatField()
# pour expert
group_expert_extraction = models.FloatField()
group_distance = models.IntegerField()
class Player(BasePlayer):
current_part = models.IntegerField()
current_round = models.IntegerField()
groupe_type = models.StringField()
player_type = models.StringField()
extraction = models.IntegerField(min=0)
benefice = models.FloatField()
extraction_proportion = models.FloatField()
extraction_autres = models.IntegerField()
extraction_autres_prediction = models.IntegerField()
difference_prediction = models.IntegerField()
payoff_prediction = models.CurrencyField()
payoff_prediction_cumul = models.CurrencyField()
payoff_f = models.FloatField(initial=0)
payoff_f_cumul = models.FloatField(initial=0)
paid_part = models.IntegerField()
simul_1_indiv = models.IntegerField(blank=True)
simul_2_indiv = models.IntegerField(blank=True)
simul_3_indiv = models.IntegerField(blank=True)
simul_4_indiv = models.IntegerField(blank=True)
simul_5_indiv = models.IntegerField(blank=True)
simul_6_indiv = models.IntegerField(blank=True)
simul_7_indiv = models.IntegerField(blank=True)
simul_8_indiv = models.IntegerField(blank=True)
simul_9_indiv = models.IntegerField(blank=True)
simul_10_indiv = models.IntegerField(blank=True)
simul_1_autres = models.IntegerField(blank=True)
simul_2_autres = models.IntegerField(blank=True)
simul_3_autres = models.IntegerField(blank=True)
simul_4_autres = models.IntegerField(blank=True)
simul_5_autres = models.IntegerField(blank=True)
simul_6_autres = models.IntegerField(blank=True)
simul_7_autres = models.IntegerField(blank=True)
simul_8_autres = models.IntegerField(blank=True)
simul_9_autres = models.IntegerField(blank=True)
simul_10_autres = models.IntegerField(blank=True)
# questionnaire comprehension
understanding_0 = models.StringField(
label=UNDERSTANDING[0]['text'], choices=UNDERSTANDING[0]['propositions'], widget=widgets.RadioSelect)
understanding_1 = models.StringField(
label=UNDERSTANDING[1]['text'], choices=UNDERSTANDING[1]['propositions'], widget=widgets.RadioSelect)
understanding_2 = models.StringField(
label=UNDERSTANDING[2]['text'], choices=UNDERSTANDING[2]['propositions'], widget=widgets.RadioSelect)
understanding_3 = models.StringField(
label=UNDERSTANDING[3]['text'], choices=UNDERSTANDING[3]['propositions'], widget=widgets.RadioSelect)
understanding_4 = models.StringField(
label=UNDERSTANDING[4]['text'], choices=UNDERSTANDING[4]['propositions'], widget=widgets.RadioSelect)
understanding_5 = models.StringField(
label=UNDERSTANDING[5]['text'], choices=UNDERSTANDING[5]['propositions'], widget=widgets.RadioSelect)
understanding_faults = models.IntegerField()
# METHODS ==============================================================================================================
# SUBSESSION
def creating_session(subsession: Subsession):
subsession.treatment = subsession.session.config.get("treatment", C.BASELINE)
# il y a le simulateur en partie 2 lorsque ce n'est pas le baseline
subsession.simulateur = current_part(subsession) == 2 and subsession.treatment != C.BASELINE
if current_part(subsession) == 1 and current_round(subsession) == 1:
subsession.group_randomly()
# dans chaque groupe il y a 2 A et 3 B
for p in subsession.get_players():
p.current_part = 1
p.current_round = p.round_number
p.groupe_type = C.GROUPE_A if p.id_in_group < 3 else C.GROUPE_B
if p.groupe_type == C.GROUPE_B:
p.player_type = C.TYPE_B_1 if p.id_in_group != 5 else C.TYPE_B_2
p.paid_part = random.randint(1, 2)
else:
subsession.group_like_round(1)
for p in subsession.get_players():
p.current_part = current_part(subsession)
p.current_round = current_round(subsession)
p.groupe_type = p.in_round(1).groupe_type
if p.groupe_type == C.GROUPE_B:
p.player_type = p.in_round(1).player_type
p.paid_part = p.in_round(1).paid_part
def current_part(subsession: Subsession):
if subsession.round_number <= C.NUM_ROUNDS_PART_1:
return 1
else:
return 2
def current_round(subsession: Subsession):
if current_part(subsession) == 1:
return subsession.round_number
else:
return subsession.round_number - C.NUM_ROUNDS_PART_1
def vars_for_admin_report(subsession: Subsession):
players_infos = list()
for p in subsession.get_players():
if "jemna" not in p.participant.vars:
partie_1, partie_2 = cu(0), cu(0)
else:
partie_1 = p.participant.vars["jemna"].get("part_1_payoff", cu(0))
partie_2 = p.participant.vars["jemna"].get("part_2_payoff", cu(0))
players_infos.append(
dict(
label=p.participant.label,
partie_1=partie_1,
partie_2=partie_2,
partie_payee=p.paid_part
)
)
return dict(players_infos=players_infos)
def calculer_valeur(D, r_number):
a = 220
b = 5
k = 0.5
c = 0.5
if r_number == 1:
return -(5/2) * (D-a+c) / (b+5*k+15), D
elif r_number == 2:
return - 5 * (D-a+c) / (2*b+10*k+25), D
elif r_number == 3:
return -(5/2) * (D-a+c) / (b+5*k+10), D
elif r_number == 4:
return -5*(D-a+c) / (2*b+10*k+15), D
elif r_number == 5:
return -(5/2) * (D-a+c) / (b+5*k+5), D
elif r_number == 6:
return -5*(D-a+c) / (2*b+10*k+5), D
elif r_number == 7:
return -(5/2) * (D-a+c) / (b+5*k), D
# GROUP ----------------------------------------------------------------------------------------------------------------
def set_cout_jetons(group: Group):
""" Appelée en début de round """
if current_round(group.subsession) == 1:
group.cout_first_jeton = 1
else:
group.cout_first_jeton = group.in_round(group.round_number - 1).cout_last_jeton + 1
def set_expert_values(group: Group):
if current_round(group.subsession) == 1:
group.group_expert_extraction, group.group_distance = calculer_valeur(0, current_round(group.subsession))
else:
distance = (group.in_round(group.round_number - 1).total_extraction +
group.in_round(group.round_number - 1).group_distance)
group.group_expert_extraction, group.group_distance = calculer_valeur(distance, current_round(group.subsession))
def compute_total_extraction(group: Group):
""" Appelée lorsque tous les joueurs du groupe ont fait leur choix """
group.total_extraction = sum([p.extraction for p in group.get_players()])
if group.round_number == 1:
group.cout_last_jeton = group.total_extraction
else:
group.cout_last_jeton = group.cout_first_jeton + group.total_extraction - 1
sum_total = (group.cout_last_jeton * (group.cout_last_jeton + 1)) / 2
sum_debut = ((group.cout_first_jeton - 1) * group.cout_first_jeton) / 2
group.cout_total_jeton = int(sum_total - sum_debut)
group.cout_moyen_jeton = group.cout_total_jeton / group.total_extraction
for p in group.get_players():
compute_payoff(p)
# PLAYER ---------------------------------------------------------------------------------------------------------------
def compute_understanding_faults(player: Player):
nb_faults = 0
for i, question in enumerate(understanding.UNDERSTANDING):
if getattr(player, f"understanding_{i}") != question["solution"]:
nb_faults += 1
player.understanding_faults = nb_faults
def compute_payoff(player: Player):
# gain période
player.benefice = C.PARAM_A * player.extraction - C.PARAM_B * math.pow(player.extraction, 2)
player.extraction_proportion = round((player.extraction / player.group.total_extraction) * 100, 2)
player.extraction_autres = player.group.total_extraction - player.extraction
player.payoff_f = round((player.benefice - player.extraction * player.group.cout_moyen_jeton), 2)
player.payoff = cu(player.payoff_f * player.session.config.get("real_world_currency_per_point"))
# gain prédiction
player.extraction_autres_prediction = getattr(player, f"simul_{current_round(player.subsession)}_autres")
player.difference_prediction = abs(player.extraction_autres_prediction - player.extraction_autres)
if player.groupe_type == C.GROUPE_B and player.player_type == C.TYPE_B_2:
player.payoff_prediction = cu(C.PREDICTION_CONST_B_2 - C.PREDICTION_TAUX * player.difference_prediction)
else:
player.payoff_prediction = cu(C.PREDICTION_CONST - C.PREDICTION_TAUX * player.difference_prediction)
if current_round(player.subsession) == 1:
player.payoff_prediction_cumul = player.payoff_prediction
else:
player.payoff_prediction_cumul = player.in_round(player.round_number - 1).payoff_prediction_cumul + \
player.payoff_prediction
# gain cumulé de la partie
if current_round(player.subsession) == 1:
player.payoff_f_cumul = player.payoff_f
else:
player.payoff_f_cumul = player.in_round(player.round_number - 1).payoff_f_cumul + player.payoff_f
# fin de la partie
if player.round_number == C.NUM_ROUNDS_PART_1:
payoff_cumul_part_1 = cu(player.payoff_f_cumul * player.session.config.get("real_world_currency_per_point"))
part_1_txt_final = f"A la partie 1 vous avez gagné {player.payoff_f_cumul:.2f} ECUs " \
f"soit {payoff_cumul_part_1} pour vos décisions d'extraction. " \
f"Vous avez également gagné {player.payoff_prediction_cumul} pour vos prédictions."
player.participant.vars["jemna"] = dict(
part_1_payoff=payoff_cumul_part_1 + player.payoff_prediction_cumul,
part_1_txt_final=part_1_txt_final
)
elif player.round_number == C.NUM_ROUNDS:
payoff_cumul_part_2 = cu(player.payoff_f_cumul * player.session.config.get("real_world_currency_per_point"))
part_2_txt_final = f"A la partie 2 vous avez gagné {player.payoff_f_cumul:.2f} ECUs " \
f"soit {payoff_cumul_part_2} pour vos décisions d'extraction. " \
f"Vous avez également gagné {player.payoff_prediction_cumul} pour vos prédictions."
part_2_dict = dict(
part_2_txt_final=part_2_txt_final,
part_2_payoff=payoff_cumul_part_2 + player.payoff_prediction_cumul
)
player.participant.vars["jemna"].update(part_2_dict)
# txt_final et payoff
if player.paid_part == 1:
player.participant.vars["jemna"]["payoff"] = player.participant.vars["jemna"]["part_1_payoff"]
else:
player.participant.vars["jemna"]["payoff"] = player.participant.vars["jemna"]["part_2_payoff"]
player.participant.payoff = player.participant.vars["jemna"]["payoff"]
player.participant.vars["jemna"]["txt_final"] = \
player.participant.vars["jemna"]["part_1_txt_final"] + "
" + \
player.participant.vars["jemna"]["part_2_txt_final"] + "
" + \
f"C'est la partie {player.paid_part} qui a été tirée au sort pour votre rémunération."
# PAGES ================================================================================================================
class MyPage(Page):
@staticmethod
def vars_for_template(player: Player):
current_num_rounds = C.NUM_ROUNDS_PART_1 if current_part(player.subsession) == 1 else C.NUM_ROUNDS_PART_2
curr_part = current_part(player.subsession)
if not player.subsession.simulateur:
if curr_part == 1:
rounds_simulateur = [player.round_number]
else:
rounds_simulateur = [player.round_number - C.NUM_ROUNDS_PART_1]
else:
if curr_part == 1:
rounds_simulateur = list(range(player.round_number, C.NUM_ROUNDS_PART_1 + 1))
else:
rounds_simulateur = list(
range(player.round_number - C.NUM_ROUNDS_PART_1, C.NUM_ROUNDS + 1 - C.NUM_ROUNDS_PART_1))
if curr_part == 1:
histo_previous = [p for p in player.in_previous_rounds()]
histo_all = [p for p in player.in_all_rounds()]
histo_simul = []
else:
histo_previous = [p for p in player.in_rounds(C.NUM_ROUNDS_PART_1 + 1, player.round_number - 1)]
histo_all = [p for p in player.in_rounds(C.NUM_ROUNDS_PART_1 + 1, player.round_number)]
curr_round = current_round(player.subsession)
tiret_or_value = lambda x: "-" if x is None else x
if player.subsession.treatment == C.BASELINE or curr_round == 1:
histo_simul = []
else:
histo_simul = []
for i in range(curr_round-1, current_num_rounds + 1):
histo_simul.append(dict(
periode=i,
simul_indiv=tiret_or_value(player.in_round(player.round_number - 1).field_maybe_none(f"simul_{i}_indiv")),
simul_autres=tiret_or_value(player.in_round(player.round_number - 1).field_maybe_none(f"simul_{i}_autres"))
))
constants = C.__dict__.copy()
constants.update(
dict(
current_part=curr_part,
current_round_number=current_round(player.subsession),
current_num_rounds=current_num_rounds,
rounds_simulateur=rounds_simulateur,
histo_previous=histo_previous,
histo_all=histo_all,
histo_simul=histo_simul,
communication_rounds_str=", ".join(map(str, C.COMMUNICATION_ROUNDS))
)
)
return constants
@staticmethod
def js_vars(player: Player):
existing = MyPage.vars_for_template(player)
# suppression car player non-sérialisable
del existing["histo_previous"]
del existing["histo_all"]
existing.update(
dict(
fill_auto=False if player.id_in_subsession == 1 else player.session.config.get("fill_auto", False),
groupe_type=player.groupe_type,
player_type=player.field_maybe_none("player_type"),
round_number=player.round_number,
treatment=player.subsession.treatment,
simulateur=player.subsession.simulateur
)
)
return existing
class Presentation(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
class Instructions(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
class InstructionsWaitMonitor(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and player.session.config.get("instructions_relues", False)
class InstructionsWaitForAll(WaitPage):
wait_for_all_groups = True
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and not player.session.config.get("instructions_relues", False)
class Understanding(MyPage):
form_model = "player"
form_fields = [f"understanding_{i}" for i in range(len(UNDERSTANDING))]
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def before_next_page(player: Player, timeout_happened):
compute_understanding_faults(player)
class UnderstandingResults(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
questionnaire = understanding.UNDERSTANDING.copy()
for i, q in enumerate(questionnaire):
q["reponse"] = getattr(player, f"understanding_{i}")
existing["questionnaire"] = questionnaire
return existing
class UnderstandingWaitForAll(WaitPage):
wait_for_all_groups = True
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
class Information(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
class BeforeDecision(WaitPage):
wait_for_all_groups = True
@staticmethod
def after_all_players_arrive(subsession: Subsession):
for g in subsession.get_groups():
set_cout_jetons(g)
set_expert_values(g)
class Decision(MyPage):
form_model = "player"
@staticmethod
def get_form_fields(player: Player):
form_fields = ["extraction"]
rounds_simulateur = Decision.vars_for_template(player)["rounds_simulateur"]
form_fields.extend([f"simul_{i}_indiv" for i in rounds_simulateur])
form_fields.extend([f"simul_{i}_autres" for i in rounds_simulateur])
return form_fields
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
advice = player.group.group_expert_extraction
if advice < 0:
advice = 0
existing.update(dict(group_expert_extraction=advice))
return existing
class DecisionWaitForGroup(WaitPage):
wait_for_all_groups = False
@staticmethod
def after_all_players_arrive(group: Group):
compute_total_extraction(group)
class Results(MyPage):
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
advice = player.group.group_expert_extraction
if advice < 0:
advice = 0
existing.update(dict(group_expert_extraction=advice))
return existing
class Part2Annonce(MyPage):
@staticmethod
def is_displayed(player: Player):
return current_part(player.subsession) == 2 and current_round(player.subsession) == 1
class Communication(MyPage):
timeout_seconds = C.COMMUNICATION_TIME
timer_text = "Temps restant : "
@staticmethod
def is_displayed(player: Player):
return current_part(player.subsession) == 2 and player.subsession.treatment == C.COMMUNICATION and \
current_round(player.subsession) in C.COMMUNICATION_ROUNDS
class Final(MyPage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def js_vars(player: Player):
existing = MyPage.js_vars(player)
existing["fill_auto"] = False
return existing
# ----------------------------------------------------------------------------------------------------------------------
page_sequence = [
Instructions, InstructionsWaitMonitor, InstructionsWaitForAll,
Information, Part2Annonce,
BeforeDecision, Decision, DecisionWaitForGroup,
Results,
Final
]