from otree.api import * import random import string import json from datetime import datetime class C(BaseConstants): NAME_IN_URL = 'phase_II' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 TASK_DURATION = 180 # 13 minutes POINTS_PER_BLOCK = 10 letters = list(string.ascii_uppercase) random.shuffle(letters) numbers = random.sample(range(100, 1000), 26) ENCODING_TABLE = {letter: number for letter, number in zip(letters, numbers)} class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): task_start_time = models.StringField(blank=True) task_duration = models.IntegerField(initial=C.TASK_DURATION) completed_encodings = models.LongStringField(blank=True, initial="[]") total_sets_completed = models.IntegerField(initial=0) total_correct = models.IntegerField(initial=0) time_is_up = models.BooleanField(initial=False) current_letters = models.StringField(blank=True) status_assigned = models.StringField(initial='') # ── FUNCTIONS ───────────────────────────────────────────────────────────────── def calculate_points(player): total_points = player.total_sets_completed * C.POINTS_PER_BLOCK player.participant.vars['points_p2'] = total_points return total_points def live_encoding(player, data): if data['type'] == 'start_task': player.task_start_time = datetime.now().isoformat() letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = letters return { player.id_in_group: { 'type': 'new_letters', 'letters': list(letters), 'start_time': player.task_start_time, 'task_duration': player.task_duration, } } elif data['type'] == 'check_time': if player.task_start_time: start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() if elapsed >= player.task_duration: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'total_correct': player.total_correct, 'payoff_points': final_points, } } return { player.id_in_group: { 'type': 'time_remaining', 'remaining': player.task_duration - elapsed, } } elif data['type'] == 'resume_task': if player.current_letters: return { player.id_in_group: { 'type': 'new_letters', 'letters': list(player.current_letters), 'resume': True, } } elif data['type'] == 'submit_encoding': submitted_answers = data['answers'] current_letters = list(player.current_letters) incorrect_indices = [] all_correct = True for i, letter in enumerate(current_letters): correct_answer = C.ENCODING_TABLE[letter] try: user_answer = int(submitted_answers[i]) is_correct = user_answer == correct_answer except (ValueError, TypeError): is_correct = False if not is_correct: all_correct = False incorrect_indices.append(i) if not all_correct: return { player.id_in_group: { 'type': 'encoding_error', 'incorrect_indices': incorrect_indices, } } start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() completed = json.loads(player.completed_encodings) completed.append({ 'letters': current_letters, 'answers': submitted_answers, 'timestamp': datetime.now().isoformat(), 'elapsed_seconds': elapsed, }) player.completed_encodings = json.dumps(completed) player.total_sets_completed += 1 player.total_correct += 3 if elapsed >= player.task_duration: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'total_correct': player.total_correct, 'payoff_points': final_points, } } new_letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = new_letters return { player.id_in_group: { 'type': 'new_letters', 'letters': list(new_letters), 'remaining_time': player.task_duration - elapsed, } } # ── PAGES ───────────────────────────────────────────────────────────────────── class EncodingTask(Page): live_method = live_encoding # Remove timeout_seconds entirely @staticmethod def vars_for_template(player): return dict( encoding_table=C.ENCODING_TABLE, task_duration=C.TASK_DURATION, ) @staticmethod def js_vars(player): task_start_time = player.field_maybe_none('task_start_time') total_sets_completed = player.field_maybe_none('total_sets_completed') return dict( encoding_table=C.ENCODING_TABLE, task_duration=C.TASK_DURATION, task_start_time=task_start_time if task_start_time else '', total_sets_completed=total_sets_completed if total_sets_completed else 0, ) @staticmethod def before_next_page(player, timeout_happened): player.participant.vars['encoding_table'] = C.ENCODING_TABLE if not player.time_is_up: calculate_points(player) page_sequence = [EncodingTask,]