from otree.api import * import json # import the function from utils.py doc = """ Your app description """ # import data (currently doubled for loop to work, a large dat set does not need this) import csv # define constants, the csv with the questions and setting for the task: rounds roles ppl per group etc class C(BaseConstants): NAME_IN_URL = 'Ind_task' ## FIRST indidvual task round # with open('_static/numQ2.csv', encoding='utf-8-sig') as f: # M1Q1 = list(csv.DictReader(f)) # with open('_static/verbQ2.csv', encoding='utf-8-sig') as f: # M2Q1 = list(csv.DictReader(f)) # with open('_static/spaIn.csv', encoding='utf-8-sig') as f: # M3Q1 = list(csv.DictReader(f)) # lenM1Q1 = len(M1Q1) # lenM2Q1 = len(M2Q1) # lenM3Q1 = len(M3Q1) # NUM_ROUNDS = lenM3Q1 + lenM2Q1 + lenM1Q1 NUM_ROUNDS = 1 PLAYERS_PER_GROUP = None TIMER_TEXT = "Time to complete questions:" STYLE = __name__ + '/ind_sty.css' TIMED_QUIZ = __name__ + '/timed_quiz.js' class Subsession(BaseSubsession): pass class Group(BaseGroup): pass # not needed class Player(BasePlayer): num_correct = models.IntegerField(initial=0) score = models.FloatField(initial=0.0) trails_mod1 = models.LongStringField() trails_mod2 = models.LongStringField() trails_mod3 = models.LongStringField() Index_mod1 = models.IntegerField(initial=0) Index_mod2 = models.IntegerField(initial=0) Index_mod3 = models.IntegerField(initial=0) q_ID = models.IntegerField(initial=0) score_mod1 = models.FloatField(initial=0.0) score_mod2 = models.FloatField(initial=0.0) score_mod3 = models.FloatField(initial=0.0) task = models.StringField() mod = models.StringField(initial='num') raw_responses = models.LongStringField() iterration_n = models.IntegerField(initial=0) iterration_a = models.IntegerField(initial=0) iterration_s = models.IntegerField(initial=0) solution = models.StringField(initial=0) nomore_num = models.BooleanField(initial=False) nomore_ana = models.BooleanField(initial=False) nomore_spa = models.BooleanField(initial=False) # input ID & CID EM = models.StringField(label="Input your Email") EM_2 = models.StringField(label="Confirm your Email") CID = models.IntegerField(label="Input your Computer ID") CID_2 = models.IntegerField(label="Confirm your Computer ID") ID1 = models.IntegerField(label="Confirm your ID label below") ran = models.IntegerField(label="Input the code given by the instructor:") FIRST = models.StringField(label="Input your first name") FIRST_2 = models.StringField(label="Confirm your first name") # variable to count if answer is correct an deadline for each round (?) is_correct = models.BooleanField() deadline = models.FloatField() # check attention question q1 = models.IntegerField(label='What will be the team score?') # subsession creation: this app as 3, this is the first, anything defined here will remian in all following sessions def creating_session(subsession: Subsession): from utils import read_csv import random test = subsession.session.config['testing'] for p in subsession.get_players(): if test: if subsession.round_number == 1: I = random.randint(0, 15) p.participant.CID = I with open('_rooms/labels.txt') as f: labels = f.read().splitlines() for p, label in zip(subsession.get_players(), labels): p.participant.label = label for p in subsession.get_players(): p.participant.pre_score1 = 0 p.participant.pre_score2 = 0 p.participant.pre_score3 = 0 p.participant.CID2 = 0 p.participant.old = 0 p.participant.Index_mod1 = 0 p.participant.Index_mod2 = 0 p.participant.Index_mod3 = 0 p.participant.correct_n = 0 p.participant.correct_a = 0 p.participant.correct_s = 0 p.participant.correct_n_all = 0 p.participant.correct_a_all = 0 p.participant.correct_s_all = 0 p.participant.timer = int(subsession.session.config['timer']) if subsession.round_number == 1: selctd_round = random.randint(1, 4) p.participant.random_round_selection = selctd_round stimuli = read_csv('_static/InQs.csv') for stim in stimuli: # print('stim is', stim) # ** is the Python operator to unpack the dict Trial.create(player=p, **stim) subsession.session.random = subsession.session.config['random_assign'] class Trial(ExtraModel): player = models.Link(Player) is_correct = models.BooleanField() question = models.StringField() graphic_name = models.CharField() solution = models.StringField() optionA = models.StringField() optionB = models.StringField() optionC = models.StringField() optionD = models.StringField() optionE = models.StringField() option1 = models.StringField() option2 = models.StringField() option3 = models.StringField() option4 = models.StringField() option5 = models.StringField() option6 = models.StringField() option7 = models.StringField() option8 = models.StringField() mod = models.StringField() key = models.StringField() q_ID = models.StringField() choice = models.StringField() score = models.FloatField(initial=0.0) class RecResponse(ExtraModel): player = models.Link(Player) response = models.StringField() solution = models.StringField() q_ID = models.StringField() mod = models.StringField() is_correct = models.BooleanField() num_correct = models.IntegerField() itteration = models.IntegerField() score = models.FloatField(initial=0.0) def to_dict(trial: Trial): return dict( question=trial.question, graphic_name=f'/static/{trial.graphic_name}', optionA=trial.optionA, optionB=trial.optionB, optionC=trial.optionC, optionD=trial.optionD, optionE=trial.optionE, option1=trial.option1, option2=trial.option2, option3=trial.option3, option4=trial.option4, option5=trial.option5, option6=trial.option6, option7=trial.option7, option8=trial.option8, solution=trial.solution, id=trial.id, key=trial.key, mod=trial.mod, q_ID=trial.q_ID, ) # timer def get_current_trial(player: Player, mod: str): task = mod if task == 'Numerical': index = player.Index_mod1 trails = json.loads(player.trails_mod1) player.q_ID = player.Index_mod1 player.iterration_n += 1 elif task == 'Analogical': index = player.Index_mod2 trails = json.loads(player.trails_mod2) player.q_ID = player.Index_mod2 player.iterration_a += 1 elif task == 'Spatial': index = player.Index_mod3 trails = json.loads(player.trails_mod3) player.q_ID = player.Index_mod3 player.iterration_s += 1 try: current = trails[index] except IndexError: if task == 'Numerical': player.nomore_num = True elif task == 'Analogical': player.nomore_ana = True elif task == 'Spatial': player.nomore_spa = True return dict(error=True, msg=f'No more questions in the {task} module') print('trails is', trails[index]) player.solution = trails[index]['solution'] return trails[index] def get_current_score(player: Player, mod: str): task = mod if task == 'Numerical': score = player.score_mod1 elif task == 'Analogical': score = player.score_mod2 elif task == 'Spatial': score = player.score_mod3 return score # timer for group task + more group task var # PAGES class ID_input(Page): form_model = 'player' form_fields = ['ID1'] @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def error_message(player: Player, values): print('t', player.participant.label) try: label = int(player.participant.label) ID_true = values['ID1'] == label except TypeError: ID_true = True if not ID_true: return "Your input does not match the ID label, please enter again" def validate_email(email): import re email_regex = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)" if re.match(email_regex, email): return True else: return False class EM_input(Page): form_model = 'player' form_fields = ['EM', 'EM_2'] @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.EM = player.EM @staticmethod def error_message(player: Player, values): if not validate_email(values['EM']): return "Your input is not a valid email format, please enter again" print('no', values) if values['EM'] != values['EM_2']: return "Your inputs does not match, please enter again" class firstNameInput(Page): form_model = 'player' form_fields = ['FIRST', 'FIRST_2'] @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.FIRST = player.FIRST @staticmethod def error_message(player: Player, values): print('no', values) if values['FIRST'] != values['FIRST_2']: return "Your inputs does not match, please enter again" class CID_input(Page): form_model = 'player' form_fields = ['CID', 'CID_2'] @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.CID = int(player.CID) @staticmethod def error_message(player: Player, values): print('no', values) if values['CID'] != values['CID_2']: return "Your inputs does not match, please enter again" class WaitPage_ind(WaitPage): wait_for_all_groups = True body_text = "Waiting for all players to arrive" @staticmethod def is_displayed(player): return player.round_number == 1 class Ind_Instruction(Page): @staticmethod def is_displayed(player): return player.round_number == 1 class Ind_Instruction1(Page): form_model = "player" @staticmethod def is_displayed(player): return player.round_number == 1 class Ind_b1(Page): form_model = "player" @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): player.Index_mod1 = 0 stimuli = [to_dict(trial) for trial in Trial.filter(player=player)] mod_dict = {} for stim in stimuli: mod_key = stim['mod'] if mod_key not in mod_dict: mod_dict[mod_key] = [] mod_dict[mod_key].append(stim) player.trails_mod1 = json.dumps(mod_dict['num']) import time player.deadline = time.monotonic() + ((int(player.participant.timer) / 2) * 80) player.task = 'Numerical' class Ind_module1(Page): form_model = 'player' # form_fields = ['raw_responses'] @staticmethod def live_method(player: Player, data): mod = 'Numerical' my_id = player.id_in_group index1 = player.Index_mod1 msg_type = data['type'] if msg_type != 'load': if player.nomore_num: return {my_id: dict(finished=True)} if msg_type == 'response': trial = get_current_trial(player, mod) print('trial has solution', trial['solution']) print(player.solution) response = data['response'].split('option')[-1] print(response) player.is_correct = response == str(trial['solution']) print('player.is_correct', player.is_correct) if player.q_ID != player.Index_mod1: return if not player.is_correct: player.score_mod1 += -0.5 player.participant.pre_score1 += -0.5 player.participant.correct_n_all += int(player.is_correct) player.score_mod1 += int(player.is_correct) player.participant.pre_score1 += int(player.is_correct) player.Index_mod1 += 1 RecResponse.create(player=player, response=str(response), solution=player.solution, is_correct=player.is_correct, score=player.score_mod1, num_correct=player.num_correct, mod=str(mod), itteration=player.iterration_n) current = get_current_trial(player, mod) if 'error' in current: return {my_id: dict(finished=True)} else: return {my_id: dict(task=player.task, current_q=get_current_trial(player, mod), score=get_current_score(player, mod)) } @staticmethod def before_next_page(player: Player, timeout_happened): import json player.num_correct = 0 @staticmethod def get_timeout_seconds(player: Player): import time return player.deadline - time.monotonic() class Ind_Instruction2(Page): form_model = "player" class Ind_b2(Page): form_model = "player" def before_next_page(player: Player, timeout_happened): import time stimuli = [to_dict(trial) for trial in Trial.filter(player=player)] mod_dict = {} for stim in stimuli: mod_key = stim['mod'] if mod_key not in mod_dict: mod_dict[mod_key] = [] mod_dict[mod_key].append(stim) player.trails_mod2 = json.dumps(mod_dict['ana']) player.Index_mod2 = 0 player.task = 'Analogical' player.deadline = time.monotonic() + ((int(player.participant.timer) / 2) * 80) class Ind_module2(Page): form_model = 'player' # form_fields = ['raw_responses'] @staticmethod def live_method(player: Player, data): mod = 'Analogical' my_id = player.id_in_group index2 = player.Index_mod2 msg_type = data['type'] if msg_type != 'load': if player.nomore_ana: return {my_id: dict(finished=True)} if msg_type == 'response': trial = get_current_trial(player, mod) print('trial has solution', trial['solution']) print(player.solution) response = data['response'].split('option')[-1] print(response) player.is_correct = response == str(trial['solution']) print('player.is_correct', player.is_correct) if player.q_ID != player.Index_mod2: return if not player.is_correct: player.score_mod2 += -0.5 player.participant.pre_score2 += -0.5 player.participant.correct_a_all += int(player.is_correct) player.participant.pre_score2 += int(player.is_correct) player.score_mod2 += int(player.is_correct) player.Index_mod2 += 1 RecResponse.create(player=player, response=str(response), solution=player.solution, is_correct=player.is_correct, score=player.score_mod2, num_correct=player.num_correct, mod=str(mod), itteration=player.iterration_a) current = get_current_trial(player, mod) if 'error' in current: return {my_id: dict(finished=True)} else: return {my_id: dict(task=player.task, current_q=get_current_trial(player, mod), score=get_current_score(player, mod)) } @staticmethod def before_next_page(player: Player, timeout_happened): player.raw_responses = '' player.num_correct = 0 @staticmethod def get_timeout_seconds(player: Player): import time return player.deadline - time.monotonic() class Ind_Instruction3(Page): pass class Ind_b3(Page): def before_next_page(player: Player, timeout_happened): import time stimuli = [to_dict(trial) for trial in Trial.filter(player=player)] mod_dict = {} for stim in stimuli: mod_key = stim['mod'] if mod_key not in mod_dict: mod_dict[mod_key] = [] mod_dict[mod_key].append(stim) player.trails_mod3 = json.dumps(mod_dict['spa']) player.Index_mod3 = 0 player.task = 'Spatial' player.deadline = time.monotonic() + ((int(player.participant.timer) / 2) * 80) class Ind_module3(Page): form_model = 'player' # form_fields = ['raw_responses'] # timeout_seconds = C.TIMEOUT_SECONDS @staticmethod def live_method(player: Player, data): mod = 'Spatial' my_id = player.id_in_group index3 = player.Index_mod3 msg_type = data['type'] if msg_type != 'load': if player.nomore_spa: return {my_id: dict(finished=True)} if msg_type == 'response': trial = get_current_trial(player, mod) print('trial has solution', trial['solution']) print(player.solution) response = data['response'].split('option')[-1] print(response) player.is_correct = response == str(trial['solution']) print('player.is_correct', player.is_correct) if player.q_ID != player.Index_mod3: return if not player.is_correct: player.score_mod3 += -0.5 player.participant.pre_score3 += -0.5 player.participant.correct_s_all += int(player.is_correct) player.score_mod3 += int(player.is_correct) player.participant.pre_score3 += int(player.is_correct) player.Index_mod3 += 1 RecResponse.create(player=player, response=str(response), solution=player.solution, is_correct=player.is_correct, score=player.score_mod3, num_correct=player.num_correct, mod=str(mod), itteration=player.iterration_s) current = get_current_trial(player, mod) if 'error' in current: return {my_id: dict(finished=True)} else: return {my_id: dict(task=player.task, current_q=get_current_trial(player, mod), score=get_current_score(player, mod)) } @staticmethod def before_next_page(player: Player, timeout_happened): player.raw_responses = '' player.num_correct = 0 @staticmethod def get_timeout_seconds(player: Player): import time return player.deadline - time.monotonic() class Ind_Results(Page): @staticmethod def vars_for_template(player: Player): participant = player.participant participant.score = (participant.pre_score1 + participant.pre_score2 + participant.pre_score3) / 3 print("score per minute", participant.pre_score1) print("score total", participant.score) return dict(numerical=participant.pre_score1, analytical=participant.pre_score2, spatial=participant.pre_score3) @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.time_out = False participant = player.participant participant.score = (participant.pre_score1 + participant.pre_score2 + participant.pre_score3) / 3 print("score mod 1", participant.pre_score1) print("score mod 2", participant.pre_score2) print("score mod 3", participant.pre_score3) print("score total b4n", participant.score) def custom_export(players): """For data export page""" yield ['participant_code', 'participant_EM', 'FirstName', 'participant_ID', 'id_in_session', 'round_number', 'score_mod1', 'score_mod2', 'score_mod3', 'score_avg', 'correctN', 'correctA', 'correctS', 'module', 'solution', 'response', 'is_correct', 'score', 'num_correct', 'itteration'] responses = RecResponse.filter() for resp in responses: player = resp.player participant = player.participant try: score = (participant.pre_score1 + participant.pre_score2 + participant.pre_score3) / 3 participant.score = score except TypeError: participant.score = None yield [participant.code, participant.EM, participant.FIRST, player.participant.label, participant.id_in_session, player.round_number, participant.pre_score1, participant.pre_score2, participant.pre_score3, participant.score, participant.correct_n_all, participant.correct_a_all, participant.correct_s_all, resp.mod, resp.solution, resp.response, resp.is_correct, resp.score, resp.num_correct, resp.itteration] page_sequence = [ID_input, EM_input, firstNameInput, CID_input, WaitPage_ind, Ind_Instruction, Ind_Instruction1, Ind_b1, Ind_module1, Ind_Instruction2, Ind_b2, Ind_module2, Ind_Instruction3, Ind_b3, Ind_module3, Ind_Results] # page_sequence = [Ind_Instruction, Ind_Instruction1, Ind_b1, Ind_module1, # Ind_Instruction2, Ind_b2, Ind_module2, # Ind_Instruction3, Ind_b3, Ind_module3, Ind_Results] # page_sequence = [Ind_b3, Ind_module3, Ind_Results]