from otree.api import * import random import string import json from datetime import datetime class C(BaseConstants): NAME_IN_URL = 'P4_Option_B' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 TOTAL_DURATION = 480 # 13 (780)minutes total TRAINING_MIN_TIME = 240 # 8 minutes minimum training TRAINING_MIN_BLOCKS = 10 # minimum blocks to complete training / usually 20 POINTS_PER_BLOCK = 10 # New coding table for Option B letters = list(string.ascii_uppercase) random.shuffle(letters) numbers = random.sample(range(100, 1000), 26) ENCODING_TABLE_B = {letter: number for letter, number in zip(letters, numbers)} class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): task_start_time = models.StringField(blank=True) training_end_time = models.StringField(blank=True) in_paid_phase = models.BooleanField(initial=False) time_is_up = models.BooleanField(initial=False) current_letters = models.StringField(blank=True) completed_encodings = models.LongStringField(blank=True, initial='[]') total_sets_completed = models.IntegerField(initial=0) # all blocks (training + paid) training_sets = models.IntegerField(initial=0) # blocks during training paid_sets = models.IntegerField(initial=0) # blocks during paid phase total_correct = models.IntegerField(initial=0) help_received = models.IntegerField(initial=0) training_reduction = models.IntegerField(initial=0) # seconds reduced by help # ── FUNCTIONS ───────────────────────────────────────────────────────────────── def get_effective_training_duration(player): """Training minimum time reduced by help received.""" reduction = (player.help_received // 15) * 30 player.training_reduction = reduction return max(C.TRAINING_MIN_TIME - reduction, 0) def calculate_points(player): total_points = player.paid_sets * C.POINTS_PER_BLOCK player.participant.vars['points_p4_B'] = total_points return total_points def check_training_complete(player): """Returns True if both training conditions are met.""" if not player.task_start_time: return False start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() effective_training = get_effective_training_duration(player) time_ok = elapsed >= effective_training blocks_ok = player.training_sets >= C.TRAINING_MIN_BLOCKS return time_ok and blocks_ok def live_encoding(player, data): if data['type'] == 'start_task': player.task_start_time = datetime.now().isoformat() player.help_received = player.participant.vars.get('help_received_p3', 0) letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = letters effective_training = get_effective_training_duration(player) return { player.id_in_group: { 'type': 'new_letters', 'letters': list(letters), 'start_time': player.task_start_time, 'total_duration': C.TOTAL_DURATION, 'training_duration': effective_training, 'in_paid_phase': False, } } elif data['type'] == 'resume_task': if player.current_letters: effective_training = get_effective_training_duration(player) return { player.id_in_group: { 'type': 'new_letters', 'letters': list(player.current_letters), 'resume': True, 'in_paid_phase': player.in_paid_phase, 'training_duration': effective_training, } } elif data['type'] == 'check_time': if not player.task_start_time: return start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() effective_training = get_effective_training_duration(player) # Check if total time is up if elapsed >= C.TOTAL_DURATION: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'paid_sets': player.paid_sets, 'payoff_points': final_points, } } # Check if training just completed if not player.in_paid_phase and check_training_complete(player): player.in_paid_phase = True player.training_end_time = datetime.now().isoformat() new_letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = new_letters return { player.id_in_group: { 'type': 'training_complete', 'letters': list(new_letters), 'remaining': C.TOTAL_DURATION - elapsed, } } return { player.id_in_group: { 'type': 'time_remaining', 'remaining': C.TOTAL_DURATION - elapsed, 'elapsed': elapsed, 'training_duration': effective_training, 'training_sets': player.training_sets, 'in_paid_phase': player.in_paid_phase, } } elif data['type'] == 'submit_encoding': submitted_answers = data['answers'] current_letters = list(player.current_letters) incorrect_indices = [] all_correct = True for i, letter in enumerate(current_letters): correct_answer = C.ENCODING_TABLE_B[letter] try: is_correct = int(submitted_answers[i]) == correct_answer except (ValueError, TypeError): is_correct = False if not is_correct: all_correct = False incorrect_indices.append(i) if not all_correct: return { player.id_in_group: { 'type': 'encoding_error', 'incorrect_indices': incorrect_indices, } } start = datetime.fromisoformat(player.task_start_time) elapsed = (datetime.now() - start).total_seconds() completed = json.loads(player.completed_encodings) completed.append({ 'letters': current_letters, 'answers': submitted_answers, 'timestamp': datetime.now().isoformat(), 'elapsed_seconds': elapsed, 'paid': player.in_paid_phase, }) player.completed_encodings = json.dumps(completed) player.total_sets_completed += 1 player.total_correct += 3 if player.in_paid_phase: player.paid_sets += 1 else: player.training_sets += 1 # Check total time if elapsed >= C.TOTAL_DURATION: player.time_is_up = True final_points = calculate_points(player) return { player.id_in_group: { 'type': 'time_up', 'total_sets': player.total_sets_completed, 'paid_sets': player.paid_sets, 'payoff_points': final_points, } } # Check if training just completed after this block if not player.in_paid_phase and check_training_complete(player): player.in_paid_phase = True player.training_end_time = datetime.now().isoformat() new_letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = new_letters return { player.id_in_group: { 'type': 'training_complete', 'letters': list(new_letters), 'remaining': C.TOTAL_DURATION - elapsed, } } new_letters = ''.join(random.sample(string.ascii_uppercase, k=3)) player.current_letters = new_letters return { player.id_in_group: { 'type': 'new_letters', 'letters': list(new_letters), 'remaining_time': C.TOTAL_DURATION - elapsed, 'in_paid_phase': player.in_paid_phase, } } # ── PAGES ───────────────────────────────────────────────────────────────────── class EncodingTaskB(Page): live_method = live_encoding @staticmethod def vars_for_template(player): help_received = player.participant.vars.get('help_received_p3', 0) reduction = (help_received // 15) * 30 effective_training = max(C.TRAINING_MIN_TIME - reduction, 0) return dict( encoding_table=C.ENCODING_TABLE_B, total_duration=C.TOTAL_DURATION, training_min_time=C.TRAINING_MIN_TIME, training_min_blocks=C.TRAINING_MIN_BLOCKS, effective_training=int(effective_training), help_received=help_received, reduction=reduction, ) @staticmethod def js_vars(player): task_start_time = player.field_maybe_none('task_start_time') total_sets_completed = player.field_maybe_none('total_sets_completed') training_sets = player.field_maybe_none('training_sets') paid_sets = player.field_maybe_none('paid_sets') in_paid_phase = player.field_maybe_none('in_paid_phase') time_is_up = player.field_maybe_none('time_is_up') help_received = player.participant.vars.get('help_received_p3', 0) reduction = (help_received // 15) * 30 effective_training = max(C.TRAINING_MIN_TIME - reduction, 0) points = player.participant.vars.get('points_p4_B', 0) return dict( encoding_table=C.ENCODING_TABLE_B, total_duration=C.TOTAL_DURATION, training_duration=effective_training, training_min_blocks=C.TRAINING_MIN_BLOCKS, task_start_time=task_start_time if task_start_time else '', total_sets_completed=total_sets_completed or 0, training_sets=training_sets or 0, paid_sets=paid_sets or 0, in_paid_phase=in_paid_phase or False, time_is_up=time_is_up or False, points_p4_B=points, ) @staticmethod def before_next_page(player, timeout_happened): player.participant.vars['encoding_table_b'] = C.ENCODING_TABLE_B if not player.time_is_up: calculate_points(player) @staticmethod def app_after_this_page(player, upcoming_apps): return 'P5_End' page_sequence = [EncodingTaskB]