from otree.api import * import json doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'group' NUM_ROUNDS = 4 PLAYERS_PER_GROUP = 3 LEADER_ROLE = 'You' PL1_ROLE = 'Worker to your left' PL2_ROLE = 'Worker to your right' L_TABLE_TEMPLATE = __name__ + '/L_assign.html' # P_TABLE_TEMPLATE = __name__ + '/IntermediateResults.html' TIMER_TEXT = "Time to complete questions:" PAUSE_TEXT = "Discuss with your group for 15 seconds" STYLE = __name__ + '/g_sty.css' TIMED_GR = __name__ + '/timed_gr.js' PAUSEP = __name__ + '/timerpage.js' MOD = ['Numerical', 'Analogical', 'Spatial'] class Subsession(BaseSubsession): pass def read_csv(path): global rows import csv f = open(path, encoding='utf-8-sig') rows = list(csv.DictReader(f)) return rows def creating_session(subsession: Subsession): import random if subsession.round_number == 1: subsession.group_randomly(fixed_id_in_group=True) selctd_round = random.randint(1, 4) subsession.session.random = subsession.session.config['random_assign'] test = subsession.session.config['testing'] for p in subsession.get_players(): if test: I = random.randint(0, 15) if subsession.round_number == 1: selctd_round = random.randint(1, 4) p.participant.random_round_selection = selctd_round with open('_rooms/labels.txt') as f: labels = f.read().splitlines() for p, label in zip(subsession.get_players(), labels): p.participant.label = label p.participant.ID_label = label p.participant.CID = label p.participant.pre_score1 = random.randint(0, 50) p.participant.pre_score2 = random.randint(0, 50) p.participant.pre_score3 = random.randint(0, 30) p.participant.m1_current_q_ID = 0 p.participant.m2_current_q_ID = 0 p.participant.m3_current_q_ID = 0 p.participant.like_leader = label p.participant.score = ( p.participant.pre_score1 + p.participant.pre_score2 + p.participant.pre_score3) / 3 else: p.participant.CID2 = 0 p.participant.ran_code = 0 p.participant.like_leader = 0 p.participant.ID_label = 0 p.participant.old = 0 p.participant.group_m1_current_q_ID = 0 p.participant.group_m2_current_q_ID = 0 p.participant.group_m3_current_q_ID = 0 # group specific p.participant.g1_correct_n = 0 p.participant.sorted_group_ranks_R1 = [] p.participant.g1_correct_a = 0 p.participant.g1_correct_s = 0 p.participant.g2_correct_n = 0 p.participant.g2_correct_a = 0 p.participant.g2_correct_s = 0 p.participant.own_correct_s = 0 p.participant.own_correct_a = 0 p.participant.own_correct_n = 0 p.participant.is_lead = 0 p.participant.is_W1 = 0 p.participant.is_W2 = 0 p.participant.role_id = 0 p.participant.otree_group_id = 0 p.participant.group_id = 0 p.participant.L_assign = False p.participant.g1_last_click = 0 p.participant.g2_last_click = 0 p.participant.time_out = False p.participant.expiry = 0 p.participant.timer = int(subsession.session.config['timer']) p.participant.task = '' p.participant.M3done = False p.participant.M2done = False p.participant.M1done = False p.participant.worker_wait_module = False p.participant.current_round = 0 p.participant.round_diff = 0 p.random_leader = subsession.session.config['random_assign'] if subsession.round_number == 1: p.participant.random_round_selection = selctd_round # for p in subsession.get_players(): # stimuli = read_csv() # p.num_trials = len(stimuli) # for stim in stimuli: # # print('stim is', stim) # # ** is the Python operator to unpack the dict # Grial.create(player=p, **stim) subsession.session.random = subsession.session.config['random_assign'] def make_q(label): return models.StringField(label=label, choices=['Numerical', 'Analogical', 'Spatial'], widget=widgets.RadioSelect) class Group(BaseGroup): num_correct = models.IntegerField(initial=0) score = models.FloatField(initial=0.0) raw_responses = models.LongStringField() deadline = models.FloatField() trails_mod1 = models.LongStringField() trails_mod2 = models.LongStringField() trails_mod3 = models.LongStringField() itt_num = models.IntegerField(initial=0) Index_mod1 = models.IntegerField(initial=0) Index_mod2 = models.IntegerField(initial=0) Index_mod3 = models.IntegerField(initial=0) score_mod1 = models.FloatField(initial=0) score_mod2 = models.FloatField(initial=0) score_mod3 = models.FloatField(initial=0) min_score = models.FloatField(initial=0) graphic_names = models.LongStringField() num_trials = models.IntegerField(initial=0) ana_trials = models.IntegerField(initial=0) spa_trials = models.IntegerField(initial=0) nomore_num = models.BooleanField(initial=False) nomore_ana = models.BooleanField(initial=False) nomore_spa = models.BooleanField(initial=False) reassign = models.IntegerField(initial=0) rank = models.IntegerField(initial=0) high = models.FloatField(initial=0) low = models.FloatField(initial=0) p_group_id = models.IntegerField(initial=0) class Player(BasePlayer): task = models.StringField(initial='Numerical') g1_c_p1 = make_q('Assign module for Worker to your left:') g1_c_p2 = make_q('Assign module for Worker to your right:') g1_c_l = make_q('Assign module for yourself:') c_p1_practice = make_q('Assign module for Team member to your left:') c_p2_practice = make_q('Assign module for Team member to your right:') c_l_practice = make_q(label='Assign module for yourself:') c_l_practice2 = make_q(label='Assign module for yourself:') c_p1_practice2 = make_q('Assign module for Team member to your left:') c_p2_practice2 = make_q('Assign module for Team member to your right:') current_q = models.LongStringField() question = models.StringField() own_total_num_answer_mod1 = models.IntegerField(initial=0) own_total_num_answer_mod2 = models.IntegerField(initial=0) own_total_num_answer_mod3 = models.IntegerField(initial=0) own_total_score_mod1 = models.FloatField(initial=0) own_total_score_mod2 = models.FloatField(initial=0) own_total_score_mod3 = models.FloatField(initial=0) q_ID = models.IntegerField() is_correct = models.BooleanField() high = models.FloatField() low = models.FloatField() solution = models.StringField() email = models.StringField() q1 = models.IntegerField(label='What will be the overall team score?') like = models.IntegerField( widget=widgets.RadioSelect, choices=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] ) Index_mod1 = models.IntegerField(initial=0) Index_mod2 = models.IntegerField(initial=0) Index_mod3 = models.IntegerField(initial=0) random_leader = models.BooleanField() class Grial(ExtraModel): player = models.Link(Player) is_correct = models.BooleanField() question = models.StringField() graphic_name = models.CharField() solution = models.StringField() optionA = models.StringField() optionB = models.StringField() optionC = models.StringField() optionD = models.StringField() optionE = models.StringField() option1 = models.StringField() option2 = models.StringField() option3 = models.StringField() option4 = models.StringField() option5 = models.StringField() option6 = models.StringField() option7 = models.StringField() option8 = models.StringField() mod = models.StringField() key = models.StringField() q_ID = models.StringField() choice = models.StringField() class count_score(ExtraModel): group = models.Link(Group) player = models.Link(Player) pcode = models.StringField() email = models.StringField() par_id = models.StringField() random_leader = models.BooleanField() par_id_in_session = models.IntegerField() par_id_in_group = models.IntegerField() like_to_be_leader = models.IntegerField() groupID = models.IntegerField() otree_groupID = models.IntegerField() score_mod1 = models.IntegerField() score_mod2 = models.IntegerField() score_mod3 = models.IntegerField() min_score = models.IntegerField() itt_num = models.IntegerField() itt_num_mod1 = models.IntegerField() itt_num_mod2 = models.IntegerField() itt_num_mod3 = models.IntegerField() groupRound = models.StringField() rank = models.LongStringField() high = models.FloatField() low = models.FloatField() role = models.StringField() module = models.StringField() time = models.StringField() part_id1 = models.StringField() part_id2 = models.StringField() part_id3 = models.StringField() mod_id1 = models.StringField() mod_id2 = models.StringField() mod_id3 = models.StringField() assignment_number = models.IntegerField() Own_sco_prev_mod1 = models.IntegerField() Own_sco_prev_mod2 = models.IntegerField() Own_sco_prev_mod3 = models.IntegerField() own_total_num_answer_mod1 = models.IntegerField() own_total_num_answer_mod2 = models.IntegerField() own_total_num_answer_mod3 = models.IntegerField() own_total_score_mod1 = models.FloatField() own_total_score_mod2 = models.FloatField() own_total_score_mod3 = models.FloatField() group_total_score_mod1 = models.FloatField() group_total_score_mod2 = models.FloatField() group_total_score_mod3 = models.FloatField() ID_reassign = models.IntegerField() def to_dict(trial: Grial): return dict( question=trial.question, graphic_name=f'/static/{trial.graphic_name}', optionA=trial.optionA, optionB=trial.optionB, optionC=trial.optionC, optionD=trial.optionD, optionE=trial.optionE, option1=trial.option1, option2=trial.option2, option3=trial.option3, option4=trial.option4, option5=trial.option5, option6=trial.option6, option7=trial.option7, option8=trial.option8, solution=trial.solution, id=trial.id, key=trial.key, mod=trial.mod, q_ID=trial.q_ID, ) # def get_groups(players): # """gets all groups that these players belong to, without duplicates""" # already_added = set() # groups = [] # for p in players: # group = p.group # if group.id not in already_added: # leader = group.get_player_by_role(C.LEADER_ROLE) # T1 = group.get_player_by_role(C.PL1_ROLE) # T2 = group.get_player_by_role(C.PL2_ROLE) # labels = {'leader': leader.participant.label, 'teammeber 1': T1.participant.label, # 'teammeber 2': T2.participant.label} # already_added.add(group.id) # groups.append({'otree_group': group, 'role_id': labels}) # return groups def score_get_groups(players): """gets all groups that these players belong to, without duplicates""" already_added = set() groups = [] for p in players: group = p.group if group.id not in already_added: already_added.add(group.id) groups.append(group) return groups def vars_for_admin_report(subsession): p = subsession.get_players() groups = [g for g in get_groups(p)] data = [] for player in subsession.get_players(): data.append({ 'round_number': player.round_number, 'participant_id': player.participant.label, 'computer_id': player.participant.CID2, # Assuming CID is stored in participant.vars 'group_id': player.group.id, 'role': player.role, }) sorted_data = sorted(data, key=lambda x: x['group_id']) return dict(player_data=sorted_data, groups=groups) def get_groups(players): """gets all groups that these players belong to, without duplicates""" already_added = set() groups = [] for p in players: group = p.group if group.id not in already_added: leader = group.get_player_by_role(C.LEADER_ROLE) T1 = group.get_player_by_role(C.PL1_ROLE) T2 = group.get_player_by_role(C.PL2_ROLE) members_role = {'manager': {'label': leader.participant.label, 'CID': leader.participant.CID2}, 'worker1': {'label': T1.participant.label, 'CID': T1.participant.CID2}, 'worker2': {'label': T2.participant.label, 'CID': T2.participant.CID2}} already_added.add(group.id) groups.append({'otree_group': group, 'members': members_role}) return groups import numpy as np def assign_roleid_groups(players): """label id and roles of all groups that these players belong to""" already_added = [] for p in players: group = p.group if group.id not in already_added: already_added.append(group.id) leader = group.get_player_by_role(C.LEADER_ROLE) T1 = group.get_player_by_role(C.PL1_ROLE) T2 = group.get_player_by_role(C.PL2_ROLE) leader.participant.role_id = 'Manager' T1.participant.role_id = 'worker 1' T2.participant.role_id = 'worker 2' leader.participant.otree_group_id = group.id T1.participant.otree_group_id = group.id T2.participant.otree_group_id = group.id for p in players: for i, gr in enumerate(already_added): if p.participant.otree_group_id == gr: p.participant.group_id = i + 1 break def get_all_matrix(group_matrix, num_participants): ref = [[1, 6, 15], [2, 3, 14], [7, 5, 8], [10, 9, 12], [13, 4, 11]] gr_n = len(group_matrix) if num_participants == 15: keys = np.reshape(ref, (num_participants)).tolist() nested_matrix = [[[1, 3, 5], [2, 8, 12], [7, 9, 11], [10, 4, 15], [13, 6, 14]], [[1, 8, 11], [2, 9, 15], [7, 4, 14], [10, 6, 5], [13, 3, 12]], [[1, 9, 14], [2, 4, 5], [7, 6, 12], [10, 3, 11], [13, 8, 15]], [[1, 4, 12], [2, 6, 11], [7, 3, 15], [10, 8, 14], [13, 9, 5]]] elif num_participants == 12: ref = [[1, 5, 12], [2, 6, 11], [3, 7, 10], [4, 8, 9]] keys = np.reshape(ref, (num_participants)).tolist() nested_matrix = [[[1, 8, 11], [2, 7, 12], [3, 6, 9], [4, 5, 10]], [[1, 7, 10], [2, 8, 9], [3, 5, 11], [4, 6, 12]], [[1, 6, 9], [2, 5, 10], [3, 8, 12], [4, 7, 11]]] elif num_participants == 18: ref = [[1, 3, 13], [2, 10, 14], [7, 9, 18], [4, 6, 16], [5, 8, 15], [11, 12, 17]] keys = np.reshape(ref, (num_participants)).tolist() nested_matrix = [[[1, 10, 17], [2, 6, 15], [7, 8, 14], [4, 12, 18], [5, 3, 16], [11, 9, 13]], [[1, 9, 18], [2, 12, 16], [7, 3, 17], [4, 8, 10], [5, 14, 13], [11, 6, 15]], [[1, 12, 15], [2, 9, 18], [7, 6, 16], [4, 3, 13], [5, 10, 17], [11, 8, 14]]] else: I = [[2, 1, 3]] keys = np.reshape(I, (num_participants)).tolist() nested_matrix = [[[2, 3, 1]], [[2, 1, 3]]] shuffled_nested_matrix = [] shuffled_nested_matrix.append(group_matrix) for shuffled_matrix in nested_matrix: shuffled_matrix = np.reshape(shuffled_matrix, (num_participants)).tolist() group_matrix = np.reshape(group_matrix, (num_participants)).tolist() merged = dict(zip(shuffled_matrix, group_matrix)) result = [merged[key] for key in keys] shuffled_nested_matrix.append(np.reshape(result, (gr_n, 3)).tolist()) return shuffled_nested_matrix class WaitPage_Gr(WaitPage): wait_for_all_groups = True body_text = "Waiting for all players to arrive before starting Group Task" @staticmethod def is_displayed(player): return player.round_number < 6 @staticmethod def after_all_players_arrive(subsession: Subsession): global matrix import random session = subsession.session # get list of computers. comp = [] for c in subsession.get_players(): comp.append(c.participant.CID) ran = session.random print(ran) comp = sorted(comp) p = subsession.get_players() print('CID', comp) # get corresponding players. CID_players = sorted(p, key=lambda player: player.participant.CID, reverse=True) if subsession.round_number == 1: # Sort players by score if ran: sorted_players = sorted(p, key=lambda player: player.participant.score, reverse=True) else: sorted_players = sorted(p, key=lambda player: player.participant.like_leader, reverse=True) # Divide players into three groups (high, mid, low) high_score = sorted_players[:len(sorted_players) // 3] mid_score = sorted_players[len(sorted_players) // 3:2 * len(sorted_players) // 3] low_score = sorted_players[2 * len(sorted_players) // 3:] # Group players # in group shuffle (random leader) group_matrix = [] for i in range(len(high_score)): g = [high_score[i], mid_score[i], low_score[i]] if ran: random.shuffle(g) group_matrix.append(g) num_participants = session.num_participants for s in subsession.in_rounds(1, C.NUM_ROUNDS): s.set_group_matrix(group_matrix) matrix = subsession.get_group_matrix() session.matrices = get_all_matrix(matrix, num_participants) allmatrix = session.matrices else: round = subsession.round_number - 1 allmatrix = session.matrices group_matrix = allmatrix[round] subsession.set_group_matrix(group_matrix) for i, group in enumerate(subsession.get_groups()): group_comp = comp[i * 3:(i + 1) * 3] group_old = CID_players[i * 3:(i + 1) * 3] for j, label in enumerate(group.get_players()): if label.role == C.LEADER_ROLE: label.participant.CID2 = group_comp[1] label.participant.old = group_old[1].participant.ID_label elif label.role == C.PL1_ROLE: label.participant.CID2 = group_comp[0] label.participant.old = group_old[0].participant.ID_label else: label.participant.CID2 = group_comp[2] label.participant.old = group_old[2].participant.ID_label players = subsession.get_players() assign_roleid_groups(players) print(f'R{subsession.round_number} to R{subsession.round_number + 1} all matrix', allmatrix[:subsession.round_number + 1]) print('R1 to R3 all matrix', allmatrix[2]) print("== R1Round ", subsession.round_number, " == ") print(" R1Matching: ", subsession.get_group_matrix()) print(" R1Matching: all ", allmatrix[0]) # PAGES class R1_intro(Page): template_name = 'group/templates/R1_intro.html' form_model = 'player' form_fields = ['like'] def before_next_page(player: Player, timeout_happened): player.participant.like_leader = player.like @staticmethod def is_displayed(player): return player.round_number == 1 class Gr_switch_groups(Page): template_name = 'group/templates/Gr_switch_groups.html' class Gr_meet_groups(Page): template_name = 'group/templates/Gr_meet_groups.html' @staticmethod def vars_for_template(player: Player): return dict(players2=[p for p in player.get_others_in_group()], ) @staticmethod def is_displayed(player): return player.round_number == 1 and player.role != C.LEADER_ROLE class Gr_Instruction_all_1(Page): template_name = 'group/templates/Gr_Instruction_all_1.html' @staticmethod def is_displayed(player): return player.round_number == 1 class Gr_Instruction_all_2(Page): template_name = 'group/templates/Gr_Instruction_all_2.html' @staticmethod def is_displayed(player): return player.round_number == 1 class Gr_Instruction_all_3(Page): template_name = 'group/templates/Gr_Instruction_all_3.html' form_model = 'player' form_fields = ['q1'] @staticmethod def error_message(player: Player, values): solutions = dict(q1=2) if values != solutions: return "One or more answers were incorrect." @staticmethod def is_displayed(player): return player.round_number == 1 class Gr_Instruction_all_4(Page): template_name = 'group/templates/Gr_Instruction_all_4.html' @staticmethod def is_displayed(player): return player.round_number == 1 class Gr_Instruction_P1(Page): template_name = 'group/templates/Gr_Instruction_P1.html' @staticmethod def is_displayed(player: Player): return player.role != C.LEADER_ROLE class Gr_Instruction_P2(Page): template_name = 'group/templates/Gr_Instruction_P2.html' @staticmethod def is_displayed(player: Player): return player.role != C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_P3(Page): template_name = 'group/templates/Gr_Instruction_P3.html' @staticmethod def is_displayed(player: Player): return player.role != C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_P4(Page): template_name = 'group/templates/Gr_Instruction_P4.html' @staticmethod def is_displayed(player: Player): return player.role != C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_P5(Page): template_name = 'group/templates/Gr_Instruction_P5.html' @staticmethod def is_displayed(player: Player): return player.role != C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_L1(Page): template_name = 'group/templates/Gr_Instruction_L1.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE class Gr_Instruction_L2(Page): template_name = 'group/templates/Gr_Instruction_L2.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_L3(Page): template_name = 'group/templates/Gr_Instruction_L3.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_L4(Page): template_name = 'group/templates/Gr_Instruction_L4.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_L5(Page): template_name = 'group/templates/Gr_Instruction_L5.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE and player.round_number == 1 class Gr_Instruction_L6(Page): template_name = 'group/templates/Gr_Instruction_L6.html' @staticmethod def is_displayed(player: Player): return player.role == C.LEADER_ROLE and player.round_number == 1 @staticmethod def vars_for_template(player: Player): return dict(players=shift(player.group.get_players())) class manageL(Page): template_name = 'group/templates/manageL.html' @staticmethod def is_displayed(player): return player.role == C.LEADER_ROLE class manageT(Page): template_name = 'group/templates/manageT.html' @staticmethod def is_displayed(player): return player.role != C.LEADER_ROLE class Welcome_team(Page): template_name = 'group/templates/Welcome_team.html' timeout_seconds = 60 @staticmethod def is_displayed(player): return player.role != C.LEADER_ROLE class Welcome_teamL(Page): template_name = 'group/templates/Welcome_teamL.html' timeout_seconds = 60 @staticmethod def is_displayed(player): return player.role == C.LEADER_ROLE class Gr_AssignPage_test(Page): template_name = 'group/templates/Gr_AssignPage_test.html' form_model = 'player' form_fields = ['c_p2_practice', 'c_l_practice', 'c_p1_practice'] @staticmethod def is_displayed(player: Player): return (player.round_number == 1) and player.role == C.LEADER_ROLE @staticmethod def vars_for_template(player: Player): group = player.group lead = group.get_player_by_role(C.LEADER_ROLE) return dict(players=shift(player.group.get_players()), min_score=min([lead.participant.g1_correct_n, lead.participant.g1_correct_a, lead.participant.g1_correct_s])) @staticmethod def error_message(player: Player, values): solutions = dict(c_l_practice='Spatial', c_p1_practice='Numerical', c_p2_practice='Analogical') if values != solutions: return "One or more selections were incorrect." class Gr_AssignPage_test2(Page): template_name = 'group/templates/Gr_AssignPage_test2.html' form_model = 'player' form_fields = ['c_p2_practice2', 'c_l_practice2', 'c_p1_practice2'] @staticmethod def is_displayed(player: Player): return (player.round_number == 1) and player.role == C.LEADER_ROLE @staticmethod def vars_for_template(player: Player): group = player.group lead = group.get_player_by_role(C.LEADER_ROLE) return dict(players=shift(player.group.get_players()), min_score=min([lead.participant.g1_correct_n, lead.participant.g1_correct_a, lead.participant.g1_correct_s])) @staticmethod def error_message(player: Player, values): solutions = dict(c_l_practice2='Analogical', c_p1_practice2='Analogical', c_p2_practice2='Analogical') if values != solutions: return "One or more selections were incorrect." class MyPage(Page): @staticmethod def is_displayed(player: Player): return player.round_number < 6 @staticmethod def vars_for_template(player: Player): p = player.subsession.get_players() groups = [g for g in get_groups(p)] allgroups = player.session.matrices return dict(groups=groups, allgroups=allgroups, players2=[p for p in player.get_others_in_group()], ) class tasks_test(Page): template_name = 'group/templates/tasks_test.html' @staticmethod def is_displayed(player: Player): return (player.round_number == 1) and player.role == C.LEADER_ROLE def shift(seq): return [seq[-1]] + seq[:-1] class Gr_AssignPage(Page): template_name = 'group/templates/Gr_AssignPage.html' form_model = 'player' form_fields = ['g1_c_p2', 'g1_c_l', 'g1_c_p1'] @staticmethod def is_displayed(player: Player): participant = player.participant return player.role == C.LEADER_ROLE @staticmethod def vars_for_template(player: Player): # print('ping', player.group.get_players()) return dict(players=shift(player.group.get_players())) @staticmethod def before_next_page(player: Player, timeout_happened): group = player.group try: player.email = player.participant.EM except: player.participant.EM = 'NA' player.email = player.participant.EM lead = group.get_player_by_id(1) p1 = group.get_player_by_id(2) p2 = group.get_player_by_id(3) p1.task = lead.g1_c_p1 p2.task = lead.g1_c_p2 lead.task = lead.g1_c_l for p in group.get_players(): p.Index_mod1 = 0 p.Index_mod2 = 0 p.Index_mod3 = 0 # Check If more than one player has the same task, if so: let one player start at index 0 and the other at index 1 and so on tasks = ['Numerical', 'Analogical', 'Spatial'] # if more than one player has the same task in 'Numerical' module add to Index_mod1, if more than one player has the same task in 'Analogical' module add to Index_mod2, if more than one player has the same task in 'Spatial' module add to Index_mod3 players_num = [p for p in group.get_players() if p.task == 'Numerical'] players_ana = [p for p in group.get_players() if p.task == 'Analogical'] players_spa = [p for p in group.get_players() if p.task == 'Spatial'] if len(players_num) > 1: for i, p in enumerate(players_num): p.Index_mod1 = i if len(players_ana) > 1: for i, p in enumerate(players_ana): p.Index_mod2 = i if len(players_spa) > 1: for i, p in enumerate(players_spa): p.Index_mod3 = i group.Index_mod1 = 0 group.Index_mod2 = 0 group.Index_mod3 = 0 group.reassign = 0 import time t = time.localtime() current_time = time.strftime("%H:%M:%S", t) reassign = group.reassign count_score.create(pcode=player.participant.code, email=player.email, par_id=player.participant.label, par_id_in_session=player.participant.id_in_session, role=player.role, groupID=player.participant.group_id, otree_groupID=player.group.id, groupRound=player.round_number, assignment_number=reassign, time=current_time, part_id1=player.participant.label, part_id2=p1.participant.label, part_id3=p2.participant.label, mod_id1=player.task, mod_id2=p1.task, mod_id3=p2.task) class WaitForShuffle(WaitPage): body_text = "Waiting for other players" @staticmethod def after_all_players_arrive(group: Group): p = group.get_players() for pp in p: N = pp.round_number path = '_static/R{}Qs.csv'.format(N) sec = pp.participant.timer stimuli = read_csv(path) for stim in stimuli: # print('stim is', stim) # ** is the Python operator to unpack the dict Grial.create(player=pp, **stim) stimuli = [to_dict(trial) for trial in Grial.filter(player=pp)] mod_dict = {} for stim in stimuli: mod_key = stim['mod'] if mod_key not in mod_dict: mod_dict[mod_key] = [] mod_dict[mod_key].append(stim) if pp.task == 'Numerical': pp.current_q = json.dumps(mod_dict['num'][0]) elif pp.task == 'ana': pp.current_q = json.dumps(mod_dict['ana'][0]) elif pp.task == 'spa': pp.current_q = json.dumps(mod_dict['spa'][0]) groupRound = pp.round_number group.trails_mod1 = json.dumps(mod_dict['num']) group.trails_mod2 = json.dumps(mod_dict['ana']) group.trails_mod3 = json.dumps(mod_dict['spa']) group.num_trials = len(mod_dict['num']) group.ana_trials = len(mod_dict['ana']) group.spa_trials = len(mod_dict['spa']) graphic_names = [item['graphic_name'] for item in mod_dict['spa']] group.graphic_names = json.dumps(graphic_names) import time # print('mod_dict is', mod_dict['num']) # group.session.expiry = time.monotonic() + 6 * 60 group.deadline = time.monotonic() + int(sec) * 60 def get_current_trial(player: Player): task = player.task if task == 'Numerical': index = player.Index_mod1 trails = json.loads(player.group.trails_mod1) player.q_ID = player.Index_mod1 elif task == 'Analogical': index = player.Index_mod2 trails = json.loads(player.group.trails_mod2) player.q_ID = player.Index_mod2 elif task == 'Spatial': index = player.Index_mod3 trails = json.loads(player.group.trails_mod3) player.q_ID = player.Index_mod3 print('index is', index) try: current = trails[index] except IndexError: if task == 'Numerical': player.group.nomore_num = True elif task == 'Analogical': player.group.nomore_ana = True elif task == 'Spatial': player.group.nomore_spa = True return dict(error=True, msg=f'No more questions in the {task} module') print('trails is', trails[index]) player.solution = trails[index]['solution'] return trails[index] def get_current_score(player: Player): task = player.task if task == 'Numerical': score = player.group.score_mod1 elif task == 'Analogical': score = player.group.score_mod2 elif task == 'Spatial': score = player.group.score_mod3 return score def add_to_index(player: Player, task): if task == 'Numerical': player.Index_mod1 = player.group.Index_mod1 + 1 elif task == 'Analogical': player.Index_mod2 = player.group.Index_mod2 + 1 elif task == 'Spatial': player.Index_mod3 = player.group.Index_mod3 + 1 def get_current_score_by_mod(player: Player, task): if task == 'Numerical': score = player.group.score_mod1 elif task == 'Analogical': score = player.group.score_mod2 elif task == 'Spatial': score = player.group.score_mod3 return score def count_assign(player: Player, id, task): pl = player.group.get_player_by_id(id) add_to_index(pl, task) import time group = player.group t = time.localtime() current_time = time.strftime("%H:%M:%S", t) reassign = group.reassign lead = group.get_player_by_id(1) p1 = group.get_player_by_id(2) p2 = group.get_player_by_id(3) p1_score_num = prev_task_score(lead, 'Numerical') p1_score_ana = prev_task_score(lead, 'Analogical') p1_score_spa = prev_task_score(lead, 'Spatial') p2_score_num = prev_task_score(p1, 'Numerical') p2_score_ana = prev_task_score(p1, 'Analogical') p2_score_spa = prev_task_score(p1, 'Spatial') p3_score_num = prev_task_score(p2, 'Numerical') p3_score_ana = prev_task_score(p2, 'Analogical') p3_score_spa = prev_task_score(p2, 'Spatial') group_total_score_mod1 = get_current_score_by_mod(player, 'Numerical') group_total_score_mod2 = get_current_score_by_mod(player, 'Analogical') group_total_score_mod3 = get_current_score_by_mod(player, 'Spatial') pl.task = task print('player task', pl.task) if id == 1: count_score.create(pcode=player.participant.code, email=player.email, par_id=player.participant.label, par_id_in_session=player.participant.id_in_session, role=player.role, groupID=player.participant.group_id, otree_groupID=player.group.id, groupRound=player.round_number, assignment_number=reassign, time=current_time, part_id1=player.participant.label, part_id2=p1.participant.label, part_id3=p2.participant.label, mod_id1=player.task, mod_id2=p1.task, mod_id3=p2.task, Own_sco_prev_mod1=p1_score_num, Own_sco_prev_mod2=p1_score_ana, Own_sco_prev_mod3=p1_score_spa, ID_reassign=id, group_total_score_mod1=group_total_score_mod1, group_total_score_mod2=group_total_score_mod2, group_total_score_mod3=group_total_score_mod3) elif id == 2: count_score.create(pcode=player.participant.code, email=player.email, par_id=player.participant.label, par_id_in_session=player.participant.id_in_session, role=player.role, groupID=player.participant.group_id, otree_groupID=player.group.id, groupRound=player.round_number, assignment_number=reassign, time=current_time, part_id1=lead.participant.label, part_id2=p1.participant.label, part_id3=p2.participant.label, mod_id1=lead.task, mod_id2=p1.task, mod_id3=p2.task, Own_sco_prev_mod1=p2_score_num, Own_sco_prev_mod2=p2_score_ana, Own_sco_prev_mod3=p2_score_spa, ID_reassign=id, group_total_score_mod1=group_total_score_mod1, group_total_score_mod2=group_total_score_mod2, group_total_score_mod3=group_total_score_mod3) elif id == 3: count_score.create(pcode=player.participant.code, email=player.email, par_id=player.participant.label, par_id_in_session=player.participant.id_in_session, role=player.role, groupID=player.participant.group_id, otree_groupID=player.group.id, groupRound=player.round_number, assignment_number=reassign, time=current_time, part_id1=lead.participant.label, part_id2=p1.participant.label, part_id3=p2.participant.label, mod_id1=lead.task, mod_id2=p1.task, mod_id3=p2.task, Own_sco_prev_mod1=p3_score_num, Own_sco_prev_mod2=p3_score_ana, Own_sco_prev_mod3=p3_score_spa, ID_reassign=id, group_total_score_mod1=group_total_score_mod1, group_total_score_mod2=group_total_score_mod2, group_total_score_mod3=group_total_score_mod3) def prev_task_score(player: Player, task): """ if player.participant.task == "Numerical" --> return player.participant.own_correct_n if player.participant.task == "Analogical" --> return player.participant.own_correct_a if player.participant.task == "Spatial" --> return player.participant.own_correct_s :param player: :return: players score in previous task """ if task == "Numerical": return player.own_total_score_mod1 elif task == "Analogical": return player.own_total_score_mod2 elif task == "Spatial": return player.own_total_score_mod3 def make_score_task(player: Player, task, response): group = player.group if task == "num": index = player.Index_mod1 trails = json.loads(player.group.trails_mod1) trail = trails[index] is_correct = response == str(trail['solution']) if not is_correct: group.score_mod1 += -0.5 player.own_total_score_mod1 += -0.5 elif is_correct: group.score_mod1 += 1 player.own_total_score_mod1 += 1 player.Index_mod1 = max([p.Index_mod1 for p in group.get_players()]) + 1 group.Index_mod1 = player.Index_mod1 group.itt_num += 1 player.own_total_num_answer_mod1 += 1 elif task == "ana": index = player.Index_mod2 trails = json.loads(player.group.trails_mod2) trail = trails[index] is_correct = response == str(trail['solution']) if not is_correct: group.score_mod2 += -0.5 player.own_total_score_mod2 += -0.5 elif is_correct: group.score_mod2 += 1 player.own_total_score_mod2 += 1 player.Index_mod2 = max([p.Index_mod2 for p in group.get_players()]) + 1 group.Index_mod2 = player.Index_mod2 group.itt_num += 1 player.own_total_num_answer_mod2 += 1 elif task == "spa": index = player.Index_mod3 trails = json.loads(player.group.trails_mod3) trail = trails[index] is_correct = response == str(trail['solution']) if not is_correct: group.score_mod3 += -0.5 player.own_total_score_mod3 += -0.5 elif is_correct: group.score_mod3 += 1 player.own_total_score_mod3 += 1 player.Index_mod3 = max([p.Index_mod3 for p in group.get_players()]) + 1 group.Index_mod3 = player.Index_mod3 group.itt_num += 1 player.own_total_num_answer_mod3 += 1 def live_method(player: Player, data): group = player.group my_id = player.id_in_group index1 = group.Index_mod1 index2 = group.Index_mod2 index3 = group.Index_mod3 msg_type = data['type'] if msg_type != 'load': if group.nomore_num and group.nomore_ana and group.nomore_spa: return {0: dict(finished=True)} if msg_type == 'assign': id = data['id'] task = str(data['task']) print('assign', f'id{id} to task {task}') pl = group.get_player_by_id(int(id)) group.reassign += 1 count_assign(player, int(id), task) # check that len index is not greater than index if task == 'Numerical': if index1 > group.num_trials - 1: group.nomore_num = True return {my_id: dict(error=True, msg='No more questions in the Nummerical module')} elif task == 'Analogical': if index2 > group.ana_trials - 1: group.nomore_ana = True return {my_id: dict(error=True, msg='No more questions in the Analogical module')} elif task == 'Spatial': if index3 > group.spa_trials - 1: group.nomore_spa = True return {my_id: dict(error=True, msg='No more questions in the Spatial module')} if msg_type == 'response': if player.task == 'Numerical' and group.nomore_num: return {my_id: dict(error=True, msg='No more questions in the Numerical module: ask to be reassigned')} if player.task == 'Analogical' and group.nomore_ana: return {my_id: dict(error=True, msg='No more questions in the Analogical module: ask to be reassigned')} if player.task == 'Spatial' and group.nomore_spa: return {my_id: dict(error=True, msg='No more questions in the Spatial module: ask to be reassigned')} trial = get_current_trial(player) print('trial has solution', trial['solution']) mod = trial['mod'] print('mod is', mod) response = data['response'].split('option')[-1] print(response) make_score_task(player, mod, response) group.min_score = min([group.score_mod1, group.score_mod2, group.score_mod3]) current = get_current_trial(player) if 'error' in current: return {my_id: current} return { p.id_in_group: dict(task=p.task, current_q=get_current_trial(p), score=get_current_score(p), min_score=p.group.min_score, is_leader=p.role == C.LEADER_ROLE) for p in group.get_players() } class WaitForGroups(WaitPage): wait_for_all_groups = True body_text = "Waiting for all groups to start playing" def get_groups_high_low_score(players): """gets all groups that these players belong to, without duplicates""" already_added = set() group_scores = [] for p in players: group = p.group if p.participant.group_id not in already_added: min_score = min([group.score_mod1, group.score_mod2, group.score_mod3]) already_added.add(p.participant.group_id) group_scores.append(min_score) low = min(group_scores) high = max(group_scores) return high, low class GamePage(Page): live_method = live_method timeout_seconds = 60 * 8 @staticmethod def vars_for_template(player: Player): group = player.group leader = group.get_player_by_role(C.LEADER_ROLE) T1 = group.get_player_by_role(C.PL1_ROLE) T2 = group.get_player_by_role(C.PL2_ROLE) MOD = [dict(role='Worker to your right', id=T2.id_in_group, labels=C.MOD, task=T2.task), dict(role='Manager (You)', id=leader.id_in_group, labels=C.MOD, task=leader.task), dict(role='Worker to your left', id=T1.id_in_group, labels=C.MOD, task=T1.task)] return dict(MOD=MOD, players=shift(player.group.get_players()), ) @staticmethod def js_vars(player: Player): return dict(my_id=player.id_in_group, my_role=player.role, ) # @staticmethod # def get_timeout_seconds(player: Player): # import time # group = player.group # return group.deadline - time.monotonic() @staticmethod def before_next_page(player: Player, timeout_happened): group = player.group try: player.email = player.participant.EM except: player.participant.EM = 'NA' player.email = player.participant.EM try: player.random_leader = player.session.config['random_assign'] random_assign = player.session.config['random_assign'] except: random_assign = player.session.config['random_assign'] count_score.create(random_leader=random_assign, pcode=player.participant.code, email=player.email, par_id=player.participant.label, par_id_in_session=player.participant.id_in_session, like_to_be_leader=player.participant.like_leader, role=player.role, groupID=player.participant.group_id, otree_groupID=player.group.id, groupRound=player.round_number, assignment_number=group.reassign, own_total_num_answer_mod1=player.own_total_num_answer_mod1, own_total_num_answer_mod2=player.own_total_num_answer_mod2, own_total_num_answer_mod3=player.own_total_num_answer_mod3, own_total_score_mod1=player.own_total_score_mod1, own_total_score_mod2=player.own_total_score_mod2, own_total_score_mod3=player.own_total_score_mod3) group.p_group_id = player.participant.group_id # don't need it anymore def sort_ranking(players, round): """Sorts the ranking of players in a round with any number of groups""" group_rankings = {} already_added = set() group_scores = [] if round == 1: for p in players: p.participant.player_rank_group = {} for p in players: group = p.group group_id = p.participant.group_id score = min([group.score_mod1, group.score_mod2, group.score_mod3]) print('group score same', score == group.min_score) p.participant.player_rank_group[round] = {'group_id': group_id, 'group_score': score} if group_id not in already_added: already_added.add(p.participant.group_id) group_scores.append(score) group_rankings[group_id] = score # Sort the dictionary items by score in descending order sorted_groups = sorted(group_rankings.items(), key=lambda x: x[1], reverse=True) # Check if the list has at least one element if sorted_groups: # The highest-scoring group will be at index 0 highest_scoring_group = sorted_groups[0] # The list you want with the highest-scoring group at index 1 sorted_group_list = [(group_id, score) for group_id, score in sorted_groups] print("Highest Scoring Group:", highest_scoring_group) print("Sorted Group List:", sorted_group_list) else: print("The dictionary is empty.") return sorted_group_list class score_wait(WaitPage): wait_for_all_groups = True body_text = "Waiting for all players to finish playing this round of Group Tasks." @staticmethod def after_all_players_arrive(subsession: Subsession): p = subsession.get_players() high, low = get_groups_high_low_score(p) round_n = p[0].round_number print(round_n) sorted_group_ranks = sort_ranking(p, p[0].round_number) for player in p: player.high, player.low = high, low if player.round_number == 1: player.participant.rank_R1 = sorted_group_ranks elif player.round_number == 2: player.participant.rank_R2 = sorted_group_ranks elif player.round_number == 3: player.participant.rank_R3 = sorted_group_ranks elif player.round_number == 4: player.participant.rank_R4 = sorted_group_ranks for g in score_get_groups(p): g.high, g.low = high, low count_score.create(groupID=g.p_group_id, otree_groupID=g.id, groupRound=round_n, rank=json.dumps(sorted_group_ranks), high=g.high, low=g.low, score_mod1=g.score_mod1, score_mod2=g.score_mod2, score_mod3=g.score_mod3, min_score=g.min_score, itt_num=g.itt_num, itt_num_mod1=g.Index_mod1, itt_num_mod2=g.Index_mod2, itt_num_mod3=g.Index_mod3) class Gr_final(Page): template_name = 'group/templates/Gr_final.html' @staticmethod def vars_for_template(player: Player): sorted_group_ranks = player.participant.rank_R1 high = player.high low = player.low return dict(high=high, low=low, sorted_group_ranks_R1=sorted_group_ranks) class LeaderM(Page): template_name = 'group/templates/LeaderM.html' @staticmethod def is_displayed(player: Player): return (player.round_number == C.NUM_ROUNDS) and player.role == C.LEADER_ROLE @staticmethod def vars_for_template(player: Player): selcted_round = player.participant.random_round_selection players_group = player.participant.player_rank_group[selcted_round]['group_id'] players_score = player.participant.player_rank_group[selcted_round]['group_score'] if selcted_round == 1: sorted_groups = player.participant.rank_R1 elif selcted_round == 2: sorted_groups = player.participant.rank_R2 elif selcted_round == 3: sorted_groups = player.participant.rank_R3 elif selcted_round == 4: sorted_groups = player.participant.rank_R4 # Calculate the thresholds total_players = len(sorted_groups) top_40_percent_threshold = int(total_players * 0.4) bottom_40_percent_threshold = int(total_players * 0.6) players_rank = None players_price = None for i, (group_id, score) in enumerate(sorted_groups, 1): if group_id == players_group: players_rank = i if i <= top_40_percent_threshold: players_price = 37 elif i > bottom_40_percent_threshold: players_price = 29 else: players_price = 33 return dict(sorted_groups=sorted_groups, players_group=players_group, players_score=players_score, selcted_round=selcted_round, players_rank=players_rank, players_price=players_price) class teamM(Page): template_name = 'group/templates/teamM.html' @staticmethod def is_displayed(player: Player): return (player.round_number == C.NUM_ROUNDS) and player.role != C.LEADER_ROLE page_sequence = [R1_intro, WaitPage_Gr, Gr_switch_groups, manageL, manageT, Gr_meet_groups, WaitForGroups, Welcome_team, Welcome_teamL, Gr_Instruction_all_1, manageL, manageT, Gr_Instruction_L1, Gr_Instruction_P1, Gr_Instruction_P2, Gr_Instruction_P3, Gr_Instruction_all_2, Gr_Instruction_all_3, Gr_Instruction_L2, Gr_Instruction_L3, Gr_Instruction_L4, Gr_Instruction_L5, Gr_Instruction_L6, Gr_Instruction_P4, Gr_Instruction_P5, Gr_AssignPage_test, Gr_AssignPage_test2, tasks_test, Gr_AssignPage, WaitForShuffle, WaitForGroups, GamePage, score_wait, Gr_final, LeaderM, teamM] # page_sequence = [R1_intro, WaitPage_Gr, Gr_AssignPage, WaitForShuffle, WaitForGroups, # GamePage, score_wait, Gr_final, LeaderM, teamM] def custom_export(players): yield ['random_leader', 'participant_code', 'participant_EM', 'participant_label', 'id_in_session', 'like_to_be_leader', 'role', 'GroupRound', 'groupID', 'Otree_groupID', 'score_mod1', 'score_mod2', 'score_mod3', 'min_score', 'total_number_answered', 'number_answered_mod1', 'number_answered_mod2', 'number_answered_mod3', 'rank_groups', 'high', 'low', 'time', 'assignment_number', 'part_id1', 'part_id2', 'part_id3', 'mod_id1', 'mod_id2', 'mod_id3', 'ID_reassign', 'Own_sco_mod1', 'Own_sco_mod2', 'Own_sco_mod3', 'group_score_mod1', 'group_score_mod2', 'group_score_mod3', 'own_total_score_mod1', 'own_total_score_mod2', 'own_total_score_mod3', 'own_total_num_answer_mod1', 'own_total_num_answer_mod2', 'own_total_num_answer_mod3'] responses = count_score.filter() for resp in responses: yield [resp.random_leader, resp.pcode, resp.email, resp.par_id, resp.par_id_in_session, resp.like_to_be_leader, resp.role, resp.groupRound, resp.groupID, resp.otree_groupID, resp.score_mod1, resp.score_mod2, resp.score_mod3, resp.min_score, resp.itt_num, resp.itt_num_mod1, resp.itt_num_mod2, resp.itt_num_mod3, resp.rank, resp.high, resp.low, resp.time, resp.assignment_number, resp.part_id1, resp.part_id2, resp.part_id3, resp.mod_id1, resp.mod_id2, resp.mod_id3, resp.ID_reassign, resp.Own_sco_prev_mod1, resp.Own_sco_prev_mod2, resp.Own_sco_prev_mod3, resp.group_total_score_mod1, resp.group_total_score_mod2, resp.group_total_score_mod3, resp.own_total_score_mod1, resp.own_total_score_mod2, resp.own_total_score_mod3, resp.own_total_num_answer_mod1, resp.own_total_num_answer_mod2, resp.own_total_num_answer_mod3]