import time import json import random from otree import settings from otree.api import * from .image_utils import encode_image from . import task_sliders c = Currency doc = """ """ class Constants(BaseConstants): name_in_url = "search_norej" players_per_group = None num_rounds = 8 # payment_per_round = cu(1) instructions_template = __name__ + "/instructions.html" class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( trial_delay=1.0, retry_delay=0.1, num_sliders=30, num_columns=3, attempts_per_slider=10 ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) class Group(BaseGroup): # this variant average_effort = models.FloatField() count_offer = models.IntegerField() count_reject = models.IntegerField() num_searching = models.IntegerField() num_in_session = models.IntegerField() # GAME 1: class Player(BasePlayer): # only supported 1 iteration for now iteration = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_correct_a = models.IntegerField(initial=0) num_correct_b = models.IntegerField(initial=0) num_correct_benchmark = models.IntegerField(initial=0) payoff_a = models.FloatField(initial=0) payoff_b = models.FloatField(initial=0) prev_payoff_a = models.FloatField(initial=0) final_payoff = models.FloatField(initial=0) elapsed_time = models.FloatField(initial=0) rejection_pr_a_noeffort = models.FloatField(initial=0) rejection_pr_b_noeffort = models.FloatField(initial=0) rejection_pr_a = models.FloatField(initial=0) rejection_pr_b = models.FloatField(initial=0) rejection_pr_public_a = models.FloatField(initial=0) rejection_pr_public_b = models.FloatField(initial=0) foundjob = models.BooleanField(initial=False) rejection_a = models.BooleanField(initial=True) rejection_b = models.BooleanField(initial=True) search = models.BooleanField( label="", choices=[ [True, "Search for a new job"], [False, "Focus on current job, don't search"], ] ) search_fb = models.BooleanField(initial=False) tempvar_a = models.FloatField(initial=0) tempvar_b = models.FloatField(initial=0) session_payoff = models.IntegerField() # puzzle-specific stuff class Puzzle(ExtraModel): """A model to keep record of sliders setup""" player = models.Link(Player) iteration = models.IntegerField() timestamp = models.FloatField() num_sliders = models.IntegerField() layout = models.LongStringField() response_timestamp = models.FloatField() num_correct = models.IntegerField(initial=0) is_solved = models.BooleanField(initial=False) class Slider(ExtraModel): """A model to keep record of each slider""" puzzle = models.Link(Puzzle) idx = models.IntegerField() target = models.IntegerField() value = models.IntegerField() is_correct = models.BooleanField(initial=False) attempts = models.IntegerField(initial=0) def generate_puzzle(player: Player) -> Puzzle: """Create new puzzle for a player""" params = player.session.params num = params['num_sliders'] layout = task_sliders.generate_layout(params) puzzle = Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), num_sliders=num, layout=json.dumps(layout) ) for i in range(num): target, initial = task_sliders.generate_slider() Slider.create( puzzle=puzzle, idx=i, target=target, value=initial ) return puzzle def get_current_puzzle(player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def get_slider(puzzle, idx): sliders = Slider.filter(puzzle=puzzle, idx=idx) if sliders: [puzzle] = sliders return puzzle def encode_puzzle(puzzle: Puzzle): """Create data describing puzzle to send to client""" layout = json.loads(puzzle.layout) sliders = Slider.filter(puzzle=puzzle) # generate image for the puzzle image = task_sliders.render_image(layout, targets=[s.target for s in sliders]) return dict( image=encode_image(image), size=layout['size'], grid=layout['grid'], sliders={s.idx: {'value': s.value, 'is_correct': s.is_correct} for s in sliders} ) def get_progress(player: Player): """Return current player progress""" return dict( iteration=player.iteration, solved=player.num_correct ) def handle_response(puzzle, slider, value): slider.value = task_sliders.snap_value(value, slider.target) slider.is_correct = slider.value == slider.target puzzle.num_correct = len(Slider.filter(puzzle=puzzle, is_correct=True)) puzzle.is_solved = puzzle.num_correct == puzzle.num_sliders def play_game(player: Player, message: dict): """Main game workflow Implemented as reactive scheme: receive message from browser, react, respond. Generic game workflow, from server point of view: - receive: {'type': 'load'} -- empty message means page loaded - check if it's game start or page refresh midgame - respond: {'type': 'status', 'progress': ...} - respond: {'type': 'status', 'progress': ..., 'puzzle': data} in case of midgame page reload - receive: {'type': 'new'} -- request for a new puzzle - generate new sliders - respond: {'type': 'puzzle', 'puzzle': data} - receive: {'type': 'value', 'slider': ..., 'value': ...} -- submitted value of a slider - slider: the index of the slider - value: the value of slider in pixels - check if the answer is correct - respond: {'type': 'feedback', 'slider': ..., 'value': ..., 'is_correct': ..., 'is_completed': ...} - slider: the index of slider submitted - value: the value aligned to slider steps - is_corect: if submitted value is correct - is_completed: if all sliders are correct """ session = player.session my_id = player.id_in_group params = session.params now = time.time() # the current puzzle or none puzzle = get_current_puzzle(player) message_type = message['type'] if message_type == 'load': p = get_progress(player) if puzzle: return {my_id: dict(type='status', progress=p, puzzle=encode_puzzle(puzzle))} else: return {my_id: dict(type='status', progress=p)} if message_type == "new": if puzzle is not None: raise RuntimeError("trying to create 2nd puzzle") player.iteration += 1 z = generate_puzzle(player) p = get_progress(player) return {my_id: dict(type='puzzle', puzzle=encode_puzzle(z), progress=p)} if message_type == "value": if puzzle is None: raise RuntimeError("missing puzzle") if puzzle.response_timestamp and now < puzzle.response_timestamp + params["retry_delay"]: raise RuntimeError("retrying too fast") slider = get_slider(puzzle, int(message["slider"])) if slider is None: raise RuntimeError("missing slider") if slider.attempts >= params['attempts_per_slider']: raise RuntimeError("too many slider motions") value = int(message["value"]) handle_response(puzzle, slider, value) puzzle.response_timestamp = now slider.attempts += 1 player.num_correct = puzzle.num_correct p = get_progress(player) return { my_id: dict( type='feedback', slider=slider.idx, value=slider.value, is_correct=slider.is_correct, is_completed=puzzle.is_solved, progress=p, ) } if message_type == "cheat" and settings.DEBUG: return {my_id: dict(type='solution', solution={s.idx: s.target for s in Slider.filter(puzzle=puzzle)})} raise RuntimeError("unrecognized message from client") #### INTRO PAGES #### class Introduction(Page): pass class Choice(Page): form_model = "player" form_fields = ["search"] pass #### GAME A #### class Game_A(Page): timeout_seconds = 40 live_method = play_game @staticmethod def is_displayed(player: Player): return player.search == True @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_a = puzzle.num_correct player.rejection_pr_public_a = puzzle.num_correct/3 player.rejection_pr_a_noeffort = random.uniform(0, 1) player.rejection_pr_a = player.rejection_pr_a_noeffort+((15-puzzle.num_correct)*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_a < 0.05: player.rejection_a = False else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_a == False: player.rejection_a = False elif player.rejection_pr_a < 0.05: player.rejection_a = False if not player.rejection_a: player.payoff_a = 55 class Results_A(Page): @staticmethod def is_displayed(player: Player): return player.search == 1 #### GAME B #### class Game_B(Page): timeout_seconds = 40 live_method = play_game @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_b = puzzle.num_correct player.rejection_pr_public_b = puzzle.num_correct/3 player.rejection_pr_b_noeffort = random.uniform(0, 1) player.rejection_pr_b = player.rejection_pr_b_noeffort+((15-puzzle.num_correct)*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_b < 0.05: player.rejection_b = False else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_b == False: player.rejection_b = False elif player.rejection_pr_b < 0.05: player.rejection_b = False if not player.rejection_b: player.payoff_b = 50 class Results_B(Page): @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False pass class CombinedResults(Page): @staticmethod def is_displayed(player: Player): return player.round_number == Constants.num_rounds @staticmethod def vars_for_template(player: Player): all_players = player.in_all_rounds() for temp_player in all_players: player.tempvar_a += temp_player.payoff_a player.tempvar_b += temp_player.payoff_b if player.tempvar_a > 0 and player.tempvar_b == 0: player.final_payoff = 55 elif player.tempvar_a == 0 and player.tempvar_b > 0: player.final_payoff = 50 elif player.tempvar_a == 0 and player.tempvar_b == 0: player.final_payoff = 0 elif player.tempvar_a > 0 and player.tempvar_b > 0 and player.tempvar_a > player.tempvar_b: player.final_payoff = 55 elif player.tempvar_a > 0 and player.tempvar_b > 0 and player.tempvar_a < player.tempvar_b: player.final_payoff = 50 player.participant.r2_po = player.final_payoff player.participant.session_payoff = player.participant.r2_po+player.participant.r1_po class ComprehensionTest(Page): pass @staticmethod def is_displayed(player: Player): return player.round_number == 1 class thanks(Page): def is_displayed(player: Player): return player.round_number == Constants.num_rounds pass page_sequence = [Introduction, ComprehensionTest, Choice, Game_A, Game_B, Results_A, Results_B, CombinedResults, thanks]