import time import json import random from otree import settings from otree.api import * from .image_utils import encode_image from . import task_sliders c = Currency doc = """ """ class Constants(BaseConstants): name_in_url = "po" players_per_group = None num_rounds = 8 # payment_per_round = cu(1) instructions_template = __name__ + "/instructions.html" class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( trial_delay=1.0, retry_delay=0.1, num_sliders=30, num_columns=3, attempts_per_slider=10 ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) class Group(BaseGroup): # this variant average_effort = models.FloatField() count_offer_total = models.IntegerField(initial=0) count_offer = models.IntegerField(initial=0) count_reject = models.IntegerField() num_searching = models.IntegerField() num_in_session = models.IntegerField() # GAME 1: class Player(BasePlayer): # only suported 1 iteration for now iteration = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_correct_a = models.IntegerField(initial=0) num_correct_b = models.IntegerField(initial=0) num_correct_benchmark = models.IntegerField(initial=0) payoff_a = models.FloatField(initial=0) payoff_b = models.FloatField(initial=0) prev_payoff_a = models.FloatField(initial=0) final_payoff = models.FloatField(initial=0) elapsed_time = models.FloatField(initial=0) rejection_pr_a_noeffort = models.FloatField(initial=0) rejection_pr_b_noeffort = models.FloatField(initial=0) rejection_pr_a = models.FloatField(initial=0) rejection_pr_b = models.FloatField(initial=0) rejection_pr_public_a = models.FloatField(initial=0) rejection_pr_public_b = models.FloatField(initial=0) rejection_fb = models.BooleanField(initial=True) foundjob = models.BooleanField(initial=False) rejection_a = models.BooleanField(initial=True) rejection_b = models.BooleanField(initial=True) search = models.BooleanField( label="", choices=[ [True, "Search for a new job"], [False, "Focus on current job, don't search"], ] ) search_fb = models.BooleanField(initial=False) tempvar_b = models.FloatField(initial=0) session_payoff = models.IntegerField() cumuloutcome_rand = models.FloatField(initial=0) cumuloutcome = models.BooleanField() count_offer_prev = models.IntegerField() # puzzle-specific stuff class Puzzle(ExtraModel): """A model to keep record of sliders setup""" player = models.Link(Player) iteration = models.IntegerField() timestamp = models.FloatField() num_sliders = models.IntegerField() layout = models.LongStringField() response_timestamp = models.FloatField() num_correct = models.IntegerField(initial=0) is_solved = models.BooleanField(initial=False) class Slider(ExtraModel): """A model to keep record of each slider""" puzzle = models.Link(Puzzle) idx = models.IntegerField() target = models.IntegerField() value = models.IntegerField() is_correct = models.BooleanField(initial=False) attempts = models.IntegerField(initial=0) def generate_puzzle(player: Player) -> Puzzle: """Create new puzzle for a player""" params = player.session.params num = params['num_sliders'] layout = task_sliders.generate_layout(params) puzzle = Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), num_sliders=num, layout=json.dumps(layout) ) for i in range(num): target, initial = task_sliders.generate_slider() Slider.create( puzzle=puzzle, idx=i, target=target, value=initial ) return puzzle def get_current_puzzle(player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def get_slider(puzzle, idx): sliders = Slider.filter(puzzle=puzzle, idx=idx) if sliders: [puzzle] = sliders return puzzle def encode_puzzle(puzzle: Puzzle): """Create data describing puzzle to send to client""" layout = json.loads(puzzle.layout) sliders = Slider.filter(puzzle=puzzle) # generate image for the puzzle image = task_sliders.render_image(layout, targets=[s.target for s in sliders]) return dict( image=encode_image(image), size=layout['size'], grid=layout['grid'], sliders={s.idx: {'value': s.value, 'is_correct': s.is_correct} for s in sliders} ) def get_progress(player: Player): """Return current player progress""" return dict( iteration=player.iteration, solved=player.num_correct ) def handle_response(puzzle, slider, value): slider.value = task_sliders.snap_value(value, slider.target) slider.is_correct = slider.value == slider.target puzzle.num_correct = len(Slider.filter(puzzle=puzzle, is_correct=True)) puzzle.is_solved = puzzle.num_correct == puzzle.num_sliders def play_game(player: Player, message: dict): """Main game workflow Implemented as reactive scheme: receive message from browser, react, respond. Generic game workflow, from server point of view: - receive: {'type': 'load'} -- empty message means page loaded - check if it's game start or page refresh midgame - respond: {'type': 'status', 'progress': ...} - respond: {'type': 'status', 'progress': ..., 'puzzle': data} in case of midgame page reload - receive: {'type': 'new'} -- request for a new puzzle - generate new sliders - respond: {'type': 'puzzle', 'puzzle': data} - receive: {'type': 'value', 'slider': ..., 'value': ...} -- submitted value of a slider - slider: the index of the slider - value: the value of slider in pixels - check if the answer is correct - respond: {'type': 'feedback', 'slider': ..., 'value': ..., 'is_correct': ..., 'is_completed': ...} - slider: the index of slider submitted - value: the value aligned to slider steps - is_corect: if submitted value is correct - is_completed: if all sliders are correct """ session = player.session my_id = player.id_in_group params = session.params now = time.time() # the current puzzle or none puzzle = get_current_puzzle(player) message_type = message['type'] if message_type == 'load': p = get_progress(player) if puzzle: return {my_id: dict(type='status', progress=p, puzzle=encode_puzzle(puzzle))} else: return {my_id: dict(type='status', progress=p)} if message_type == "new": if puzzle is not None: raise RuntimeError("trying to create 2nd puzzle") player.iteration += 1 z = generate_puzzle(player) p = get_progress(player) return {my_id: dict(type='puzzle', puzzle=encode_puzzle(z), progress=p)} if message_type == "value": if puzzle is None: raise RuntimeError("missing puzzle") if puzzle.response_timestamp and now < puzzle.response_timestamp + params["retry_delay"]: raise RuntimeError("retrying too fast") slider = get_slider(puzzle, int(message["slider"])) if slider is None: raise RuntimeError("missing slider") if slider.attempts >= params['attempts_per_slider']: raise RuntimeError("too many slider motions") value = int(message["value"]) handle_response(puzzle, slider, value) puzzle.response_timestamp = now slider.attempts += 1 player.num_correct = puzzle.num_correct p = get_progress(player) return { my_id: dict( type='feedback', slider=slider.idx, value=slider.value, is_correct=slider.is_correct, is_completed=puzzle.is_solved, progress=p, ) } if message_type == "cheat" and settings.DEBUG: return {my_id: dict(type='solution', solution={s.idx: s.target for s in Slider.filter(puzzle=puzzle)})} raise RuntimeError("unrecognized message from client") #### INTRO PAGES #### class Introduction(Page): @staticmethod def before_next_page(player: Player, timeout_happened): player.cumuloutcome_rand = random.uniform(0, 1) if player.round_number == 1: if player.cumuloutcome_rand < 0.5: player.cumuloutcome = False if player.cumuloutcome_rand > 0.5: player.cumuloutcome = True if player.round_number > 1: prev_player = player.in_round(player.round_number - 1) player.num_correct_benchmark = prev_player.num_correct_benchmark player.prev_payoff_a = prev_player.payoff_a player.cumuloutcome = prev_player.cumuloutcome if player.prev_payoff_a > 0: player.foundjob = True class Choice(Page): form_model = "player" form_fields = ["search"] @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == False else: return player.round_number == 1 pass class JobFound(Page): @staticmethod def vars_for_template(player: Player): if player.round_number > 1: prev_player = player.in_round(player.round_number - 1) player.num_correct_benchmark = prev_player.num_correct_benchmark @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == True pass #### GAME A #### class Game_A(Page): timeout_seconds = 37 live_method = play_game @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == True or (player.foundjob == False and player.search == True) elif player.round_number == 1: return player.search == 1 @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_a = puzzle.num_correct player.rejection_pr_public_a = puzzle.num_correct/3 player.rejection_pr_a_noeffort = random.uniform(0, 1) player.rejection_pr_a = player.rejection_pr_a_noeffort+((15-puzzle.num_correct)*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_a < 0.05: player.rejection_a = False player.rejection_fb = False player.num_correct_benchmark = player.num_correct_a-4 else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_a == False: player.rejection_a = False player.num_correct_benchmark = prev_player.num_correct_benchmark elif player.rejection_pr_a < 0.05: player.rejection_a = False player.rejection_fb = False player.num_correct_benchmark = player.num_correct_a-4 if not player.rejection_a and puzzle.num_correct >= player.num_correct_benchmark: player.payoff_a = 55 # all_players = player.in_all_rounds() # for temp_player in all_players: # player.combined_payoff_a += temp_player.payoff_a # player.combined_payoff_b += temp_player.payoff_b # player.combined_pot_payoff_b += temp_player.potential_payoff_b if player.field_maybe_none('search'): ### check player.search_fb = player.search class ResultsWaitPage_a(WaitPage): # this variant @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() num_correct = [p.num_correct for p in players] group.average_effort = sum(num_correct) / len(num_correct) search_fb = [p.search_fb for p in players] group.num_in_session = len(num_correct) group.num_searching = sum(search_fb) rejection_fb = [p.rejection_fb for p in players] group.count_offer = len(search_fb)-sum(rejection_fb) #rejection_a = [p.rejection_a for p in players] #group.count_offer_total = len(search_fb)-sum(rejection_a) group.count_reject = sum(rejection_fb)-(group.num_in_session-group.num_searching) prev_group = group.in_all_rounds() for temp_group in prev_group: group.count_offer_total += temp_group.count_offer # @staticmethod # def is_displayed(player: Player): # if player.round_number > 1: # return player.foundjob == True or (player.foundjob == False and player.search == True) # elif player.round_number == 1: # return player.search == 1 #@staticmethod # We need the wait page to show only if the other player is searching #def is_displayed(player: Player): # if player.round_number > 1: # return player.foundjob == False class Results_A(Page): @staticmethod def is_displayed(player: Player): if not player.foundjob: return player.search == 1 elif player.round_number == 1: return player.search == 1 class Results_A_JobFound(Page): @staticmethod def is_displayed(player: Player): if player.round_number > 1: return player.foundjob == 1 pass #### GAME B #### class Game_B(Page): timeout_seconds = 40 live_method = play_game @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=task_sliders.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_current_puzzle(player) if puzzle and puzzle.response_timestamp: player.elapsed_time = puzzle.response_timestamp - puzzle.timestamp player.num_correct_b = puzzle.num_correct player.rejection_pr_public_b = puzzle.num_correct/3 player.rejection_pr_b_noeffort = random.uniform(0, 1) player.rejection_pr_b = player.rejection_pr_b_noeffort+((15-puzzle.num_correct)*(1/3)*0.01) if player.round_number == 1: if player.rejection_pr_b < 0.05: player.rejection_b = False else: prev_player = player.in_round(player.round_number - 1) if prev_player.rejection_b == False: player.rejection_b = False elif player.rejection_pr_b < 0.05: player.rejection_b = False if not player.rejection_b: player.payoff_b = 50 # all_players = player.in_all_rounds() # for temp_player in all_players: # player.combined_payoff_a += temp_player.payoff_a # player.combined_payoff_b += temp_player.payoff_b # player.combined_pot_payoff_b += temp_player.potential_payoff_b class Results_B(Page): @staticmethod def is_displayed(player: Player): if player.round_number == 1: return player.search == False elif player.round_number > 1 and not player.foundjob: return player.search == False pass class CombinedResults(Page): @staticmethod def is_displayed(player: Player): return player.round_number == Constants.num_rounds @staticmethod def vars_for_template(player: Player): all_players = player.in_all_rounds() for temp_player in all_players: player.tempvar_b += temp_player.payoff_b if player.payoff_a > 0 and player.tempvar_b == 0: player.final_payoff = 55 elif player.payoff_a == 0 and player.tempvar_b > 0: player.final_payoff = 50 elif player.payoff_a == 0 and player.tempvar_b == 0: player.final_payoff = 0 elif player.payoff_a > 0 and player.tempvar_b > 0 and player.payoff_a > player.tempvar_b: player.final_payoff = 55 elif player.payoff_a > 0 and player.tempvar_b > 0 and player.payoff_a < player.tempvar_b: player.final_payoff = 50 player.participant.r3_po = player.final_payoff #player.participant.session_payoff = player.participant.r6_po+player.participant.r5_po+player.participant.r4_po+player.participant.r3_po+player.participant.r2_po+player.participant.r1_po class ComprehensionTest(Page): pass @staticmethod def is_displayed(player: Player): return player.round_number == 1 class AppWaitPage(WaitPage): @staticmethod def is_displayed(player: Player): return player.round_number == 8 pass page_sequence = [Introduction, ComprehensionTest, JobFound, Choice, Game_A, Game_B, ResultsWaitPage_a, Results_A, Results_A_JobFound, Results_B, CombinedResults]