import random import time from cProfile import label from otree import settings from otree.api import * from . import task_matrix from .image_utils import encode_image doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'unmasking_corporate_hypocrisy' PLAYERS_PER_GROUP = 2 NUM_ROUNDS = 3 ### Test Round + 2 Game Rounds COMPANY_OWNER_ROLE = "Firmeninhaber:in" EMPLOYEE_ROLE = "Arbeitnehmer:in" DONATION_PROMISE = 25 ## Was wird versprochen zu spenden DONATION_BONUS_PERCENTAGE = DONATION_PROMISE ### Welchen Teil Zwackt der Company Owner ab DONATION_BONUS = 2 ## Multiplier für das hinterziehen TASK_POINTS = 30 ### Taler generated per Game TASK_THRESHOLD = 5 ### Minimum amount tasks for employee salary EMPLOYEE_START_SALARY = 350 COMPANY_OWNER_START_SALARY = 50 GAME_TIME = 240#240 TRIAL_TIME = 60#60 BIASED_NO_INFORMATION = True # No Information only if the Company Owner donated - True #BIASED_INFORMATION = True ##no information only if Company Owner kept money NO_INFORMATION_PERCENTAGE = 0.2 ### ### NumberTask Specifics ## changeable HEIGHT = 10 WIDTH = 6 IGNORED_CHARS = "012345689" ### if you want to change this later, every character can be used (for example: ←) COUNTED_CHARS = "7" AMOUNT_OF_COUNTED_CHARS = 1 ## Multiplier for amount of 7s (baseline 1 ^= 1/10 possibility per image)) ## Fixed TEXT_SIZE = 20 DEVIATION_GROUP = 0 ## Deviation for Group stuff NO_SUBMIT_PENALTY = DEVIATION_GROUP * 5 PAYOFF_PER_PUZZLE = cu(TASK_POINTS) TIME_PER_PUZZLE = GAME_TIME AMOUNT_EMPTY_FIELDS = 0 class Subsession(BaseSubsession): pass class Group(BaseGroup): donated = models.BooleanField(initial=False) earnings = models.CurrencyField(initial=0) donations = models.CurrencyField(initial=0) class Player(BasePlayer): completed_task = models.IntegerField(initial=0) earnings = models.CurrencyField(initial=0) decision_to_keep = models.BooleanField(label="Ich möchte ___", choices=[[True, 'das Geld im Unternehmen behalten.'], [False,' das Geld spenden.']]) received_information = models.BooleanField(initial=False) ## control questions control_company_owner_1 = models.IntegerField(label='Ihre Rolle in diesem Experiment ist ___', choices = [ [1,C.COMPANY_OWNER_ROLE], [2,C.EMPLOYEE_ROLE] ]) control_company_owner_2 = models.IntegerField(label="Ihr/e Arbeitnehmer:in hat 4 Aufgaben korrekt gelöst und 120 Taler für das Unternehmen generiert. Welcher Anteil dieser 120 Taler wird Ihre zusätzliche Auszahlung sein?", choices= [ [1,'24 Taler (20%)'], [2,'30 Taler (25%)'], [3,'60 Taler (50%)']]) #control_company_owner_3 = models.IntegerField(label="Die Entscheidung, nicht an die Wohltätigkeitsorganisation zu spenden und stattdessen das Geld im Unternehmen zu behalten, um weiteren Profit zu generieren, wird ___", # choices = [ [1,'ihre eigene Auszahlung erhöhen.'], [2, 'keinen Einfluss auf ihre Auszahlung haben.'], [3,'ihre eigene Auszahlung verringern.']]) control_employee_1 = models.IntegerField(label='Ihre Rolle in diesem Experiment ist ___', choices = [ [1,C.COMPANY_OWNER_ROLE], [2,C.EMPLOYEE_ROLE] ]) control_employee_2 = models.IntegerField(label='Wer entscheidet über die Verwendung des Unternehmensgewinns?', choices=[ [1, 'Sie, als Arbeitnehmender'], [2, 'Ihr Teampartner, als Firmeninhaber:in']]) control_employee_3 = models.IntegerField(label='Angenommen Sie lösen 10 Aufgaben korrekt. Wie hoch ist Ihre Auszahlung?', choices= [[1, '200 Taler'], [2, '350 Taler'], [3,'500 Taler']]) ### Number Task iteration = models.IntegerField(initial=0) num_trials = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_failed = models.IntegerField(initial=0) solution = models.LongStringField(initial=0) response = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False) ### create Session ### def creating_session(subsession: Subsession): session = subsession.session ## default variables () defaults = dict( retry_delay=1.0, puzzle_delay=1.0, attempts_per_puzzle=1, max_iterations= 300 ) session.params = {} ## for each param defined, try if its set, else use default for param in defaults: session.params[param] = session.config.get(param, defaults[param]) ### Puzzle Methods class Puzzle(ExtraModel): player = models.Link(Player) iteration = models.IntegerField(initial=0) attempts = models.IntegerField(initial=0) timestamp = models.FloatField(initial=0) text = models.LongStringField() # entw. Text oder Json (wie in Slider) solution = models.LongStringField() # genauso wie text response = models.LongStringField() response_timestamp = models.FloatField() is_correct = models.BooleanField() def gen_puzzle(player: Player) -> Puzzle: """ generate new puzzle for a player""" fields = task_matrix.generate_puzzle_fields(ignored_chars=C.IGNORED_CHARS, counted_char=C.COUNTED_CHARS, width=C.WIDTH, height=C.HEIGHT) ## Schriftgröße kann hier mitgegeben werden player.iteration += 1 return Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), **fields # text & solution ) def get_cur_puzzle(player: Player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def enc_puzzle(puzzle: Puzzle): image = task_matrix.render_image(puzzle) return dict(image=encode_image(image)) def get_progess(player:Player): return dict( num_trials= player.num_trials, num_correct = player.num_correct, num_incorrect = player.num_failed, iteration = player.iteration ) def play_game(player: Player, msg: dict): ## siehe slider session = player.session my_id = player.id_in_group params = session.params now = time.time() current = get_cur_puzzle(player) msg_type = msg['type'] if msg_type == 'load': prog = get_progess(player) if current: return { my_id: dict(type='status',progress=prog, puzzle=enc_puzzle(current)) } else: return {my_id: dict(type='status',progress=prog)} if msg_type == 'cheat' and settings.DEBUG: return {my_id: dict(type='solution', solution=current.solution)} if msg_type == 'next': if current is not None: if current.response is None: raise RuntimeError('trying to skip over unsolved puzzle') if now < current.timestamp + params["puzzle_delay"]: raise RuntimeError('retrying too fast') if current.iteration == params['max_iterations']: return { my_id: dict(type='status', progess=get_progess(player), iterations_left=0) } puzz = gen_puzzle(player) prog = get_progess(player) return {my_id: dict(type='puzzle', puzzle=enc_puzzle(puzz), progress=prog)} if msg_type == 'answer': if current is None: raise RuntimeError('trying to answer no puzzle') if current.response is not None: if current.attempts >= params['attempts_per_puzzle']: raise RuntimeError('no more attempts allowed') if now < current.response_timestamp + params['retry_delay']: raise RuntimeError('retrying too fast') player.num_trials -=1 if current.is_correct: player.num_correct-= 1 else: player.num_failed -= 1 answer = msg['answer'] if answer == '' or answer is None: raise ValueError('Answer is empty ') current.response = answer current.is_correct = task_matrix.is_correct(answer,current) current.response_timestamp = now current.attempts +=1 if current.is_correct: player.num_correct += 1 else: player.num_failed +=1 player.num_trials += 1 tries_left = params['attempts_per_puzzle'] - current.attempts prog = get_progess(player) feedback_msg = { my_id: dict( type='feedback', is_correct=current.is_correct, retries_left = tries_left, progress=prog, ) } ''' Extra for more than one puzzle if current.is_correct or current.attempts >= params['attempts_per_puzzle']: puzz = gen_puzzle(player) prog = get_progess(player) feedback_msg[my_id].update({ 'next_puzzle': enc_puzzle(puzz), # Neues Puzzle mitschicken }) ''' return feedback_msg raise RuntimeError("unrecognized message from client") ### Extra for Calculations def calculate_company_owner_embezzlement(player: Player): #bonus_earnings = (C.DONATION_BONUS_PERCENTAGE / 100) * C.DONATION_BONUS * (player.group.earnings - player.earnings) bonus_earnings = (C.DONATION_BONUS_PERCENTAGE / 100) * C.DONATION_BONUS * player.group.earnings * 0.75 earnings = player.earnings return bonus_earnings, earnings # PAGES class SyncPage(WaitPage): wait_for_all_groups = True @staticmethod def after_all_players_arrive(subsession: Subsession): # Gruppierung nur in der ersten Runde der App if subsession.round_number == 1: session = subsession.session # Erlaubte Teilnehmer filtern allowed_players = [ p for p in session.get_participants() if p.vars.get('allowed_in_main_experiment', False) ] # Nicht erlaubte Teilnehmer filtern excluded_players = [ p for p in session.get_participants() if not p.vars.get('allowed_in_main_experiment', False) ] # Gruppiere die erlaubten Teilnehmer basierend auf der Gruppengröße groups = [] current_group = [] for i, participant in enumerate(allowed_players): current_group.append(participant.id_in_session) if len(current_group) == C.PLAYERS_PER_GROUP: groups.append(current_group) current_group = [] # Füge verbleibende erlaubte Spieler zu einer letzten Gruppe hinzu if current_group: groups.append(current_group) # Füge die ausgeschlossenen Spieler zu "Leerlaufgruppen" hinzu for excluded in excluded_players: groups.append([excluded.id_in_session]) # Setze die Gruppenmatrix subsession.set_group_matrix(groups) else: subsession.group_like_round(1) class Instructions(Page): @staticmethod def vars_for_template(player): is_employee = player.role == C.EMPLOYEE_ROLE return dict( is_employee = is_employee, player_letter = "B" if is_employee else "A", ) @staticmethod def is_displayed(player): return player.round_number == 1 class ControlCompanyOwner(Page): form_model = 'player' form_fields = ['control_company_owner_1', 'control_company_owner_2'] @staticmethod def is_displayed(player): return player.round_number == 1 and player.role == C.COMPANY_OWNER_ROLE @staticmethod def error_message(player, values): solutions = dict( control_company_owner_1 = 1, control_company_owner_2 = 2, # control_company_owner_3 = 1 ) error_message = dict( control_company_owner_1 = "Ihre Rolle in diesem Experiment ist Firmeninhaber:in." , control_company_owner_2 = "Sie erhalten 25 % des von Ihrem Arbeitnehmer/Ihrer Arbeitnehmerin generierten Gewinns.", # control_company_owner_3 = "Die Entscheidung das Geld im Unternehmen zu behalten anstatt es zu spenden, generiert zusätzlichen Gewinn für das Unternehmen und erhöht dadurch Ihre Auszahlung.", ) error_messages = dict() for i,field_name in enumerate(solutions): if values[field_name] != solutions[field_name]: error_messages[field_name] = error_message[field_name] return error_messages class ControlEmployee(Page): form_model = 'player' form_fields = ['control_employee_1', 'control_employee_2', 'control_employee_3'] @staticmethod def is_displayed(player): return player.round_number == 1 and player.role == C.EMPLOYEE_ROLE @staticmethod def error_message(player, values): solutions = dict( control_employee_1=2, control_employee_2=2, control_employee_3=2 ) error_message = dict( control_employee_1="Ihre Rolle in diesem Experiment ist Arbeitnehmer.", control_employee_2="Ihr Teamartner, als Firmeninhaber:in entscheidet über die Verwendung des Unternehmensgewinns.", control_employee_3="Sie erhalten ein festes Gehalt in Höhe von 350 Talern.\n Um dieses Gehalt zu erhalten, müssen Sie 5 Aufgaben korrekt lösen. Jede zusätzlich gelöste Aufgabe hat keinen Einfluss auf Ihre persönliche Auszahlung am Ende des Experiments.", ) error_messages = dict() for i, field_name in enumerate(solutions): if values[field_name] != solutions[field_name]: error_messages[field_name] = error_message[field_name] return error_messages class InfoRealRounds(Page): @staticmethod def is_displayed(player): return player.round_number > 1 @staticmethod def vars_for_template(player): earnings_round_1 = player.in_round(player.round_number - 1).earnings return dict( round = player.round_number - 1 , earning_before = earnings_round_1, earnings_possible = earnings_round_1 + C.EMPLOYEE_START_SALARY, is_employee = player.role == C.EMPLOYEE_ROLE ) class EmployeeInfo(Page): @staticmethod def vars_for_template(player): trial_run = player.round_number == 1 return dict( title = 'Proberunde' if trial_run else 'Aufgabe', trial_run = trial_run, game_time = C.GAME_TIME // 60 ) @staticmethod def is_displayed(player: Player): return player.role == C.EMPLOYEE_ROLE class EmployeeGameTrial(Page): timeout_seconds = C.TRIAL_TIME live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): return dict(DEBUG=settings.DEBUG, input_type=task_matrix.INPUT_TYPE, placeholder=task_matrix.INPUT_HINT) @staticmethod def before_next_page(player: Player, timeout_happened): if not timeout_happened and not player.session.params['max_iterations']: raise RuntimeError("malicious page submission") @staticmethod def is_displayed(player: Player): return player.role == C.EMPLOYEE_ROLE and player.round_number == 1 class EmployeeGame(Page): timeout_seconds = C.GAME_TIME live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): return dict(DEBUG=settings.DEBUG, input_type=task_matrix.INPUT_TYPE, placeholder=task_matrix.INPUT_HINT) @staticmethod def before_next_page(player: Player, timeout_happened): if not timeout_happened and not player.session.params['max_iterations']: raise RuntimeError("malicious page submission") @staticmethod def is_displayed(player: Player): return player.role == C.EMPLOYEE_ROLE and player.round_number > 1 class CompanyOwnerNumTaskWait(WaitPage): body_text = f"Bitte warten Sie bis Ihr Teammitglied die Aufgabe abgeschlossen hat. Dies wird etwa {C.GAME_TIME//60 } Minute(n) dauern." @staticmethod def is_displayed(player): return player.role == C.COMPANY_OWNER_ROLE @staticmethod def after_all_players_arrive(group): ### todo: Hier die Berechnung machen for player in group.get_players(): ### Firmeninhaber kriegt if player.role == C.COMPANY_OWNER_ROLE: other_players = player.get_others_in_group() amount_correct_counted = other_players[0].num_correct if other_players else 0 #amount_correct_counted = player.get_others_in_group()[0].num_correct player.group.earnings = amount_correct_counted * C.TASK_POINTS ### generierte Company Earning player.earnings = C.COMPANY_OWNER_START_SALARY + player.group.earnings*(C.DONATION_BONUS_PERCENTAGE/100) else: threshold_achieved = player.num_correct >= C.TASK_THRESHOLD player.earnings = threshold_achieved * C.EMPLOYEE_START_SALARY class CompanyOwnerChoice(Page): form_model = 'player' form_fields = ['decision_to_keep'] @staticmethod def is_displayed(player): ### Todo: Hier auch in der Test runde? sonst wieder einkommentieren return player.role == C.COMPANY_OWNER_ROLE and player.round_number > 1 @staticmethod def vars_for_template(player): amount_solved_correct = player.get_others_in_group()[0].num_correct threshold_achieved = amount_solved_correct >= C.TASK_THRESHOLD bonus_earnings, earnings = calculate_company_owner_embezzlement(player) return dict( correct_solved = amount_solved_correct, threshold_achieved = threshold_achieved, ## braucht ihr vermutlich nicht earnings = earnings, bonus_earnings = bonus_earnings, possible_earnings = earnings+bonus_earnings ) @staticmethod def before_next_page(player,timeout_happened): ## er behält if player.decision_to_keep: ### bis jetzt: 25% des UNternehmensressource ### + 25% * 2 * Restgeld des Unternehmens player.earnings += calculate_company_owner_embezzlement(player)[0] #player.earnings += (C.DONATION_PROMISE/100)*player.earnings player.group.donations = 0 else: ### es wird gespendet player.group.donations = (C.DONATION_PROMISE/100)*player.group.earnings player.group.earnings -= player.earnings class EmployeeWaitForDecision(WaitPage): @staticmethod def is_displayed(player): return player.role == C.EMPLOYEE_ROLE @staticmethod def after_all_players_arrive(group): pass ##sollte reichen class ShowDecisionResultEmployee(Page): @staticmethod def is_displayed(player): return player.role == C.EMPLOYEE_ROLE and player.round_number > 1 @staticmethod def vars_for_template(player): decision_to_keep = player.get_others_in_group()[0].decision_to_keep ## if kept and BIASED_NO_INFO or random bigger then Treshold information_scenario = (decision_to_keep and C.BIASED_NO_INFORMATION) or random.random() > C.NO_INFORMATION_PERCENTAGE #information_scenario = (not decision_to_keep and C.BIASED_INFORMATION) or random.random() > C.NO_INFORMATION_PERCENTAGE player.received_information = information_scenario return dict( keep_in_company = decision_to_keep, no_information_scenario = not information_scenario ) class RoundResult(Page): @staticmethod def is_displayed(player): return player.round_number > 1 ### not in the trial run class GameResult(Page): @staticmethod def vars_for_template(player): earnings = player.in_round(player.round_number - 1).earnings + player.earnings return dict( earnings = earnings ) @staticmethod def is_displayed(player): return player.round_number == C.NUM_ROUNDS class PayoffWaitPage(WaitPage): @staticmethod def is_displayed(player): return player.round_number > 1 @staticmethod def after_all_players_arrive(group): for player in group.get_players(): player.participant.vars['employee'] = player.role == C.EMPLOYEE_ROLE player.payoff = player.earnings class EndScreen(Page): @staticmethod def is_displayed(player): return player.round_number == C.NUM_ROUNDS page_sequence = [SyncPage, Instructions, ## Instructions ControlEmployee, ## ControlCompanyOwner, InfoRealRounds, EmployeeInfo, EmployeeGameTrial, EmployeeGame, CompanyOwnerNumTaskWait, CompanyOwnerChoice, EmployeeWaitForDecision, ShowDecisionResultEmployee, RoundResult, PayoffWaitPage, GameResult, EndScreen]