from otree.api import ( models, widgets, BaseConstants, BaseSubsession, BaseGroup, BasePlayer, Currency as c, currency_range ) import random as random import itertools import json as json import pandas as pd import numpy as np import math author = 'Your name here' doc = """ Main part of the experiment """ def load_math_options(): with open(f'Treatment/static/Addition_Increasing_Difficulty/task_list_16_rounds.json', 'r') as read_file: task_list = json.load(read_file) return task_list def load_pwd_tasks(type='LinearDifficulty'): with open(f'Treatment/static/Code_{type}/counting_tasks.json', 'r') as read_file: task_list = json.load(read_file) return task_list def make_math_ability_field(): return models.IntegerField( blank=True, label='', ) def make_pwd_ability_field(): return models.IntegerField( blank=True, label='', ) def make_wtp_field(): return models.BooleanField() ## Production Function def get_production_func(df,task,timeout=40): """Obtain production function on second level Computes for every second the expected points each subject obtains. Parameters ------------ df : pd.DataFrame Information in standard (subject-round)-format. task : str `math` : math task. `mem` : memory. Returns ------------ df_res : pd.DataFrame Expected value in with seconds as index. """ df_res = pd.Series() if task == 'math': max_num = 10 elif task == 'pwd': max_num = 10 for sec in range(0,timeout+1): tab = (df<=sec).mean() for num in reversed(range(1,max_num)): weight = np.array(tab[f'{task}_t_{num}']-tab[f'{task}_t_{num+1}']) if (num+1) == max_num: # Treat the last round extra exp_val = ((num+1) * np.array(tab[f'{task}_t_{num+1}'])) exp_val += (num * weight) df_res.loc[sec] = exp_val #df_res.index = tab.index return df_res def get_optimal_allocation(df,p_vars): """ Find the optimal allocation of time between 'mem' and 'math' activities. Parameters: df (pd.DataFrame): DataFrame containing scores for 'mem' and 'math' activities Returns: tuple: A tuple of two elements: rslt (pd.DataFrame): DataFrame with columns 'switch_point', 'math_score', 'mem_score', and 'total_score' detail_dict (dict): A dictionary with individual level results for each subject The function iterates over all seconds, calculates scores for 'mem' and 'math' activities, and finds the optimal allocation that results in the highest total score. The results are returned as a DataFrame with columns 'switch_point', 'math_score', 'mem_score', and 'total_score' and as a dictionary with individual level results. """ allotted_time=Constants.timeout xs = np.ones((3,(allotted_time+1)))*np.nan for sec in range(0,allotted_time+1): pwd = df.loc[allotted_time-sec]['pwd'] math = df.loc[sec]['math'] xs[0,sec] = math xs[1,sec] = pwd xs[2,sec] = math+pwd # Optimal allocation alloc = np.argmax(xs[2,:]) p_vars['switch_point'] = int(alloc) p_vars['opt_math_score'] = round(float(xs[0,alloc]),3) p_vars['opt_pwd_score'] = round(float(xs[1,alloc]),3) p_vars['opt_total_score'] = round(float(xs[2,alloc]),3) class Constants(BaseConstants): name_in_url = '9H32Ddw4gdqBa' players_per_group = None num_rounds = 5 num_comp = int(num_rounds/2) num_participants = 10 #ordering = ['BA','AB'] likert_scale_agree = [(1, 'I completely disagree'), (2, 'I somewhat disagree'), (3, 'Neither agree nor disagree'), (4, 'I somewhat agree'), (5, 'I completely agree')] # Load tasks from JSON addition_task = load_math_options() #linear_task = load_pwd_tasks(type='LinearDifficulty') convex_task = load_pwd_tasks(type='DecreasingDifficulty') # Payoff Constants payoff_incorrect = c(0) pwd_rate_high = 12 pwd_piecerate_high = c(pwd_rate_high) math_rate_high = 12 math_piecerate_high = c(math_rate_high) pwd_rate_low = 10 pwd_piecerate_low = c(pwd_rate_low) math_rate_low = 10 math_piecerate_low = c(math_rate_low) task = 'MainPart' production_profile = ['Convex']#['Linear','Convex'] treatments = ['Control','Reminder','WTP'] incentives = ['High','Low'] timeout = 40 num_pwd_tasks = 10 num_math_tasks = 10 randomization_weights = [0,0,1]#[45/100,45/100,10/100] class Subsession(BaseSubsession): def creating_session(self): ##### This code is for obtaining a balanced session ############ task_profiles = Constants.production_profile.copy() # shuffle the list random.shuffle(task_profiles) # make a cycle out of the shuffled list profiles = itertools.cycle(task_profiles) for p in self.get_players(): # Establishes task ordering & Incentives for Math Task if self.round_number == 1: try: p.participant.vars['production_profile'] except KeyError: #p.participant.vars['task_treatment'] = random.choice(Constants.task_treatment) p.participant.vars['production_profile'] = next(profiles) p.task = Constants.task p.task_option_this_round = (self.round_number - 1) p.production_profile = p.participant.vars['production_profile'] class Group(BaseGroup): pass class Player(BasePlayer): # Task task = models.StringField() # Production function type production_profile = models.StringField() # Treatment treatment = models.StringField() ### WTP implementation random_num = models.IntegerField(blank=True) rel_decision = models.IntegerField(blank=True) WTP_cost = models.FloatField(blank=True) WTP_pos = models.BooleanField(blank=True) incentive_level = models.StringField() # Check correct answers math_correct = models.IntegerField(initial=0) # Check correct answers pwd_correct = models.IntegerField(initial=0) # Optimal Score optimal_score = models.FloatField() # Optimal Switch Point optimal_switch_point = models.IntegerField() # Most important timer math_time = models.FloatField() pwd_time = models.FloatField() ###### sequence task ############################ math_1 = make_math_ability_field() math_2 = make_math_ability_field() math_3 = make_math_ability_field() math_4 = make_math_ability_field() math_5 = make_math_ability_field() math_6 = make_math_ability_field() math_7 = make_math_ability_field() math_8 = make_math_ability_field() math_9 = make_math_ability_field() math_10 = make_math_ability_field() ### Cumulated Time needed for one sequence ################ math_1_time = models.FloatField(blank=True) math_2_time = models.FloatField(blank=True) math_3_time = models.FloatField(blank=True) math_4_time = models.FloatField(blank=True) math_5_time = models.FloatField(blank=True) math_6_time = models.FloatField(blank=True) math_7_time = models.FloatField(blank=True) math_8_time = models.FloatField(blank=True) math_9_time = models.FloatField(blank=True) math_10_time = models.FloatField(blank=True) ### Number tries needed to find the correct input math_1_tries = models.IntegerField(blank=True) math_2_tries = models.IntegerField(blank=True) math_3_tries = models.IntegerField(blank=True) math_4_tries = models.IntegerField(blank=True) math_5_tries = models.IntegerField(blank=True) math_6_tries = models.IntegerField(blank=True) math_7_tries = models.IntegerField(blank=True) math_8_tries = models.IntegerField(blank=True) math_9_tries = models.IntegerField(blank=True) math_10_tries = models.IntegerField(blank=True) # Timer timer_task1 = models.FloatField() new_timer_task1 = models.FloatField() timer_task2 = models.FloatField() new_timer_task2 = models.FloatField() total_time = models.FloatField() # Prompt Counter prompt_counter = models.IntegerField() # Switch Counter switch_counter = models.IntegerField() # Switch Timer switch_time_1 = models.FloatField(blank=True) switch_time_2 = models.FloatField(blank=True) switch_time_3 = models.FloatField(blank=True) switch_time_4 = models.FloatField(blank=True) switch_time_5 = models.FloatField(blank=True) switch_time_6 = models.FloatField(blank=True) switch_time_7 = models.FloatField(blank=True) switch_time_8 = models.FloatField(blank=True) switch_time_9 = models.FloatField(blank=True) switch_time_10 = models.FloatField(blank=True) # Switch Timer switch_1_math_perf = models.IntegerField(blank=True) switch_2_math_perf = models.IntegerField(blank=True) switch_3_math_perf = models.IntegerField(blank=True) switch_4_math_perf = models.IntegerField(blank=True) switch_5_math_perf = models.IntegerField(blank=True) switch_6_math_perf = models.IntegerField(blank=True) switch_7_math_perf = models.IntegerField(blank=True) switch_8_math_perf = models.IntegerField(blank=True) switch_9_math_perf = models.IntegerField(blank=True) switch_10_math_perf = models.IntegerField(blank=True) switch_1_pwd_perf = models.IntegerField(blank=True) switch_2_pwd_perf = models.IntegerField(blank=True) switch_3_pwd_perf = models.IntegerField(blank=True) switch_4_pwd_perf = models.IntegerField(blank=True) switch_5_pwd_perf = models.IntegerField(blank=True) switch_6_pwd_perf = models.IntegerField(blank=True) switch_7_pwd_perf = models.IntegerField(blank=True) switch_8_pwd_perf = models.IntegerField(blank=True) switch_9_pwd_perf = models.IntegerField(blank=True) switch_10_pwd_perf = models.IntegerField(blank=True) # Time Spent RandomizationPage_time_spent = models.FloatField(blank=True) InfoPage_time_spent = models.FloatField(blank=True) Instructions_Reminder_time_spent = models.FloatField(blank=True) Allocation_time_spent = models.FloatField(blank=True) Task_time_spent = models.FloatField(blank=True) Exit_Q1_time_spent = models.FloatField(blank=True) Exit_Q2_time_spent = models.FloatField(blank=True) # Warnings RandomizationPage_warnings = models.IntegerField() InfoPage_warnings = models.IntegerField() Instructions_Reminder_warnings = models.IntegerField() Allocation_warnings = models.IntegerField() StartRound_warnings = models.IntegerField() Task_warnings = models.IntegerField() Exit_Q1_warnings = models.IntegerField() Exit_Q2_warnings = models.IntegerField() task_option_this_round = models.IntegerField() # Actual Allocation Dummy for Testing actual_allocation = models.BooleanField() ## Control questions control_q1 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)],initial=None) control_q2 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)],initial=None) control_q3 = models.PositiveIntegerField(choices=[i * 10 for i in range(11)],initial=None) def control_q1_error_message(self,value): """Customized error message for control question 1.""" cond = (value != 30) if cond: return 'Your input is incorrect. Please try again!' def control_q2_error_message(self,value): """Customized error message for control question 2.""" cond = (value != 60) if cond: return 'Your input is incorrect. Please try again!' def control_q3_error_message(self,value): """Customized error message for control question 3.""" cond = (value != 90) if cond: return 'Your input is incorrect. Please try again!' def q_subjective_optimality_2_error_message(self,value): """Customized error message for subjective optimality.""" if value == None: return 'Your input is incorrect. Please try again!' # Questions q_subjective_sophistication_2 = models.PositiveIntegerField(choices=Constants.likert_scale_agree, label='I chose a good allocation of my time between the two tasks.', widget=widgets.RadioSelectHorizontal()) q_subjective_optimality_2 = models.IntegerField(choices=range(1,Constants.timeout+1), widget=widgets.RadioSelect) # Payoffs and Piecerates # Piece Rate pwd_piecerate = models.CurrencyField() math_piecerate = models.CurrencyField() payoff_pwd = models.CurrencyField() payoff_math = models.CurrencyField() payoff_gross = models.CurrencyField() ### Cumulated Time needed for one math task ################ pwd_1_time = models.FloatField(blank=True) pwd_2_time = models.FloatField(blank=True) pwd_3_time = models.FloatField(blank=True) pwd_4_time = models.FloatField(blank=True) pwd_5_time = models.FloatField(blank=True) pwd_6_time = models.FloatField(blank=True) pwd_7_time = models.FloatField(blank=True) pwd_8_time = models.FloatField(blank=True) pwd_9_time = models.FloatField(blank=True) pwd_10_time = models.FloatField(blank=True) pwd_1 = make_pwd_ability_field() pwd_2 = make_pwd_ability_field() pwd_3 = make_pwd_ability_field() pwd_4 = make_pwd_ability_field() pwd_5 = make_pwd_ability_field() pwd_6 = make_pwd_ability_field() pwd_7 = make_pwd_ability_field() pwd_8 = make_pwd_ability_field() pwd_9 = make_pwd_ability_field() pwd_10 = make_pwd_ability_field() # Number of tries pwd_1_tries = models.IntegerField(blank=True) pwd_2_tries = models.IntegerField(blank=True) pwd_3_tries = models.IntegerField(blank=True) pwd_4_tries = models.IntegerField(blank=True) pwd_5_tries = models.IntegerField(blank=True) pwd_6_tries = models.IntegerField(blank=True) pwd_7_tries = models.IntegerField(blank=True) pwd_8_tries = models.IntegerField(blank=True) pwd_9_tries = models.IntegerField(blank=True) pwd_10_tries = models.IntegerField(blank=True) # Number of toggles pwd_1_toggles = models.IntegerField(blank=True) pwd_2_toggles = models.IntegerField(blank=True) pwd_3_toggles = models.IntegerField(blank=True) pwd_4_toggles = models.IntegerField(blank=True) pwd_5_toggles = models.IntegerField(blank=True) pwd_6_toggles = models.IntegerField(blank=True) pwd_7_toggles = models.IntegerField(blank=True) pwd_8_toggles = models.IntegerField(blank=True) pwd_9_toggles = models.IntegerField(blank=True) pwd_10_toggles = models.IntegerField(blank=True) def set_payoff(self): if self.production_profile == 'Linear': pwd_solutions = Constants.linear_task[self.task_option_this_round][0] elif self.production_profile == 'Convex': pwd_solutions = Constants.convex_task[self.task_option_this_round][0] pwd_answers = [ self.pwd_1, self.pwd_2, self.pwd_3, self.pwd_4, self.pwd_5, self.pwd_6, self.pwd_7, self.pwd_8, self.pwd_9, self.pwd_10, ] self.pwd_correct = sum([1 for x,y in zip(pwd_answers,pwd_solutions) if x == y]) self.payoff_pwd = self.pwd_correct * self.pwd_piecerate math_solutions = Constants.addition_task[self.task_option_this_round][1] math_answers = [ self.math_1, self.math_2, self.math_3, self.math_4, self.math_5, self.math_6, self.math_7, self.math_8, self.math_9, self.math_10, ] self.math_correct = sum([1 for x,y in zip(math_answers,math_solutions) if x == y]) self.payoff_math = self.math_correct * self.math_piecerate self.payoff_gross = self.payoff_math + self.payoff_pwd if self.treatment == 'WTP': # Subtract cost self.payoff = self.payoff_gross - c(self.WTP_cost) # Cost components self.participant.vars[f'treat_wtp_cost_round_{self.round_number}'] = c(self.WTP_cost) else: ### Compute total payoff self.payoff = self.payoff_gross # Put it into the ether # Pwd self.participant.vars[f'treat_pwd_round_{self.round_number}'] = self.pwd_correct self.participant.vars[f'treat_pwd_payoff_round_{self.round_number}'] = self.payoff_pwd # Math self.participant.vars[f'treat_math_round_{self.round_number}'] = self.math_correct self.participant.vars[f'treat_math_payoff_round_{self.round_number}'] = self.payoff_math def comp_opt_alloc(self): p_vars = self.participant.vars # Simplifies code later math_cols = ['math_t_1', 'math_t_2', 'math_t_3', 'math_t_4', 'math_t_5', 'math_t_6', 'math_t_7', 'math_t_8','math_t_9','math_t_10'] pwd_cols = ['pwd_t_1' ,'pwd_t_2', 'pwd_t_3', 'pwd_t_4', 'pwd_t_5', 'pwd_t_6', 'pwd_t_7', 'pwd_t_8', 'pwd_t_9', 'pwd_t_10'] # Create df of only pwd data pwddict={col:p_vars[col] for col in pwd_cols} df_pwd = pd.DataFrame(pwddict) # Create df of only math data mathdict={col:p_vars[col] for col in math_cols} df_math = pd.DataFrame(mathdict) # Remove the first two rounds (practice) df_pwd = df_pwd.loc[2:,:] df_math = df_math.loc[2:,:] # Create respective production function rslt_pwd = get_production_func(df_pwd,'pwd') rslt_math = get_production_func(df_math,'math') # Find optimal allocation df_both = pd.DataFrame({'pwd': rslt_pwd, 'math': rslt_math}) get_optimal_allocation(df_both,p_vars) def write_math_time(self): p_vars = self.participant.vars #p_vars['last_round_math_time'] = self.math_time p_vars['treat_num_pwd_tasks'] = Constants.num_pwd_tasks p_vars['treat_num_math_tasks'] = Constants.num_math_tasks p_vars['treat_num_rounds'] = Constants.num_rounds