import random from decimal import Decimal, ROUND_HALF_UP import math from otree.api import * #from openai import OpenAI from otree.models import session, subsession doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'Uncertainty_Group_BC_LI' PLAYERS_PER_GROUP = 3 NUM_ROUNDS = 11 class Subsession(BaseSubsession): pass class Group(BaseGroup): clear = models.BooleanField() average_price = models.IntegerField() optimization = models.BooleanField() class Player(BasePlayer): # pictures = models.BooleanField() necessary for explanation picture? # if comprehension test is needed, see steph # constants omega_current = models.IntegerField(default = 40) omega_pre = models.IntegerField(default = 40) omega_small = models.IntegerField(default = 20) omega_large = models.IntegerField(default = 60) omega_average = models.IntegerField(default = 40) cost_deduction = models.IntegerField(default = 10) price_aim_parameter_pre = models.IntegerField(default = 15) price_aim_parameter_post = models.IntegerField(default = 25) price_aim_parameter_current = models.IntegerField(default = 15) price_aim_parameter_small = models.IntegerField(default=5) maximum_price = models.IntegerField() shock = models.IntegerField(default=6) shock_announcement = models.BooleanField() display_shock = models.BooleanField() explanation = models.LongStringField(label="Begründung:",max_length=2000) optimization_instruction = models.BooleanField() optimization = models.BooleanField() clear = models.BooleanField() clear_instruction = models.BooleanField() subgroup_id = models.IntegerField() sitzplatznummer = models.IntegerField() # player variables price = models.IntegerField(label= "",min=10) output = models.IntegerField() unit_costs = models.IntegerField() total_costs = models.IntegerField() revenue = models.IntegerField() earnings = models.IntegerField(min=0) per_piece_earnings = models.IntegerField() price_aim = models.IntegerField() #observer values is_observer = models.BooleanField(initial=False) observer_choice_id = models.IntegerField(blank=True) observer_guess = models.IntegerField(blank=True) pre_omega = models.IntegerField() pre_average_price = models.IntegerField() pre_price_aim = models.IntegerField() pre_price_aim_parameter = models.IntegerField() # variables for payoffs payoff_round = models.IntegerField() # if I don't round, then it needs to be a float payoff_acc = models.IntegerField() being_chosen = models.BooleanField(initial=False) bonus_being_chosen = models.CurrencyField(default = 0.5) times_chosen = models.IntegerField() bonus_being_chosen_cum = models.CurrencyField() exchange_rate = models.IntegerField(default=100) show_up_fee = models.CurrencyField(default=5) payoff_mon = models.CurrencyField() payoff_mon_final = models.CurrencyField() payoff_mon_final_otree = models.CurrencyField() # Questionnaire Question_1 = models.IntegerField( choices=[[1, ("Die Produktion erfolgt automatisch in Höhe der Nachfrage.")], [2, ("Die Produktion wird zufällig bestimmt.")], [3, ("Die Produktion hängt vom Durchschnittspreis ab.")]], label="Frage 1: Wie wird die Höhe Ihrer Produktion bestimmt?", widget=widgets.RadioSelect) Question_2 = models.IntegerField( choices=[[1, "Je höher der Preis p, desto höher die Nachfrage y."], [2, "Je höher der Preis p, desto niedriger die Nachfrage y."], [3, "Je niedriger der Preis p, desto niedriger die Nachfrage y."]], label="Frage 2: Welche Aussage über Ihre Nachfrage y und Ihren Preis p ist richtig?", widget=widgets.RadioSelect) Question_3 = models.IntegerField( choices=[[1, ("Je höher der exogene Faktor ω, desto höher die Nachfrage y.")], [2, ("Je höher der exogene Faktor ω, desto niedriger die Nachfrage y.")], [3, ("Je niedriger der exogene Faktor ω, desto höher die Nachfrage y.")]], label="Frage 3: Welche Aussage über Ihre Nachfrage und den exogenen Faktor ω ist korrekt?", widget=widgets.RadioSelect) Question_4 = models.IntegerField( choices=[[1, "Der Preis des eigenen Produkts."], [2, "Der Durchschnittspreis aller Produzenten über alle Runden hinweg."], [3, "Der Durchschnittspreis aller Produzenten in der jeweiligen Runde."]], label="Frage 4: Welche der folgenden Variablen hat einen Einfluss auf die Kosten pro Stück Ihres Produkts?", widget=widgets.RadioSelect) Question_5 = models.IntegerField( choices=[[1, "... werden durch die Preissetzung aller Teams bestimmt und sind daher am Anfang jeder Runde unbekannt."], [2, "... sind zu Beginn jeder Runde bekannt."], [3, "... resultieren aus dem von meinem Team festgelegten Preis."]], label="Frage 5: Die Produktionskosten ...", widget=widgets.RadioSelect) Question_6 = models.IntegerField( choices=[[1, "An alle Teilnehmenden."], [2, "An mein Teammitglied."], [3, "An ein zufällig ausgewähltes anderes Team."]], label="Frage 6: An wen wird Ihr vorläufiger Preisvorschlag weitergeleitet?", widget=widgets.RadioSelect) Question_7 = models.IntegerField( choices=[[1, "Mein finaler Preisvorschlag ist der Preis für das Produkt."], [2, "Der Preis wird zufällig zwischen den vorläufigen Preisvorschlägen ausgewählt, die zwischen den Teammitgliedern ausgetauscht wurden."], [3, "Der Preis wird zufällig aus den finalen Preisvorschlägen ausgewählt, die beide Teammitglieder nach dem Austausch der vorläufigen Preisvorschläge abgegeben haben."]], label="Frage 7: Wie wird der Preis für Ihr Produkt in jeder Runde festgelegt?", widget=widgets.RadioSelect) attempts = models.IntegerField(blank=True) Q1_first_attempt = models.IntegerField(blank=True) Q2_first_attempt = models.IntegerField(blank=True) Q3_first_attempt = models.IntegerField(blank=True) Q4_first_attempt = models.IntegerField(blank=True) Q5_first_attempt = models.IntegerField(blank=True) Q6_first_attempt = models.IntegerField(blank=True) Q7_first_attempt = models.IntegerField(blank=True) #BC questions Question_1_BC = models.IntegerField( choices=[[1, ("Je näher der Teampreis p am Zielpreis z liegt, desto höher der Gewinn.")], [2, ("Je näher der Teampreis p am Zielpreis z liegt, desto niedriger der Gewinn.")], [3, ("Es gibt nur einen Gewinn, wenn der Teampreis p exakt dem Zielpreis z entspricht.")]], label=("Frage 1: Welche Aussage über Ihren Gewinn ist korrekt?"), widget=widgets.RadioSelect) Question_2_BC = models.IntegerField( choices=[[1, ("Je höher der Durchschnittspreis P, desto höher der Zielpreis z.")], [2, ("Je höher der Durchschnittspreis P, desto niedriger der Zielpreis z.")], [3, ("Der Zielpreis z ist unabhängig vom Durchschnittspreis P.")]], label=("Frage 2: Welche Aussage über das Verhältnis zwischen dem Zielpreis z und dem Durchschnittspreis P ist korrekt?"), widget=widgets.RadioSelect) Question_3_BC = models.IntegerField( choices=[ [1, ("Der Zielpreis z liegt 15 Taler unter der Hälfte des Durchschnittspreises P.")], [2, ("Der Zielpreis z liegt 15 Taler über der Hälfte des Durchschnittspreises P.")], [3, ("Der Zielpreis z liegt 15 Taler über dem Durchschnittspreis P.")]], label=("Frage 3: Wie berechnet sich Ihr Zielpreis z?"), widget=widgets.RadioSelect, allow_html = True ) Question_4_BC = models.IntegerField( choices=[[1, "… wird durch die Preissetzung aller Teams bestimmt und ist daher zu Beginn jeder Runde unbekannt."], [2, "… ist zu Beginn jeder Runde bekannt."], [3, "… entspricht immer dem Zielpreis."]], label="Frage 4: Der Durchschnittspreis P ...", widget=widgets.RadioSelect) Question_5_BC = models.IntegerField( choices=[[1, "An alle Teilnehmenden."], [2, "An mein Teammitglied."], [3, "An ein zufällig ausgewähltes anderes Team."]], label="Frage 5: An wen wird Ihr vorläufiger Preisvorschlag weitergeleitet?", widget=widgets.RadioSelect) Question_6_BC = models.IntegerField( choices=[[1, "Mein finaler Preisvorschlag ist der Preis für das Produkt."], [2, "Der Preis wird zufällig zwischen den vorläufigen Preisvorschlägen ausgewählt, die zwischen den Teammitgliedern ausgetauscht wurden."], [3, "Der Preis wird zufällig aus den finalen Preisvorschlägen ausgewählt, die beide Teammitglieder nach dem Austausch der vorläufigen Preisvorschläge abgegeben haben."]], label="Frage 6: Wie wird der Preis für das Produkt Ihres Teams in jeder Runde festgelegt?", widget=widgets.RadioSelect) Q1_BC_first_attempt = models.IntegerField(blank=True) Q2_BC_first_attempt = models.IntegerField(blank=True) Q3_BC_first_attempt = models.IntegerField(blank=True) Q4_BC_first_attempt = models.IntegerField(blank=True) Q5_BC_first_attempt = models.IntegerField(blank=True) Q6_BC_first_attempt = models.IntegerField(blank=True) bonus = models.IntegerField(blank=True) bonus_mon = models.CurrencyField(blank=True) bonus_eligible = models.BooleanField(blank=True) payoff_bonus = models.IntegerField(blank=True) #Demographics Gender = models.IntegerField( choices=[[1, "Weiblich"], [2, "Männlich"], [3, "Divers"]], label="Welchem Geschlecht fühlen Sie sich zugehörig?", widget=widgets.RadioSelect) Age = models.IntegerField(label="Wie alt sind Sie?", min=10, max=99) Faculty = models.IntegerField( choices=[[1, "Sozial- und Bildungswissenschaftliche Fakultät"], [2, "Geistes- und Kulturwissenschaftliche Fakultät"], [3, "Wirtschaftswissenschaftliche Fakultät"], [4, "Juristische Fakultät"], [5, "Fakultät für Informatik und Mathematik "], [6, "An mehreren Fakultäten"], [7, "Sonstiges"]], label="An welcher Fakultät studieren Sie?", widget=widgets.RadioSelect) Makro = models.IntegerField( choices=[[1, "Ja"], [2, "Nein"]], label="Haben Sie bereits die Veranstaltung Makroökonomik besucht?", widget=widgets.RadioSelect) tech_problems = models.LongStringField(label="Haben Sie sonstige Anmerkungen oder sind Ihnen technische Fehler ausgefallen?",max_length=2000, blank=True) #Question on the explanation difficulty_explanation = models.IntegerField( choices=[[1, "Sehr einfach"], [2, "Eher einfach"], [3, "Weder einfach noch schwierig"], [4, "Eher schwierig"], [5, "Sehr schwierig"]], label="Wie schwierig fanden Sie es, während des Experiments eine Begründung für Ihren Preisvorschlag zu formulieren?", widget=widgets.RadioSelectHorizontal ) #decision data keystroke_data = models.LongStringField(blank=True) # Stores JSON keystroke data keystroke_price_data = models.LongStringField(blank=True) # same for price def price_max(player): return player.omega_current def price_min(player): return player.cost_deduction def final_price_max(player): return player.omega_current def final_price_min(player): return player.cost_deduction def price_proposal_max(player): return player.omega_current def price_proposal_min(player): return player.cost_deduction # assign treatments def creating_session(subsession): if subsession.round_number == 1: for group in subsession.get_groups(): all_players = subsession.get_players() num_observers = len(all_players) // 3 observers = all_players[:num_observers] producers = all_players[num_observers:] for player in observers: player.is_observer = True for player in producers: player.is_observer = False import itertools clear_cycle = itertools.cycle([ False, False, True, True]) for group in subsession.get_groups(): group.clear = next(clear_cycle) print(f"Group {group.id_in_subsession}: clear = {group.clear}") import itertools optimization_cycle = itertools.cycle([False, True, False, True, False]) for group in subsession.get_groups(): group.optimization = next(optimization_cycle) print(f"Group {group.id_in_subsession}: optimization = {group.optimization}") else: for group in subsession.get_groups(): group.clear = group.in_round(1).clear group.optimization = group.in_round(1).optimization for player in subsession.get_players(): player.is_observer = player.in_round(1).is_observer # currently this is only for a positive shock of omega --> group statt player for group in subsession.get_groups(): for p in group.get_players(): if 6<= group.round_number <=10: p.omega_current = p.omega_large p.price_aim_parameter_current = p.price_aim_parameter_post for group in subsession.get_groups(): if group.round_number==Player.shock: group.display_shock=True else: group.display_shock=False for group in subsession.get_groups(): for p in group.get_players(): p.optimization_instruction = group.optimization #print(f"Player {p.id_in_group}: optimization_instruction = {p.optimization_instruction}") for group in subsession.get_groups(): for p in group.get_players(): p.clear_instruction = group.clear #print(f"Player {p.id_in_group}: clear_instruction = {p.clear_instruction}") #now the random rematch between rounds for group in subsession.get_groups(): players = group.get_players() # split by role producers = [p for p in players if not p.is_observer] observers = [p for p in players if p.is_observer] # sanity check: we want exactly 2 producers per observer # e.g. 8 producers, 4 observers -> 4 triples assert len(producers) == 2 * len(observers), "Need 2 producers per observer!" random.shuffle(producers) random.shuffle(observers) subgroup_id = 0 for i, obs in enumerate(observers): prod1 = producers[2 * i] prod2 = producers[2 * i + 1] # assign same subgroup_id to 2 producers + 1 observer for p in (prod1, prod2, obs): p.subgroup_id = subgroup_id subgroup_id += 1 def explanation(player): return player.explanation # calculating the earnings of the producers def round_results(group): players = group.get_players() producers = [p for p in players if not p.is_observer] observers = [p for p in players if p.is_observer] if group.round_number <= 10: for p in producers: if p.price is not None and p.price < 10: p.price = 10 if producers: avg = sum(p.price for p in producers) / len(producers) group.average_price = int( Decimal(avg).to_integral_value(rounding=ROUND_HALF_UP) ) else: group.average_price = 0 for p in producers: p.output = (p.omega_current - p.price) if p.output < 0: p.output = 0 p.revenue = p.output * p.price p.unit_costs = group.average_price - p.cost_deduction p.total_costs = p.unit_costs * p.output p.per_piece_earnings = p.price - p.unit_costs p.earnings = (p.price - p.unit_costs) * p.output if p.earnings < 0: p.earnings = 0 #if in BC p.price_aim = round((p.omega_current + group.average_price - p.cost_deduction)/2) else: # round 11: producers don't make new decisions, earnings = 0 in this round group.average_price = 0 for p in producers: p.output = 0 p.revenue = 0 p.unit_costs = 0 p.total_costs = 0 p.per_piece_earnings = 0 p.earnings = 0 # earnings calculation observers for p in observers: if group.round_number in [4, 5, 9, 10] and p.observer_guess is not None: prev_group = group.in_round(group.round_number - 1) pre_average_price = prev_group.average_price prev_players = prev_group.get_players() prev_producer = next(q for q in prev_players if not q.is_observer) pre_omega = prev_producer.omega_current pre_price_aim = prev_producer.price_aim pre_price_aim_parameter = prev_producer.price_aim_parameter_current p.pre_omega = pre_omega p.pre_average_price = pre_average_price p.pre_price_aim = pre_price_aim p.pre_price_aim_parameter = pre_price_aim_parameter p.output = (pre_omega - p.observer_guess) if p.output < 0: p.output = 0 p.revenue = p.output * p.observer_guess p.unit_costs = pre_average_price - p.cost_deduction p.total_costs = p.unit_costs * p.output p.per_piece_earnings = p.observer_guess - p.unit_costs p.earnings = (p.observer_guess - p.unit_costs) * p.output if p.earnings < 0: p.earnings = 0 elif group.round_number in [2, 3, 6, 7, 8, 11] and p.observer_choice_id is not None: prev_group = group.in_round(group.round_number - 1) chosen_producer = next( q for q in prev_group.get_players() if (not q.is_observer) and (q.id_in_subsession == p.observer_choice_id) ) p.observer_guess = chosen_producer.price pre_average_price = prev_group.average_price prev_players = prev_group.get_players() prev_producer = next(q for q in prev_players if not q.is_observer) pre_omega = prev_producer.omega_current pre_price_aim = prev_producer.price_aim pre_price_aim_parameter = prev_producer.price_aim_parameter_current p.pre_omega = pre_omega p.pre_average_price = pre_average_price p.pre_price_aim = pre_price_aim p.pre_price_aim_parameter = pre_price_aim_parameter p.output = (pre_omega - p.observer_guess) if p.output < 0: p.output = 0 p.revenue = p.output * p.observer_guess p.unit_costs = pre_average_price - p.cost_deduction p.total_costs = p.unit_costs * p.output p.per_piece_earnings = p.observer_guess - p.unit_costs p.earnings = (p.observer_guess - p.unit_costs) * p.output if p.earnings < 0: p.earnings = 0 chosen_producer.being_chosen = True else: p.price = 0 p.output = 0 p.revenue = 0 p.unit_costs = 0 p.total_costs = 0 p.per_piece_earnings = 0 p.earnings = 0 players = group.get_players() for p in players: p.payoff_round = p.earnings if group.round_number>1: player_in_all_rounds=p.in_all_rounds() p.payoff_acc = sum([p.payoff_round for p in player_in_all_rounds]) else: p.payoff_acc = p.payoff_round if p.in_round(1).attempts > 1: p.bonus_eligible = False p.bonus = 0 if p.in_round(1).attempts == 1: p.bonus_eligible = True p.bonus = 200 p.bonus_mon = cu(p.bonus).to_real_world_currency(p.session) / p.exchange_rate p.payoff_mon = math.ceil(cu(p.payoff_acc).to_real_world_currency(p.session) / p.exchange_rate) if p.payoff_mon < 0.1: #? p.payoff_mon = 5 p.times_chosen = sum(1 for r in p.in_all_rounds() if r.being_chosen) p.bonus_being_chosen_cum = p.times_chosen * p.bonus_being_chosen p.payoff_mon_final = p.payoff_mon + p.bonus_mon + p.show_up_fee + p.bonus_being_chosen_cum p.payoff_mon_final_otree = p.payoff_mon + p.bonus_mon + p.bonus_being_chosen_cum if group.round_number == C.NUM_ROUNDS: p.payoff=p.payoff_mon_final_otree # PAGES class Place(Page): form_model = 'player' form_fields = ['sitzplatznummer'] @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.label = str(player.sitzplatznummer) @staticmethod def is_displayed(player: Player): return (player.round_number == 1) class Start(Page): @staticmethod def is_displayed(player: Player): return (player.round_number == 1) class Introduction(Page): #timeout_seconds = 240 @staticmethod def is_displayed(player: Player): return (player.round_number == 1) class Instructions(Page): #timeout_seconds = 720 @staticmethod def is_displayed(player: Player): return (player.round_number == 1) form_model = 'player' form_fields = ['Question_1', 'Question_2', 'Question_3', 'Question_4', 'attempts', 'Q1_first_attempt', 'Q2_first_attempt', 'Q3_first_attempt', 'Q4_first_attempt'] class Pre_Comprehension(Page): #timeout_seconds = 240 @staticmethod def is_displayed(player: Player): return (player.round_number == 1) class Comprehension_OT(Page): #timeout_seconds = 720 @staticmethod def is_displayed(player: Player): return (player.round_number == 1) and player.group.optimization == True form_model = 'player' form_fields = ['Question_1', 'Question_2', 'Question_3', 'Question_4', 'Question_5', 'Question_6', 'Question_7', 'attempts', 'Q1_first_attempt', 'Q2_first_attempt', 'Q3_first_attempt', 'Q4_first_attempt', 'Q5_first_attempt', 'Q6_first_attempt', 'Q7_first_attempt'] class Comprehension_BC(Page): #timeout_seconds = 720 @staticmethod def is_displayed(player: Player): return (player.round_number == 1) and player.group.optimization == False form_model = 'player' form_fields = ['Question_1_BC', 'Question_2_BC', 'Question_3_BC', 'Question_4_BC', 'Question_5_BC','Question_6_BC', 'attempts', 'Q1_BC_first_attempt', 'Q2_BC_first_attempt', 'Q3_BC_first_attempt', 'Q4_BC_first_attempt', 'Q5_BC_first_attempt','Q6_BC_first_attempt' ] # yet no dropout marker right now. if I include it, then include it for the payoff as well class StartWait(WaitPage): @staticmethod def is_displayed(player: Player): return (player.round_number == 1) # after_all_players_arrive = 'assigned_roles' title_text = "Bitte warten Sie einen Moment." body_text = "Bitte warten Sie, bis die anderen Spieler bereit sind, um mit dem Experiment zu beginnen. " \ "Das Experiment wird in Kürze beginnen." class Price_and_Explanation(Page): @staticmethod def app_after_this_page(player, timeout_happened): if timeout_happened: return None # Stay on the same page if time runs out @staticmethod def is_displayed(player: Player): return (not player.is_observer) and player.round_number in [1,2,5,6,7,10] form_model = 'player' form_fields = ['price','explanation', 'keystroke_data', 'keystroke_price_data'] class PriceProposal(Page): @staticmethod def is_displayed(player: Player): return (not player.is_observer) and player.round_number in [3, 4, 8, 9] form_model = 'player' form_fields = ['price', 'keystroke_price_data'] class ObserverWait(Page): @staticmethod def is_displayed(player: Player): return player.is_observer and player.round_number == 1 title_text = "Observer wait." class ObserverWithExplanation(Page): @staticmethod def is_displayed(player: Player): return player.is_observer and player.round_number in [2, 3, 6, 7, 8, 11] @staticmethod def vars_for_template(player: Player): prev_round = player.round_number - 1 prev_me = player.in_round(prev_round) prev_group = prev_me.group prev_producers = [ q for q in prev_group.get_players() if (q.subgroup_id == prev_me.subgroup_id) and (not q.is_observer) ] options = [ dict( producer_id =q.id_in_subsession, price=q.price, explanation=q.explanation, ) for q in prev_producers ] return dict( options=options, prev_round=prev_round, ) form_model = 'player' form_fields = ['observer_choice_id'] class ObserverWithoutExplanation(Page): @staticmethod def is_displayed(player: Player): return player.is_observer and player.round_number in [4, 5, 9, 10] @staticmethod def vars_for_template(player: Player): prev_round = player.round_number - 1 prev_me = player.in_round(prev_round) prev_group = prev_me.group prev_producers = [ q for q in prev_group.get_players() if (q.subgroup_id == prev_me.subgroup_id) and (not q.is_observer) ] prices = [q.price for q in prev_producers] options = [ dict( producer_id=q.id_in_subsession, price=q.price, ) for q in prev_producers ] return dict( options=options, prev_round=prev_round, ) form_model = 'player' form_fields = ['observer_guess'] class ResultsWait(WaitPage): after_all_players_arrive='round_results' @staticmethod def is_displayed(player: Player): # All players, all rounds return True title_text = "Bitte warten Sie einen Moment." body_text = "Bitte warten Sie kurz bis alle Teilnehmenden eine Entscheidung getroffen haben. " \ "Es geht in Kürze weiter." class ResultsFeedbackProducers(Page): @staticmethod def is_displayed(player: Player): return (player.round_number <= 10) and (not player.is_observer) class ResultsFeedbackObservers(Page): @staticmethod def is_displayed(player: Player): return (player.round_number >= 2) and player.is_observer @staticmethod def vars_for_template(player: Player): prev_round = player.round_number - 1 return dict( prev_round=prev_round, ) class ShockAnnouncement(Page): @staticmethod def is_displayed(player: Player): if not player.is_observer: return player.round_number == player.shock # observers see it one round later else: return player.round_number == player.shock + 1 class ProducerWait(Page): @staticmethod def is_displayed(player: Player): return (not player.is_observer) and player.round_number == 11 class FinalResults(Page): @staticmethod def is_displayed(player: Player): return (player.round_number == 11) class Demographics(Page): @staticmethod def is_displayed(player: Player): return (player.round_number == 11) form_model = 'player' form_fields = ['Gender', 'Age', 'Faculty', 'Makro', 'tech_problems'] class FinalPage(Page): @staticmethod def is_displayed(player: Player): return (player.round_number == 11) page_sequence = [Place, Start, Pre_Comprehension, Comprehension_OT, Comprehension_BC, StartWait, ShockAnnouncement, Price_and_Explanation, PriceProposal, ObserverWithExplanation, ObserverWithoutExplanation, ResultsWait, ResultsFeedbackProducers, ResultsFeedbackObservers, FinalResults, Demographics, FinalPage]