import math import random from otree.api import * from .understanding import UNDERSTANDING doc = """ Projet Jemna """ class C(BaseConstants): NAME_IN_URL = 'jemna' PLAYERS_PER_GROUP_A = 2 PLAYERS_PER_GROUP_B_1 = 2 PLAYERS_PER_GROUP_B_2 = 1 PLAYERS_PER_GROUP_B = PLAYERS_PER_GROUP_B_1 + PLAYERS_PER_GROUP_B_2 PLAYERS_PER_GROUP = PLAYERS_PER_GROUP_A + PLAYERS_PER_GROUP_B OTHER_PLAYERS = PLAYERS_PER_GROUP - 1 NUM_ROUNDS_PART_1 = 7 NUM_ROUNDS_PART_2 = 7 NUM_ROUNDS = NUM_ROUNDS_PART_1 + NUM_ROUNDS_PART_2 EXTRACTION_MAX_A = 25 EXTRACTION_FIXE_B_1 = 1 EXTRACTION_MAX_B_1 = EXTRACTION_MAX_A - EXTRACTION_FIXE_B_1 EXTRACTION_FIXE_B_2 = 1 EXTRACTION_MAX_B_2 = 0 PREDICTION_CONST = 1 PREDICTION_CONST_B_2 = 1.3 PREDICTION_TAUX = 0.013 GROUPE_A = "A" GROUPE_B = "B" TYPE_B_1 = "1" TYPE_B_2 = "2" # ax - bx2 : attention si ces valeurs sont changées il faut changer dans la fonction calculer_valeur PARAM_A = 220 PARAM_B = 5 DECISION_TIME = 180 # en secondes RESULT_TIME = 60 # en secondes COMMUNICATION_TIME = 120 # en secondes # Traitements BASELINE = "baseline" # pas de simulateur en partie 2 SIMULATEUR = "simulateur" # simulateur en partie 2 EXPERT = "expert" # simulateur en partie 2 + conseil d'expert COMMUNICATION = "communication" # simulateur en partie 2 + communication COMMUNICATION_ROUNDS = [1, 3, 5] # MODEL ================================================================================================================ class Subsession(BaseSubsession): treatment = models.StringField() simulateur = models.BooleanField() class Group(BaseGroup): cout_first_jeton = models.IntegerField() total_extraction = models.IntegerField() cout_last_jeton = models.IntegerField() cout_total_jeton = models.IntegerField() cout_moyen_jeton = models.FloatField() # pour expert group_expert_extraction = models.FloatField() group_distance = models.IntegerField() class Player(BasePlayer): current_part = models.IntegerField() current_round = models.IntegerField() groupe_type = models.StringField() player_type = models.StringField() extraction = models.IntegerField(min=0) benefice = models.FloatField() extraction_proportion = models.FloatField() extraction_autres = models.IntegerField() extraction_autres_prediction = models.IntegerField() difference_prediction = models.IntegerField() payoff_prediction = models.CurrencyField() payoff_prediction_cumul = models.CurrencyField() payoff_f = models.FloatField(initial=0) payoff_f_cumul = models.FloatField(initial=0) paid_part = models.IntegerField() simul_1_indiv = models.IntegerField(blank=True) simul_2_indiv = models.IntegerField(blank=True) simul_3_indiv = models.IntegerField(blank=True) simul_4_indiv = models.IntegerField(blank=True) simul_5_indiv = models.IntegerField(blank=True) simul_6_indiv = models.IntegerField(blank=True) simul_7_indiv = models.IntegerField(blank=True) simul_8_indiv = models.IntegerField(blank=True) simul_9_indiv = models.IntegerField(blank=True) simul_10_indiv = models.IntegerField(blank=True) simul_1_autres = models.IntegerField(blank=True) simul_2_autres = models.IntegerField(blank=True) simul_3_autres = models.IntegerField(blank=True) simul_4_autres = models.IntegerField(blank=True) simul_5_autres = models.IntegerField(blank=True) simul_6_autres = models.IntegerField(blank=True) simul_7_autres = models.IntegerField(blank=True) simul_8_autres = models.IntegerField(blank=True) simul_9_autres = models.IntegerField(blank=True) simul_10_autres = models.IntegerField(blank=True) # questionnaire comprehension understanding_0 = models.StringField( label=UNDERSTANDING[0]['text'], choices=UNDERSTANDING[0]['propositions'], widget=widgets.RadioSelect) understanding_1 = models.StringField( label=UNDERSTANDING[1]['text'], choices=UNDERSTANDING[1]['propositions'], widget=widgets.RadioSelect) understanding_2 = models.StringField( label=UNDERSTANDING[2]['text'], choices=UNDERSTANDING[2]['propositions'], widget=widgets.RadioSelect) understanding_3 = models.StringField( label=UNDERSTANDING[3]['text'], choices=UNDERSTANDING[3]['propositions'], widget=widgets.RadioSelect) understanding_4 = models.StringField( label=UNDERSTANDING[4]['text'], choices=UNDERSTANDING[4]['propositions'], widget=widgets.RadioSelect) understanding_5 = models.StringField( label=UNDERSTANDING[5]['text'], choices=UNDERSTANDING[5]['propositions'], widget=widgets.RadioSelect) understanding_faults = models.IntegerField() # METHODS ============================================================================================================== # SUBSESSION def creating_session(subsession: Subsession): subsession.treatment = subsession.session.config.get("treatment", C.BASELINE) # il y a le simulateur en partie 2 lorsque ce n'est pas le baseline subsession.simulateur = current_part(subsession) == 2 and subsession.treatment != C.BASELINE if current_part(subsession) == 1 and current_round(subsession) == 1: subsession.group_randomly() # dans chaque groupe il y a 2 A et 3 B for p in subsession.get_players(): p.current_part = 1 p.current_round = p.round_number p.groupe_type = C.GROUPE_A if p.id_in_group < 3 else C.GROUPE_B if p.groupe_type == C.GROUPE_B: p.player_type = C.TYPE_B_1 if p.id_in_group != 5 else C.TYPE_B_2 p.paid_part = random.randint(1, 2) else: subsession.group_like_round(1) for p in subsession.get_players(): p.current_part = current_part(subsession) p.current_round = current_round(subsession) p.groupe_type = p.in_round(1).groupe_type if p.groupe_type == C.GROUPE_B: p.player_type = p.in_round(1).player_type p.paid_part = p.in_round(1).paid_part def current_part(subsession: Subsession): if subsession.round_number <= C.NUM_ROUNDS_PART_1: return 1 else: return 2 def current_round(subsession: Subsession): if current_part(subsession) == 1: return subsession.round_number else: return subsession.round_number - C.NUM_ROUNDS_PART_1 def vars_for_admin_report(subsession: Subsession): players_infos = list() for p in subsession.get_players(): if "jemna" not in p.participant.vars: partie_1, partie_2 = cu(0), cu(0) else: partie_1 = p.participant.vars["jemna"].get("part_1_payoff", cu(0)) partie_2 = p.participant.vars["jemna"].get("part_2_payoff", cu(0)) players_infos.append( dict( label=p.participant.label, partie_1=partie_1, partie_2=partie_2, partie_payee=p.paid_part ) ) return dict(players_infos=players_infos) def calculer_valeur(D, r_number): a = 220 b = 5 k = 0.5 c = 0.5 if r_number == 1: return -(5/2) * (D-a+c) / (b+5*k+15), D elif r_number == 2: return - 5 * (D-a+c) / (2*b+10*k+25), D elif r_number == 3: return -(5/2) * (D-a+c) / (b+5*k+10), D elif r_number == 4: return -5*(D-a+c) / (2*b+10*k+15), D elif r_number == 5: return -(5/2) * (D-a+c) / (b+5*k+5), D elif r_number == 6: return -5*(D-a+c) / (2*b+10*k+5), D elif r_number == 7: return -(5/2) * (D-a+c) / (b+5*k), D # GROUP ---------------------------------------------------------------------------------------------------------------- def set_cout_jetons(group: Group): """ Appelée en début de round """ if current_round(group.subsession) == 1: group.cout_first_jeton = 1 else: group.cout_first_jeton = group.in_round(group.round_number - 1).cout_last_jeton + 1 def set_expert_values(group: Group): if current_round(group.subsession) == 1: group.group_expert_extraction, group.group_distance = calculer_valeur(0, current_round(group.subsession)) else: distance = (group.in_round(group.round_number - 1).total_extraction + group.in_round(group.round_number - 1).group_distance) group.group_expert_extraction, group.group_distance = calculer_valeur(distance, current_round(group.subsession)) def compute_total_extraction(group: Group): """ Appelée lorsque tous les joueurs du groupe ont fait leur choix """ group.total_extraction = sum([p.extraction for p in group.get_players()]) if group.round_number == 1: group.cout_last_jeton = group.total_extraction else: group.cout_last_jeton = group.cout_first_jeton + group.total_extraction - 1 sum_total = (group.cout_last_jeton * (group.cout_last_jeton + 1)) / 2 sum_debut = ((group.cout_first_jeton - 1) * group.cout_first_jeton) / 2 group.cout_total_jeton = int(sum_total - sum_debut) group.cout_moyen_jeton = group.cout_total_jeton / group.total_extraction for p in group.get_players(): compute_payoff(p) # PLAYER --------------------------------------------------------------------------------------------------------------- def compute_understanding_faults(player: Player): nb_faults = 0 for i, question in enumerate(understanding.UNDERSTANDING): if getattr(player, f"understanding_{i}") != question["solution"]: nb_faults += 1 player.understanding_faults = nb_faults def compute_payoff(player: Player): # gain période player.benefice = C.PARAM_A * player.extraction - C.PARAM_B * math.pow(player.extraction, 2) player.extraction_proportion = round((player.extraction / player.group.total_extraction) * 100, 2) player.extraction_autres = player.group.total_extraction - player.extraction player.payoff_f = round((player.benefice - player.extraction * player.group.cout_moyen_jeton), 2) player.payoff = cu(player.payoff_f * player.session.config.get("real_world_currency_per_point")) # gain prédiction player.extraction_autres_prediction = getattr(player, f"simul_{current_round(player.subsession)}_autres") player.difference_prediction = abs(player.extraction_autres_prediction - player.extraction_autres) if player.groupe_type == C.GROUPE_B and player.player_type == C.TYPE_B_2: player.payoff_prediction = cu(C.PREDICTION_CONST_B_2 - C.PREDICTION_TAUX * player.difference_prediction) else: player.payoff_prediction = cu(C.PREDICTION_CONST - C.PREDICTION_TAUX * player.difference_prediction) if current_round(player.subsession) == 1: player.payoff_prediction_cumul = player.payoff_prediction else: player.payoff_prediction_cumul = player.in_round(player.round_number - 1).payoff_prediction_cumul + \ player.payoff_prediction # gain cumulé de la partie if current_round(player.subsession) == 1: player.payoff_f_cumul = player.payoff_f else: player.payoff_f_cumul = player.in_round(player.round_number - 1).payoff_f_cumul + player.payoff_f # fin de la partie if player.round_number == C.NUM_ROUNDS_PART_1: payoff_cumul_part_1 = cu(player.payoff_f_cumul * player.session.config.get("real_world_currency_per_point")) part_1_txt_final = f"A la partie 1 vous avez gagné {player.payoff_f_cumul:.2f} ECUs " \ f"soit {payoff_cumul_part_1} pour vos décisions d'extraction. " \ f"Vous avez également gagné {player.payoff_prediction_cumul} pour vos prédictions." player.participant.vars["jemna"] = dict( part_1_payoff=payoff_cumul_part_1 + player.payoff_prediction_cumul, part_1_txt_final=part_1_txt_final ) elif player.round_number == C.NUM_ROUNDS: payoff_cumul_part_2 = cu(player.payoff_f_cumul * player.session.config.get("real_world_currency_per_point")) part_2_txt_final = f"A la partie 2 vous avez gagné {player.payoff_f_cumul:.2f} ECUs " \ f"soit {payoff_cumul_part_2} pour vos décisions d'extraction. " \ f"Vous avez également gagné {player.payoff_prediction_cumul} pour vos prédictions." part_2_dict = dict( part_2_txt_final=part_2_txt_final, part_2_payoff=payoff_cumul_part_2 + player.payoff_prediction_cumul ) player.participant.vars["jemna"].update(part_2_dict) # txt_final et payoff if player.paid_part == 1: player.participant.vars["jemna"]["payoff"] = player.participant.vars["jemna"]["part_1_payoff"] else: player.participant.vars["jemna"]["payoff"] = player.participant.vars["jemna"]["part_2_payoff"] player.participant.payoff = player.participant.vars["jemna"]["payoff"] player.participant.vars["jemna"]["txt_final"] = \ player.participant.vars["jemna"]["part_1_txt_final"] + "
" + \ player.participant.vars["jemna"]["part_2_txt_final"] + "
" + \ f"C'est la partie {player.paid_part} qui a été tirée au sort pour votre rémunération." # PAGES ================================================================================================================ class MyPage(Page): @staticmethod def vars_for_template(player: Player): current_num_rounds = C.NUM_ROUNDS_PART_1 if current_part(player.subsession) == 1 else C.NUM_ROUNDS_PART_2 curr_part = current_part(player.subsession) if not player.subsession.simulateur: if curr_part == 1: rounds_simulateur = [player.round_number] else: rounds_simulateur = [player.round_number - C.NUM_ROUNDS_PART_1] else: if curr_part == 1: rounds_simulateur = list(range(player.round_number, C.NUM_ROUNDS_PART_1 + 1)) else: rounds_simulateur = list( range(player.round_number - C.NUM_ROUNDS_PART_1, C.NUM_ROUNDS + 1 - C.NUM_ROUNDS_PART_1)) if curr_part == 1: histo_previous = [p for p in player.in_previous_rounds()] histo_all = [p for p in player.in_all_rounds()] histo_simul = [] else: histo_previous = [p for p in player.in_rounds(C.NUM_ROUNDS_PART_1 + 1, player.round_number - 1)] histo_all = [p for p in player.in_rounds(C.NUM_ROUNDS_PART_1 + 1, player.round_number)] curr_round = current_round(player.subsession) tiret_or_value = lambda x: "-" if x is None else x if player.subsession.treatment == C.BASELINE or curr_round == 1: histo_simul = [] else: histo_simul = [] for i in range(curr_round-1, current_num_rounds + 1): histo_simul.append(dict( periode=i, simul_indiv=tiret_or_value(player.in_round(player.round_number - 1).field_maybe_none(f"simul_{i}_indiv")), simul_autres=tiret_or_value(player.in_round(player.round_number - 1).field_maybe_none(f"simul_{i}_autres")) )) constants = C.__dict__.copy() constants.update( dict( current_part=curr_part, current_round_number=current_round(player.subsession), current_num_rounds=current_num_rounds, rounds_simulateur=rounds_simulateur, histo_previous=histo_previous, histo_all=histo_all, histo_simul=histo_simul, communication_rounds_str=", ".join(map(str, C.COMMUNICATION_ROUNDS)) ) ) return constants @staticmethod def js_vars(player: Player): existing = MyPage.vars_for_template(player) # suppression car player non-sérialisable del existing["histo_previous"] del existing["histo_all"] existing.update( dict( fill_auto=False if player.id_in_subsession == 1 else player.session.config.get("fill_auto", False), groupe_type=player.groupe_type, player_type=player.field_maybe_none("player_type"), round_number=player.round_number, treatment=player.subsession.treatment, simulateur=player.subsession.simulateur ) ) return existing class Presentation(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class InstructionsWaitMonitor(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 and player.session.config.get("instructions_relues", False) class InstructionsWaitForAll(WaitPage): wait_for_all_groups = True @staticmethod def is_displayed(player: Player): return player.round_number == 1 and not player.session.config.get("instructions_relues", False) class Understanding(MyPage): form_model = "player" form_fields = [f"understanding_{i}" for i in range(len(UNDERSTANDING))] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): compute_understanding_faults(player) class UnderstandingResults(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): existing = MyPage.vars_for_template(player) questionnaire = understanding.UNDERSTANDING.copy() for i, q in enumerate(questionnaire): q["reponse"] = getattr(player, f"understanding_{i}") existing["questionnaire"] = questionnaire return existing class UnderstandingWaitForAll(WaitPage): wait_for_all_groups = True @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Information(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class BeforeDecision(WaitPage): wait_for_all_groups = True @staticmethod def after_all_players_arrive(subsession: Subsession): for g in subsession.get_groups(): set_cout_jetons(g) set_expert_values(g) class Decision(MyPage): form_model = "player" @staticmethod def get_form_fields(player: Player): form_fields = ["extraction"] rounds_simulateur = Decision.vars_for_template(player)["rounds_simulateur"] form_fields.extend([f"simul_{i}_indiv" for i in rounds_simulateur]) form_fields.extend([f"simul_{i}_autres" for i in rounds_simulateur]) return form_fields @staticmethod def vars_for_template(player: Player): existing = MyPage.vars_for_template(player) advice = player.group.group_expert_extraction if advice < 0: advice = 0 existing.update(dict(group_expert_extraction=advice)) return existing class DecisionWaitForGroup(WaitPage): wait_for_all_groups = False @staticmethod def after_all_players_arrive(group: Group): compute_total_extraction(group) class Results(MyPage): @staticmethod def vars_for_template(player: Player): existing = MyPage.vars_for_template(player) advice = player.group.group_expert_extraction if advice < 0: advice = 0 existing.update(dict(group_expert_extraction=advice)) return existing class Part2Annonce(MyPage): @staticmethod def is_displayed(player: Player): return current_part(player.subsession) == 2 and current_round(player.subsession) == 1 class Communication(MyPage): timeout_seconds = C.COMMUNICATION_TIME timer_text = "Temps restant : " @staticmethod def is_displayed(player: Player): return current_part(player.subsession) == 2 and player.subsession.treatment == C.COMMUNICATION and \ current_round(player.subsession) in C.COMMUNICATION_ROUNDS class Final(MyPage): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def js_vars(player: Player): existing = MyPage.js_vars(player) existing["fill_auto"] = False return existing # ---------------------------------------------------------------------------------------------------------------------- page_sequence = [ Instructions, InstructionsWaitMonitor, InstructionsWaitForAll, Information, Part2Annonce, BeforeDecision, Decision, DecisionWaitForGroup, Results, Final ]