import time import json import random from otree import settings from otree.api import * from .image_utils import encode_image from . import task_sliders c = Currency doc = """ """ class Constants(BaseConstants): name_in_url = "sliders_intro" players_per_group = None num_rounds = 1 # payment_per_round = cu(1) instructions_template = __name__ + "/instructions.html" class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( trial_delay=1.0, retry_delay=0.1, num_sliders=9, num_columns=3, attempts_per_slider=10 ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) class Group(BaseGroup): pass # GAME 1: class Player(BasePlayer): # only suported 1 iteration for now consent = models.BooleanField( label="Do you agree to the above?", choices=[ [True, "Yes"], [False, "No"], ] ) iteration = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_correct_a = models.IntegerField(initial=0) num_correct_b = models.IntegerField(initial=0) payoff_a = models.FloatField(initial=0) payoff_b = models.FloatField(initial=0) potential_payoff_b = models.FloatField(initial=0) combined_payoff_a = models.FloatField(initial=0) combined_payoff_b = models.FloatField(initial=0) prevcombined_payoff_a = models.FloatField(initial=0) combined_pot_payoff_b = models.FloatField(initial=0) final_payoff = models.FloatField(initial=0) elapsed_time = models.FloatField(initial=0) rejection_pr_a_noeffort = models.FloatField(initial=0) rejection_pr_b_noeffort = models.FloatField(initial=0) rejection_pr_a = models.FloatField(initial=0) rejection_pr_b = models.FloatField(initial=0) combined_rejection = models.FloatField(initial=0) ever_accept = models.BooleanField() has_dropped = models.BooleanField(initial=False) ever_accept_int = models.StringField ever_accept_zeroone = models.IntegerField(initial=0) foundjob = models.BooleanField(initial=False) rejection_a = models.BooleanField(initial=True) rejection_b = models.BooleanField(initial=True) search = models.BooleanField( label="", choices=[ [True, "Search for a new job"], [False, "Work extra hard to try to get a contract extension"], ] ) # puzzle-specific stuff class Puzzle(ExtraModel): """A model to keep record of sliders setup""" player = models.Link(Player) iteration = models.IntegerField() timestamp = models.FloatField() num_sliders = models.IntegerField() layout = models.LongStringField() response_timestamp = models.FloatField() num_correct = models.IntegerField(initial=0) is_solved = models.BooleanField(initial=False) class Slider(ExtraModel): """A model to keep record of each slider""" puzzle = models.Link(Puzzle) idx = models.IntegerField() target = models.IntegerField() value = models.IntegerField() is_correct = models.BooleanField(initial=False) attempts = models.IntegerField(initial=0) def generate_puzzle(player: Player) -> Puzzle: """Create new puzzle for a player""" params = player.session.params num = params['num_sliders'] layout = task_sliders.generate_layout(params) puzzle = Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), num_sliders=num, layout=json.dumps(layout) ) for i in range(num): target, initial = task_sliders.generate_slider() Slider.create( puzzle=puzzle, idx=i, target=target, value=initial ) return puzzle def get_current_puzzle(player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def get_slider(puzzle, idx): sliders = Slider.filter(puzzle=puzzle, idx=idx) if sliders: [puzzle] = sliders return puzzle def encode_puzzle(puzzle: Puzzle): """Create data describing puzzle to send to client""" layout = json.loads(puzzle.layout) sliders = Slider.filter(puzzle=puzzle) # generate image for the puzzle image = task_sliders.render_image(layout, targets=[s.target for s in sliders]) return dict( image=encode_image(image), size=layout['size'], grid=layout['grid'], sliders={s.idx: {'value': s.value, 'is_correct': s.is_correct} for s in sliders} ) def get_progress(player: Player): """Return current player progress""" return dict( iteration=player.iteration, solved=player.num_correct ) def handle_response(puzzle, slider, value): slider.value = task_sliders.snap_value(value, slider.target) slider.is_correct = slider.value == slider.target puzzle.num_correct = len(Slider.filter(puzzle=puzzle, is_correct=True)) puzzle.is_solved = puzzle.num_correct == puzzle.num_sliders def play_game(player: Player, message: dict): """Main game workflow Implemented as reactive scheme: receive message from browser, react, respond. Generic game workflow, from server point of view: - receive: {'type': 'load'} -- empty message means page loaded - check if it's game start or page refresh midgame - respond: {'type': 'status', 'progress': ...} - respond: {'type': 'status', 'progress': ..., 'puzzle': data} in case of midgame page reload - receive: {'type': 'new'} -- request for a new puzzle - generate new sliders - respond: {'type': 'puzzle', 'puzzle': data} - receive: {'type': 'value', 'slider': ..., 'value': ...} -- submitted value of a slider - slider: the index of the slider - value: the value of slider in pixels - check if the answer is correct - respond: {'type': 'feedback', 'slider': ..., 'value': ..., 'is_correct': ..., 'is_completed': ...} - slider: the index of slider submitted - value: the value aligned to slider steps - is_corect: if submitted value is correct - is_completed: if all sliders are correct """ session = player.session my_id = player.id_in_group params = session.params now = time.time() # the current puzzle or none puzzle = get_current_puzzle(player) message_type = message['type'] if message_type == 'load': p = get_progress(player) if puzzle: return {my_id: dict(type='status', progress=p, puzzle=encode_puzzle(puzzle))} else: return {my_id: dict(type='status', progress=p)} if message_type == "new": if puzzle is not None: raise RuntimeError("trying to create 2nd puzzle") player.iteration += 1 z = generate_puzzle(player) p = get_progress(player) return {my_id: dict(type='puzzle', puzzle=encode_puzzle(z), progress=p)} if message_type == "value": if puzzle is None: raise RuntimeError("missing puzzle") if puzzle.response_timestamp and now < puzzle.response_timestamp + params["retry_delay"]: raise RuntimeError("retrying too fast") slider = get_slider(puzzle, int(message["slider"])) if slider is None: raise RuntimeError("missing slider") if slider.attempts >= params['attempts_per_slider']: raise RuntimeError("too many slider motions") value = int(message["value"]) handle_response(puzzle, slider, value) puzzle.response_timestamp = now slider.attempts += 1 player.num_correct = puzzle.num_correct p = get_progress(player) return { my_id: dict( type='feedback', slider=slider.idx, value=slider.value, is_correct=slider.is_correct, is_completed=puzzle.is_solved, progress=p, ) } if message_type == "cheat" and settings.DEBUG: return {my_id: dict(type='solution', solution={s.idx: s.target for s in Slider.filter(puzzle=puzzle)})} raise RuntimeError("unrecognized message from client") #### INTRO PAGES #### class intro(Page): pass class Consent(Page): form_model = "player" form_fields = ['consent'] def consent_error_message(player, value): if value ==False: return 'If you do not consent, you cannot proceed with the game.' class context(Page): pass class incentives(Page): pass class session1(Page): pass class session2(Page): pass class choice(Page): pass class game1(Page): pass class game2(Page): pass class practice(Page): pass class prob(Page): pass class payoff1(Page): pass class payoff2(Page): pass class payoff3(Page): pass class ComprehensionTest(Page): pass class foundjob(Page): pass #### GAME A #### class Game_A(Page): timeout_seconds = 40 live_method = play_game @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_a = puzzle.num_correct player.rejection_pr_a_noeffort = random.uniform(0, 1) player.rejection_pr_a = player.rejection_pr_a_noeffort - (((puzzle.num_correct - 15) / 30) * 0.1) if player.round_number == 1: if player.rejection_pr_a < 0.07: player.rejection_a = False else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_a == False: player.rejection_a = False elif player.rejection_pr_a < 0.07: player.rejection_a = False if not player.rejection_a: player.payoff_a = player.num_correct_a * 1 all_players = player.in_all_rounds() for temp_player in all_players: player.combined_payoff_a += temp_player.payoff_a player.combined_payoff_b += temp_player.payoff_b player.combined_pot_payoff_b += temp_player.potential_payoff_b class Results_A(Page): pass class AppWaitPage(WaitPage): @staticmethod def is_displayed(player: Player): return player.round_number == 1 pass page_sequence = [Consent, intro, incentives, context, practice, Game_A, Results_A, ComprehensionTest]