from otree.api import * from . import ret_functions import json import random import string doc = """Two-minute continuous decoding task""" # ============================================================ # CONSTANTS # ============================================================ class C(BaseConstants): NAME_IN_URL = 'decoding' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 task_time = 120 # total duration in seconds PracticePay = 1 # ============================================================ # MODELS # ============================================================ class Subsession(BaseSubsession): task_dict = models.LongStringField() # the mapping number → letter sequence = models.LongStringField() # the fixed order of numbers class Group(BaseGroup): pass class Player(BasePlayer): question = models.StringField() # current number to decode correct_answer = models.StringField() # current correct answer current_index = models.IntegerField(initial=0) # position in sequence EffortPractice = models.IntegerField(initial=0) task_finished = models.BooleanField(initial=False) EarningsPractice = models.FloatField(initial=0) key = models.LongStringField() # store task_dict for template # ============================================================ # CREATE SHARED TASK DICT # ============================================================ def creating_session(subsession): import json, random, string # Only generate once per subsession if subsession.field_maybe_none('task_dict') is None: letters = random.sample(string.ascii_uppercase, 24) numbers = random.sample(range(100, 1000), 24) task_dict = dict(zip([str(n) for n in numbers], letters)) subsession.task_dict = json.dumps(task_dict) # fixed sequence (all players use the same) sequence = list(task_dict.keys()) random.shuffle(sequence) # optional: shuffle for random order subsession.sequence = json.dumps(sequence) task_dict = json.loads(subsession.task_dict) sequence = json.loads(subsession.sequence) # Initialize each player at the first number in sequence for player in subsession.get_players(): player.current_index = 0 first_number = sequence[player.current_index] player.question = first_number player.correct_answer = task_dict[first_number] player.key = subsession.task_dict # ============================================================ # LIVE HANDLER # ============================================================ def live_decoding(player, data): return live_decoding_impl(player, data) def live_decoding_impl(player, data): if player.task_finished: return {} msg_type = data.get('type') if msg_type == 'ping': return {} if msg_type == 'end': player.task_finished = True player.EarningsPractice = C.PracticePay * (player.EffortPractice or 0) return {player.id_in_group: {'type': 'end', 'EarningsPractice': player.EarningsPractice}} if msg_type == 'answer_submit': submitted = data.get('answer', '').strip().upper() correct_answer = player.correct_answer.upper() if player.correct_answer else '' import json task_dict = json.loads(player.key) sequence = json.loads(player.subsession.sequence) # check answer if submitted == correct_answer: player.EffortPractice += 1 feedback_type = 'correct' # advance in sequence player.current_index += 1 if player.current_index >= len(sequence): player.current_index = 0 # optional: loop back to start else: feedback_type = 'incorrect' next_number = sequence[player.current_index] player.question = next_number player.correct_answer = task_dict[next_number] task_pairs = sorted(list(task_dict.items()), key=lambda x: x[1]) return { player.id_in_group: { 'type': feedback_type, 'EffortPractice': player.EffortPractice, 'question': player.question, 'task_pairs': task_pairs, 'message': 'Incorrect. Try again!' if feedback_type == 'incorrect' else '', } } # ============================================================ # MAIN PAGE (continuous decoding task) # ============================================================ class Screen14_PracticeIntro(Page): def vars_for_template(self): return dict( PracticePay=C.PracticePay, ) def is_displayed(player): # Only Workers see this page return player.participant.vars.get('role') == "Worker" and player.round_number == 1 class Screen15_DecodingPractice(Page): live_method = live_decoding timeout_seconds = C.task_time timer_text = "" # no top timer stay_on_page = True def is_displayed(player): return player.participant.vars.get('role') == "Worker" def vars_for_template(player): import json task_dict = json.loads(player.key) sorted_pairs = sorted(list(task_dict.items()), key=lambda x: x[1]) return dict( question=player.question, task_pairs=sorted_pairs, EffortPractice=player.EffortPractice or 0, task_time=C.task_time, ) # ============================================================ # RESULTS PAGE # ============================================================ class Screen16_EndPractice(Page): def is_displayed(player): return player.participant.vars.get('role') == "Worker" def vars_for_template(player): return dict( EffortPractice=player.EffortPractice, EarningsPractice = player.EarningsPractice, ) class PracticeFinalSync(WaitPage): wait_for_all_groups = True body_text = "Please wait. You'll continue once all participants are ready." @staticmethod def is_displayed(player): # Everyone goes here: Customers wait; Workers arrive after finishing practice. return True # page_sequence = [Screen14_PracticeIntro, Screen15_DecodingPractice, Screen16_EndPractice] def custom_export(players): # Header row yield [ 'session_code', 'participant_code', 'round_number', 'id_in_group', 'role', 'EffortPractice', 'EarningsPractice', ] # Data rows – one row per player per round for p in players: yield [ p.session.code, p.participant.code, p.round_number, p.id_in_group, p.participant.vars.get('role'), p.EffortPractice, p.EarningsPractice, ]