from otree.api import *
from tax_instr import set_timer_for_waiting, exclude_participants
from random import shuffle, randint, choices, choice
from collections import defaultdict
from markupsafe import Markup
from itertools import cycle
from typing import List
doc = """
Your app description
"""
class C(BaseConstants):
NAME_IN_URL = 'tax_game'
PLAYERS_PER_GROUP = 4
NUM_ROUNDS = 10 # 10
CUSTOM_POINTS_NAME = 'E'
# beliefs elicitation fields
ee_fields = [f'ee_{m + 1}' for m in range(PLAYERS_PER_GROUP - 1)]
ne_fields = [f'ne_{n + 1}' for n in range(PLAYERS_PER_GROUP - 1)]
ge_labels = dict(
double="Da 0 a 100, dove 0 significa “completamente improbabile” e 100 significa “assolutamente probabile”, quanto ritiene probabile che il governo raddoppi le tasse pagate prima di redistribuirle?",
keep="Da 0 a 100, dove 0 significa “completamente improbabile” e 100 significa “assolutamente probabile”, quanto ritiene probabile che il governo trattenga metà delle tasse pagate prima di redistribuire le restanti?"
)
# condition contribution fields
cc_fields = ['high_contr_high_belief', 'low_contr_high_belief', 'high_contr_low_belief', 'low_contr_low_belief']
cc_fields_labels = [
Markup(
'''La maggioranza dei membri del suo gruppo ha dichiarato tutto
il reddito e pensa che tutti debbano dichiararlo interamente.'''
),
Markup(
'''La maggioranza dei membri del suo gruppo ha dichiarato tutto
il reddito ma pensa che tutti debbano dichiararne meno della metà.'''
),
Markup(
'''La maggioranza dei membri del suo gruppo ha dichiarato meno della metà
del reddito ma pensa che tutti debbano dichiararlo interamente.'''
),
Markup(
'''La maggioranza dei membri del suo gruppo ha dichiarato meno della metà
del reddito e pensa che tutti debbano dichiararne meno della metà.'''
)
]
check_round = [7]
ge_rounds = [1, 10] # [1, 10]
cc_rounds = [1, 10] # [1, 10]
max_bonus_per_belief = 5
penalty_per_belief_error = 1
endowment = 200
tax_rate = 0.4
penalty_rate = 2
audit_probability = 0.05
# Treatments:
info = [False, True]
multipliers = dict(low=0.5, high=2)
timers = dict(
pne=60 * 2,
ee=60 * 2,
ne=60 * 2,
ge=60 * 2,
check=60 * 2,
income_declaration=60 * 2,
cc=60 * 2,
results=60 * 2
)
max_minutes_on_grouping_page = 10
class Subsession(BaseSubsession):
pass
class Group(BaseGroup):
has_dropout = models.BooleanField(initial=False)
info = models.BooleanField()
multiplier = models.FloatField()
tot = models.FloatField()
tot_with_multiplier = models.FloatField()
share = models.FloatField()
class Player(BasePlayer):
dropout = models.BooleanField(initial=False)
check = models.IntegerField(
label="Per favore inserisca il numero 7 qui sotto."
)
income = models.IntegerField(
label='',
# label=f'Scelga quanto reddito dichiarare inserendo un valore da 0 a {C.endowment}:',
min=0,
max=C.endowment
)
tax_paid = models.FloatField()
audited = models.BooleanField()
penalty = models.FloatField(initial=0)
pgg_payoff = models.FloatField()
pne = models.IntegerField(
label=f'Per favore risponda alla seguente domanda inserendo un valore da 0 a {C.endowment}.',
min=0, max=C.endowment
)
ge = models.IntegerField(
choices=[(n, f'{n * 10}') for n in range(0, 11)],
widget=widgets.RadioSelectHorizontal
)
for n, field in enumerate(C.ee_fields):
locals()[field] = models.IntegerField(label=f'Box {n + 1}', min=0, max=C.endowment)
del field
for n, field in enumerate(C.ne_fields):
locals()[field] = models.IntegerField(label=f'Box {n + 1}', min=0, max=C.endowment)
del field
for n, field in enumerate(C.cc_fields):
locals()[field] = models.IntegerField(label=C.cc_fields_labels[n], min=0, max=C.endowment)
del n, field
# FUNCTIONS
def creating_session(subsession: Subsession):
if subsession.round_number == 1:
const = C
session = subsession.session
session.round_to_pay = randint(1, const.NUM_ROUNDS)
def group_by_arrival_time_method(subsession: Subsession, waiting_players: List[Player]) -> List[Player]:
const = C
ps_per_group = const.PLAYERS_PER_GROUP
for p in waiting_players:
set_timer_for_waiting(p, 'waiting_group_start')
d = defaultdict(list)
for p in waiting_players:
participant = p.participant
t = f'{participant.info}_{participant.multiplier}'
g = d[t]
g.append(p)
if len(g) >= ps_per_group:
return g[:ps_per_group]
excluded = exclude_participants(waiting_players, const, 'waiting_group_start')
if excluded:
return excluded
def set_group_treatment(group: Group):
const = C
p = group.get_player_by_id(1).participant
info, multiplier = p.info, const.multipliers[p.multiplier]
for g in group.in_rounds(1, const.NUM_ROUNDS):
g.info, g.multiplier = info, multiplier
if not info:
ge_labels = cycle(list(const.ge_labels))
ps = group.get_players()
shuffle(ps)
for p in ps:
p.participant.ge_label = next(ge_labels)
def set_tax_paid(player: Player):
if player.field_maybe_none('income') is not None:
player.tax_paid = round(player.income * C.tax_rate, 2)
def set_penalty(group: Group, const: C):
endowment = const.endowment
tax_rate = const.tax_rate
penalty_rate = const.penalty_rate
for p in [player for player in group.get_players() if not player.participant.dropout]:
audited = choices([True, False], weights=[0.05, 0.95], k=1)[0]
p.audited = audited
tax_evaded = endowment - p.income
if audited and tax_evaded > 0:
p.penalty = tax_evaded * tax_rate * penalty_rate
def get_belief_accuracy(belief, decision):
return abs(decision - belief)
def get_ee_payoff(player: Player, incomes: List[int], const: C):
payoff = 0
incomes = sorted(incomes, reverse=True)
for n, o in enumerate(incomes):
accuracy = get_belief_accuracy(getattr(player, f'ee_{n + 1}'), o)
this_payoff = const.max_bonus_per_belief - accuracy
if this_payoff > 0:
payoff += this_payoff
return payoff
def get_ne_payoff(player: Player, norms: list, const: C):
payoff = 0
norms = sorted(norms, reverse=True)
for n, o in enumerate(norms):
accuracy = get_belief_accuracy(getattr(player, f'ne_{n + 1}'), o)
this_payoff = const.max_bonus_per_belief - accuracy
if this_payoff > 0:
payoff += this_payoff
return payoff
def get_cc_payoff(player: Player, high_contr: bool, high_belief: bool):
if high_contr:
return player.high_contr_high_belief if high_belief else player.high_contr_low_belief
else:
return player.low_contr_high_belief if high_belief else player.low_contr_low_belief
def set_cc_payoffs(player: Player, const: C, round_to_pay: int):
if round_to_pay in const.cc_rounds:
min_high_contr = const.endowment
participant = player.participant
player = player.in_round(round_to_pay)
others = player.get_others_in_group()
num_others = len(others)
high_contr = len([o.income for o in others if o.income == min_high_contr]) > num_others / 2
high_belief = len([o.pne for o in others if o.pne == min_high_contr]) > num_others / 2
participant.cc_payoff = get_cc_payoff(player, high_contr, high_belief)
def set_beliefs_payoff(group: Group, round_to_pay: int, const: C):
for p in group.in_round(round_to_pay).get_players():
participant = p.participant
others = p.get_others_in_group()
participant.ee_payoff = get_ee_payoff(p, [o.income for o in others], const)
participant.ne_payoff = get_ne_payoff(p, [o.pne for o in others], const)
def set_payoffs(group: Group, const: C):
ps = group.get_players()
endowment = const.endowment
taxes = [p.tax_paid for p in ps if p.field_maybe_none('tax_paid') is not None]
tot = sum(taxes)
share = round(tot * group.multiplier / len(taxes), 2)
group.tot = tot
group.share = share
for p in ps:
if not p.participant.dropout:
p.pgg_payoff = round(endowment - p.tax_paid + share, 2)
if group.round_number == const.NUM_ROUNDS:
set_svo_payoffs(ps)
round_to_pay = group.session.round_to_pay
set_participants_payoffs(ps, group, const, round_to_pay)
def set_svo_payoffs(players: List[Player]):
shuffle(players)
pairs = list(zip(*[iter(players)] * 2))
for pair in pairs:
selected = choice(pair)
selected.participant.svo_ego_selected = True
question_selected = selected.participant.vars['svo_temp_question_selected']
for n, p in enumerate(pair):
participant = p.participant
participant.svo_question_selected = question_selected['num']
participant.svo_partner = pair[n - 1].participant.id_in_session
participant.svo_payoff = question_selected['ego_payoff'] \
if p == selected else question_selected['alter_payoff']
def set_participants_payoffs(ps: List[Player], group: Group, const: C, round_to_pay=0):
set_beliefs_payoff(group, round_to_pay, const)
endowment = const.endowment
for p in ps:
participant = p.participant
player_to_pay = p.in_round(round_to_pay)
audited = player_to_pay.audited
penalty = player_to_pay.penalty
participant.audited = audited
participant.tax_game_payoff = player_to_pay.pgg_payoff - penalty
participant.vars['income'] = player_to_pay.income
participant.vars['tax_evaded'] = endowment - player_to_pay.income
participant.vars['penalty'] = penalty
set_cc_payoffs(p, const, round_to_pay)
participant.ee_ne_cc_payoff = participant.ee_payoff + participant.ne_payoff + participant.cc_payoff
def set_error_if_not_descending(values: dict) -> str:
values = list(values.values())
if sorted(values, reverse=True) != values:
return "Inserisca i valori in ordine decrescente (dal più grande al più piccolo)."
# PAGES
class WaitingLastPart(WaitPage):
group_by_arrival_time = True
@staticmethod
def after_all_players_arrive(group: Group):
return set_group_treatment(group)
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def app_after_this_page(player: Player, upcoming_apps):
if player.participant.excluded:
return upcoming_apps[-1]
class TaskPage(Page):
@staticmethod
def vars_for_template(player: Player):
const = C
return dict(
tax_rate=f'{const.tax_rate:.0%}',
audit_probability=f'{const.audit_probability:.0%}'
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
if timeout_happened:
player.dropout = True
player.participant.dropout = True
player.group.has_dropout = True
@staticmethod
def app_after_this_page(player: Player, upcoming_apps):
participant = player.participant
if participant.dropout:
return upcoming_apps[-1]
class PNE(TaskPage):
form_model = 'player'
form_fields = ['pne']
timeout_seconds = C.timers['pne']
@staticmethod
def js_vars(player: Player):
return dict(
currency=C.CUSTOM_POINTS_NAME,
fields=['pne'],
timeout_seconds=C.timers['pne']
)
class EE(TaskPage):
form_model = 'player'
form_fields = C.ee_fields
timeout_seconds = C.timers['ee']
@staticmethod
def vars_for_template(player: Player):
const = C
_vars = super(EE, EE).vars_for_template(player)
return dict(
**_vars,
max_beliefs_payoff=const.max_bonus_per_belief * (const.PLAYERS_PER_GROUP - 1)
)
@staticmethod
def js_vars(player: Player):
const = C
return dict(
currency=const.CUSTOM_POINTS_NAME,
fields=const.ee_fields,
timeout_seconds=C.timers['ee']
)
@staticmethod
def error_message(player: Player, values):
return set_error_if_not_descending(values)
class NE(TaskPage):
form_model = 'player'
form_fields = C.ne_fields
timeout_seconds = C.timers['ne']
@staticmethod
def vars_for_template(player: Player):
const = C
_vars = super(EE, EE).vars_for_template(player)
return dict(
**_vars,
max_beliefs_payoff=const.max_bonus_per_belief * (const.PLAYERS_PER_GROUP - 1)
)
@staticmethod
def js_vars(player: Player):
return dict(
currency=C.CUSTOM_POINTS_NAME,
fields=C.ne_fields,
timeout_seconds=C.timers['ne']
)
@staticmethod
def error_message(player: Player, values):
return set_error_if_not_descending(values)
class GE(TaskPage):
form_model = 'player'
form_fields = ['ge']
timeout_seconds = C.timers['ge']
@staticmethod
def vars_for_template(player: Player):
_vars = super(EE, EE).vars_for_template(player)
return dict(
**_vars,
label=C.ge_labels[player.participant.ge_label]
)
@staticmethod
def js_vars(player: Player):
return dict(
timeout_seconds=C.timers['ge']
)
@staticmethod
def is_displayed(player: Player):
return player.round_number in C.ge_rounds and not player.participant.info
class Check(TaskPage):
form_model = 'player'
form_fields = ['check']
timeout_seconds = C.timers['check']
# @staticmethod
# def vars_for_template(player: Player):
# _vars = super(EE, EE).vars_for_template(player)
# return dict(
# **_vars,
# label=C.ge_labels[player.participant.ge_label]
# )
@staticmethod
def js_vars(player: Player):
return dict(
timeout_seconds=C.timers['check']
)
@staticmethod
def is_displayed(player: Player):
return player.round_number in C.check_round
class IncomeDeclaration(TaskPage):
form_model = 'player'
form_fields = ['income']
timeout_seconds = C.timers['income_declaration']
@staticmethod
def js_vars(player: Player):
const = C
return dict(
currency=const.CUSTOM_POINTS_NAME,
fields=['income'],
tax_rate=const.tax_rate,
timeout_seconds=C.timers['income_declaration']
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
super(IncomeDeclaration, IncomeDeclaration).before_next_page(player, timeout_happened)
if not timeout_happened:
set_tax_paid(player)
class CC(TaskPage):
form_model = 'player'
form_fields = C.cc_fields
timeout_seconds = C.timers['cc']
@staticmethod
def js_vars(player: Player):
return dict(
currency=C.CUSTOM_POINTS_NAME,
fields=C.cc_fields,
timeout_seconds=C.timers['cc']
)
@staticmethod
def is_displayed(player: Player):
return player.round_number in C.cc_rounds
class ResultsWaitPage(WaitPage):
@staticmethod
def after_all_players_arrive(group: Group):
const = C
set_penalty(group, const)
set_payoffs(group, const)
@staticmethod
def app_after_this_page(player: Player, upcoming_apps):
participant = player.participant
if participant.dropout or participant.excluded:
return upcoming_apps[-1]
class Results(TaskPage):
timeout_seconds = C.timers['results']
@staticmethod
def js_vars(player: Player):
return dict(
timeout_seconds=C.timers['results']
)
page_sequence = [
WaitingLastPart,
GE,
Check,
PNE,
EE,
NE,
IncomeDeclaration,
CC,
ResultsWaitPage,
Results
]