from otree.api import * import json import os import random doc = """ Puzzle-solving RPI experiment with 4 conditions and multiple rounds. """ class C(BaseConstants): NAME_IN_URL = 'puzzle_rpi' PLAYERS_PER_GROUP = 4 NUM_ROUNDS = 3 PUZZLE_TIME = 240 # 4 minutes TARGET_POINTS = 1000 # Customize as needed CONDITIONS = ['target_level', 'target_nonlevel', 'absolute_level', 'absolute_nonlevel'] PUZZLE_SETS = { 'easy': { 1: 'round1_easy.json', 2: 'round2_easy.json', 3: 'round3_easy.json', }, 'difficult': { 1: 'round1_difficult.json', 2: 'round2_difficult.json', 3: 'round3_difficult.json', }, } class Subsession(BaseSubsession): def creating_session(self): if self.round_number > 1: self.group_like_round(1) class Group(BaseGroup): def calculate_ranking(self): condition = self.session.vars[f'group_condition_{self.id_in_subsession}'] is_target = 'target' in condition for p in self.get_players(): block_list = json.loads(p.round_feedback_flattened or '[]') p.round_blocks = sum(block_list) p.performance_metric = (p.round_points / C.TARGET_POINTS) * 100 if is_target else p.round_points sorted_players = sorted(self.get_players(), key=lambda pl: pl.performance_metric, reverse=True) current_rank = 1 last_score = None eps = 1e-9 for i, pl in enumerate(sorted_players): if last_score is None or abs(pl.performance_metric - last_score) > eps: current_rank = i + 1 pl.round_rank = current_rank last_score = pl.performance_metric class Player(BasePlayer): total_points = models.IntegerField(initial=0) current_puzzle_index = models.IntegerField(initial=0) current_puzzle_number = models.IntegerField(initial=1) puzzleState = models.LongStringField(initial=json.dumps([])) round_blocks = models.IntegerField(initial=0) round_feedback_flattened = models.LongStringField(initial=json.dumps([])) # track correct blocks for ranking round_rank = models.IntegerField(initial=0) performance_metric = models.FloatField() # percentage or absolute blocks round_points = models.IntegerField(initial=0) fully_correct_puzzles = models.IntegerField(initial=0) first_name = models.StringField(label="First name:") dream_vacation = models.StringField(label="Location of your dream vacation:") name = models.StringField() # This will store concatenated version (e.g., "Kate (London)") condition_choice = models.StringField( choices=[ ('target_level', 'Target RPI / Level Playing Field'), ('target_nonlevel', 'Target RPI / Non-Level Playing Field'), ('absolute_level', 'Absolute RPI / Level Playing Field'), ('absolute_nonlevel', 'Absolute RPI / Non-Level Playing Field'), ], label="Please select your assigned condition:", widget=widgets.RadioSelect ) manual_group = models.StringField( choices=[('1', 'Group 1'), ('2', 'Group 2'), ('3', 'Group 3'), ('4', 'Group 4'), ('5', 'Group 5')], label="Please select your group:", widget=widgets.RadioSelect ) puzzle_similarity = models.IntegerField( choices=[(1, "Yes"), (2, "No")], label=" ", widget=widgets.RadioSelect ) performance_basis = models.IntegerField( label=" ", choices=[[1, "True"], [2, "False"]], widget=widgets.RadioSelect ) own_difficulty = models.IntegerField( choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], label= " ", widget=widgets.RadioSelectHorizontal ) peer_difficulty = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) fairness = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) effort = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) puzzle_set = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) think_about_ranking = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) nervous = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) distraction = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) motivation = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) familiarity = models.IntegerField( label=" ", choices=[ [1, " "], [2, " "], [3, " "], [4, " "], [5, " "], [6, " "], [7, " "] ], widget=widgets.RadioSelectHorizontal ) gender = models.StringField( label=" ", choices=["Male", "Female", "Other", "Prefer not to say"], widget=widgets.RadioSelect ) grade_status = models.StringField( label=" ", choices=["Freshman", "Sophomore", "Junior", "Senior", "Graduate"], widget=widgets.RadioSelect ) gpa = models.FloatField(label=" ", min=0, max=4) def load_puzzles(player: Player): difficulty = player.participant.vars['puzzle_difficulty'] round_number = player.round_number filename = C.PUZZLE_SETS[difficulty][round_number] path = os.path.join(os.path.dirname(__file__), filename) with open(path, 'r', encoding='utf-8') as f: puzzles = json.load(f) return puzzles def calculate_scores(player: Player): puzzles = load_puzzles(player) puzzle = puzzles[player.current_puzzle_index] solution = puzzle["solution"] submitted_state = json.loads(player.puzzleState) feedback = [[None for _ in row] for row in solution] puzzlepoints = 0 bonuspoints = 0 blocks_correct = 0 for r in range(len(solution)): for c in range(len(solution[r])): submitted_value = next( (item['value'] for item in reversed(submitted_state) if item['row'] == r and item['col'] == c), None) if str(submitted_value) == str(solution[r][c]): puzzlepoints += 5 feedback[r][c] = True blocks_correct += 1 else: feedback[r][c] = False if all(all(row) for row in feedback): bonuspoints += 45 player.fully_correct_puzzles += 1 player.total_points += puzzlepoints + bonuspoints player.round_points += puzzlepoints + bonuspoints # Track block correctness per puzzle current_feedback = json.loads(player.round_feedback_flattened or '[]') current_feedback.append(blocks_correct) player.round_feedback_flattened = json.dumps(current_feedback) return puzzlepoints, bonuspoints, feedback class ConditionSelect(Page): form_model = 'player' form_fields = ['condition_choice', 'manual_group'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 # class GroupWaitPage(WaitPage): # @staticmethod # def is_displayed(player: Player): # return player.round_number == 1 # # wait_for_all_groups = True # or False if you're only using 1 group per condition # # @staticmethod # def after_all_players_arrive(subsession: Subsession): # # Assign group_matrix based on manual_group # players = subsession.get_players() # group_map = {'1': [], '2': [], '3': [], '4': [], '5': []} # for p in players: # group_map[p.manual_group].append(p) # # group_matrix = [group for group in group_map.values() if len(group) > 0] # subsession.set_group_matrix(group_matrix) # # # Now assign condition per group # for group in subsession.get_groups(): # condition = group.get_players()[0].condition_choice # subsession.session.vars[f'group_condition_{group.id_in_subsession}'] = condition # # if 'level' in condition: # difficulty = random.choice(['easy', 'difficult']) # for p in group.get_players(): # p.participant.vars['puzzle_difficulty'] = difficulty # else: # difficulties = ['easy', 'easy', 'difficult', 'difficult'] # random.shuffle(difficulties) # for p, diff in zip(group.get_players(), difficulties): # p.participant.vars['puzzle_difficulty'] = diff class GroupWaitPage(WaitPage): group_by_arrival_time = True @staticmethod def is_displayed(player: Player): return player.round_number == 1 # IMPORTANT: instance method, not static; use (self, waiting_players) def group_by_arrival_time_method(self, waiting_players): me = self.player # current player on this wait page combo_key = (me.condition_choice, me.manual_group) # only include players who selected the same condition AND the same manual group matching = [ p for p in waiting_players if (p.condition_choice, p.manual_group) == combo_key ] required = self.session.config.get('players_per_group', C.PLAYERS_PER_GROUP) # form the group only when the exact combo is full if len(matching) == required: return matching # (or matching[:required]) # else: return None implicitly and keep waiting @staticmethod def after_all_players_arrive(group: Group): condition = group.get_players()[0].condition_choice group.session.vars[f'group_condition_{group.id_in_subsession}'] = condition # difficulty assignment per your design if 'level' in condition: diff = random.choice(['easy', 'difficult']) for p in group.get_players(): p.participant.vars['puzzle_difficulty'] = diff else: difficulties = ['easy', 'easy', 'difficult', 'difficult'][:len(group.get_players())] random.shuffle(difficulties) for p, diff in zip(group.get_players(), difficulties): p.participant.vars['puzzle_difficulty'] = diff class Instructions1(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions2(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions3(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions4(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions5(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): group_id = player.group.id_in_subsession condition = player.session.vars.get(f'group_condition_{group_id}', '') return dict( condition=condition, ) class Instructions6(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): group_id = player.group.id_in_subsession condition = player.session.vars.get(f'group_condition_{group_id}', '') return dict( condition=condition, ) class PracticeStart(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class PracticePuzzle1(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class PracticePuzzle2(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 class NameEntry(Page): form_model = 'player' form_fields = ['first_name', 'dream_vacation'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): group_id = player.group.id_in_subsession condition = player.session.vars.get(f'group_condition_{group_id}', '') return dict( condition=condition, ) @staticmethod def before_next_page(player: Player, timeout_happened): # Concatenate into one combined display name player.name = f"{player.first_name} ({player.dream_vacation})" player.participant.vars['name'] = player.name class PuzzleWait(WaitPage): pass class RoundStart(Page): pass class Puzzle(Page): timeout_seconds = C.PUZZLE_TIME @staticmethod def vars_for_template(player: Player): puzzles = load_puzzles(player) puzzle = puzzles[player.current_puzzle_index] return { 'puzzle_data': json.dumps(puzzle), 'puzzle_number': player.current_puzzle_number, } @staticmethod def live_method(player: Player, data): if data['type'] == 'reset': player.puzzleState = json.dumps([]) return {player.id_in_group: {'state': []}} elif data['type'] == 'submit': puzzlepoints, bonuspoints, feedback = calculate_scores(player) response = { 'feedback': feedback, 'points': puzzlepoints, 'bonus_points': bonuspoints, 'round_points': player.round_points, } puzzles = load_puzzles(player) if player.current_puzzle_index + 1 < len(puzzles): player.current_puzzle_index += 1 player.current_puzzle_number += 1 player.puzzleState = json.dumps([]) response['next_puzzle'] = puzzles[player.current_puzzle_index] response['puzzle_number'] = player.current_puzzle_number else: response['end'] = True return {player.id_in_group: response} elif data['type'] == 'update': state = json.loads(player.puzzleState) state.append(data['update']) player.puzzleState = json.dumps(state) return {player.id_in_group: {'state': state}} @staticmethod def before_next_page(player: Player, timeout_happened): # Ensure at least last puzzle scores are saved if they timed out if timeout_happened: calculate_scores(player) class FeedbackWait(WaitPage): pass class RPIFeedback(Page): @staticmethod def is_displayed(player: Player): return True @staticmethod def vars_for_template(player: Player): group = player.group condition = player.session.vars.get(f'group_condition_{group.id_in_subsession}', '') is_target = 'target' in condition group.calculate_ranking() group_data = sorted(group.get_players(), key=lambda p: p.round_rank or 99) return { 'is_target': is_target, 'own_points': player.round_points, 'own_percent': round(player.performance_metric, 1) if is_target else None, 'own_rank': player.round_rank, 'group_data': group_data, 'group_size': len(group.get_players()), } class ManipulationCheck(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 form_model = 'player' form_fields = ['puzzle_similarity', 'performance_basis'] @staticmethod def vars_for_template(player: Player): group_id = player.group.id_in_subsession condition = player.session.vars.get(f'group_condition_{group_id}', '') return dict( condition=condition, ) @staticmethod def error_message(player: Player, values): group_id = player.group.id_in_subsession cond = player.session.vars.get(f'group_condition_{group_id}', None) errs = {} ans_sim = values.get('puzzle_similarity') if cond in ('target_level', 'absolute_level'): if ans_sim != 1: errs[ 'puzzle_similarity'] = 'Recall: All group members receive puzzles of the same difficulty.' elif cond in ('target_nonlevel', 'absolute_nonlevel'): if ans_sim != 2: errs[ 'puzzle_similarity'] = 'Recall: Group members may receive puzzles of different difficulty.' ans_basis = values.get('performance_basis') if cond in ('target_level', 'target_nonlevel'): if ans_basis != 1: errs['performance_basis'] = 'Recall: Ranking is based on the percentage of your target achieved in a round.' elif cond in ('absolute_level', 'absolute_nonlevel'): if ans_basis != 1: errs['performance_basis'] = 'Recall: Ranking is based on the total number of points you earned in a round.' return errs or None class PEQ1(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = [ 'own_difficulty', 'peer_difficulty'] class PEQ2(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = [ 'fairness', 'effort', 'puzzle_set'] class PEQ3(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = [ 'think_about_ranking', 'nervous', 'distraction', 'motivation'] class PEQ4(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = [ 'familiarity', 'gender', 'grade_status', 'gpa' ] class ThankYou(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS page_sequence = [ ConditionSelect, GroupWaitPage, Instructions1, Instructions2, Instructions3, Instructions4, Instructions5, Instructions6, PracticeStart, PracticePuzzle1, PracticePuzzle2, ManipulationCheck, NameEntry, PuzzleWait, RoundStart, Puzzle, FeedbackWait, RPIFeedback, PEQ1, PEQ2, PEQ3, PEQ4, ThankYou ]