from otree.api import * import random import json from math import lgamma, exp doc = """ Joint Inference experiment: subjects estimate a proportion from their own draw, then see two source reports (one from their box, one from a different box with independent Uniform(0,1) proportion) and elicit (i) a posterior on identity via paired-lottery price list, (ii) a posterior on the box proportion. """ class C(BaseConstants): NAME_IN_URL = 'joint_inference' PLAYERS_PER_GROUP = None NUM_PRACTICE_TRIALS = 2 NUM_MAIN_TRIALS = 8 # 4 with n_i=10, 4 with n_i=100 NUM_TRIALS = NUM_PRACTICE_TRIALS + NUM_MAIN_TRIALS NUM_ROUNDS = NUM_TRIALS SOURCE_DRAW_SIZE = 10 BOX_SIZE = 100 PRICE_LIST_PCTS = list(range(0, 101, 5)) # 0, 5, 10, ..., 100 — 21 rows MAX_PAY_PER_TRIAL = 100 # max 100 points per trial PARTICIPATION_FEE = 5 # ============================================================================ # Bayesian helpers (Beta-Binomial closed form) # ============================================================================ def log_beta(a, b): return lgamma(a) + lgamma(b) - lgamma(a + b) def beta_binom_logpmf(k, n, alpha, beta): """Log pmf of BetaBinomial(n, alpha, beta) at k.""" return (lgamma(n + 1) - lgamma(k + 1) - lgamma(n - k + 1) + log_beta(alpha + k, beta + n - k) - log_beta(alpha, beta)) def bayesian_qa(k_i, n_i, k_a, k_b, n_source=10): """Closed-form posterior probability that source A drew from subject's box.""" alpha = 1 + k_i beta = 1 + n_i - k_i log_fa = beta_binom_logpmf(k_a, n_source, alpha, beta) log_fb = beta_binom_logpmf(k_b, n_source, alpha, beta) # Numerically stable softmax-like m = max(log_fa, log_fb) return exp(log_fa - m) / (exp(log_fa - m) + exp(log_fb - m)) def bayesian_posterior_red(k_i, n_i, k_a, k_b, n_source=10): """Closed-form posterior mean of red ball count (out of 100).""" qa = bayesian_qa(k_i, n_i, k_a, k_b, n_source) p_mean = (1 + k_i + qa * k_a + (1 - qa) * k_b) / (2 + n_i + n_source) return p_mean * C.BOX_SIZE # ============================================================================ # Trial generation # ============================================================================ def generate_trial(round_number): """Generate one trial's parameters. Returns dict with all the latents.""" is_practice = round_number <= C.NUM_PRACTICE_TRIALS if is_practice: n_i = 10 if round_number == 1 else 50 else: # n_i is set later from per-participant shuffled order; placeholder here n_i = 10 p = random.randint(1, 99) / 100.0 p_imp = random.randint(1, 99) / 100.0 k_i = sum(1 for _ in range(n_i) if random.random() < p) k_a_true = sum(1 for _ in range(C.SOURCE_DRAW_SIZE) if random.random() < p) k_b_imp = sum(1 for _ in range(C.SOURCE_DRAW_SIZE) if random.random() < p_imp) a_is_partner = random.random() < 0.5 if a_is_partner: k_a, k_b = k_a_true, k_b_imp else: k_a, k_b = k_b_imp, k_a_true return dict( is_practice=is_practice, n_i=n_i, p=p, p_imp=p_imp, k_i=k_i, k_a=k_a, k_b=k_b, a_is_partner=a_is_partner, ) # ============================================================================ # oTree models # ============================================================================ class Subsession(BaseSubsession): pass def creating_session(subsession: Subsession): for player in subsession.get_players(): # On round 1, build a per-participant shuffled order of n_i for the paid trials if subsession.round_number == 1: paid_order = [10, 10, 10, 10, 50, 50, 50, 50] random.shuffle(paid_order) player.participant.vars['paid_ns_order'] = paid_order # Hardcoded practice trials with asymmetric source reports to demonstrate the mechanism if subsession.round_number == 1: # Practice 1: n_i=10, subject draws 7/10, A=8/10, B=2/10 trial = dict(is_practice=True, n_i=10, p=0.75, p_imp=0.20, k_i=7, k_a=8, k_b=2, a_is_partner=True) elif subsession.round_number == 2: # Practice 2: n_i=50, subject draws 15/50, A=9/10 (imposter), B=4/10 (partner) trial = dict(is_practice=True, n_i=50, p=0.37, p_imp=0.85, k_i=15, k_a=9, k_b=4, a_is_partner=False) else: trial = generate_trial(subsession.round_number) # Override n_i for paid trials using the shuffled order if not trial['is_practice']: paid_idx = subsession.round_number - C.NUM_PRACTICE_TRIALS - 1 n_i_override = player.participant.vars['paid_ns_order'][paid_idx] # Re-roll k_i, k_a_true, k_b_imp with the new n_i p = random.randint(1, 99) / 100.0 p_imp = random.randint(1, 99) / 100.0 k_i = sum(1 for _ in range(n_i_override) if random.random() < p) k_a_true = sum(1 for _ in range(C.SOURCE_DRAW_SIZE) if random.random() < p) k_b_imp = sum(1 for _ in range(C.SOURCE_DRAW_SIZE) if random.random() < p_imp) a_is_partner = random.random() < 0.5 if a_is_partner: k_a, k_b = k_a_true, k_b_imp else: k_a, k_b = k_b_imp, k_a_true trial['n_i'] = n_i_override trial['p'] = p trial['p_imp'] = p_imp trial['k_i'] = k_i trial['k_a'] = k_a trial['k_b'] = k_b trial['a_is_partner'] = a_is_partner player.is_practice = trial['is_practice'] player.n_i = trial['n_i'] player.p_true = trial['p'] player.p_imp = trial['p_imp'] player.k_i = trial['k_i'] player.k_a = trial['k_a'] player.k_b = trial['k_b'] player.a_is_partner = trial['a_is_partner'] # which question pays this trial player.paid_question = random.choice(['Q1', 'Q2', 'Q3']) # for Q2 paired lottery: which row do we resolve and what's the random draw player.q2_resolution_row = random.randint(0, len(C.PRICE_LIST_PCTS) - 1) player.q2_lottery_draw = random.random() # build sample sequence (for display) sample = ['red' if i < trial['k_i'] else 'blue' for i in range(trial['n_i'])] random.shuffle(sample) player.sample_json = json.dumps(sample) a_balls = ['red' if i < trial['k_a'] else 'blue' for i in range(C.SOURCE_DRAW_SIZE)] b_balls = ['red' if i < trial['k_b'] else 'blue' for i in range(C.SOURCE_DRAW_SIZE)] random.shuffle(a_balls); random.shuffle(b_balls) player.source_a_json = json.dumps(a_balls) player.source_b_json = json.dumps(b_balls) class Group(BaseGroup): pass class Player(BasePlayer): is_practice = models.BooleanField() n_i = models.IntegerField() p_true = models.FloatField() p_imp = models.FloatField() k_i = models.IntegerField() k_a = models.IntegerField() k_b = models.IntegerField() a_is_partner = models.BooleanField() sample_json = models.LongStringField() source_a_json = models.LongStringField() source_b_json = models.LongStringField() # Registration student_name = models.StringField(blank=True) student_id = models.StringField(blank=True) # Elicitations q1_estimate = models.IntegerField(min=1, max=99, label="How many of the 100 balls in Box 1 are red?") q2_choices_json = models.LongStringField(blank=True) # JSON array of 11 'A' or 'B' q3_estimate = models.IntegerField(min=1, max=99, label="How many of the 100 balls in Box 1 are red?") # Post-experiment survey survey_unclear = models.LongStringField(blank=True, label="Was anything in the experiment unclear or confusing?") survey_strategy = models.LongStringField(blank=True, label="What strategy did you use to solve these problems?") # Payment bookkeeping paid_question = models.StringField() q2_resolution_row = models.IntegerField() q2_lottery_draw = models.FloatField() trial_payoff = models.CurrencyField(initial=0) trial_summary_json = models.LongStringField(blank=True) # ============================================================================ # Pages # ============================================================================ class Registration(Page): form_model = 'player' form_fields = ['student_name', 'student_id'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 class Instructions(Page): def is_displayed(self): return self.round_number == 1 @staticmethod def vars_for_template(player: Player): return dict(num_practice=C.NUM_PRACTICE_TRIALS, num_main=C.NUM_MAIN_TRIALS, num_total=C.NUM_TRIALS) class InstructionsPartner(Page): def is_displayed(self): return self.round_number == 1 @staticmethod def vars_for_template(player: Player): return dict(num_practice=C.NUM_PRACTICE_TRIALS, num_main=C.NUM_MAIN_TRIALS, num_total=C.NUM_TRIALS) class Instructions2(Page): def is_displayed(self): return self.round_number == 1 @staticmethod def vars_for_template(player: Player): return dict(num_practice=C.NUM_PRACTICE_TRIALS, num_main=C.NUM_MAIN_TRIALS, num_total=C.NUM_TRIALS) class TrialIntro(Page): @staticmethod def vars_for_template(player: Player): if player.is_practice: label = f"Practice Trial {player.round_number} of {C.NUM_PRACTICE_TRIALS}" else: tnum = player.round_number - C.NUM_PRACTICE_TRIALS label = f"Trial {tnum} of {C.NUM_MAIN_TRIALS}" return dict(label=label, n_i=player.n_i, is_practice=player.is_practice, trial_num=max(0, player.round_number - 2)) class OwnDraw(Page): form_model = 'player' form_fields = ['q1_estimate'] @staticmethod def vars_for_template(player: Player): return dict( n_i=player.n_i, k_i=player.k_i, sample=json.loads(player.sample_json), trial_num=max(0, player.round_number - 2), ) class SourcesReveal(Page): @staticmethod def vars_for_template(player: Player): return dict( source_a=json.loads(player.source_a_json), source_b=json.loads(player.source_b_json), sample=json.loads(player.sample_json), k_a=player.k_a, k_b=player.k_b, k_i=player.k_i, n_i=player.n_i, trial_num=max(0, player.round_number - 2), ) class IdentityBet(Page): form_model = 'player' form_fields = ['q2_choices_json'] @staticmethod def vars_for_template(player: Player): return dict( source_a=json.loads(player.source_a_json), source_b=json.loads(player.source_b_json), sample=json.loads(player.sample_json), k_a=player.k_a, k_b=player.k_b, k_i=player.k_i, n_i=player.n_i, pcts=C.PRICE_LIST_PCTS, trial_num=max(0, player.round_number - 2), ) class Posterior(Page): form_model = 'player' form_fields = ['q3_estimate'] @staticmethod def vars_for_template(player: Player): return dict( source_a=json.loads(player.source_a_json), source_b=json.loads(player.source_b_json), sample=json.loads(player.sample_json), k_a=player.k_a, k_b=player.k_b, k_i=player.k_i, n_i=player.n_i, q1=player.q1_estimate, trial_num=max(0, player.round_number - 2), ) @staticmethod def before_next_page(player: Player, timeout_happened): compute_trial_payoff(player) def compute_trial_payoff(player: Player): """Compute and store this trial's payoff silently (no feedback to subject).""" true_red = round(player.p_true * C.BOX_SIZE) def quadratic_pay(guess, true): return max(0, int(C.MAX_PAY_PER_TRIAL * (1.0 - ((guess - true) / 20.0) ** 2))) q1_pay = quadratic_pay(player.q1_estimate, true_red) q3_pay = quadratic_pay(player.q3_estimate, true_red) choices = json.loads(player.q2_choices_json) if player.q2_choices_json else ['A'] * len(C.PRICE_LIST_PCTS) chosen = choices[player.q2_resolution_row] pct = C.PRICE_LIST_PCTS[player.q2_resolution_row] if chosen == 'A': # Option A now = bet that Report A is from the partner q2_pay = C.MAX_PAY_PER_TRIAL if player.a_is_partner else 0 else: # Option B now = known-probability lottery q2_pay = C.MAX_PAY_PER_TRIAL if (player.q2_lottery_draw * 100 < pct) else 0 if player.paid_question == 'Q1': pay = q1_pay elif player.paid_question == 'Q2': pay = q2_pay else: pay = q3_pay if not player.is_practice: player.trial_payoff = cu(pay) player.payoff = cu(pay) # Store per-trial detail for FinalResults summary player.trial_summary_json = json.dumps(dict( round_number=player.round_number, is_practice=player.is_practice, n_i=player.n_i, true_red=true_red, k_i=player.k_i, k_a=player.k_a, k_b=player.k_b, a_is_partner=player.a_is_partner, q1=player.q1_estimate, q3=player.q3_estimate, q2_row_pct=pct, q2_choice=chosen, q1_pay=int(q1_pay), q2_pay=int(q2_pay), q3_pay=int(q3_pay), paid_q=player.paid_question, pay=int(pay), )) class TrialResolution(Page): """Show feedback only for practice trials.""" @staticmethod def is_displayed(player: Player): return player.is_practice @staticmethod def vars_for_template(player: Player): # Payoff already computed in Posterior.before_next_page summary = json.loads(player.trial_summary_json) return summary class PostSurvey(Page): form_model = 'player' form_fields = ['survey_unclear', 'survey_strategy'] @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS class FinalResults(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player: Player): # Gather all trial summaries trials = [] for p in player.in_all_rounds(): if p.field_maybe_none('trial_summary_json'): trials.append(json.loads(p.trial_summary_json)) total = sum(p.trial_payoff for p in player.in_all_rounds()) return dict( trials=trials, total_payoff=total, ) class ThankYou(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS page_sequence = [Registration, Instructions, InstructionsPartner, Instructions2, TrialIntro, OwnDraw, SourcesReveal, IdentityBet, Posterior, TrialResolution, FinalResults, PostSurvey, ThankYou]