import time from otree import settings from otree.api import * from . import task_matrix from .image_utils import encode_image from datetime import datetime doc = """ Count character in characters """ class C(BaseConstants): NAME_IN_URL = 'real_effort_task_' +datetime.now().strftime("%Y%m%dT%H%M%S%f") PLAYERS_PER_GROUP = 2 NUM_ROUNDS = 3 WB_INTRO_1 = __name__ + '/wbintro1.html' WB_INTRO_2 = __name__ + '/wbintro2.html' INSTRUCTION_TEMPLATE = __name__ + "/instructions.html" WAIT_TEMPLATE = __name__ + "/waittext.html" RESULT_TEMPLATE = __name__ + "/result_template.html" ##Puzzle specifics TEXT_SIZE = 20 DEVIATION_GROUP = 4 ## Deviation for Group stuff NO_SUBMIT_PENALTY = DEVIATION_GROUP * 5 PAYOFF_PER_PUZZLE = cu(5) PAYOFF_PER_PUZZLE_SP1 = cu(4) PAYOFF_PER_PUZZLE_SP2 = cu(3) TIME_PER_PUZZLE = 120 HEIGHT = 15 WIDTH = 20 IGNORED_CHARS = "012345689" COUNTED_CHARS = "7" AMOUNT_OF_COUNTED_CHARS = 1 AMOUNT_EMPTY_FIELDS = 0 class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( retry_delay=1.0, puzzle_delay=1.0, attempts_per_puzzle=1, second_time_counting = False ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) class Group(BaseGroup): solution = models.IntegerField(initial=0) response = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False) solo_play = models.BooleanField(initial=False) class Player(BasePlayer): iteration = models.IntegerField(initial=0) num_trials = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_failed = models.IntegerField(initial=0) response = models.IntegerField(initial=-1) solution = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False, doc='Nur für 1x Zählen und Alleine Zählen aussagekräftig') ## nur für solospiele #in_team = models.BooleanField(Initial=True) #besonders relevant für zweiten Teil #spieler1 = models.BooleanField (initial=False) #relevant für zweiten Teil # Puzzlestuff class Puzzle(ExtraModel): player = models.Link(Player) iteration = models.IntegerField(initial=0) attempts = models.IntegerField(initial=0) timestamp = models.FloatField(initial=0) text = models.LongStringField() # entw. Text oder Json (wie in Slider) solution = models.LongStringField() # genauso wie text response = models.LongStringField() response_timestamp = models.FloatField() is_correct = models.BooleanField() def gen_puzzle(player: Player) -> Puzzle: """ generate new puzzle for a player""" ## ignored_chars = player.session.config['ignored_chars'] if 'ignored_chars' in player.session.config else C.IGNORED_CHARS counted_char = player.session.config['counted_char'] if 'counted_char' in player.session.config else C.COUNTED_CHARS width = player.session.config['width'] if 'width' in player.session.config else C.WIDTH height = player.session.config['height'] if 'height' in player.session.config else C.HEIGHT text_size = player.session.config['text_size'] if 'text_size' in player.session.config else C.TEXT_SIZE amount_of_counted_chars = player.session.config['amount_of_counted_chars'] if 'amount_of_counted_chars' in player.session.config else C.AMOUNT_OF_COUNTED_CHARS fields = task_matrix.generate_puzzle_fields(ignored_chars=ignored_chars, counted_char=counted_char, width=width, height=height, text_size= text_size, amount_of_counted_chars=amount_of_counted_chars, amount_empty_fields=C.AMOUNT_EMPTY_FIELDS ) ## Schriftgröße kann hier mitgegeben werden player.iteration += 1 return Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), **fields # text & solution ) def get_cur_puzzle(player: Player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def enc_puzzle(puzzle: Puzzle): image = task_matrix.render_image(puzzle) return dict(image=encode_image(image)) def get_progess(player:Player): return dict( num_trials= player.num_trials, num_correct = player.num_correct, num_incorrect = player.num_failed, iteration = player.iteration ) def play_game(player: Player, msg: dict): ## siehe slider session = player.session my_id = player.id_in_group params = session.params now = time.time() current = get_cur_puzzle(player) msg_type = msg['type'] if msg_type == 'load': prog = get_progess(player) if current: return { my_id: dict(type='status',progress=prog, puzzle=enc_puzzle(current)) } else: return {my_id: dict(type='status',progress=prog)} if msg_type == 'cheat' and settings.DEBUG: return {my_id: dict(type='solution', solution=current.solution)} if msg_type == 'next': if current is not None: if current.response is None: raise RuntimeError('trying to skip over unsolved puzzle') if now < current.timestamp + params["puzzle_delay"]: raise RuntimeError('retrying too fast') return {my_id: dict(type='status', progress=get_progess(player), iteration_left=0)} puzz = gen_puzzle(player) prog = get_progess(player) return {my_id: dict(type='puzzle', puzzle=enc_puzzle(puzz), progress=prog)} if msg_type == 'answer': if current is None: raise RuntimeError('trying to answer no puzzle') if current.response is not None: # if current.attempts >= params['attempts_per_puzzle']: # raise RuntimeError('no more attempts allowed') # herausgenommen da jeder ja nur einen Versuch hat # if now < current.response_timestamp + params['retry_delay']: # raise RuntimeError('retrying too fast') # herausgenommen, da die Zahl submittet wird, wenn gewünscht dann retry hoch player.num_trials -= 1 if current.is_correct: player.num_correct -= 1 else: player.num_failed -= 1 answer = msg['answer'] if answer == '' or answer is None: raise ValueError('Answer is empty ') current.response = answer current.is_correct = task_matrix.is_correct(answer, current) player.solution = int(current.solution) current.response_timestamp = now current.attempts +=1 if current.is_correct: player.num_correct += 1 else: player.num_failed +=1 player.num_trials += 1 player.response = int(answer) tries_left = params['attempts_per_puzzle'] - current.attempts prog = get_progess(player) return { my_id: dict( type='feedback', is_correct=current.is_correct, retries_left=tries_left, progress=prog, ) } raise RuntimeError("unrecognized message from client") def calculate_group_findings(group: Group): together_test =group.get_players()[0].participant.count_numbers_together if together_test is not None and together_test: group.response = 0 group.solution = 0 for p in group.get_players(): if p.response == -1 and p.solution == 0: ## nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY group.solution += p.solution group.response += p.response group.is_correct = abs(group.solution - group.response) <= C.DEVIATION_GROUP for p in group.get_players(): p.is_correct = group.is_correct else: ## fürs alleine spielen for p in group.get_players(): if p.response == -1 and p.solution == 0: # nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY p.is_correct = abs(p.solution - p.response) <= (C.DEVIATION_GROUP/2) #halb so viel Abweichung group.is_correct = False ## Gruppe kann nicht recht haben wenn es keine Gruppe geht, schau auf den Spieler for p in group.get_players(): if p.participant.count_numbers_together: p.payoff = C.PAYOFF_PER_PUZZLE if p.is_correct else cu(0) else: # solo if p.participant.vars['spieler'] is not None: if p.participant.vars['spieler'] =='Spieler1': p.payoff = C.PAYOFF_PER_PUZZLE_SP1 if p.is_correct else cu(0) else: p.payoff = C.PAYOFF_PER_PUZZLE_SP2 if p.is_correct else cu(0) # PAGES class Intro(Page): @staticmethod def vars_for_template(player: Player): ## only for testing, normally it should come from the app before if len(player.participant.vars) == 0 or player.participant.vars['count_numbers_together'] is None: player.participant.vars['count_numbers_together'] = True if len(player.session.params) == 0 or player.session.params['second_time_counting'] is None: player.session.params['second_time_counting'] = False second_time_counting = player.session.params['second_time_counting'] second_time = second_time_counting is not None and second_time_counting sp1 = False if player.session.second_time_counting: if player.participant.vars['spieler'] == 'Spieler1': sp1 = True return dict( second_time = second_time, together = player.participant.vars['count_numbers_together'], myround = player.round_number==1, spieler1= sp1 ) def is_displayed(player: Player): return not (player.session.second_time_counting==True and player.round_number>1) class Spiel(Page): timeout_seconds = C.TIME_PER_PUZZLE live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): return dict(DEBUG=settings.DEBUG, input_type=task_matrix.INPUT_TYPE, placeholder=task_matrix.INPUT_HINT, together = player.participant.vars['count_numbers_together'] ) @staticmethod def before_next_page(player: Player, timeout_happened): if not timeout_happened and not player.session.params['max_iterations']: raise RuntimeError("malicious page submission") '''if C.PLAYERS_PER_GROUP is not None and C.PLAYERS_PER_GROUP > 1: ### An der Stelle Zusammenrechnen der Gruppenberechnung machen, bzw in der Gruppe speichern current = get_cur_puzzle(player) player.group.solution += int (current.solution) player.group.response += int (current.response) ''' @staticmethod def is_displayed(player: Player): if player.session.second_time_counting: if player.round_number > 1: return False else: if player.participant.count_numbers_together==False and player.participant.spieler== 'Spieler2': return False else: return True else: return True #return not (player.session.second_time_counting == True and player.round_number > 1) class Group_wait_stage(WaitPage): @staticmethod def after_all_players_arrive(group: Group): calculate_group_findings(group) def vars_for_template(player: Player): return dict (body_text= "Bitte warten Sie, bis Ihr(e) Teampartner*in mit dem Zählen fertig ist") def is_displayed(player: Player): return not (player.session.second_time_counting == True and player.round_number > 1) ''' class Group_Stage(Page): @staticmethod def is_displayed(player: Player): return C.PLAYERS_PER_GROUP is not None and C.PLAYERS_PER_GROUP > 1 ### nur wenn es eine Gruppe gibt @staticmethod def vars_for_template(player: Player): return dict()## hier dict füllen wenn ihr was anzeigen wollt für den spaß ''' class Results(Page): @staticmethod def vars_for_template(player: Player): cor = player.group.is_correct if(player.session.second_time_counting and player.participant.vars['count_numbers_together']==False): cor=player.is_correct sp1 = False if player.session.second_time_counting: if player.participant.vars['spieler'] == 'Spieler1': sp1=True return dict( correct = cor, payoff = player.payoff if cor else "keine Entlohnung", together = player.participant.vars['count_numbers_together'], spieler1 = sp1, myround_final = player.round_number==C.NUM_ROUNDS ) #für zweiten teil überspringen def is_displayed(player: Player): return not (player.session.second_time_counting == True and player.round_number > 1) class ShuffleWaitPage(WaitPage): wait_for_all_groups = True def after_all_players_arrive(subsession: Subsession): if subsession.session.params['second_time_counting'] == True: subsession.set_group_matrix(subsession.session.mymatrix) print(subsession.get_group_matrix()) def is_displayed(player: Player): return player.session.second_time_counting == True and player.round_number == 1 def vars_for_template(player: Player): return dict (body_text= "Bitte warten Sie, bis das Experiment weitergeht") page_sequence = [ShuffleWaitPage, Intro, Spiel, Group_wait_stage, Results]