from otree.api import * import random import string import json from datetime import datetime class C(BaseConstants): NAME_IN_URL = 'phase_IV_A' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 TASK_DURATION = 480 # 13 (780) minutes POINTS_PER_BLOCK = 10 BLOCK_LENGTH_NORMAL = 6 BLOCK_LENGTH_REDUCED = 5 # during help period class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): task_start_time = models.StringField(blank=True) task_duration = models.IntegerField(initial=C.TASK_DURATION) completed_encodings = models.LongStringField(blank=True, initial='[]') total_sets_completed = models.IntegerField(initial=0) total_correct = models.IntegerField(initial=0) time_is_up = models.BooleanField(initial=False) current_letters = models.StringField(blank=True) status_assigned = models.StringField(initial='') help_received = models.IntegerField(initial=0) help_reduction_secs = models.IntegerField(initial=0) # ── FUNCTIONS ───────────────────────────────────────────────────────────────── def get_encoding_table(player): return player.participant.vars.get('encoding_table', {}) def get_block_length(player): """Returns 5 during help period, 6 otherwise.""" if not player.task_start_time or player.help_reduction_secs == 0: return C.BLOCK_LENGTH_NORMAL start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() return C.BLOCK_LENGTH_REDUCED if elapsed < player.help_reduction_secs else C.BLOCK_LENGTH_NORMAL def calculate_points(player): total_points = player.total_sets_completed * C.POINTS_PER_BLOCK player.participant.vars['points_p4_A'] = total_points return total_points def live_encoding(player, data): encoding_table = get_encoding_table(player) if data['type'] == 'start_task': player.task_start_time = datetime.now().isoformat() player.help_received = player.participant.vars.get('help_received_p3', 0) player.help_reduction_secs = (player.help_received // 15) * 30 block_length = get_block_length(player) letters = ''.join(random.sample(list(encoding_table.keys()), k=block_length)) player.current_letters = letters return { player.id_in_group: { 'type': 'new_letters', 'letters': list(letters), 'start_time': player.task_start_time, 'task_duration': player.task_duration, 'block_length': block_length, 'help_reduction_secs': player.help_reduction_secs, } } elif data['type'] == 'check_time': if player.task_start_time: start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() if elapsed >= player.task_duration: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'total_correct': player.total_correct, 'payoff_points': final_points, } } return { player.id_in_group: { 'type': 'time_remaining', 'remaining': player.task_duration - elapsed, 'elapsed': elapsed, 'block_length': get_block_length(player), 'help_reduction_secs': player.help_reduction_secs, } } elif data['type'] == 'resume_task': if player.current_letters: block_length = get_block_length(player) return { player.id_in_group: { 'type': 'new_letters', 'letters': list(player.current_letters), 'resume': True, 'block_length': block_length, 'help_reduction_secs': player.help_reduction_secs, } } elif data['type'] == 'submit_encoding': submitted_answers = data['answers'] current_letters = list(player.current_letters) incorrect_indices = [] all_correct = True for i, letter in enumerate(current_letters): correct_answer = encoding_table[letter] try: is_correct = int(submitted_answers[i]) == correct_answer except (ValueError, TypeError): is_correct = False if not is_correct: all_correct = False incorrect_indices.append(i) if not all_correct: return { player.id_in_group: { 'type': 'encoding_error', 'incorrect_indices': incorrect_indices, } } start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() completed = json.loads(player.completed_encodings) completed.append({ 'letters': current_letters, 'answers': submitted_answers, 'timestamp': datetime.now().isoformat(), 'elapsed_seconds': elapsed, 'block_length': len(current_letters), }) player.completed_encodings = json.dumps(completed) player.total_sets_completed += 1 player.total_correct += len(current_letters) if elapsed >= player.task_duration: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'total_correct': player.total_correct, 'payoff_points': final_points, } } block_length = get_block_length(player) new_letters = ''.join(random.sample(list(encoding_table.keys()), k=block_length)) player.current_letters = new_letters return { player.id_in_group: { 'type': 'new_letters', 'letters': list(new_letters), 'remaining_time': player.task_duration - elapsed, 'block_length': block_length, } } # ── PAGES ───────────────────────────────────────────────────────────────────── class EncodingTask(Page): live_method = live_encoding @staticmethod def vars_for_template(player): encoding_table = get_encoding_table(player) help_received = player.participant.vars.get('help_received_p3', 0) help_reduction_secs = (help_received // 15) * 30 return dict( encoding_table=encoding_table, task_duration=C.TASK_DURATION, help_received=help_received, help_reduction_secs=help_reduction_secs, ) @staticmethod def js_vars(player): task_start_time = player.field_maybe_none('task_start_time') total_sets_completed = player.field_maybe_none('total_sets_completed') time_is_up = player.field_maybe_none('time_is_up') help_received = player.participant.vars.get('help_received_p3', 0) help_reduction_secs = (help_received // 15) * 30 points = player.participant.vars.get('points_p4_A', 0) return dict( encoding_table=get_encoding_table(player), task_duration=C.TASK_DURATION, task_start_time=task_start_time if task_start_time else '', total_sets_completed=total_sets_completed or 0, time_is_up=time_is_up or False, help_reduction_secs=help_reduction_secs, points_p4_A=points, ) @staticmethod def before_next_page(player, timeout_happened): if not player.time_is_up: calculate_points(player) @staticmethod def app_after_this_page(player, upcoming_apps): return 'P5_End' page_sequence = [EncodingTask]