from otree.api import * from tax_instr import set_timer_for_waiting, exclude_participants from random import shuffle, randint, choices, choice from collections import defaultdict from markupsafe import Markup from itertools import cycle from typing import List doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'tax_game' PLAYERS_PER_GROUP = 4 NUM_ROUNDS = 10 # 10 CUSTOM_POINTS_NAME = 'E' # beliefs elicitation fields ee_fields = [f'ee_{m + 1}' for m in range(PLAYERS_PER_GROUP - 1)] ne_fields = [f'ne_{n + 1}' for n in range(PLAYERS_PER_GROUP - 1)] ge_labels = dict( double="Da 0 a 100, dove 0 significa “completamente improbabile” e 100 significa “assolutamente probabile”, quanto ritiene probabile che il governo raddoppi le tasse pagate prima di redistribuirle?", keep="Da 0 a 100, dove 0 significa “completamente improbabile” e 100 significa “assolutamente probabile”, quanto ritiene probabile che il governo trattenga metà delle tasse pagate prima di redistribuire le restanti?" ) # condition contribution fields cc_fields = ['high_contr_high_belief', 'low_contr_high_belief', 'high_contr_low_belief', 'low_contr_low_belief'] cc_fields_labels = [ Markup( '''La maggioranza dei membri del suo gruppo ha dichiarato tutto il reddito e pensa che tutti debbano dichiararlo interamente.''' ), Markup( '''La maggioranza dei membri del suo gruppo ha dichiarato tutto il reddito ma pensa che tutti debbano dichiararne meno della metà.''' ), Markup( '''La maggioranza dei membri del suo gruppo ha dichiarato meno della metà del reddito ma pensa che tutti debbano dichiararlo interamente.''' ), Markup( '''La maggioranza dei membri del suo gruppo ha dichiarato meno della metà del reddito e pensa che tutti debbano dichiararne meno della metà.''' ) ] check_round = [7] ge_rounds = [1, 10] # [1, 10] cc_rounds = [1, 10] # [1, 10] max_bonus_per_belief = 5 penalty_per_belief_error = 1 endowment = 200 tax_rate = 0.4 penalty_rate = 2 audit_probability = 0.05 # Treatments: info = [False, True] multipliers = dict(low=0.5, high=2) timers = dict( pne=60 * 2, ee=60 * 2, ne=60 * 2, ge=60 * 2, check=60 * 2, income_declaration=60 * 2, cc=60 * 2, results=60 * 2 ) max_minutes_on_grouping_page = 10 class Subsession(BaseSubsession): pass class Group(BaseGroup): has_dropout = models.BooleanField(initial=False) info = models.BooleanField() multiplier = models.FloatField() tot = models.FloatField() tot_with_multiplier = models.FloatField() share = models.FloatField() class Player(BasePlayer): dropout = models.BooleanField(initial=False) check = models.IntegerField( label="Per favore inserisca il numero 7 qui sotto." ) income = models.IntegerField( label='', # label=f'Scelga quanto reddito dichiarare inserendo un valore da 0 a {C.endowment}:', min=0, max=C.endowment ) tax_paid = models.FloatField() audited = models.BooleanField() penalty = models.FloatField(initial=0) pgg_payoff = models.FloatField() pne = models.IntegerField( label=f'Per favore risponda alla seguente domanda inserendo un valore da 0 a {C.endowment}.', min=0, max=C.endowment ) ge = models.IntegerField( choices=[(n, f'{n * 10}') for n in range(0, 11)], widget=widgets.RadioSelectHorizontal ) for n, field in enumerate(C.ee_fields): locals()[field] = models.IntegerField(label=f'Box {n + 1}', min=0, max=C.endowment) del field for n, field in enumerate(C.ne_fields): locals()[field] = models.IntegerField(label=f'Box {n + 1}', min=0, max=C.endowment) del field for n, field in enumerate(C.cc_fields): locals()[field] = models.IntegerField(label=C.cc_fields_labels[n], min=0, max=C.endowment) del n, field # FUNCTIONS def creating_session(subsession: Subsession): if subsession.round_number == 1: const = C session = subsession.session session.round_to_pay = randint(1, const.NUM_ROUNDS) def group_by_arrival_time_method(subsession: Subsession, waiting_players: List[Player]) -> List[Player]: const = C ps_per_group = const.PLAYERS_PER_GROUP for p in waiting_players: set_timer_for_waiting(p, 'waiting_group_start') d = defaultdict(list) for p in waiting_players: participant = p.participant t = f'{participant.info}_{participant.multiplier}' g = d[t] g.append(p) if len(g) >= ps_per_group: return g[:ps_per_group] excluded = exclude_participants(waiting_players, const, 'waiting_group_start') if excluded: return excluded def set_group_treatment(group: Group): const = C p = group.get_player_by_id(1).participant info, multiplier = p.info, const.multipliers[p.multiplier] for g in group.in_rounds(1, const.NUM_ROUNDS): g.info, g.multiplier = info, multiplier if not info: ge_labels = cycle(list(const.ge_labels)) ps = group.get_players() shuffle(ps) for p in ps: p.participant.ge_label = next(ge_labels) def set_tax_paid(player: Player): if player.field_maybe_none('income') is not None: player.tax_paid = round(player.income * C.tax_rate, 2) def set_penalty(group: Group, const: C): endowment = const.endowment tax_rate = const.tax_rate penalty_rate = const.penalty_rate for p in [player for player in group.get_players() if not player.participant.dropout]: audited = choices([True, False], weights=[0.05, 0.95], k=1)[0] p.audited = audited tax_evaded = endowment - p.income if audited and tax_evaded > 0: p.penalty = tax_evaded * tax_rate * penalty_rate def get_belief_accuracy(belief, decision): return abs(decision - belief) def get_ee_payoff(player: Player, incomes: List[int], const: C): payoff = 0 incomes = sorted(incomes, reverse=True) for n, o in enumerate(incomes): accuracy = get_belief_accuracy(getattr(player, f'ee_{n + 1}'), o) this_payoff = const.max_bonus_per_belief - accuracy if this_payoff > 0: payoff += this_payoff return payoff def get_ne_payoff(player: Player, norms: list, const: C): payoff = 0 norms = sorted(norms, reverse=True) for n, o in enumerate(norms): accuracy = get_belief_accuracy(getattr(player, f'ne_{n + 1}'), o) this_payoff = const.max_bonus_per_belief - accuracy if this_payoff > 0: payoff += this_payoff return payoff def get_cc_payoff(player: Player, high_contr: bool, high_belief: bool): if high_contr: return player.high_contr_high_belief if high_belief else player.high_contr_low_belief else: return player.low_contr_high_belief if high_belief else player.low_contr_low_belief def set_cc_payoffs(player: Player, const: C, round_to_pay: int): if round_to_pay in const.cc_rounds: min_high_contr = const.endowment participant = player.participant player = player.in_round(round_to_pay) others = player.get_others_in_group() num_others = len(others) high_contr = len([o.income for o in others if o.income == min_high_contr]) > num_others / 2 high_belief = len([o.pne for o in others if o.pne == min_high_contr]) > num_others / 2 participant.cc_payoff = get_cc_payoff(player, high_contr, high_belief) def set_beliefs_payoff(group: Group, round_to_pay: int, const: C): for p in group.in_round(round_to_pay).get_players(): participant = p.participant others = p.get_others_in_group() participant.ee_payoff = get_ee_payoff(p, [o.income for o in others], const) participant.ne_payoff = get_ne_payoff(p, [o.pne for o in others], const) def set_payoffs(group: Group, const: C): ps = group.get_players() endowment = const.endowment taxes = [p.tax_paid for p in ps if p.field_maybe_none('tax_paid') is not None] tot = sum(taxes) share = round(tot * group.multiplier / len(taxes), 2) group.tot = tot group.share = share for p in ps: if not p.participant.dropout: p.pgg_payoff = round(endowment - p.tax_paid + share, 2) if group.round_number == const.NUM_ROUNDS: set_svo_payoffs(ps) round_to_pay = group.session.round_to_pay set_participants_payoffs(ps, group, const, round_to_pay) def set_svo_payoffs(players: List[Player]): shuffle(players) pairs = list(zip(*[iter(players)] * 2)) for pair in pairs: selected = choice(pair) selected.participant.svo_ego_selected = True question_selected = selected.participant.vars['svo_temp_question_selected'] for n, p in enumerate(pair): participant = p.participant participant.svo_question_selected = question_selected['num'] participant.svo_partner = pair[n - 1].participant.id_in_session participant.svo_payoff = question_selected['ego_payoff'] \ if p == selected else question_selected['alter_payoff'] def set_participants_payoffs(ps: List[Player], group: Group, const: C, round_to_pay=0): set_beliefs_payoff(group, round_to_pay, const) endowment = const.endowment for p in ps: participant = p.participant player_to_pay = p.in_round(round_to_pay) audited = player_to_pay.audited penalty = player_to_pay.penalty participant.audited = audited participant.tax_game_payoff = player_to_pay.pgg_payoff - penalty participant.vars['income'] = player_to_pay.income participant.vars['tax_evaded'] = endowment - player_to_pay.income participant.vars['penalty'] = penalty set_cc_payoffs(p, const, round_to_pay) participant.ee_ne_cc_payoff = participant.ee_payoff + participant.ne_payoff + participant.cc_payoff def set_error_if_not_descending(values: dict) -> str: values = list(values.values()) if sorted(values, reverse=True) != values: return "Inserisca i valori in ordine decrescente (dal più grande al più piccolo)." # PAGES class WaitingLastPart(WaitPage): group_by_arrival_time = True @staticmethod def after_all_players_arrive(group: Group): return set_group_treatment(group) @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def app_after_this_page(player: Player, upcoming_apps): if player.participant.excluded: return upcoming_apps[-1] class TaskPage(Page): @staticmethod def vars_for_template(player: Player): const = C return dict( tax_rate=f'{const.tax_rate:.0%}', audit_probability=f'{const.audit_probability:.0%}' ) @staticmethod def before_next_page(player: Player, timeout_happened): if timeout_happened: player.dropout = True player.participant.dropout = True player.group.has_dropout = True @staticmethod def app_after_this_page(player: Player, upcoming_apps): participant = player.participant if participant.dropout: return upcoming_apps[-1] class PNE(TaskPage): form_model = 'player' form_fields = ['pne'] timeout_seconds = C.timers['pne'] @staticmethod def js_vars(player: Player): return dict( currency=C.CUSTOM_POINTS_NAME, fields=['pne'], timeout_seconds=C.timers['pne'] ) class EE(TaskPage): form_model = 'player' form_fields = C.ee_fields timeout_seconds = C.timers['ee'] @staticmethod def vars_for_template(player: Player): const = C _vars = super(EE, EE).vars_for_template(player) return dict( **_vars, max_beliefs_payoff=const.max_bonus_per_belief * (const.PLAYERS_PER_GROUP - 1) ) @staticmethod def js_vars(player: Player): const = C return dict( currency=const.CUSTOM_POINTS_NAME, fields=const.ee_fields, timeout_seconds=C.timers['ee'] ) @staticmethod def error_message(player: Player, values): return set_error_if_not_descending(values) class NE(TaskPage): form_model = 'player' form_fields = C.ne_fields timeout_seconds = C.timers['ne'] @staticmethod def vars_for_template(player: Player): const = C _vars = super(EE, EE).vars_for_template(player) return dict( **_vars, max_beliefs_payoff=const.max_bonus_per_belief * (const.PLAYERS_PER_GROUP - 1) ) @staticmethod def js_vars(player: Player): return dict( currency=C.CUSTOM_POINTS_NAME, fields=C.ne_fields, timeout_seconds=C.timers['ne'] ) @staticmethod def error_message(player: Player, values): return set_error_if_not_descending(values) class GE(TaskPage): form_model = 'player' form_fields = ['ge'] timeout_seconds = C.timers['ge'] @staticmethod def vars_for_template(player: Player): _vars = super(EE, EE).vars_for_template(player) return dict( **_vars, label=C.ge_labels[player.participant.ge_label] ) @staticmethod def js_vars(player: Player): return dict( timeout_seconds=C.timers['ge'] ) @staticmethod def is_displayed(player: Player): return player.round_number in C.ge_rounds and not player.participant.info class Check(TaskPage): form_model = 'player' form_fields = ['check'] timeout_seconds = C.timers['check'] # @staticmethod # def vars_for_template(player: Player): # _vars = super(EE, EE).vars_for_template(player) # return dict( # **_vars, # label=C.ge_labels[player.participant.ge_label] # ) @staticmethod def js_vars(player: Player): return dict( timeout_seconds=C.timers['check'] ) @staticmethod def is_displayed(player: Player): return player.round_number in C.check_round class IncomeDeclaration(TaskPage): form_model = 'player' form_fields = ['income'] timeout_seconds = C.timers['income_declaration'] @staticmethod def js_vars(player: Player): const = C return dict( currency=const.CUSTOM_POINTS_NAME, fields=['income'], tax_rate=const.tax_rate, timeout_seconds=C.timers['income_declaration'] ) @staticmethod def before_next_page(player: Player, timeout_happened): super(IncomeDeclaration, IncomeDeclaration).before_next_page(player, timeout_happened) if not timeout_happened: set_tax_paid(player) class CC(TaskPage): form_model = 'player' form_fields = C.cc_fields timeout_seconds = C.timers['cc'] @staticmethod def js_vars(player: Player): return dict( currency=C.CUSTOM_POINTS_NAME, fields=C.cc_fields, timeout_seconds=C.timers['cc'] ) @staticmethod def is_displayed(player: Player): return player.round_number in C.cc_rounds class ResultsWaitPage(WaitPage): @staticmethod def after_all_players_arrive(group: Group): const = C set_penalty(group, const) set_payoffs(group, const) @staticmethod def app_after_this_page(player: Player, upcoming_apps): participant = player.participant if participant.dropout or participant.excluded: return upcoming_apps[-1] class Results(TaskPage): timeout_seconds = C.timers['results'] @staticmethod def js_vars(player: Player): return dict( timeout_seconds=C.timers['results'] ) page_sequence = [ WaitingLastPart, GE, Check, PNE, EE, NE, IncomeDeclaration, CC, ResultsWaitPage, Results ]