import time import json import random from otree import settings from otree.api import * from .image_utils import encode_image from . import task_sliders c = Currency doc = """ """ class Constants(BaseConstants): name_in_url = "mr" players_per_group = None num_rounds = 16 # payment_per_round = cu(1) instructions_template = __name__ + "/instructions.html" class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( trial_delay=1.0, retry_delay=0.1, num_sliders=30, num_columns=3, attempts_per_slider=10 ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) class Group(BaseGroup): pass # GAME 1: class Player(BasePlayer): # only suported 1 iteration for now iteration = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_correct_a = models.IntegerField(initial=0) num_correct_b = models.IntegerField(initial=0) num_correct_benchmark = models.IntegerField(initial=0) payoff_a = models.FloatField(initial=0) payoff_b = models.FloatField(initial=0) prev_payoff_a = models.FloatField(initial=0) final_payoff = models.FloatField(initial=0) elapsed_time = models.FloatField(initial=0) rejection_pr_a_noeffort = models.FloatField(initial=0) rejection_pr_b_noeffort = models.FloatField(initial=0) rejection_pr_a = models.FloatField(initial=0) rejection_pr_b = models.FloatField(initial=0) rejection_pr_public_a = models.FloatField(initial=0) rejection_pr_public_b = models.FloatField(initial=0) foundjob = models.BooleanField(initial=False) rejection_a = models.BooleanField(initial=True) rejection_b = models.BooleanField(initial=True) search = models.BooleanField( label="", choices=[ [True, "Search for a new job"], [False, "Focus on current job, don't search"], ] ) session_payoff = models.IntegerField() tempvar_b = models.FloatField(initial=0) # puzzle-specific stuff class Puzzle(ExtraModel): """A model to keep record of sliders setup""" player = models.Link(Player) iteration = models.IntegerField() timestamp = models.FloatField() num_sliders = models.IntegerField() layout = models.LongStringField() response_timestamp = models.FloatField() num_correct = models.IntegerField(initial=0) is_solved = models.BooleanField(initial=False) class Slider(ExtraModel): """A model to keep record of each slider""" puzzle = models.Link(Puzzle) idx = models.IntegerField() target = models.IntegerField() value = models.IntegerField() is_correct = models.BooleanField(initial=False) attempts = models.IntegerField(initial=0) def generate_puzzle(player: Player) -> Puzzle: """Create new puzzle for a player""" params = player.session.params num = params['num_sliders'] layout = task_sliders.generate_layout(params) puzzle = Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), num_sliders=num, layout=json.dumps(layout) ) for i in range(num): target, initial = task_sliders.generate_slider() Slider.create( puzzle=puzzle, idx=i, target=target, value=initial ) return puzzle def get_current_puzzle(player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def get_slider(puzzle, idx): sliders = Slider.filter(puzzle=puzzle, idx=idx) if sliders: [puzzle] = sliders return puzzle def encode_puzzle(puzzle: Puzzle): """Create data describing puzzle to send to client""" layout = json.loads(puzzle.layout) sliders = Slider.filter(puzzle=puzzle) # generate image for the puzzle image = task_sliders.render_image(layout, targets=[s.target for s in sliders]) return dict( image=encode_image(image), size=layout['size'], grid=layout['grid'], sliders={s.idx: {'value': s.value, 'is_correct': s.is_correct} for s in sliders} ) def get_progress(player: Player): """Return current player progress""" return dict( iteration=player.iteration, solved=player.num_correct ) def handle_response(puzzle, slider, value): slider.value = task_sliders.snap_value(value, slider.target) slider.is_correct = slider.value == slider.target puzzle.num_correct = len(Slider.filter(puzzle=puzzle, is_correct=True)) puzzle.is_solved = puzzle.num_correct == puzzle.num_sliders def play_game(player: Player, message: dict): """Main game workflow Implemented as reactive scheme: receive message from browser, react, respond. Generic game workflow, from server point of view: - receive: {'type': 'load'} -- empty message means page loaded - check if it's game start or page refresh midgame - respond: {'type': 'status', 'progress': ...} - respond: {'type': 'status', 'progress': ..., 'puzzle': data} in case of midgame page reload - receive: {'type': 'new'} -- request for a new puzzle - generate new sliders - respond: {'type': 'puzzle', 'puzzle': data} - receive: {'type': 'value', 'slider': ..., 'value': ...} -- submitted value of a slider - slider: the index of the slider - value: the value of slider in pixels - check if the answer is correct - respond: {'type': 'feedback', 'slider': ..., 'value': ..., 'is_correct': ..., 'is_completed': ...} - slider: the index of slider submitted - value: the value aligned to slider steps - is_corect: if submitted value is correct - is_completed: if all sliders are correct """ session = player.session my_id = player.id_in_group params = session.params now = time.time() # the current puzzle or none puzzle = get_current_puzzle(player) message_type = message['type'] if message_type == 'load': p = get_progress(player) if puzzle: return {my_id: dict(type='status', progress=p, puzzle=encode_puzzle(puzzle))} else: return {my_id: dict(type='status', progress=p)} if message_type == "new": if puzzle is not None: raise RuntimeError("trying to create 2nd puzzle") player.iteration += 1 z = generate_puzzle(player) p = get_progress(player) return {my_id: dict(type='puzzle', puzzle=encode_puzzle(z), progress=p)} if message_type == "value": if puzzle is None: raise RuntimeError("missing puzzle") if puzzle.response_timestamp and now < puzzle.response_timestamp + params["retry_delay"]: raise RuntimeError("retrying too fast") slider = get_slider(puzzle, int(message["slider"])) if slider is None: raise RuntimeError("missing slider") if slider.attempts >= params['attempts_per_slider']: raise RuntimeError("too many slider motions") value = int(message["value"]) handle_response(puzzle, slider, value) puzzle.response_timestamp = now slider.attempts += 1 player.num_correct = puzzle.num_correct p = get_progress(player) return { my_id: dict( type='feedback', slider=slider.idx, value=slider.value, is_correct=slider.is_correct, is_completed=puzzle.is_solved, progress=p, ) } if message_type == "cheat" and settings.DEBUG: return {my_id: dict(type='solution', solution={s.idx: s.target for s in Slider.filter(puzzle=puzzle)})} raise RuntimeError("unrecognized message from client") #### INTRO PAGES #### class Introduction(Page): @staticmethod def before_next_page(player: Player, timeout_happened): if player.round_number > 1: prev_player = player.in_round(player.round_number - 1) player.num_correct_benchmark = prev_player.num_correct_benchmark player.prev_payoff_a = prev_player.payoff_a if player.prev_payoff_a > 0: player.foundjob = True class Choice(Page): form_model = "player" form_fields = ["search"] @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == False else: return player.round_number == 1 pass class JobFound(Page): @staticmethod def vars_for_template(player: Player): if player.round_number > 1: prev_player = player.in_round(player.round_number - 1) player.num_correct_benchmark = prev_player.num_correct_benchmark @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == True pass #### GAME A #### class Game_A(Page): timeout_seconds = 40 live_method = play_game @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == True or (player.foundjob == False and player.search == True) elif player.round_number == 1: return player.search == 1 @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_a = puzzle.num_correct player.rejection_pr_public_a = puzzle.num_correct/6 player.rejection_pr_a_noeffort = random.uniform(0, 1) player.rejection_pr_a = player.rejection_pr_a_noeffort+((15-(puzzle.num_correct/2))*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_a < 0.05: player.rejection_a = False player.num_correct_benchmark = player.num_correct_a-4 else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_a == False: player.rejection_a = False player.num_correct_benchmark = prev_player.num_correct_benchmark elif player.rejection_pr_a < 0.05: player.rejection_a = False player.num_correct_benchmark = player.num_correct_a-4 if not player.rejection_a and puzzle.num_correct >= player.num_correct_benchmark: player.payoff_a = 55 # all_players = player.in_all_rounds() # for temp_player in all_players: # player.combined_payoff_a += temp_player.payoff_a # player.combined_payoff_b += temp_player.payoff_b # player.combined_pot_payoff_b += temp_player.potential_payoff_b class Results_A(Page): @staticmethod def is_displayed(player: Player): if not player.foundjob: return player.search == 1 elif player.round_number == 1: return player.search == 1 pass class Results_A_JobFound(Page): @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == 1 pass #### GAME B #### class Game_B(Page): timeout_seconds = 40 live_method = play_game @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_b = puzzle.num_correct player.rejection_pr_public_b = puzzle.num_correct/6 player.rejection_pr_b_noeffort = random.uniform(0, 1) player.rejection_pr_b = player.rejection_pr_b_noeffort+((15-(puzzle.num_correct/2))*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_b < 0.05: player.rejection_b = False else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_b == False: player.rejection_b = False elif player.rejection_pr_b < 0.05: player.rejection_b = False if not player.rejection_b: player.payoff_b = 50 # all_players = player.in_all_rounds() # for temp_player in all_players: # player.combined_payoff_a += temp_player.payoff_a # player.combined_payoff_b += temp_player.payoff_b # player.combined_pot_payoff_b += temp_player.potential_payoff_b class Results_B(Page): @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False pass class CombinedResults(Page): @staticmethod def is_displayed(player: Player): return player.round_number == Constants.num_rounds @staticmethod def vars_for_template(player: Player): all_players = player.in_all_rounds() for temp_player in all_players: player.tempvar_b += temp_player.payoff_b if player.payoff_a > 0 and player.tempvar_b == 0: player.final_payoff = 55 elif player.payoff_a == 0 and player.tempvar_b > 0: player.final_payoff = 50 elif player.payoff_a == 0 and player.tempvar_b == 0: player.final_payoff = 0 elif player.payoff_a > 0 and player.tempvar_b > 0 and player.payoff_a > player.tempvar_b: player.final_payoff = 55 elif player.payoff_a > 0 and player.tempvar_b > 0 and player.payoff_a < player.tempvar_b: player.final_payoff = 50 player.participant.r1_po = player.final_payoff #player.participant.session_payoff = player.participant.r2_po+player.participant.r1_po class ComprehensionTest(Page): pass @staticmethod def is_displayed(player: Player): return player.round_number == 1 class thanks(Page): def is_displayed(player: Player): return player.round_number == Constants.num_rounds pass page_sequence = [Introduction, ComprehensionTest, JobFound, Choice, Game_A, Results_A, Results_A_JobFound, Game_B, Results_B, CombinedResults]