import time from otree import settings from otree.api import * from . import task_matrix from .image_utils import encode_image from datetime import datetime doc = """ Count character in characters keep_groups_as_before Variable: Wenn man möchte, dass Gruppen in der zweiten Phase zusammenbleiben sollen, dann auf True setzen Wenn Gruppen in der zweiten Phase gemischt werden sollen, dann auf False setzen. """ class C(BaseConstants): NAME_IN_URL = 'real_effort_task_' +datetime.now().strftime("%Y%m%dT%H%M%S%f") PLAYERS_PER_GROUP = 2 NUM_ROUNDS = 1 WB_INTRO_1 = __name__ + '/wbintro1.html' WB_INTRO_2 = __name__ + '/wbintro2.html' INSTRUCTION_TEMPLATE = __name__ + "/instructions.html" WAIT_TEMPLATE = __name__ + "/waittext.html" RESULT_TEMPLATE = __name__ + "/result_template.html" ##Puzzle specifics TEXT_SIZE = 20 DEVIATION_GROUP = 4 ## Deviation for Group stuff NO_SUBMIT_PENALTY = DEVIATION_GROUP * 5 PAYOFF_PER_PUZZLE = cu(5) #in norm appropriateness cu(1.5) PAYOFF_PER_PUZZLE_SP1 = cu(4) ##in norm appropriateness cu(1.2) PAYOFF_PER_PUZZLE_SP2 = cu(0) TIME_PER_PUZZLE = 120 HEIGHT = 15 WIDTH = 20 IGNORED_CHARS = "012345689" COUNTED_CHARS = "7" AMOUNT_OF_COUNTED_CHARS = 1 AMOUNT_EMPTY_FIELDS = 0 class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): session = subsession.session defaults = dict( second_time_counting2 = True, keep_groups_as_before2 = True #ändern bei anderen Experimenten ) for param in defaults: session.params[param] = session.config.get(param, defaults[param]) #print("params session group2", session.params) session.vars['second_time_counting2']=True class Group(BaseGroup): solution = models.IntegerField(initial=0) response = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False) solo_play = models.BooleanField(initial=False) class Player(BasePlayer): iteration = models.IntegerField(initial=0) num_trials = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_failed = models.IntegerField(initial=0) response = models.IntegerField(initial=-1) solution = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False, doc='Nur für 1x Zählen und Alleine Zählen aussagekräftig') ## nur für solospiele #in_team = models.BooleanField(Initial=True) #besonders relevant für zweiten Teil #spieler1 = models.BooleanField (initial=False) #relevant für zweiten Teil match = models.IntegerField(initial=-1) # Puzzlestuff class Puzzle(ExtraModel): player = models.Link(Player) iteration = models.IntegerField(initial=0) attempts = models.IntegerField(initial=0) timestamp = models.FloatField(initial=0) text = models.LongStringField() # entw. Text oder Json (wie in Slider) solution = models.LongStringField() # genauso wie text response = models.LongStringField() response_timestamp = models.FloatField() is_correct = models.BooleanField() def gen_puzzle(player: Player) -> Puzzle: """ generate new puzzle for a player""" ## ignored_chars = player.session.config['ignored_chars'] if 'ignored_chars' in player.session.config else C.IGNORED_CHARS counted_char = player.session.config['counted_char'] if 'counted_char' in player.session.config else C.COUNTED_CHARS width = player.session.config['width'] if 'width' in player.session.config else C.WIDTH height = player.session.config['height'] if 'height' in player.session.config else C.HEIGHT text_size = player.session.config['text_size'] if 'text_size' in player.session.config else C.TEXT_SIZE amount_of_counted_chars = player.session.config['amount_of_counted_chars'] if 'amount_of_counted_chars' in player.session.config else C.AMOUNT_OF_COUNTED_CHARS fields = task_matrix.generate_puzzle_fields(ignored_chars=ignored_chars, counted_char=counted_char, width=width, height=height, text_size= text_size, amount_of_counted_chars=amount_of_counted_chars, amount_empty_fields=C.AMOUNT_EMPTY_FIELDS ) ## Schriftgröße kann hier mitgegeben werden player.iteration += 1 return Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), **fields # text & solution ) def get_cur_puzzle(player: Player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def enc_puzzle(puzzle: Puzzle): image = task_matrix.render_image(puzzle) return dict(image=encode_image(image)) def get_progess(player:Player): return dict( num_trials= player.num_trials, num_correct = player.num_correct, num_incorrect = player.num_failed, iteration = player.iteration ) def play_game(player: Player, msg: dict): ## siehe slider session = player.session my_id = player.id_in_group params = session.params now = time.time() current = get_cur_puzzle(player) msg_type = msg['type'] if msg_type == 'load': prog = get_progess(player) if current: return { my_id: dict(type='status',progress=prog, puzzle=enc_puzzle(current)) } else: return {my_id: dict(type='status',progress=prog)} if msg_type == 'cheat' and settings.DEBUG: return {my_id: dict(type='solution', solution=current.solution)} if msg_type == 'next': if current is not None: if current.response is None: raise RuntimeError('trying to skip over unsolved puzzle') if now < current.timestamp + params["puzzle_delay"]: raise RuntimeError('retrying too fast') return {my_id: dict(type='status', progress=get_progess(player), iteration_left=0)} puzz = gen_puzzle(player) prog = get_progess(player) return {my_id: dict(type='puzzle', puzzle=enc_puzzle(puzz), progress=prog)} if msg_type == 'answer': if current is None: raise RuntimeError('trying to answer no puzzle') if current.response is not None: # if current.attempts >= params['attempts_per_puzzle']: # raise RuntimeError('no more attempts allowed') # herausgenommen da jeder ja nur einen Versuch hat # if now < current.response_timestamp + params['retry_delay']: # raise RuntimeError('retrying too fast') # herausgenommen, da die Zahl submittet wird, wenn gewünscht dann retry hoch player.num_trials -= 1 if current.is_correct: player.num_correct -= 1 else: player.num_failed -= 1 answer = msg['answer'] if answer == '' or answer is None: raise ValueError('Answer is empty ') current.response = answer current.is_correct = task_matrix.is_correct(answer, current) player.solution = int(current.solution) current.response_timestamp = now current.attempts +=1 if current.is_correct: player.num_correct += 1 else: player.num_failed +=1 player.num_trials += 1 player.response = int(answer) tries_left = params['attempts_per_puzzle'] - current.attempts prog = get_progess(player) return { my_id: dict( type='feedback', is_correct=current.is_correct, retries_left=tries_left, progress=prog, ) } raise RuntimeError("unrecognized message from client") def calculate_group_findings(group: Group): together_test =group.get_players()[0].participant.count_numbers_together if together_test is not None and together_test: group.response = 0 group.solution = 0 for p in group.get_players(): if p.response == -1 and p.solution == 0: ## nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY group.solution += p.solution group.response += p.response group.is_correct = abs(group.solution - group.response) <= C.DEVIATION_GROUP for p in group.get_players(): p.is_correct = group.is_correct else: ## fürs alleine spielen for p in group.get_players(): if p.response == -1 and p.solution == 0: # nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY p.is_correct = abs(p.solution - p.response) <= (C.DEVIATION_GROUP/2) #halb so viel Abweichung group.is_correct = False ## Gruppe kann nicht recht haben wenn es keine Gruppe geht, schau auf den Spieler for p in group.get_players(): if p.participant.count_numbers_together: p.payoff = C.PAYOFF_PER_PUZZLE if p.is_correct else cu(0) #print(f'payoff:{p.payoff} other') #print(f'payoff:{C.PAYOFF_PER_PUZZLE} Constant') else: # solo if p.participant.vars['spieler'] is not None: if p.participant.vars['spieler'] =='Spieler1': p.payoff = C.PAYOFF_PER_PUZZLE_SP1 if p.is_correct else cu(0) else: p.payoff = C.PAYOFF_PER_PUZZLE_SP2 if p.is_correct else cu(0) def find_transition_index(array): for i,element in enumerate(array): if not isinstance(element, list): return i return -1 #Nur für Experiment wb_norm_app: Wenn 2 Leute da sind und gleiche GruppenIDs haben, dann wird #gematcht def group_by_arrival_time_method(subsession, waiting_players): from collections import defaultdict if subsession.session.params['second_time_counting2']: ### beim zweiten mal: if not subsession.session.params['keep_groups_as_before2']: ## wenn ich das erste mal reinkomme subsession.set_group_matrix(subsession.session.mymatrix) else: players_grouped_by_matched_before = defaultdict(list) for p in waiting_players: players_grouped_by_matched_before[p.participant.matched_before].append(p) for g in players_grouped_by_matched_before.values(): if len(g) >= 2: print('pair found for 2nd', "",g[0],"und",g[1]) return [g[0],g[1]] else: #einfahc matchen players = [p for p in waiting_players] if len(players) >= 2: subsession.session.vars['mymatrix'] = subsession.get_group_matrix() return [players[0],players[1]] # PAGES class Intro(Page): @staticmethod def vars_for_template(player: Player): ## only for testing, normally it should come from the app before #Todo: Note if clause removed because not needed #if len(player.session.params) == 0 or player.session.params['second_time_counting2'] is None: #player.session.params['second_time_counting2'] = False player.session.params['second_time_counting2'] = True #hart setzen player.session.vars['second_time_counting2'] = True second_time_counting = True #player.session.params['second_time_counting2'] hart setzen auf True second_time = second_time_counting #is not None and second_time_counting sp1 = False if player.session.second_time_counting2: if player.participant.vars['spieler'] == 'Spieler1': sp1 = True #print("Intro CountNumbersGroup2 spielen wir zusammen?", player.participant.count_numbers_together) #print ("vars spieler:", (player.participant.vars['spieler'])) return dict( second_time = second_time, together = player.participant.vars['count_numbers_together'], myround = player.round_number==1, spieler1= sp1 ) def is_displayed(player: Player): print("player session group2 intro", player.session.vars['second_time_counting2'], "player subsession group2 intro", player.subsession.session.vars['second_time_counting2']) return not (player.subsession.session.vars['second_time_counting2'] and player.round_number>1) class Spiel(Page): timeout_seconds = C.TIME_PER_PUZZLE if not settings.DEBUG else 10 live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): return dict(DEBUG=settings.DEBUG, input_type=task_matrix.INPUT_TYPE, placeholder=task_matrix.INPUT_HINT, together = player.participant.vars['count_numbers_together'] ) @staticmethod def before_next_page(player: Player, timeout_happened): if not timeout_happened and not player.session.params['max_iterations']: raise RuntimeError("malicious page submission") '''if C.PLAYERS_PER_GROUP is not None and C.PLAYERS_PER_GROUP > 1: ### An der Stelle Zusammenrechnen der Gruppenberechnung machen, bzw in der Gruppe speichern current = get_cur_puzzle(player) player.group.solution += int (current.solution) player.group.response += int (current.response) ''' @staticmethod def is_displayed(player: Player): print("hier drin Wert von second_time_counting und Rolle participant",player.session.second_time_counting, player.participant.spieler) if player.session.second_time_counting2: if player.round_number > 1: return False else: if player.participant.count_numbers_together==False and player.participant.spieler== 'Spieler2': return False else: return True else: return True #return not (player.session.second_time_counting == True and player.round_number > 1) class Group_wait_stage(WaitPage): @staticmethod def after_all_players_arrive(group: Group): calculate_group_findings(group) def vars_for_template(player: Player): return dict(body_text="Bitte warten Sie, bis der*die Teampartner*in die Aufgabe beendet hat.") #return dict (body_text= "Please wait until your partner has completed the task.") def is_displayed(player: Player): return not (player.session.second_time_counting2 == True and player.round_number > 1) ''' class Group_Stage(Page): @staticmethod def is_displayed(player: Player): return C.PLAYERS_PER_GROUP is not None and C.PLAYERS_PER_GROUP > 1 ### nur wenn es eine Gruppe gibt @staticmethod def vars_for_template(player: Player): return dict()## hier dict füllen wenn ihr was anzeigen wollt für den spaß ''' class Results(Page): @staticmethod def vars_for_template(player: Player): cor = player.group.is_correct if(player.session.second_time_counting2 and player.participant.vars['count_numbers_together']==False): cor=player.is_correct sp1 = False if player.session.second_time_counting2: if player.participant.vars['spieler'] == 'Spieler1': sp1=True return dict( correct = cor, payoff = player.payoff if cor else "keine Auszahlung für diese Aufgabe.", #"no payment for this task" together = player.participant.vars['count_numbers_together'], spieler1 = sp1, myround_final = player.round_number==C.NUM_ROUNDS ) #für zweiten teil überspringen def is_displayed(player: Player): return not (player.session.second_time_counting2 == True and player.round_number > 1) #For Experiment wb_norm_app use ShuffleWaitPageWBNormApp instead of ShuffleWaitPage(WaitPage); use according page_sequence class ShuffleWaitPage(WaitPage): group_by_arrival_time = True def is_displayed(player: Player): #print(player.session.second_time_counting) return player.round_number == 1 def vars_for_template(player: Player): return dict(body_text="Bitte warten Sie, bis das Experiment weitergeht.") #return dict (body_text= "Please wait until the experiment can continue.") page_sequence = [ ShuffleWaitPage , Intro, Spiel, Group_wait_stage, Results] #page_sequence = [ShuffleWaitPageWBNormApp, Intro, Spiel, Group_wait_stage, Results]