import itertools import json as json import random as random import numpy as np import pandas as pd from otree.api import * author = 'Your name here' doc = """ Main part of the experiment """ def load_math_options(): with open( f'AttentionAllocation/static/Addition_Increasing_Difficulty/task_list_5_rounds.json', 'r' ) as read_file: task_list = json.load(read_file) return task_list def load_pwd_tasks(type='LinearDifficulty'): with open(f'AttentionAllocation/static/Code_{type}/counting_tasks.json', 'r') as read_file: task_list = json.load(read_file) return task_list def make_math_ability_field(): return models.IntegerField( blank=True, label='', ) def make_pwd_ability_field(): return models.IntegerField( blank=True, label='', ) class Constants(BaseConstants): name_in_url = '9H32djdqaaBa' players_per_group = None num_rounds = 5 num_comp = int(num_rounds / 2) num_participants = 10 # ordering = ['BA','AB'] # Load tasks from JSON addition_task = load_math_options() # linear_task = load_pwd_tasks(type='LinearDifficulty') convex_task = load_pwd_tasks(type='DecreasingDifficulty') # Payoff Constants payoff_incorrect = cu(0) pwd_rate = 10 pwd_piecerate = cu(pwd_rate) math_rate = 10 math_piecerate = cu(math_rate) task = 'Baseline' production_profile = ['Convex'] # ['Linear','Convex'] timeout = 40 num_pwd_tasks = 10 num_math_tasks = 10 likert_scale_agree = [ (1, 'I completely disagree'), (2, 'I somewhat disagree'), (3, 'Neither agree nor disagree'), (4, 'I somewhat agree'), (5, 'I completely agree'), ] class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): # Task task = models.StringField() # Production function type production_profile = models.StringField() # Check correct answers math_correct = models.IntegerField(initial=0) # Check correct answers pwd_correct = models.IntegerField(initial=0) # Optimal Score # optimal_score = models.FloatField() # Optimal Switch Point # optimal_switch_point = models.IntegerField() # Most important timer math_time = models.FloatField() pwd_time = models.FloatField() ###### sequence task ############################ math_1 = make_math_ability_field() math_2 = make_math_ability_field() math_3 = make_math_ability_field() math_4 = make_math_ability_field() math_5 = make_math_ability_field() math_6 = make_math_ability_field() math_7 = make_math_ability_field() math_8 = make_math_ability_field() math_9 = make_math_ability_field() math_10 = make_math_ability_field() ### Cumulated Time needed for one sequence ################ math_1_time = models.FloatField(blank=True) math_2_time = models.FloatField(blank=True) math_3_time = models.FloatField(blank=True) math_4_time = models.FloatField(blank=True) math_5_time = models.FloatField(blank=True) math_6_time = models.FloatField(blank=True) math_7_time = models.FloatField(blank=True) math_8_time = models.FloatField(blank=True) math_9_time = models.FloatField(blank=True) math_10_time = models.FloatField(blank=True) ### Number tries needed to find the correct input math_1_tries = models.IntegerField(blank=True) math_2_tries = models.IntegerField(blank=True) math_3_tries = models.IntegerField(blank=True) math_4_tries = models.IntegerField(blank=True) math_5_tries = models.IntegerField(blank=True) math_6_tries = models.IntegerField(blank=True) math_7_tries = models.IntegerField(blank=True) math_8_tries = models.IntegerField(blank=True) math_9_tries = models.IntegerField(blank=True) math_10_tries = models.IntegerField(blank=True) # Timer timer_task1 = models.FloatField() new_timer_task1 = models.FloatField() timer_task2 = models.FloatField() new_timer_task2 = models.FloatField() total_time = models.FloatField() # Prompt Counter prompt_counter = models.IntegerField() # Switch Counter switch_counter = models.IntegerField() # Switch Timer switch_time_1 = models.FloatField(blank=True) switch_time_2 = models.FloatField(blank=True) switch_time_3 = models.FloatField(blank=True) switch_time_4 = models.FloatField(blank=True) switch_time_5 = models.FloatField(blank=True) switch_time_6 = models.FloatField(blank=True) switch_time_7 = models.FloatField(blank=True) switch_time_8 = models.FloatField(blank=True) switch_time_9 = models.FloatField(blank=True) switch_time_10 = models.FloatField(blank=True) # Switch Timer switch_1_math_perf = models.IntegerField(blank=True) switch_2_math_perf = models.IntegerField(blank=True) switch_3_math_perf = models.IntegerField(blank=True) switch_4_math_perf = models.IntegerField(blank=True) switch_5_math_perf = models.IntegerField(blank=True) switch_6_math_perf = models.IntegerField(blank=True) switch_7_math_perf = models.IntegerField(blank=True) switch_8_math_perf = models.IntegerField(blank=True) switch_9_math_perf = models.IntegerField(blank=True) switch_10_math_perf = models.IntegerField(blank=True) switch_1_pwd_perf = models.IntegerField(blank=True) switch_2_pwd_perf = models.IntegerField(blank=True) switch_3_pwd_perf = models.IntegerField(blank=True) switch_4_pwd_perf = models.IntegerField(blank=True) switch_5_pwd_perf = models.IntegerField(blank=True) switch_6_pwd_perf = models.IntegerField(blank=True) switch_7_pwd_perf = models.IntegerField(blank=True) switch_8_pwd_perf = models.IntegerField(blank=True) switch_9_pwd_perf = models.IntegerField(blank=True) switch_10_pwd_perf = models.IntegerField(blank=True) # Time Spent Main_Instructions_time_spent = models.FloatField(blank=True) Task_time_spent = models.FloatField(blank=True) Exit_Q1_time_spent = models.FloatField(blank=True) Exit_Q2_time_spent = models.FloatField(blank=True) # Warnings Main_Instructions_warnings = models.IntegerField() StartRound_warnings = models.IntegerField() Task_warnings = models.IntegerField() Exit_Q1_warnings = models.IntegerField() Exit_Q2_warnings = models.IntegerField() task_option_this_round = models.IntegerField() ## Control questions control_q1 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)], initial=None) control_q2 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)], initial=None) control_q3 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)], initial=None) q_subjective_sophistication_1 = models.PositiveIntegerField( choices=Constants.likert_scale_agree, label='I chose a good allocation of my time between the two tasks.', widget=widgets.RadioSelectHorizontal(), ) q_subjective_optimality_1 = models.IntegerField( choices=range(1, Constants.timeout + 1), widget=widgets.RadioSelect ) # Payoffs and Piecerates payoff_pwd = models.CurrencyField() payoff_math = models.CurrencyField() ### Cumulated Time needed for one math task ################ pwd_1_time = models.FloatField(blank=True) pwd_2_time = models.FloatField(blank=True) pwd_3_time = models.FloatField(blank=True) pwd_4_time = models.FloatField(blank=True) pwd_5_time = models.FloatField(blank=True) pwd_6_time = models.FloatField(blank=True) pwd_7_time = models.FloatField(blank=True) pwd_8_time = models.FloatField(blank=True) pwd_9_time = models.FloatField(blank=True) pwd_10_time = models.FloatField(blank=True) pwd_1 = make_pwd_ability_field() pwd_2 = make_pwd_ability_field() pwd_3 = make_pwd_ability_field() pwd_4 = make_pwd_ability_field() pwd_5 = make_pwd_ability_field() pwd_6 = make_pwd_ability_field() pwd_7 = make_pwd_ability_field() pwd_8 = make_pwd_ability_field() pwd_9 = make_pwd_ability_field() pwd_10 = make_pwd_ability_field() # Number of tries pwd_1_tries = models.IntegerField(blank=True) pwd_2_tries = models.IntegerField(blank=True) pwd_3_tries = models.IntegerField(blank=True) pwd_4_tries = models.IntegerField(blank=True) pwd_5_tries = models.IntegerField(blank=True) pwd_6_tries = models.IntegerField(blank=True) pwd_7_tries = models.IntegerField(blank=True) pwd_8_tries = models.IntegerField(blank=True) pwd_9_tries = models.IntegerField(blank=True) pwd_10_tries = models.IntegerField(blank=True) # Number of toggles pwd_1_toggles = models.IntegerField(blank=True) pwd_2_toggles = models.IntegerField(blank=True) pwd_3_toggles = models.IntegerField(blank=True) pwd_4_toggles = models.IntegerField(blank=True) pwd_5_toggles = models.IntegerField(blank=True) pwd_6_toggles = models.IntegerField(blank=True) pwd_7_toggles = models.IntegerField(blank=True) pwd_8_toggles = models.IntegerField(blank=True) pwd_9_toggles = models.IntegerField(blank=True) pwd_10_toggles = models.IntegerField(blank=True) # FUNCTIONS def creating_session(subsession: Subsession): ##### This code is for obtaining a balanced session ############ task_profiles = Constants.production_profile.copy() # shuffle the list random.shuffle(task_profiles) # make a cycle out of the shuffled list profiles = itertools.cycle(task_profiles) for p in subsession.get_players(): # Establishes task ordering & Incentives for Math Task if subsession.round_number == 1: try: p.participant.vars['production_profile'] except KeyError: # p.participant.vars['task_treatment'] = random.choice(Constants.task_treatment) p.participant.vars['production_profile'] = next(profiles) p.task = Constants.task p.task_option_this_round = subsession.round_number - 1 p.production_profile = p.participant.vars['production_profile'] def control_q1_error_message(player: Player, value): """Customized error message for control question 1.""" cond = value != 30 if cond: return 'Your input is incorrect. Please try again!' def control_q2_error_message(player: Player, value): """Customized error message for control question 2.""" cond = value != 60 if cond: return 'Your input is incorrect. Please try again!' def control_q3_error_message(player: Player, value): """Customized error message for control question 3.""" cond = value != 90 if cond: return 'Your input is incorrect. Please try again!' def q_subjective_optimality_1_error_message(player: Player, value): """Customized error message for subjective optimality.""" if value == None: return 'Your input is incorrect. Please try again!' # Questions def set_payoff(player: Player): if player.production_profile == 'Linear': pwd_solutions = Constants.linear_task[player.task_option_this_round][0] elif player.production_profile == 'Convex': pwd_solutions = Constants.convex_task[player.task_option_this_round][0] pwd_1 = player.field_maybe_none('pwd_1') pwd_2 = player.field_maybe_none('pwd_2') pwd_3 = player.field_maybe_none('pwd_3') pwd_4 = player.field_maybe_none('pwd_4') pwd_5 = player.field_maybe_none('pwd_5') pwd_6 = player.field_maybe_none('pwd_6') pwd_7 = player.field_maybe_none('pwd_7') pwd_8 = player.field_maybe_none('pwd_8') pwd_9 = player.field_maybe_none('pwd_9') pwd_10 = player.field_maybe_none('pwd_10') pwd_answers = [ pwd_1, pwd_2, pwd_3, pwd_4, pwd_5, pwd_6, pwd_7, pwd_8, pwd_9, pwd_10, ] player.pwd_correct = sum([1 for x, y in zip(pwd_answers, pwd_solutions) if x == y]) player.payoff_pwd = player.pwd_correct * Constants.pwd_piecerate math_solutions = Constants.addition_task[player.task_option_this_round][1] math_1 = player.field_maybe_none('math_1') math_2 = player.field_maybe_none('math_2') math_3 = player.field_maybe_none('math_3') math_4 = player.field_maybe_none('math_4') math_5 = player.field_maybe_none('math_5') math_6 = player.field_maybe_none('math_6') math_7 = player.field_maybe_none('math_7') math_8 = player.field_maybe_none('math_8') math_9 = player.field_maybe_none('math_9') math_10 = player.field_maybe_none('math_10') math_answers = [ math_1, math_2, math_3, math_4, math_5, math_6, math_7, math_8, math_9, math_10, ] player.math_correct = sum([1 for x, y in zip(math_answers, math_solutions) if x == y]) player.payoff_math = player.math_correct * Constants.math_piecerate ### Compute total payoff player.payoff = player.payoff_math + player.payoff_pwd # Put it into the ether # Pwd player.participant.vars[f'base_pwd_round_{player.round_number}'] = player.pwd_correct player.participant.vars[f'base_pwd_payoff_round_{player.round_number}'] = player.payoff_pwd # Math player.participant.vars[f'base_math_round_{player.round_number}'] = player.math_correct player.participant.vars[f'base_math_payoff_round_{player.round_number}'] = player.payoff_math def comp_opt_alloc(player: Player): p_vars = player.participant.vars # Simplifies code later math_cols = [ 'math_t_1', 'math_t_2', 'math_t_3', 'math_t_4', 'math_t_5', 'math_t_6', 'math_t_7', 'math_t_8', 'math_t_9', 'math_t_10', ] mem_cols = [ 'pwd_t_1', 'pwd_t_2', 'pwd_t_3', 'pwd_t_4', 'pwd_t_5', 'pwd_t_6', 'pwd_t_7', 'pwd_t_8', 'pwd_t_9', 'pwd_t_10', ] # Create df of only pwd data pwddict = {col: p_vars[col] for col in pwd_cols} df_pwd = pd.DataFrame(pwddict) # Create df of only math data mathdict = {col: p_vars[col] for col in math_cols} df_math = pd.DataFrame(mathdict) # Create respective production function rslt_pwd = get_production_func(df_pwd, 'pwd') rslt_math = get_production_func(df_math, 'math') # Find optimal allocation df_both = pd.DataFrame({'pwd': rslt_mem, 'math': rslt_math}) get_optimal_allocation(df_both, p_vars) def write_math_time(player: Player): p_vars = player.participant.vars p_vars['last_round_math_time'] = player.field_maybe_none('math_time') p_vars['gen_num_pwd_tasks'] = Constants.num_pwd_tasks p_vars['gen_num_math_tasks'] = Constants.num_math_tasks p_vars['gen_num_rounds'] = Constants.num_rounds # PAGES class Main_Instructions(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 form_model = 'player' form_fields = [ 'Main_Instructions_time_spent', 'Main_Instructions_warnings', 'control_q1', 'control_q2', 'control_q3', ] @staticmethod def js_vars(player: Player): return dict( page_name='Main_Instructions', ) @staticmethod def vars_for_template(player: Player): # conversion = self.session.config['real_world_currency_per_point'] total_pwd = Constants.pwd_rate * Constants.num_pwd_tasks pwd_example = 6 pwd_example_earnings = pwd_example * Constants.pwd_piecerate math_example = 4 math_example_earnings = Constants.math_piecerate * math_example total_example_earnings = math_example_earnings + pwd_example_earnings return dict( total_pwd=total_pwd, pwd_example=pwd_example, pwd_example_earnings=pwd_example_earnings, math_example=math_example, math_example_earnings=math_example_earnings, total_example_earnings=total_example_earnings, ) @staticmethod def error_message(player: Player, values): """Customized error message for control question 2. The error message is not actually displayed. This function simply validates the form for us. For the actual error message code look at the html code of the page. """ cond_1 = values['control_q1'] != 30 cond_2 = values['control_q2'] != 60 cond_3 = values['control_q3'] != 90 if cond_1: return 'Your input is incorrect! Please read the instructions again.' elif cond_2: return 'Your input is incorrect! Please read the instructions again.' elif cond_3: return 'Your input is incorrect! Please read the instructions again.' class StartRound(Page): # oTree Timer timer_text = 'Time left until the round starts:' timeout_seconds = 5 @staticmethod def vars_for_template(player: Player): return dict(round_num=player.round_number) @staticmethod def js_vars(player: Player): return dict( page_name='StartRound', ) form_model = 'player' form_fields = ['StartRound_warnings'] class MathMemory(Page): # oTree Timer timer_text = 'Time left in this round:' timeout_seconds = Constants.timeout @staticmethod def before_next_page(player: Player, timeout_happened): set_payoff(player) if player.round_number == Constants.num_rounds: write_math_time(player) @staticmethod def vars_for_template(player: Player): p = player label = Constants.addition_task[player.round_number - 1][0] if p.production_profile == 'Linear': pwd_label = Constants.linear_task[player.round_number - 1][0] elif p.production_profile == 'Convex': pwd_label = Constants.convex_task[player.round_number - 1][0] filler = ' = ' return dict( math_1_label=label[0] + filler, math_2_label=label[1] + filler, math_3_label=label[2] + filler, math_4_label=label[3] + filler, math_5_label=label[4] + filler, math_6_label=label[5] + filler, math_7_label=label[6] + filler, math_8_label=label[7] + filler, math_9_label=label[8] + filler, math_10_label=label[9] + filler, pwd_1_label=pwd_label[0], pwd_2_label=pwd_label[1], pwd_3_label=pwd_label[2], pwd_4_label=pwd_label[3], pwd_5_label=pwd_label[4], pwd_6_label=pwd_label[5], pwd_7_label=pwd_label[6], pwd_8_label=pwd_label[7], pwd_9_label=pwd_label[8], pwd_10_label=pwd_label[9], ) form_model = 'player' form_fields = [ 'timer_task1', 'new_timer_task1', 'timer_task2', 'new_timer_task2', 'pwd_time', 'total_time', 'prompt_counter', 'switch_counter', 'Task_time_spent', 'Task_warnings', 'math_1', 'math_2', 'math_3', 'math_4', 'math_5', 'math_6', 'math_7', 'math_8', 'math_9', 'math_10', 'math_time', 'math_1_time', 'math_2_time', 'math_3_time', 'math_4_time', 'math_5_time', 'math_6_time', 'math_7_time', 'math_8_time', 'math_9_time', 'math_10_time', 'math_1_tries', 'math_2_tries', 'math_3_tries', 'math_4_tries', 'math_5_tries', 'math_6_tries', 'math_7_tries', 'math_8_tries', 'math_9_tries', 'math_10_tries', 'switch_time_1', 'switch_time_2', 'switch_time_3', 'switch_time_4', 'switch_time_5', 'switch_time_6', 'switch_time_7', 'switch_time_8', 'switch_time_9', 'switch_time_10', 'switch_1_math_perf', 'switch_2_math_perf', 'switch_3_math_perf', 'switch_4_math_perf', 'switch_5_math_perf', 'switch_6_math_perf', 'switch_7_math_perf', 'switch_8_math_perf', 'switch_9_math_perf', 'switch_10_math_perf', 'switch_1_pwd_perf', 'switch_2_pwd_perf', 'switch_3_pwd_perf', 'switch_4_pwd_perf', 'switch_5_pwd_perf', 'switch_6_pwd_perf', 'switch_7_pwd_perf', 'switch_8_pwd_perf', 'switch_9_pwd_perf', 'switch_10_pwd_perf', 'pwd_1', 'pwd_2', 'pwd_3', 'pwd_4', 'pwd_5', 'pwd_6', 'pwd_7', 'pwd_8', 'pwd_9', 'pwd_10', 'pwd_time', 'pwd_1_time', 'pwd_2_time', 'pwd_3_time', 'pwd_4_time', 'pwd_5_time', 'pwd_6_time', 'pwd_7_time', 'pwd_8_time', 'pwd_9_time', 'pwd_10_time', 'pwd_1_tries', 'pwd_2_tries', 'pwd_3_tries', 'pwd_4_tries', 'pwd_5_tries', 'pwd_6_tries', 'pwd_7_tries', 'pwd_8_tries', 'pwd_9_tries', 'pwd_10_tries', 'pwd_1_toggles', 'pwd_2_toggles', 'pwd_3_toggles', 'pwd_4_toggles', 'pwd_5_toggles', 'pwd_6_toggles', 'pwd_7_toggles', 'pwd_8_toggles', 'pwd_9_toggles', 'pwd_10_toggles', ] class A(MathMemory): # def is_displayed(self): # cond1 = (self.player.order_tasks == 'AB') & (self.round_number <= 8) # cond2 = (self.player.order_tasks == 'BA') & (self.round_number > 8) # return (cond1 | cond2) pass def js_vars(player: Player): p = player if p.production_profile == 'Linear': pwd_sol = Constants.linear_task[p.round_number - 1][0] elif p.production_profile == 'Convex': pwd_sol = Constants.convex_task[p.round_number - 1][0] math_sol = Constants.addition_task[p.round_number - 1][1] return dict( pwd_solutions_array=pwd_sol, math_solutions_array=math_sol, page_name='Task', num_pwd_tasks=Constants.num_pwd_tasks, num_math_tasks=Constants.num_math_tasks, ) class Exit_Q1(Page): @staticmethod def is_displayed(player: Player): return player.round_number == Constants.num_rounds @staticmethod def js_vars(player: Player): return dict( page_name='Exit_Q1', ) form_model = 'player' form_fields = ['Exit_Q1_warnings', 'Exit_Q1_time_spent', 'q_subjective_sophistication_1'] class Exit_Q2(Page): @staticmethod def is_displayed(player: Player): return player.round_number == Constants.num_rounds @staticmethod def js_vars(player: Player): return dict( page_name='Exit_Q2', ) form_model = 'player' form_fields = ['Exit_Q2_warnings', 'Exit_Q2_time_spent', 'q_subjective_optimality_1'] @staticmethod def error_message(player: Player, values): """Customized error message for control question 2. The error message is not actually displayed. This function simply validates the form for us. For the actual error message code look at the html code of the page. """ if values['q_subjective_optimality_1'] == None: return "Please indicate your answer by clicking on the slider." page_sequence = [Main_Instructions, StartRound, A, Exit_Q1, Exit_Q2]