import time from otree import settings from otree.api import * from .image_utils import encode_image doc = """ Introduction to study and examples. """ #CLASSES ##SESSION def get_task_module(player): from . import task_decoding session = player.session return task_decoding class Constants(BaseConstants): name_in_url = "transcription_example" players_per_group = None num_rounds = 1 instructions_template = __name__ + "/instructions_short.html" captcha_length = 3 ##SUBSESSION class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( retry_delay=1.0, puzzle_delay=1.0, attempts_per_puzzle=1, max_iterations=None ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) ##GROUP class Group(BaseGroup): pass ##PLAYER class Player(BasePlayer): iteration = models.IntegerField(initial=0) trial_attempt = models.IntegerField(initial=0) trial_correct = models.IntegerField(initial=0) trial_failed = models.IntegerField(initial=0) ##PUZZLE class Puzzle(ExtraModel): """A model to keep record of all generated puzzles""" player = models.Link(Player) iteration = models.IntegerField(initial=0) attempts = models.IntegerField(initial=0) timestamp = models.FloatField(initial=0) # can be either simple text, or a json-encoded definition of the puzzle, etc. text = models.LongStringField() # solution may be the same as text, if it's simply a transcription task solution = models.LongStringField() response = models.LongStringField() response_timestamp = models.FloatField() is_correct = models.BooleanField() def calculate_dependent(independent): if independent == 0: return 1 elif independent == 1: return 5 elif independent == 2: return 10 else: return 15 # Return -1 for out-of-range values or handle it differently based on your requirements def generate_puzzle(player: Player) -> Puzzle: """Create new puzzle for a player""" task_module = get_task_module(player) WORD_LENGTH=calculate_dependent(player.iteration) fields = task_module.generate_puzzle_fields(WORD_LENGTH) player.iteration += 1 return Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), **fields ) def get_current_puzzle(player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def encode_puzzle(puzzle: Puzzle): """Create data describing puzzle to send to client""" task_module = get_task_module(puzzle.player) # noqa # generate image for the puzzle image = task_module.render_image(puzzle) data = encode_image(image) return dict(image=data) def get_progress(player: Player): """Return current player progress""" return dict( trial_attempt=player.trial_attempt, trial_correct=player.trial_correct, trial_incorrect=player.trial_failed, iteration=player.iteration, ) def play_game(player: Player, message: dict): """Main game workflow""" session = player.session my_id = player.id_in_group total_games = 3 params = session.params task_module = get_task_module(player) now = time.time() current = get_current_puzzle(player) message_type = message['type'] max_correct = session.config['max_correct'] # page loaded if message_type == 'load': p = get_progress(player) if current: return { my_id: dict(type='status', progress=p, puzzle=encode_puzzle(current)) } else: return {my_id: dict(type='status', progress=p)} if message_type == "cheat": return {my_id: dict(type='solution', solution=current.solution)} # client requested new puzzle if message_type == "next": if current is not None: if current.response is None: raise RuntimeError("trying to skip over unsolved puzzle") if now < current.timestamp + params["puzzle_delay"]: raise RuntimeError("retrying too fast") if player.trial_correct == total_games: return { my_id: dict( type='status', progress=get_progress(player), iterations_left=0 ) } # generate new puzzle z = generate_puzzle(player) p = get_progress(player) return {my_id: dict(type='puzzle', puzzle=encode_puzzle(z), progress=p)} # client gives an answer to current puzzle if message_type == "answer": if current is None: raise RuntimeError("trying to answer no puzzle") if current.response is not None: # it's a retry if now < current.response_timestamp + params["retry_delay"]: raise RuntimeError("retrying too fast") # check answer answer = message["answer"] if answer == "" or answer is None: raise ValueError("bogus answer") current.response = answer current.is_correct = task_module.is_correct(answer, current) current.response_timestamp = now current.attempts += 1 # update player progress if current.is_correct: player.trial_correct += 1 else: player.trial_failed += 1 player.trial_attempt += 1 retries_left = 1 p = get_progress(player) return { my_id: dict( type='feedback', is_correct=current.is_correct, retries_left=retries_left, progress=p, ) } raise RuntimeError("unrecognized message from client") #PAGES class Screen1(Page): pass class Screen2(Page): pass class Instructions(Page): pass class Trial(Page): timeout_seconds = None live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): task_module = get_task_module(player) return dict(DEBUG=True, input_type=task_module.INPUT_TYPE) #SEQUENCE page_sequence = [Screen1, Screen2, Instructions, Trial]