from otree.api import * doc = """ Your app description """ def analyticalq(): import csv f = open('_static/R1.csv', encoding='utf-8-sig') rows = list(csv.DictReader(f)) return rows def numericalq(): import csv f = open('_static/R1N.csv', encoding='utf-8-sig') rows = list(csv.DictReader(f)) return rows def spatialq(): import csv f = open('_static/spaIn.csv', encoding='utf-8-sig') rows = list(csv.DictReader(f)) return rows class C(BaseConstants): NAME_IN_URL = 'testing' M1Q1 = numericalq() M2Q1 = analyticalq() M3Q1 = spatialq() lenM1 = len(M1Q1) lenM2 = len(M2Q1) lenM3 = len(M3Q1) NUM_ROUNDS = 10 PLAYERS_PER_GROUP = 3 LEADER_ROLE = 'You' PL1_ROLE = 'Worker to your left' PL2_ROLE = 'Worker to your right' L_TABLE_TEMPLATE = __name__ + '/L_IntermediateResults.html' P_TABLE_TEMPLATE = __name__ + '/IntermediateResults.html' TIMER_TEXT = "Time to complete questions:" PAUSE_TEXT = "Discuss with your group for 15 seconds" class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): # input ID & CID EM = models.StringField(label="Input your Email") EM_2 = models.StringField(label="Confirm your Email") CID = models.IntegerField(label="Input your Computer ID") CID_2 = models.IntegerField(label="Confirm your Computer ID") ID1 = models.IntegerField(label="Confirm your ID label below") ran = models.IntegerField(label="Input the code given by the instructor:") # individual and group task var. _a = analytical, _s = spatial and _n =numerical question_a = models.StringField() solution_a = models.StringField() question_n = models.StringField() solution_n = models.StringField() graphics_name = models.CharField() solution_s = models.StringField() key_s = models.StringField() # analytical and numerical chocies field , empty, defined futher down choicea1_1 = models.StringField(widget=widgets.RadioSelect, label='', blank=True) choicen1 = models.StringField(widget=widgets.RadioSelect, label='', blank=True) # variable to count if answer is correct is_correct = models.BooleanField() # spatial choice field # spatial choice field spa_1 = models.StringField( widget=widgets.RadioSelectHorizontal, choices=['A', 'B', 'C', 'D', 'E'], label='', blank=True) spa_2 = models.StringField( widget=widgets.RadioSelectHorizontal, choices=['A', 'B', 'C', 'D'], label='', blank=True) spa_8 = models.IntegerField(widget=widgets.RadioSelectHorizontal, choices=[1, 2, 3, 4, 5, 6, 7, 8], label='', blank=True) spa_5 = models.IntegerField(widget=widgets.RadioSelectHorizontal, choices=[1, 2, 3, 4, 5], label='', blank=True) spa_6 = models.IntegerField(widget=widgets.RadioSelectHorizontal, choices=[1, 2, 3, 4, 5, 6], label='', blank=True) spa_4 = models.IntegerField(widget=widgets.RadioSelectHorizontal, choices=[1, 2, 3, 4], label='', blank=True) # count q_ID in mods last_click = models.IntegerField() # check attention question q1 = models.IntegerField(label='What will be the team score?') ## chocie field def choicen1_choices(player: Player): current_question1 = current_m1(player) return [ ['A', current_question1['optionA']], ['B', current_question1['optionB']], ['C', current_question1['optionC']], ['D', current_question1['optionD']], ['E', current_question1['optionE']], ] def choicea1_1_choices(player: Player): current_question2 = current_m2(player) return [ ['A', current_question2['optionA']], ['B', current_question2['optionB']], ['C', current_question2['optionC']], ['D', current_question2['optionD']], ['E', current_question2['optionE']], ] #subsession creation: this app as 3, this is the first, anything defined here will remian in all following sessions def creating_session(subsession: Subsession): import random if subsession.round_number == 1: subsession.group_randomly(fixed_id_in_group=True) subsession.session.random = subsession.session.config['random_assign'] test = subsession.session.config['testing'] if test: I = random.randint(0, 15) if subsession.round_number == 1: with open('_rooms/labels.txt') as f: labels = f.read().splitlines() for p, label in zip(subsession.get_players(), labels): p.participant.label = label p.participant.ID_label = label p.participant.CID = label p.participant.pre_score1 = random.randint(0, 50) p.participant.pre_score2 = random.randint(0, 50) p.participant.pre_score3 = random.randint(0, 30) p.participant.m1_current_q_ID = 0 p.participant.m2_current_q_ID = 0 p.participant.m3_current_q_ID = 0 p.participant.like_leader = label p.participant.score = (p.participant.pre_score1 + p.participant.pre_score2 + p.participant.pre_score3) / 3 for p in subsession.get_players(): p.participant.pre_score1 = 0 p.participant.pre_score2 = 0 p.participant.pre_score3 = 0 p.participant.g1_correct_a=0 p.participant.g1_correct_n=0 p.participant.g1_correct_s=0 p.participant.g2_correct_a=0 p.participant.g2_correct_n=0 p.participant.g2_correct_s=0 p.participant.like_leader = 0 p.participant.L_assign = False p.participant.score = 0 p.participant.CID2 = 0 p.participant.old = 0 p.participant.questions_mod1 = True p.participant.questions_mod2 = False p.participant.questions_mod3 = False p.participant.time_out = False p.participant.intro2 = False p.participant.intro3 = False p.participant.p_m1_page2 = 0 p.participant.p_m2_page2 = 0 p.participant.p_m3_page2 = 0 p.participant.timer = int(subsession.session.config['timer']) subsession.session.random = subsession.session.config['random_assign'] # selext a random round to be the round with the price if subsession.round_number == 1: selctd_round = random.randint(1, 5) p.participant.random_round_selection = selctd_round def current_m1(player: Player): ID1 = player.participant.m1_current_q_ID if ID1 >= int(int(C.lenM1)): ID1 = 0 player.participant.m1_current_q_ID = 0 return C.M1Q1[ID1] def current_m2(player: Player): ID2 = player.participant.m2_current_q_ID if ID2 >= int(C.lenM2): ID2 = 0 player.participant.m2_current_q_ID = 0 return C.M2Q1[ID2] def current_m3(player: Player): ID3 = player.participant.m3_current_q_ID if ID3 >= int(int(C.lenM3)): ID3 = 0 player.participant.m3_current_q_ID = 0 return C.M3Q1[ID3] class count_answers(ExtraModel): player = models.Link(Player) module = models.StringField() number_answered = models.IntegerField() groupid = models.StringField() def vars_for_template1(player: Player): qd1 = current_m1(player) player.question_n = qd1['question'] player.solution_n = qd1['solution'] qd2 = current_m2(player) player.question_a = qd2['question'] player.solution_a = qd2['solution'] qd3 = current_m3(player) player.graphics_name = qd3['graphic_name'] graphics_name = player.graphics_name print("Graphics Name", graphics_name) player.solution_s = qd3['solution'] print("Graph sol", player.solution_s) return dict( image_path='{}.jpg'.format(graphics_name), question_a=player.question_a, question_n=player.question_n ) # timer def get_timeout_seconds1(player: Player): participant = player.participant import time return participant.expiry - time.time() def get_timeout_seconds12(player: Player): participant = player.participant import time return participant.expiry2 - time.time() def get_timeout_seconds13(player: Player): participant = player.participant import time return participant.expiry3 - time.time() import numpy as np def get_all_matrix(group_matrix, num_participants): ref = [[1, 6, 15], [2, 3, 14], [7, 5, 8], [10, 9, 12], [13, 4, 11]] gr_n = len(group_matrix) if num_participants == 15: keys = np.reshape(ref, (num_participants)).tolist() nested_matrix = [[[1, 3, 5], [2, 8, 12], [7, 9, 11], [10, 4, 15], [13, 6, 14]], [[1, 8, 11], [2, 9, 15], [7, 4, 14], [10, 6, 5], [13, 3, 12]], [[1, 9, 14], [2, 4, 5], [7, 6, 12], [10, 3, 11], [13, 8, 15]], [[1, 4, 12], [2, 6, 11], [7, 3, 15], [10, 8, 14], [13, 9, 5]]] elif num_participants == 12: ref = [[1, 6, 5], [2, 3, 8], [7, 9, 4], [10, 11, 12]] keys = np.reshape(ref, (num_participants)).tolist() nested_matrix = [[[1, 3, 11], [2, 5, 9], [7, 6, 12], [10, 8, 4]], [[1, 12, 9], [2, 6, 4], [7, 3, 11], [10, 8, 5]], [[1, 6, 8], [2, 9, 11], [7, 4, 5], [10, 3, 12]], [[1, 3, 4], [2, 6, 12], [7, 11, 8], [10, 5, 9]]] else: I = [[1, 2, 3]] keys = np.reshape(I, (num_participants)).tolist() nested_matrix = [[[2, 3, 1]], [[2, 1, 3]]] shuffled_nested_matrix = [] shuffled_nested_matrix.append(group_matrix) for shuffled_matrix in nested_matrix: shuffled_matrix = np.reshape(shuffled_matrix, (num_participants)).tolist() group_matrix = np.reshape(group_matrix, (num_participants)).tolist() merged = dict(zip(shuffled_matrix, group_matrix)) result = [merged[key] for key in keys] shuffled_nested_matrix.append(np.reshape(result, (gr_n, 3)).tolist()) return shuffled_nested_matrix class WaitPage_Gr(WaitPage): wait_for_all_groups = True body_text = "Waiting for all players to arrive before starting Group Task" @staticmethod def is_displayed(player): return player.round_number < 6 @staticmethod def after_all_players_arrive(subsession: Subsession): global matrix import random session = subsession.session # get list of computers. comp = [] for c in subsession.get_players(): comp.append(c.participant.CID) ran = session.random print(ran) comp = sorted(comp) p = subsession.get_players() print('CID', comp) # get corresponding players. CID_players = sorted(p, key=lambda player: player.participant.CID, reverse=True) if subsession.round_number == 1: # Sort players by score if ran: sorted_players = sorted(p, key=lambda player: player.participant.score, reverse=True) else: sorted_players = sorted(p, key=lambda player: player.participant.like_leader, reverse=True) # Divide players into three groups (high, mid, low) high_score = sorted_players[:len(sorted_players) // 3] mid_score = sorted_players[len(sorted_players) // 3:2 * len(sorted_players) // 3] low_score = sorted_players[2 * len(sorted_players) // 3:] # Group players # in group shuffle (random leader) group_matrix = [] for i in range(len(high_score)): g = [high_score[i], mid_score[i], low_score[i]] if ran: random.shuffle(g) group_matrix.append(g) num_participants = session.num_participants for s in subsession.in_rounds(1, C.NUM_ROUNDS): s.set_group_matrix(group_matrix) matrix = subsession.get_group_matrix() session.matrices = get_all_matrix(matrix, num_participants) allmatrix = session.matrices else: round = subsession.round_number - 1 allmatrix = session.matrices group_matrix = allmatrix[round] subsession.set_group_matrix(group_matrix) for i, group in enumerate(subsession.get_groups()): group_comp = comp[i * 3:(i + 1) * 3] group_old = CID_players[i * 3:(i + 1) * 3] for j, label in enumerate(group.get_players()): if label.role == C.LEADER_ROLE: label.participant.CID2 = group_comp[1] label.participant.old = group_old[1].participant.ID_label elif label.role == C.PL2_ROLE: label.participant.CID2 = group_comp[0] label.participant.old = group_old[0].participant.ID_label else: label.participant.CID2 = group_comp[2] label.participant.old = group_old[2].participant.ID_label players = subsession.get_players() assign_roleid_groups(players) print('R1 to R2 all matrix', allmatrix[1]) print('R1 to R3 all matrix', allmatrix[2]) print("== R1Round ", subsession.round_number, " == ") print(" R1Matching: ", subsession.get_group_matrix()) print(" R1Matching: all ", allmatrix[0]) def assign_roleid_groups(players): """label id and roles of all groups that these players belong to""" already_added = [] for p in players: group = p.group if group.id not in already_added: already_added.append(group.id) leader = group.get_player_by_role(C.LEADER_ROLE) T1 = group.get_player_by_role(C.PL1_ROLE) T2 = group.get_player_by_role(C.PL2_ROLE) leader.participant.role_id = 'Manager' T1.participant.role_id = 'team member1' T2.participant.role_id = 'team member2' leader.participant.otree_group_id = group.id T1.participant.otree_group_id = group.id T2.participant.otree_group_id = group.id for p in players: for i, gr in enumerate(already_added): if p.participant.otree_group_id == gr: p.participant.group_id = i + 1 break def get_groups2(players): """gets all groups that these players belong to, without duplicates""" already_added = set() groups = [] for p in players: group = p.group if group.id not in already_added: leader = group.get_player_by_role(C.LEADER_ROLE) T1 = group.get_player_by_role(C.PL1_ROLE) T2 = group.get_player_by_role(C.PL2_ROLE) labels = {'Manager': leader.participant.label, 'Manager CID': leader.participant.CID2, 'worker 1': T1.participant.label, 'worker 1 CID': T1.participant.CID2, 'worker 2': T2.participant.label, 'worker 2 CID': T2.participant.CID2} already_added.add(group.id) groups.append({'otree_group': group, 'role_id': labels}) return groups # PAGES class MyPage(Page): @staticmethod def is_displayed(player: Player): return player.round_number < 6 @staticmethod def vars_for_template(player: Player): p=player.subsession.get_players() groups = [g for g in get_groups2(p)] allgroups = player.session.matrices return dict(groups=groups, allgroups=allgroups, players2=[p for p in player.get_others_in_group()],) class ResultsWaitPage(WaitPage): pass class Results(Page): pass class Ind_module1(Page): form_model = 'player' form_fields = ['choicen1'] vars_for_template = vars_for_template1 @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant mod = 'mod1' num = player.participant.m1_current_q_ID group = player.group groupid = group.id_in_subsession count_answers.create(player=player, module=str(mod), number_answered=int(num), groupid=str(groupid)) try: is_correct = player.choicen1 == player.solution_n except TypeError: is_correct = None if is_correct is not None: player.is_correct = player.choicen1 == player.solution_n participant.g1_correct_n += int(player.is_correct) print('is correct', player.is_correct) print('answer', player.solution_n) print('choice', player.choicen1) if not player.is_correct: participant.g1_correct_n += -0.5 import time player.participant.m1_current_q_ID += 1 @staticmethod def error_message(player: Player, values): if values['choicen1'] is None: return "Select an answer" class Ind_Instruction2(Page): form_model = "player" @staticmethod def is_displayed(player): return player.participant.questions_mod2 and player.participant.intro2 class Ind_module2(Page): form_model = 'player' form_fields = ['choicea1_1'] vars_for_template = vars_for_template1 @staticmethod def before_next_page(player: Player, timeout_happened): ## try except is used to avoid type error when field is none participant = player.participant mod = 'mod2' num = player.participant.m2_current_q_ID group = player.group groupid = group.id_in_subsession count_answers.create(player=player, module=str(mod), number_answered=int(num), groupid=str(groupid)) try: is_correct = player.choicea1_1 == player.solution_a except TypeError: is_correct = None if is_correct is not None: player.is_correct = player.choicea1_1 == player.solution_a participant.g1_correct_a += int(player.is_correct) print('is correct', player.is_correct) print('answer', player.solution_a) print('choice', player.choicea1_1) if not player.is_correct: participant.g1_correct_a += -0.5 import time player.participant.m2_current_q_ID += 1 @staticmethod def error_message(player: Player, values): print('no', values) if values['choicea1_1'] is None: return "Select an answer" class Ind_module3(Page): form_model = 'player' timer_text = C.TIMER_TEXT vars_for_template = vars_for_template1 @staticmethod def get_form_fields(player): qd3 = current_m3(player) player.key_s = qd3['key'] print('key', player.key_s) if player.key_s == ('5' or '5.0'): return ['spa_1'] elif player.key_s == ('4' or '4.0'): return ['spa_2'] elif player.key_s == 'int4': return ['spa_4'] elif player.key_s == 'int5': return ['spa_5'] elif player.key_s == 'int6': return ['spa_6'] elif player.key_s == 'int8': return ['spa_8'] @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant mod = 'mod3' num = player.participant.m3_current_q_ID group = player.group groupid = group.id_in_subsession count_answers.create(player=player, module=str(mod), number_answered=int(num), groupid=str(groupid)) if player.key_s == ('5' or '5.0'): try: is_correct = player.spa_1 == player.solution_s except TypeError: is_correct = None elif player.key_s == ('4' or '4.0'): try: is_correct = player.spa_2 == player.solution_s except TypeError: is_correct = None elif player.key_s == 'int4': try: is_correct = player.spa_4 == player.solution_s except TypeError: is_correct = None elif player.key_s == 'int5': try: is_correct = player.spa_5 == player.solution_s except TypeError: is_correct = None elif player.key_s == 'int6': try: is_correct = player.spa_6 == player.solution_s except TypeError: is_correct = None else: try: is_correct = player.spa_8 == player.solution_s except TypeError: is_correct = None if is_correct is not None: player.is_correct = is_correct import time participant.g1_correct_s += int(player.is_correct) if not player.is_correct: participant.g1_correct_s += -0.5 import time player.participant.m3_current_q_ID += 1 @staticmethod def error_message(player: Player, values): print('no', values) if (player.key_s == ('5' or '5.0')) and values['spa_1'] is None: return "Select an answer" elif (player.key_s == ('4' or '4.0')) and values['spa_2'] is None: return "select an answer" elif player.key_s == 'int4' and values['spa_4'] is None: return "select an answer" elif player.key_s == 'int5' and values['spa_5'] is None: return "select an answer" elif player.key_s == 'int6' and values['spa_6'] is None: return "select an answer" elif player.key_s == 'int8' and values['spa_8'] is None: return "select an answer" def get_groups_high_low_score(players): """gets all groups that these players belong to, without duplicates""" already_added = set() group_scores = [] for p in players: group = p.group if p.participant.group_id not in already_added: min_score = min([p.participant.g1_correct_n, p.participant.g1_correct_a, p.participant.g1_correct_s]) already_added.add(p.participant.group_id) group_scores.append(min_score) low = min(group_scores) high = max(group_scores) return high, low def sort_ranking(players, round): """Sorts the ranking of players in a round with any number of groups""" group_rankings = {} already_added = set() group_scores = [] if round == 1: for p in players: p.participant.player_rank_group = {} for p in players: group_id = p.participant.group_id score = min(p.participant.g1_correct_n, p.participant.g1_correct_a, p.participant.g1_correct_s) p.participant.player_rank_group[round] = {'group_id': group_id, 'group_score': score} if group_id not in already_added: already_added.add(p.participant.group_id) group_scores.append(score) group_rankings[group_id] = score # Sort the dictionary items by score in descending order sorted_groups = sorted(group_rankings.items(), key=lambda x: x[1], reverse=True) # Check if the list has at least one element if sorted_groups: # The highest-scoring group will be at index 0 highest_scoring_group = sorted_groups[0] # The list you want with the highest-scoring group at index 1 sorted_group_list = [(group_id, score) for group_id, score in sorted_groups] print("Highest Scoring Group:", highest_scoring_group) print("Sorted Group List:", sorted_group_list) else: print("The dictionary is empty.") return sorted_group_list class Gr_final(Page): @staticmethod def vars_for_template(player: Player): p = player.subsession.get_players() high, low = get_groups_high_low_score(p) sorted_group_ranks = sort_ranking(p, player.round_number) if player.round_number == 1: player.participant.rank_R1 = sorted_group_ranks elif player.round_number == 2: player.participant.rank_R2 = sorted_group_ranks elif player.round_number == 3: player.participant.rank_R3 = sorted_group_ranks elif player.round_number == 4: player.participant.rank_R4 = sorted_group_ranks elif player.round_number == 5: player.participant.rank_R5 = sorted_group_ranks return dict(high=high, low=low, sorted_group_ranks_R1 = sorted_group_ranks) @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.time_out = False class LeaderM(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player: Player): selcted_round = player.participant.random_round_selection players_group = player.participant.player_rank_group[selcted_round]['group_id'] players_score = player.participant.player_rank_group[selcted_round]['group_score'] if selcted_round == 1: sorted_groups = player.participant.rank_R1 elif selcted_round == 2: sorted_groups = player.participant.rank_R2 elif selcted_round == 3: sorted_groups = player.participant.rank_R3 elif selcted_round == 4: sorted_groups = player.participant.rank_R4 elif selcted_round == 5: sorted_groups = player.participant.rank_R5 for i, (group_id, score) in enumerate(sorted_groups, 1): if group_id == players_group: players_rank = i return dict(sorted_groups=sorted_groups, players_group=players_group, players_score=players_score, selcted_round=selcted_round, players_rank=players_rank) # class Ind_Results(Page): # @staticmethod # def is_displayed(player: Player): # return player.round_number == C.NUM_ROUNDS # # @staticmethod # def vars_for_template(player: Player): # participant = player.participant # # participant.pre_score1 = round(participant.correct_n, 2) # participant.pre_score2 = round(participant.correct_a, 2) # participant.pre_score3 = round(participant.correct_s, 2) # participant.score = (participant.pre_score1 + participant.pre_score2 + participant.pre_score3)/3 # print("score per minute", participant.pre_score1) # # print("score total", participant.score) # @staticmethod page_sequence = [WaitPage_Gr, MyPage, Ind_module1, Ind_module2, Ind_module3, Gr_final, LeaderM] # page_sequence = [WaitPage_Gr, MyPage] def custom_export(players): """For data export page""" yield ['participant_label', 'player_role', 'group_id', 'id_in_session', 'round_number', 'score_mod1', 'score_mod2', 'score_mod3', 'score_avg', 'number_answered', 'module'] responses = count_answers.filter() for resp in responses: player = resp.player participant = player.participant yield [player.participant.label, player.role, resp.groupid, participant.id_in_session, player.round_number, participant.pre_score1, participant.pre_score2, participant.pre_score3, participant.score, resp.number_answered, resp.module]