import random
from otree.api import *
from . import lexicon_fr
from . import lexicon_tn
from . import understanding_fr
from . import understanding_tn
doc = """
CPR dynamique par groupe de 5.
Treatments :
- baseline
- simulateur
- communication
- expert
"""
class C(BaseConstants):
NAME_IN_URL = 'jemnabis'
REAL_WORLD_CURRENCY_PER_POINT = 0.004
PLAYERS_PER_GROUP = 5
OTHER_PLAYERS = PLAYERS_PER_GROUP - 1
NUM_ROUNDS_TEST = 5
NUM_ROUNDS_PHASE_1 = 5
NUM_ROUNDS_PHASE_2 = 5
NUM_ROUNDS = NUM_ROUNDS_TEST + NUM_ROUNDS_PHASE_1 + NUM_ROUNDS_PHASE_2
EXTRACTION_MAX = 25
BASELINE = "baseline"
SIMULATEUR = "simulateur"
COMMUNICATION = "communication"
EXPERT = "expert"
DECISION_TIME = 180 # en secondes
RESULT_TIME = 60 # en secondes
COMMUNICATION_TIME = 120 # en secondes
COMMUNICATION_ROUNDS = [1, 3, 5]
PARAM_A = 220
PARAM_B = 5
PARAM_C = 0.5
PARAM_K = 0.5
INITIAL_D = 0 # la profondeur
FORMULE_RECETTE = f"\({PARAM_A} \\times E - {PARAM_B} \\times E^2 \)"
# ======================================================================================================================
#
# -- STATIC METHODS
#
# ======================================================================================================================
def get_current_part(round_number):
if round_number <= C.NUM_ROUNDS_TEST:
return 0
elif C.NUM_ROUNDS_TEST < round_number <= (C.NUM_ROUNDS_TEST + C.NUM_ROUNDS_PHASE_1):
return 1
else:
return 2
def get_current_round(round_number):
if get_current_part(round_number) == 0:
return round_number
elif get_current_part(round_number) == 1:
return round_number - C.NUM_ROUNDS_TEST
else:
return round_number - C.NUM_ROUNDS_TEST - C.NUM_ROUNDS_PHASE_1
def get_expert_advice(round_number):
return 30
# ======================================================================================================================
#
# -- SUBSESSION
#
# ======================================================================================================================
class Subsession(BaseSubsession):
language = models.StringField()
treatment = models.StringField()
current_part = models.IntegerField()
current_round = models.IntegerField()
simulateur = models.BooleanField(initial=False)
def creating_session(subsession: Subsession):
if subsession.round_number == 1:
# traitement
subsession.treatment = subsession.session.config.get("treatment", C.BASELINE)
# Langue
subsession.language = subsession.session.config.get("language", "fr")
if subsession.language == 'tn':
subsession.session.lexicon = lexicon_tn.Lexicon
subsession.session.understanding = understanding_tn.get_understanding(C.__dict__.copy())
else:
subsession.session.lexicon = lexicon_fr.Lexicon
subsession.session.understanding = understanding_fr.get_understanding(C.__dict__.copy())
# Groupes
subsession.group_randomly()
else:
subsession.group_like_round(1)
sub_first_round = subsession.in_round(1)
subsession.language = sub_first_round.language
subsession.treatment = sub_first_round.treatment
# partie et round
subsession.current_part = get_current_part(subsession.round_number)
subsession.current_round = get_current_round(subsession.round_number)
# simulateur
if subsession.current_part == 2 and subsession.treatment != C.BASELINE:
subsession.simulateur = True
# ======================================================================================================================
#
# -- GROUP
#
# ======================================================================================================================
class Group(BaseGroup):
cout_first_jeton = models.IntegerField()
total_extraction = models.IntegerField()
cout_last_jeton = models.IntegerField()
cout_total_jeton = models.IntegerField()
cout_moyen_jeton = models.FloatField()
# pour expert
group_expert_extraction = models.FloatField()
group_distance = models.IntegerField()
def init_round(group):
if get_current_round(group.round_number) == 1:
group.cout_first_jeton = 1
else:
group.cout_first_jeton = group.in_round(group.round_number - 1).cout_last_jeton + 1
def set_group_extraction_and_payoffs(group: Group):
group.total_extraction = sum([p.extraction for p in group.get_players()])
if get_current_round(group.round_number) == 1:
group.cout_last_jeton = group.total_extraction
else:
group.cout_last_jeton = group.cout_first_jeton + group.total_extraction - 1
sum_total = (group.cout_last_jeton * (group.cout_last_jeton + 1)) / 2
sum_debut = ((group.cout_first_jeton - 1) * group.cout_first_jeton) / 2
group.cout_total_jeton = int(sum_total - sum_debut)
# group.cout_moyen_jeton = group.cout_total_jeton / group.total_extraction
if group.total_extraction != 0:
group.cout_moyen_jeton = group.cout_total_jeton / group.total_extraction
else:
group.cout_moyen_jeton = 0
for p in group.get_players():
compute_payoff(p)
# ======================================================================================================================
#
# -- PLAYER
#
# ======================================================================================================================
class Player(BasePlayer):
understanding_1 = models.StringField()
understanding_2 = models.StringField()
understanding_3 = models.StringField()
understanding_4 = models.StringField()
understanding_5 = models.StringField()
understanding_faults = models.IntegerField()
extraction = models.IntegerField(min=0, max=C.EXTRACTION_MAX)
benefice = models.FloatField()
extraction_autres_estimation = models.IntegerField(min=0, max=C.OTHER_PLAYERS * C.EXTRACTION_MAX)
extraction_autres = models.IntegerField()
payoff_ecu = models.FloatField()
payoff_ecu_cumul = models.FloatField()
payoff_cumul = models.CurrencyField()
# -- simulations
simul_2_indiv = models.IntegerField(blank=True)
simul_3_indiv = models.IntegerField(blank=True)
simul_4_indiv = models.IntegerField(blank=True)
simul_5_indiv = models.IntegerField(blank=True)
simul_2_autres = models.IntegerField(blank=True)
simul_3_autres = models.IntegerField(blank=True)
simul_4_autres = models.IntegerField(blank=True)
simul_5_autres = models.IntegerField(blank=True)
def compute_payoff(player: Player):
player.benefice = C.PARAM_A * player.extraction - C.PARAM_B * pow(player.extraction, 2)
player.extraction_autres = player.group.total_extraction - player.extraction
player.payoff_ecu = (player.benefice - player.extraction * player.group.cout_moyen_jeton)
player.payoff = cu(player.payoff_ecu * C.REAL_WORLD_CURRENCY_PER_POINT)
if get_current_round(player.round_number) == 1:
player.payoff_ecu_cumul = player.payoff_ecu
player.payoff_cumul = player.payoff
else:
prev_p = player.in_round(player.round_number - 1)
player.payoff_ecu_cumul = prev_p.payoff_ecu_cumul + player.payoff_ecu
player.payoff_cumul = prev_p.payoff_cumul + player.payoff
# -- FIN DE PARTIE -------------------------------------------------------------------------------------------------
if get_current_part(player.round_number) == 0 and get_current_round(player.round_number) == C.NUM_ROUNDS_TEST:
player.participant.vars["jemnabis_test"] = dict(
txt_final=player.session.lexicon.get_txt_final(player.payoff_ecu_cumul, player.payoff_cumul),
payoff=player.payoff_cumul
)
player.participant.payoff = 0
elif get_current_part(player.round_number) == 1 and get_current_round(player.round_number) == C.NUM_ROUNDS_PHASE_1:
player.participant.vars["jemnabis_P1"] = dict(
txt_final=player.session.lexicon.get_txt_final(player.payoff_ecu_cumul, player.payoff_cumul),
payoff=player.payoff_cumul
)
player.participant.payoff = 0
elif get_current_part(player.round_number) == 2 and get_current_round(player.round_number) == C.NUM_ROUNDS_PHASE_2:
player.participant.vars["jemnabis_P2"] = dict(
txt_final=player.session.lexicon.get_txt_final(player.payoff_ecu_cumul, player.payoff_cumul),
payoff=player.payoff_cumul
)
player.participant.payoff = 0
def get_histo_previous(player: Player):
round_start = [1, C.NUM_ROUNDS_TEST + 1, C.NUM_ROUNDS_TEST + C.NUM_ROUNDS_PHASE_1 + 1][
player.subsession.current_part]
if player.round_number > round_start:
histo_previous = [p for p in player.in_rounds(round_start, player.round_number - 1)]
else:
histo_previous = []
return histo_previous
def get_histo_all(player: Player):
round_start = [1, C.NUM_ROUNDS_TEST + 1, C.NUM_ROUNDS_TEST + C.NUM_ROUNDS_PHASE_1 + 1][
player.subsession.current_part]
histo_all = [p for p in player.in_rounds(round_start, player.round_number)]
return histo_all
def get_rounds_simulateur(player: Player):
if player.subsession.simulateur and player.subsession.current_part == 2 and player.round_number < C.NUM_ROUNDS:
rounds_simulateur = range(get_current_round(player.round_number) + 1, C.NUM_ROUNDS_PHASE_2 + 1)
else:
rounds_simulateur = []
return rounds_simulateur
# ======================================================================================================================
#
# -- PAGES
#
# ======================================================================================================================
class MyPage(Page):
@staticmethod
def vars_for_template(player: Player):
return dict(
language=player.subsession.language,
Lexicon=player.session.lexicon,
)
@staticmethod
def js_vars(player: Player):
parameters = C.__dict__.copy()
parameters.update(dict(
fill_auto=player.session.config.get("fill_auto", False),
current_part=player.subsession.current_part,
current_round=player.subsession.current_round,
language=player.subsession.language,
simulateur=player.subsession.simulateur
))
return parameters
class Instructions(MyPage):
@staticmethod
def is_displayed(player: Player):
return (player.subsession.current_part == 0 or player.subsession.current_part == 2) and \
player.subsession.current_round == 1
class InstructionsWaitMonitor(MyPage):
@staticmethod
def is_displayed(player: Player):
return Instructions.is_displayed(player)
class Understanding(MyPage):
form_model = "player"
@staticmethod
def get_form_fields(player: Player):
return [f"understanding_{i + 1}" for i in range(len(player.session.vars["understanding"]))]
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
existing.update(dict(
understanding=player.session.vars["understanding"]
))
return existing
@staticmethod
def js_vars(player: Player):
existing = MyPage.js_vars(player)
existing.update(dict(
understanding=player.session.vars["understanding"]
))
return existing
@staticmethod
def before_next_page(player, timeout_happened):
understanding = player.session.vars["understanding"]
if timeout_happened:
for i, q in enumerate(understanding):
setattr(player, f"understanding_{i + 1}", random.choice(q["propositions"]))
faults = 0
for i, q in enumerate(understanding):
if getattr(player, f"understanding_{i + 1}") != q["solution"]:
faults += 1
player.understanding_faults = faults
class UnderstandingResults(MyPage):
@staticmethod
def is_displayed(player: Player):
return Understanding.is_displayed(player)
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
understanding = player.session.vars["understanding"].copy()
for i, q in enumerate(understanding):
q["reponse"] = getattr(player, f"understanding_{i + 1}")
existing.update(dict(understanding=understanding))
return existing
class UnderstandingsWaitForAll(WaitPage):
wait_for_all_groups = True
@staticmethod
def is_displayed(player: Player):
return Understanding.is_displayed(player)
class NewPhase(MyPage):
def is_displayed(player: Player):
return player.subsession.current_part < 2 and player.subsession.current_round == 1
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
num_rounds = C.NUM_ROUNDS_TEST if player.subsession.current_part == 0 else C.NUM_ROUNDS_PHASE_1
existing.update(dict(
txt=player.session.lexicon.get_newPhase_txt(player.subsession.current_part, num_rounds)
))
return existing
class InitRound(WaitPage):
wait_for_all_groups = False
def after_all_players_arrive(group):
init_round(group)
class Decision(MyPage):
form_model = "player"
@staticmethod
def get_form_fields(player: Player):
form_fields = ["extraction", "extraction_autres_estimation"]
rounds_simulateur = get_rounds_simulateur(player)
form_fields.extend([f"simul_{i}_indiv" for i in rounds_simulateur])
form_fields.extend([f"simul_{i}_autres" for i in rounds_simulateur])
return form_fields
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
existing.update(dict(
histo_previous=get_histo_previous(player),
rounds_simulateur=get_rounds_simulateur(player)
))
return existing
@staticmethod
def before_next_page(player, timeout_happened):
if timeout_happened:
for field in Decision.get_form_fields(player):
if field == "extraction" or "indiv" in field:
setattr(player, field, random.randint(0, C.EXTRACTION_MAX))
else:
setattr(player, field, random.randint(0, C.OTHER_PLAYERS * C.EXTRACTION_MAX))
class DecisionWaitForAll(WaitPage):
wait_for_all_groups = True
@staticmethod
def after_all_players_arrive(subsession: Subsession):
for g in subsession.get_groups():
set_group_extraction_and_payoffs(g)
class Results(MyPage):
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
existing.update(dict(
histo_all=get_histo_all(player),
txt_result=player.session.lexicon.get_txt_result(player)
))
return existing
class ResultsWaitForAll(WaitPage):
wait_for_all_groups = True
class EndPhase(MyPage):
@staticmethod
def is_displayed(player: Player):
if player.subsession.current_part == 0:
return player.subsession.current_round == C.NUM_ROUNDS_TEST
elif player.subsession.current_part == 1:
return player.subsession.current_round == C.NUM_ROUNDS_PHASE_1
elif player.subsession.current_part == 2:
return player.subsession.current_round == C.NUM_ROUNDS_PHASE_2
@staticmethod
def vars_for_template(player: Player):
existing = MyPage.vars_for_template(player)
existing.update(
dict(
histo_all=get_histo_all(player)
)
)
return existing
page_sequence = [
Instructions, InstructionsWaitMonitor,
Understanding, UnderstandingResults, UnderstandingsWaitForAll,
NewPhase,
InitRound, Decision, DecisionWaitForAll,
Results, ResultsWaitForAll,
EndPhase,
]