### Standard Imports import json import random import time import csv from otree import settings from otree.api import * ## Imports von unseren eigenen Dateien from . import slider_task ## Slider Verarbeitung from .image_utils import encode_image ## Bild Verarbeitung from collections import defaultdict doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'taxevasion' PLAYERS_PER_GROUP = None NUM_ROUNDS = 3 DONATION_ORGANISATION = __name__ + '/assets/donation_orga.csv' DONATION_ORGANISATION_TEMPLATE = __name__ + '/show_donation_orgas.html' TAX_SCREEN = __name__ + '/show_tax_screen.html' TAX_CHECK_PROB = 0.1 instructions_template = __name__ + "/instructions.html" MONEY_PER_SLIDER = 1500 AUTO_SUBMIT_TIME = 2400 ##120 ## danach schickt er die Antwort automatisch ab TEST_AUTO_SUBMIT_TIME = 2400##50 ## zum Testen die Slider Zeit TREATMENTS = ['control', 'info' , 'selection' ] TEXT_UEBER_VERDIENST = "Sie können nur maximal Ihr verdientes Geld als Bemessungsgrundlage verwenden " def read_csv(): f = open(C.DONATION_ORGANISATION, encoding='utf-8-sig') rows = list(csv.DictReader(f, delimiter=';')) # random.shuffle(rows) return rows class Subsession(BaseSubsession): HIGH_RATE = models.BooleanField(doc="Boolean zum überprüfen ob wir mit dem Hohen oder dem niedrigen Steuersatz spielen") TAX_RATE = models.FloatField(doc="", initial=0.25) INFO_TEXT = models.StringField(initial="") TREATMENT = models.StringField(initial=C.TREATMENTS[0]) pass class Group(BaseGroup): pass def creating_session(subsession: Subsession): session = subsession.session # hol sie aus dem Object ## Standard Varianten hier setzen defaults = dict( num_sliders=36, # anzahl der Slider num_columns=4, # anzahl der Spalten attempts_per_slider=5, # anzahl versuche pro Slider (mach sehr groß wenn nicht gebraucht) ## Delays hinzufügen? trial_delay=1, retry_delay=1 / 10, ## Zeit zwischen falscher Eingabe und neu Versuch ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) if 'treatment' in subsession.session.config: if subsession.session.config['treatment'] in C.TREATMENTS: subsession.TREATMENT = subsession.session.config['treatment'] else: raise ValueError('Treatment must be one of the following: {}'.format(C.TREATMENTS)) ## ganze donation org donation_orgas = read_csv() for org in donation_orgas: DonationOrganisation.create(subsession=subsession, **org) ''' control=False, #control == Tax_Screen keine Orgas info=False, #info == Show_donation_orga (nicht klickbar); Tax_screen ; Info Text auf blub setzen selection=False ## Selection == Klickbar orgas , und Tax_screen ''' class DonationOrganisation(ExtraModel): subsession = models.Link(Subsession) IdentNumber = models.IntegerField() Category = models.StringField() Title = models.StringField() Short_Description = models.LongStringField() DonationOrganisation = models.StringField() Founding = models.IntegerField() Usecase = models.StringField() Address_Name = models.StringField() Address_Street = models.StringField() Address_PLZ = models.StringField() Bank_Number = models.StringField() Bank_Name = models.StringField() Bank_BLZ = models.StringField() class DonationAmount(ExtraModel): subsession = models.Link(Subsession) DonationOrganisation = models.Link(DonationOrganisation) ID_Orga = models.IntegerField() ## IdentNumber Title = models.StringField() Amount = models.CurrencyField() ## Todo: Zuordnung Spieler und Organisation def to_dict(dorga: DonationOrganisation): return dict( ID = dorga.IdentNumber, Category=dorga.Category, Title = dorga.Title, Short_Description = dorga.Short_Description, DonationOrganisation = dorga.DonationOrganisation, Founding = dorga.Founding, Usecase = dorga.Usecase, Address_Name = dorga.Address_Name, Address_Street = dorga.Address_Street, Address_PLZ = dorga.Address_PLZ, Bank_Number = dorga.Bank_Number, Bank_Name = dorga.Bank_Name, Bank_BLZ = dorga.Bank_BLZ ) class Player(BasePlayer): taxable_income = models.FloatField( label="Wert, den Sie als Bemessungsgrundlage für Ihre Steuer angeben möchten") payoff_before_taxes = models.FloatField(initial=0) ## evtl Float wenns schöner für euch ist payoff_after_taxes = models.FloatField() taxes = models.FloatField(initial=0) winnings = models.FloatField() tax_checked = models.BooleanField() penalty = models.FloatField(initial=0) # profit = models.CurrencyField(initial=0) iteration = models.IntegerField(initial=0) ## nur dafür getestet num_correct = models.IntegerField(initial=0) ## anzahl richtiger time_elapsed = models.FloatField(initial=0) ## Vergangene Zeit num_false = models.IntegerField(initial=0) selected_donation_orgs = models.LongStringField(initial="") no_donation_orga_selected = models.BooleanField(initial=False) ### Extra Klasse für Spezifizierungen class Puzzle(ExtraModel): ## Idee ist für die anderen Grundspiele identisch """Model for Slider Setup""" player = models.Link(Player) iteration = models.IntegerField() timestamp = models.FloatField() response_timestamp = models.FloatField() num_sliders = models.IntegerField() layout = models.LongStringField() num_correct = models.IntegerField(initial=0) is_solved = models.BooleanField(initial=False) class Slider(ExtraModel): """Model pro Slider""" puzzle = models.Link(Puzzle) id_slider = models.IntegerField() target = models.IntegerField() value = models.IntegerField() is_correct = models.BooleanField(initial=False) attempts = models.IntegerField(initial=0) ### Ende Extraklassen ### Zusätzliche Methoden def gen_puzzle(player: Player) -> Puzzle: # kriegt nen Spieler -> gibt nen Puzzle """Create new Puzzle for a Player""" params = player.session.params num_sliders = params['num_sliders'] layout = slider_task.generate_layout(params) ## kommt aus der extra python die oben importiert wurde puzzle = Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), # start Zeit num_sliders=num_sliders, layout=json.dumps(layout) ) for i in range(num_sliders): target, initial = slider_task.generate_slider() Slider.create( puzzle=puzzle, id_slider=i, target=target, value=initial ) return puzzle def get_cur_puzzle(player: Player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def get_slider(puzzle: Puzzle, id_slider): sliders = Slider.filter(puzzle=puzzle, id_slider=id_slider) if sliders: [slider] = sliders return slider def enc_puzzle(puzzle: Puzzle): """Sent to Client - description of puzzle""" layout = json.loads(puzzle.layout) sliders = Slider.filter(puzzle=puzzle) ## Bild draus machen image = slider_task.render_image(layout, targets=[s.target for s in sliders]) return dict( image=encode_image(image), size=layout['size'], grid=layout['grid'], sliders={s.id_slider: {'value': s.value, 'is_correct': s.is_correct} for s in sliders} ) def get_progress(player: Player): """Return Progress of current Player""" return dict( iteration=player.iteration, solved=player.num_correct ) def handle_resp(puzzle, slider, value): slider.value = slider_task.snap_value(value, slider.target) slider.is_correct = slider.value == slider.target ## nur korrekt wenn es genau gleich ist puzzle.num_correct = len(Slider.filter(puzzle=puzzle, is_correct=True)) puzzle.is_solved = puzzle.num_correct == puzzle.num_sliders ### Ende Zusätzliche Methoden ## eigentliche Arbeit: def play_game(player: Player, msg: dict): """ Game workflow: Receive Message from browser, use data , give answer Serverpov for workflow: - receive: {'type': 'load'} -- empty message means page loaded - check if it's game start or page refresh midgame - respond: {'type': 'status', 'progress': ...} - respond: {'type': 'status', 'progress': ..., 'puzzle': data} in case of midgame page reload - receive: {'type': 'new'} -- request for a new puzzle - generate new sliders - respond: {'type': 'puzzle', 'puzzle': data} - receive: {'type': 'value', 'slider': ..., 'value': ...} -- submitted value of a slider - slider: the index of the slider - value: the value of slider in pixels - check if the answer is correct - respond: {'type': 'feedback', 'slider': ..., 'value': ..., 'is_correct': ..., 'is_completed': ...} - slider: the index of slider submitted - value: the value aligned to slider steps - is_corect: if submitted value is correct - is_completed: if all sliders are correct """ session = player.session my_id = player.id_in_group params = session.params now = time.time() puzzle = get_cur_puzzle(player) ## entweder leer(none) oder das aktuelle msg_type = msg['type'] if msg_type == 'load': prog = get_progress(player) id_dict = dict(type='status', progress=prog) if puzzle: ## falls n puzzle vorhanden ist, häng es dran id_dict['puzzle'] = enc_puzzle(puzzle) return {my_id: id_dict} if msg_type == 'new': if puzzle is not None: raise RuntimeError('Trying to create another Puzzle') player.iteration += 1 puzz = gen_puzzle(player) prog = get_progress(player) return {my_id: dict(type='puzzle', puzzle=enc_puzzle(puzz), progress=prog)} if msg_type == 'value': if puzzle is None: raise RuntimeError('Puzzle is missing') ## Spieler versucht es zu schnell wieder (retry delay, kleiner machen falls euch das häufiger passiert) #if puzzle.response_timestamp and now < puzzle.response_timestamp + params['retry_delay']: #raise RuntimeError('Retrying to fast') slider = get_slider(puzzle, int(msg['slider'])) if slider is None: raise RuntimeError('Slider is missing') if slider.attempts >= params['attempts_per_slider']: raise RuntimeError('Too many slider motions') value = int(msg['value']) handle_resp(puzzle, slider, value) puzzle.response_timestamp = now slider.attempts += 1 player.num_correct = puzzle.num_correct prog = get_progress(player) return { my_id: dict( type='feedback', slider=slider.id_slider, value=slider.value, is_correct=slider.is_correct, is_completed=puzzle.is_solved, progress=prog ) } if msg_type == 'cheat' and settings.DEBUG: ## nur für euch damit ihr es nicht machen müsst return {my_id: dict(type='solution', solution={s.id_slider: s.target for s in Slider.filter(puzzle=puzzle)})} # sonst, heißt er kennt den msg_type nicht raise RuntimeError('unrecognized message from Client') def set_all_donation_orgas(player: Player): return [dorga.Title for dorga in DonationOrganisation.filter(subsession=player.subsession)], # PAGES class Spiel(Page): timeout_seconds = C.AUTO_SUBMIT_TIME if not settings.DEBUG else C.TEST_AUTO_SUBMIT_TIME live_method = play_game @staticmethod def js_vars(player: Player): return dict( params=player.session.params, slider_size=slider_task.SLIDER_BBOX, ) @staticmethod def vars_for_template(player: Player): return dict( params=player.session.params, DEBUG=settings.DEBUG ) @staticmethod def before_next_page(player: Player, timeout_happened): puzzle = get_cur_puzzle(player) if puzzle and puzzle.response_timestamp: player.time_elapsed = puzzle.response_timestamp - puzzle.timestamp player.num_correct = puzzle.num_correct ## todo: hier das money gegebenenfalls über setting setzen für die 1500 taler von aussen player.payoff_before_taxes =player.num_correct * C.MONEY_PER_SLIDER player.num_false = puzzle.num_sliders - puzzle.num_correct # PAGES class DonationOverview(Page): form_model = 'player' form_fields = ['taxable_income'] @staticmethod def live_method(player: Player, data: dict): if 'selected_donation_orgs' in data: # Save the selected organization IDs directly to the player model # You can also perform any immediate processing here if needed player.selected_donation_orgs = json.dumps(data['selected_donation_orgs']) print(player.selected_donation_orgs, data['selected_donation_orgs']) @staticmethod def error_message(player, values): player.no_donation_orga_selected = False if values['taxable_income'] > player.payoff_before_taxes: return C.TEXT_UEBER_VERDIENST if player.subsession.TREATMENT == C.TREATMENTS[0]: ## Controltreatment extra logic ## todo:Behnud brauchst du hier noch was? return False if player.subsession.TREATMENT == C.TREATMENTS[2] and (player.selected_donation_orgs == "" or len(player.selected_donation_orgs) == 0): player.no_donation_orga_selected = True return False return False @staticmethod def vars_for_template(player: Player): #print(f"\n\n\n{player.subsession.TREATMENT} : {player.subsession.TREATMENT == C.TREATMENTS[0]}\n\n\n") info = player.subsession.TREATMENT == C.TREATMENTS[1] if info: player.selected_donation_orgs = json.dumps(set_all_donation_orgas(player)) ### Hier selected orgs alle setzten return dict( tax_rate=player.subsession.TAX_RATE * 100, organisation= [to_dict(dorga) for dorga in DonationOrganisation.filter(subsession=player.subsession)], info_text = player.subsession.INFO_TEXT, controi_treatment = player.subsession.TREATMENT == C.TREATMENTS[0], ## TODO Wieder das Not entfernen info = player.subsession.TREATMENT == C.TREATMENTS[1], selection = player.subsession.TREATMENT == C.TREATMENTS[2], ) class ResultsWaitPage(WaitPage): @staticmethod def is_displayed(player: Player): player.taxes = player.taxable_income * player.subsession.TAX_RATE player.winnings = player.payoff_before_taxes - player.taxes player.tax_checked = random.random() < C.TAX_CHECK_PROB ## wahrscheinlichkeit für einen steuercheck player.penalty = (player.payoff_before_taxes - player.taxable_income) * player.tax_checked player.payoff_after_taxes= player.winnings - player.penalty player.payoff = max(player.payoff_after_taxes , 0) return True @staticmethod def after_all_players_arrive(group: Group): if group.subsession.TREATMENT == C.TREATMENTS[0]: ### Control ### Todo: Behnud: willst du hier generell irgendwas in den Donation Amounts speichern? pass else: for player in group.get_players(): if not player.no_donation_orga_selected: selected_orgs_titles = json.loads(player.selected_donation_orgs) ## todo: Selected Orgs nochmal anpassen #selected_orgs = DonationOrganisation.filter(subsession=player.subsession, Title__in=selected_orgs_titles) selected_orgs = [org for org in DonationOrganisation.filter(subsession=player.subsession) if org.Title in selected_orgs_titles] print(selected_orgs_titles) ## amount berechnen amount_this_round = player.taxes / len(selected_orgs_titles) for org in selected_orgs: DonationAmount.create(subsession=player.subsession,Title=org.Title, Amount = amount_this_round, DonationOrganisation=org) else: ### Hier ggibt es keinen Amount pass # Perform your logic with selected_orgs_ids class Results(Page): @staticmethod def vars_for_template(player): pass class ResultsFinal(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player): return dict( table_data = prepare_data_For_final(player), total_payoff = player.participant.payoff_plus_participation_fee() ) def prepare_data_For_final(player: Player): # < !--{{Runde}} - {{Bemessungsgrundlage}} - {{Einkommen}} - - {{Geprüft}} --> runden = [] for p in player.in_all_rounds(): liste = [p.round_number, int(p.taxable_income), int(p.payoff_before_taxes), "Ja" if p.tax_checked else "Nein", int(p.payoff_after_taxes) ] runden.append(liste) return runden def custom_export(players): donation_sums = defaultdict(float) # Loop through all DonationAmount records and sum the amount for donation in DonationAmount.filter(): ## id ändern donation_sums[donation.Title] += donation.Amount # Yield the header row yield ['DonationID','DonationOrga', 'Amount'] # Yield the summed amounts for each organization index = 0 for org_title, amount_sum in donation_sums.items(): index+=1 yield [ index, org_title, amount_sum] yield [ index+1, 'Experimentator', generate_skipped_donations(players)] def generate_skipped_donations(players): amount_ = 0 for player in players: for player_round in player.in_all_rounds(): if player_round.no_donation_orga_selected: amount_ += int(player_round.taxes) return amount_ page_sequence = [Spiel, DonationOverview, ResultsWaitPage, Results, ResultsFinal] ''' Todo: Behnud: wir haben jetzt die IDs in dem Player gespeichert die er ausgewählt hat , was willst du damit tun, reicht dir das pro runde zur auswertung? '''