from otree.api import ( models, widgets, BaseConstants, BaseSubsession, BaseGroup, BasePlayer, Currency as c, currency_range, safe_json ) import itertools import random import numpy as np import math author = 'Thomas Graeber' #Revised and expanded by Tsahi Halyo doc = """ Assign treatment status. """ def divisors(n, setting): if setting == 1: divs = [1,n] else: divs = [] for i in range(2,int(math.sqrt(n))+1): if n%i == 0: divs.extend([i,n/i]) return list(divs) class Constants(BaseConstants): """Contains constants of the current experiment app.""" name_in_url = 'beliefs_treatment' players_per_group = None task_settings = { 'baseline': { 'n': [1, 3, 5], 'priors': [0.01, 0.05, 0.10, 0.30, 0.50, 0.70, 0.90, 0.95, 0.99], 'discriminabilities': [0.65, 0.75, 0.90], }, 'weights': { 'n': [0.50,0.4,0.1], 'priors': [0.05, 0.05, 0.05, 0.2, 0.3, 0.2, 0.05, 0.05, 0.05], 'discriminabilities': [0.5, 0.25, 0.25], } } baseline_rounds = 6 rounds_specs = { 'baseline': [baseline_rounds, 0], 'low_default': [6, 0] } num_inconsis = 0 treatment_conditions = ['baseline', 'low_default'] treatment_weights = [1, 0] combinations_grether = len(task_settings['baseline']['n']) * len(task_settings['baseline']['priors']) * len(task_settings['baseline']['discriminabilities']) #combinations_rocl = len(task_settings['rocl']['n']) * len(task_settings['rocl']['priors']) * len(task_settings['rocl']['discriminabilities']) num_rounds = sum(rounds_specs['baseline'])+num_inconsis class Subsession(BaseSubsession): """Contains subsession-level objects of the current experiment app.""" def creating_session(self): global lt, num_players num_players = len(self.get_players()) lt = list(range(0,num_players)) random.shuffle(lt) if self.round_number == 1: for p in self.get_players(): p.participant.vars['load_num_correct'] = 0 p.participant.vars['failed_comprehension'] = False # p.participant.vars['condition'] = random.choices(Constants.treatment_conditions, k=1, weights=Constants.treatment_weights)[0] # grether_sequence = random.sample(range(0, Constants.combinations_grether), Constants.rounds_specs[p.participant.vars['condition']][0]) # rocl_sequence = random.sample(range(Constants.combinations_grether, Constants.combinations_grether), Constants.rounds_specs[p.participant.vars['condition']][1]) # incon_sequence = random.sample(grether_sequence, Constants.num_inconsis) # p.participant.vars['bu_sequence'] = random.sample(grether_sequence+incon_sequence, len(grether_sequence+incon_sequence)) p.participant.vars['paying_round'] = random.randint(1, Constants.num_rounds) p.participant.vars['disqualified'] = False # #Set treatment # tmp = p.participant.id_in_session - 1 # if lt[tmp] < num_players/2: # p.participant.vars['complex'] = True # else: # p.participant.vars['complex'] = False # for p in self.get_players(): # p.treatment_condition = p.participant.vars['condition'] # if p.participant.vars['condition'] == 'low_default': # p.default_condition = 'ten' # else: # p.default_condition = 'two' # if p.participant.vars['complex']: # p.is_complex = True # p.task_type = 'complex' # else: # p.is_complex = False # p.task_type = 'baseline' # #Set Task Number # p.task_number = p.participant.vars['bu_sequence'][self.round_number-1] # task_list = [] # for t in ['baseline', 'rocl']: # l = itertools.product( # Constants.task_settings[t]['n'], # Constants.task_settings[t]['priors'], # Constants.task_settings[t]['discriminabilities'] # ) # task_list.extend(l) # tup = task_list[p.task_number] # if p.task_number >= Constants.combinations_grether: # p.task_type='rocl' # p.is_rocl = (p.task_type == 'rocl') # p.prior = tup[1]/100 # p.diagnosticity = tup[2]/100 # p.sample_size = tup[0] # if p.is_rocl: # p.rocl_probability = random.randint(p.diagnosticity*100 - 10, p.diagnosticity*100 + 10)/100 # p.blackish_urn_drawn = (random.random() < p.prior) # if p.blackish_urn_drawn: # if not p.is_rocl: # p_black = p.diagnosticity # else: # p_black = p.rocl_probability # else: # if not p.is_rocl: # p_black = 1 - p.diagnosticity # else: # p_black = 1- p.rocl_probability # p.number_blacks = int(round(np.sum(np.random.choice([1, 0], p.sample_size, [p_black, (1-p_black)])), 0)) # p.number_whites = p.sample_size-p.number_blacks # p.bayesian_posterior = 1 / (1 + # (p.diagnosticity**p.number_whites * (1-p.diagnosticity)**p.number_blacks * (1-p.prior))/ # (p.diagnosticity**p.number_blacks * (1-p.diagnosticity)**p.number_whites * (p.prior)) # ) # setattr(p, 'task_identifier', 'ball_urns_p{}_d{}_n{}_b{}_{}_rocl{}'.format( # tup[1], # tup[2], # tup[0], # p.number_blacks, # p.task_type, # p.is_rocl, # )) for p in self.get_players(): if p.round_number == p.participant.vars['paying_round']: p.on_paying_round = True class Group(BaseGroup): """Contains group-level objects of the current experiment app.""" pass class Player(BasePlayer): # condition = models.StringField() treat_group = models.BooleanField() task_identifier = models.StringField() task_number = models.IntegerField() task_type = models.StringField() task_type_beta = models.IntegerField(choices=[[1, 'Load'],[2, 'Complex Probability'], [3, 'Baseline']]) on_paying_round = models.BooleanField(initial=False) random_payoff = models.CurrencyField() expected_payoff = models.CurrencyField() squared_deviation = models.IntegerField() prior = models.FloatField() bayesian_posterior = models.FloatField() rocl_probability = models.FloatField() sample_size = models.IntegerField() number_blacks = models.IntegerField() number_whites = models.IntegerField() diagnosticity = models.FloatField() blackish_urn_drawn = models.BooleanField() treatment_condition = models.StringField() default_condition = models.StringField() is_rocl = models.BooleanField() is_complex = models.BooleanField() is_load = models.BooleanField() complex_prob = models.StringField() complex_diag = models.StringField() #Timer timed_out = models.BooleanField(initial = False, blank = True) #Time Spent with Page Open time_on_p = models.IntegerField(initial = 0, blank = True) #Time from opening page to beginning next page time_before_next_p = models.IntegerField(initial = 0, blank = True) #Did they close/refresh/redirect from the page closed_p = models.IntegerField(initial=0, blank=True) load_sum = models.IntegerField(initial = 0, blank = True) load_correct = models.BooleanField() load_guess = models.IntegerField() load_num_correct = models.IntegerField(initial = 0, blank=True) # previous_guess = models.FloatField() failed_comprehension = models.BooleanField(initial=False) replaced_by_machine = models.BooleanField(initial=False) wtp_draw = models.FloatField() qn_draws_got_wrong = models.BooleanField(initial=False) qn_alloc_got_wrong = models.BooleanField(initial=False) qn_incent_got_wrong = models.BooleanField(initial=False) qn_machine1_got_wrong = models.BooleanField(initial=False) qn_machine2_got_wrong = models.BooleanField(initial=False) qn_timer = models.BooleanField(initial=False) qn_bonus = models.BooleanField(initial=False, blank=True) qn_draws = models.IntegerField( choices=[ [0, 'The number of "bag A" cards is the same in all tasks.'], [1, 'The exact number of cards corresponding to each bag may vary across tasks.'], ], widget=widgets.RadioSelect, blank=False, label='Which statement about the number of cards corresponding to each bag is correct?' ) qn_alloc = models.IntegerField( choices=[ [1, 'The exact fractions of red and blue balls in each bag may vary across tasks.'], [0, 'The fraction of red balls in each bag is the same in all tasks, and bag A always contains the most red balls.'], ], widget=widgets.RadioSelect, blank=False, label="Which statement about the allocation of red and blue balls in the bags is correct?" ) qn_select = models.IntegerField( choices=[ [1, 'The computer draws 1 card from a deck of 100 cards at random. Each card has the label of one of the bags written on it, ' 'e.g. "bag A". The bag corresponding to the label on drawn card gets selected.'], [0, 'The computer selects a bag at random, but this does not depend on the cards.'] ], widget=widgets.RadioSelect, blank=False, label="Which statement about how the bag is selected is correct?" ) qn_incent = models.IntegerField( choices=[ [0, 'The probability of receiving the prize is independent of how good my guess was ' 'and which bag was actually selected.'], [1, 'If I get selected for payment and this part of the study counts, then one of the tasks' ' is randomly picked by the computer. The higher the probability that I assigned to the actually' ' drawn bag in that task, the higher the probability that I receive the prize.'] ], widget=widgets.RadioSelect, blank=False, label="Which statement about your bonus payment is correct?" ) wtp = models.FloatField() confidence = models.FloatField() confidence_upper_bound = models.FloatField() confidence_lower_bound = models.FloatField() answer = models.FloatField( min=0, max=100, blank=False, ) answer_b = models.FloatField( min=0, max=100, blank=False, ) def create_task(self): self.prior = random.choices(Constants.task_settings['baseline']['priors'], weights = Constants.task_settings['weights']['priors'])[0] print(self.prior) self.diagnosticity = random.choices(Constants.task_settings['baseline']['discriminabilities'], weights = Constants.task_settings['weights']['discriminabilities'])[0] self.sample_size = random.choices(Constants.task_settings['baseline']['n'], weights = Constants.task_settings['weights']['n'])[0] self.task_type = "baseline" self.blackish_urn_drawn = (random.random() < self.prior) if self.blackish_urn_drawn: p_black = self.diagnosticity else: p_black = 1 - self.diagnosticity self.number_blacks = int(round(np.sum(np.random.choice([1, 0], self.sample_size, [p_black, (1-p_black)])), 0)) self.number_whites = self.sample_size-self.number_blacks if self.round_number == Constants.num_rounds - 1: self.prior = self.in_round(1).prior self.diagnosticity = self.in_round(1).diagnosticity self.sample_size = self.in_round(1).sample_size self.number_blacks = self.in_round(1).number_blacks self.number_whites = self.in_round(1).number_whites if self.round_number == Constants.num_rounds: self.prior = self.in_round(2).prior self.diagnosticity = self.in_round(2).diagnosticity self.sample_size = self.in_round(2).sample_size self.number_blacks = self.in_round(2).number_blacks self.number_whites = self.in_round(2).number_whites setattr(self, 'task_identifier', 'ball_urns_p{}_d{}_n{}_b{}_{}'.format( self.prior, self.diagnosticity, self.sample_size, self.number_blacks, self.task_type, )) def set_treatment_beta(self): self.is_complex = False self.is_load = False if self.task_type_beta == 1: self.is_complex = False self.is_load = True elif self.task_type_beta == 2: self.is_complex = True self.is_load = False def set_treatment(self): self.is_rocl = False self.is_complex = False self.is_load = False if self.participant.vars['treat'] == 1: if self.round_number == 1: r = random.random() if r <= 1 or True: self.participant.vars['task_type'] = 'cprob' else: self.participant.vars['task_type'] = 'load' if self.participant.vars['task_type'] == 'cprob': self.is_complex = True self.task_type = 'complex' if self.participant.vars['task_type'] == 'load': self.is_load = True self.task_type = 'load' def specification(self): print("Specification") if self.is_complex: self.set_complex_rate() self.set_complex_diag() return { 'n_chips': 100, 'a_chips': self.complex_prob, 'b_chips': '100 - ' + '('+self.complex_prob+')', # 'a_chips': int(round(self.prior*100, 0)), # 'b_chips': int(100-round(self.prior*100, 0)), 'red_chips_a': self.complex_diag, 'blue_chips_a': '100 - ' + '('+self.complex_diag+')', 'n_sample': self.sample_size, 'red_sample': int(round(self.number_blacks,0)), 'blue_sample': int(round(self.number_whites, 0)), 'prior': int(self.prior*100), 'task_type': self.task_type, } else: return { 'n_chips': 100, 'a_chips': int(round(self.prior*100, 0)), 'b_chips': int(100-round(self.prior*100, 0)), 'red_chips_a': int(round(self.diagnosticity*100, 0)), 'blue_chips_a': int(100-round(self.diagnosticity*100, 0)), 'n_sample': self.sample_size, 'red_sample': int(round(self.number_blacks,0)), 'blue_sample': int(round(self.number_whites, 0)), 'prior': int(self.prior*100), 'task_type': self.task_type, } def resolve_machine(self): self.wtp_draw = round(random.uniform(0, self.session.config['endowment']), 2) if self.wtp > self.wtp_draw and random.random() < self.session.config['machine_probability']/100: self.replaced_by_machine = True def other_cards(self): bag_a = self.specification()['a_chips'] return [int((100-bag_a)/9), int((100-bag_a)/9 + (100-bag_a) % 9)] def set_payoffs(self): if self.blackish_urn_drawn: p_win_prize = 1 - 0.0001*(self.answer-1)**2 else: p_win_prize = 1 - 0.0001*(self.answer)**2 won_prize = random.random() < p_win_prize if self.on_paying_round: possible_payoff = 0 if won_prize: possible_payoff += self.session.config['prize'] if self.is_load and self.load_correct == 1: possible_payoff += 2 if 'possible_payoffs' in self.participant.vars: self.participant.vars['possible_payoffs'].append(round(possible_payoff, 2)) else: self.participant.vars['possible_payoffs'] = [round(possible_payoff, 2)] def set_complex_rate(self): b = random.choice([2*x for x in range(5,10)]) a = random.randint(4,9) c = int(random.choice([x for x in divisors(a*b, 0) if x < 10])) d = int((a*b)/c - self.prior*100) if d>= 0: self.complex_prob = '(' + str(a) + ' x ' + str(b) + ') / ' + str(c) + ' - ' + str(d) else: self.complex_prob = '(' + str(a) + ' x ' + str(b) + ') / ' + str(c) + ' + ' + str(abs(d)) #We want complex_prob = q * d + r # if self.prior >= 0.5: # d = random.randint(2,13) # q = int((self.prior*100) // d) # r = int((self.prior*100) % d) # self.complex_prob = '(' + str(q) + ' x ' + str(d) + ')' + ' + ' + str(r) # else: # tmpn = random.randint(30,50) # d = random.randint(6,13) # q = int(tmpn // d) # r1 = tmpn % d # r2 = self.prior*100 - tmpn # r = int(r1 + r2) # if r > 0: # self.complex_prob = '(' + str(q) + ' x ' + str(d) + ')' + ' + ' + str(r) # else: # self.complex_prob = '(' + str(q) + ' x ' + str(d) + ')' + ' - ' + str(abs(r)) def set_complex_diag(self): b = random.choice([2*x for x in range(5,10)]) a = random.randint(4,9) c = int(random.choice([x for x in divisors(a*b, 0) if x < 10])) d = int((a*b)/c - self.diagnosticity*100) if d>= 0: self.complex_diag = '(' + str(a) + ' x ' + str(b) + ') / ' + str(c) + ' - ' + str(d) else: self.complex_diag = '(' + str(a) + ' x ' + str(b) + ') / ' + str(c) + ' + ' + str(abs(d)) # d = random.randint(2,13) # q = int((self.diagnosticity*100) // d) # r = int((self.diagnosticity*100) % d) # self.complex_diag = '(' + str(q) + ' x ' + str(d) + ')' + ' + ' + str(r)