from otree.api import * import random import json class C(BaseConstants): NAME_IN_URL = 'part4_prediction' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 # durations shown in the tables T_SHORT = 5 T_MED = 10 T_LONG = 15 OTHERS_TIME = 10 # Part 4 incentives (if this part is selected for payment) PART4_GUARANTEED_BONUS = cu(0) PART4_MAX_BONUS = cu(3) # up to 3 euros from the MPL mechanism # B performance rank fixed across the 3 times (draw per participant) B_PERF_RANK_MIN = 1 B_PERF_RANK_MAX = 10 # restrict A and B productivity draws AB_PROD_MIN = 25 AB_PROD_MAX = 75 # MPL grid (rows). 0..100 means p in {0%,1%,...,100%} MPL_MIN = 0 MPL_MAX = 100 def is_session1(player) -> bool: return bool(player.session.config.get('session1', False)) def productivity_set(): return list(range(1, 101)) def rank_desc(values): """ Rank 1 = highest value. Returns list of ranks aligned with input. Uses first-occurrence ranking if ties arise. """ sorted_vals = sorted(values, reverse=True) return [sorted_vals.index(v) + 1 for v in values] def sample_opponents(exclude_set, n=9): """ Unconditional opponent draw from 1..100 excluding A and B. """ candidates = [x for x in productivity_set() if x not in exclude_set] return random.sample(candidates, n) def split_opponents_by_threshold(exclude_set, threshold_perf): """ Split candidates into: - above: OTHERS_TIME*p > threshold_perf (strictly beats B) - tie: OTHERS_TIME*p == threshold_perf (ties B) - below: OTHERS_TIME*p < threshold_perf (strictly loses to B) where threshold_perf = prod_B * tB """ above, tie, below = [], [], [] for p in productivity_set(): if p in exclude_set: continue perf = C.OTHERS_TIME * p if perf > threshold_perf: above.append(p) elif perf < threshold_perf: below.append(p) else: tie.append(p) return above, tie, below def draw_opponents_for_B_threshold_method(prod_B, tB, b_perf_rank, exclude_set, n=9): """ Constructive draw: - Determine threshold from B performance - Draw (b_perf_rank - 1) opponents from those who beat B (above threshold) - Draw remaining from those who do not beat B (below threshold plus ties as needed) Tie-handling: - If we need more "beat B" opponents but above-threshold is insufficient, we use ties to fill the beating bucket. - Defensive fallbacks included. """ threshold_perf = prod_B * tB need_beating = b_perf_rank - 1 need_not_beating = n - need_beating above, tie, below = split_opponents_by_threshold(exclude_set, threshold_perf) # Case 1: enough strict beaters if need_beating <= len(above): chosen_beating = random.sample(above, need_beating) not_beating_pool = below + tie if need_not_beating > len(not_beating_pool): all_pool = [p for p in productivity_set() if p not in exclude_set and p not in chosen_beating] chosen_not_beating = random.sample(all_pool, need_not_beating) else: chosen_not_beating = random.sample(not_beating_pool, need_not_beating) return chosen_beating + chosen_not_beating # Case 2: not enough strict beaters, use ties to top up chosen_beating = above[:] remaining_beating = need_beating - len(above) if remaining_beating > len(tie): # Defensive fallback (should be extremely rare) all_pool = [p for p in productivity_set() if p not in exclude_set] return random.sample(all_pool, n) tie_for_beating = random.sample(tie, remaining_beating) chosen_beating += tie_for_beating remaining_ties = [p for p in tie if p not in tie_for_beating] not_beating_pool = below + remaining_ties if need_not_beating > len(not_beating_pool): all_pool = [p for p in productivity_set() if p not in exclude_set and p not in chosen_beating] chosen_not_beating = random.sample(all_pool, need_not_beating) else: chosen_not_beating = random.sample(not_beating_pool, need_not_beating) return chosen_beating + chosen_not_beating def clamp01(x: float) -> float: if x < 0: return 0.0 if x > 1: return 1.0 return x class Subsession(BaseSubsession): def creating_session(self): for pl in self.get_players(): pl.ensure_initialized() class Group(BaseGroup): pass class Player(BasePlayer): initialized = models.BooleanField(initial=False) show_A_first = models.BooleanField(initial=False) prod_A = models.IntegerField() prod_B = models.IntegerField() b_perf_rank = models.IntegerField() # store opponents as JSON strings (stable “world”) opp_A_json = models.LongStringField(initial='[]') opp_B_short_json = models.LongStringField(initial='[]') opp_B_med_json = models.LongStringField(initial='[]') opp_B_long_json = models.LongStringField(initial='[]') # true outcomes (ranks) true_A_rank_short = models.IntegerField() true_A_rank_med = models.IntegerField() true_A_rank_long = models.IntegerField() S_true_B_rank = models.IntegerField() M_true_B_rank = models.IntegerField() L_true_B_rank = models.IntegerField() # Which question is selected for payment: # 0..2 = A short/med/long, 3..5 = B short/med/long payoff_question_index = models.IntegerField() # NEW: MPL randomization within the selected question payoff_rank_selected = models.IntegerField() # 1..10 mpl_row_selected = models.IntegerField() # 0..100 (p in percent) mpl_p_selected = models.FloatField() # p in [0,1] mpl_choice = models.StringField(initial='') # 'A' or 'B' mpl_event_happened = models.BooleanField() # (true_rank == payoff_rank_selected) mpl_win = models.BooleanField() # whether the 3€ prize is won under the mechanism # Keep these names, but repurpose: # - part4_score: store the reported probability for the selected rank (in [0,1]) # - part4_bonus: total part4 payment if this part is selected (6€ + mpl outcome up to 3€) part4_score = models.FloatField(initial=0) part4_bonus = models.CurrencyField(initial=cu(0)) # NEW: split payment components (useful in dataset) part4_guaranteed = models.CurrencyField(initial=cu(0)) part4_perf_bonus = models.CurrencyField(initial=cu(0)) # ---- ELICITATION FIELDS ---- for tname in ['short', 'med', 'long']: for r in range(1, 11): locals()[f'A_{tname}_p{r}'] = models.IntegerField(min=0, max=100, initial=0) del tname, r for tname in ['short', 'med', 'long']: for r in range(1, 11): locals()[f'B_{tname}_p{r}'] = models.IntegerField(min=0, max=100, initial=0) del tname, r def ensure_initialized(self): if not self.initialized: self.set_simulation() def set_simulation(self): self.show_A_first = random.choice([True, False]) # Draw A and B productivities in [AB_PROD_MIN, AB_PROD_MAX], distinct restricted = list(range(C.AB_PROD_MIN, C.AB_PROD_MAX + 1)) self.prod_A = random.choice(restricted) restricted.remove(self.prod_A) self.prod_B = random.choice(restricted) # Fixed performance rank for B across times self.b_perf_rank = random.randint(C.B_PERF_RANK_MIN, C.B_PERF_RANK_MAX) exclude = {self.prod_A, self.prod_B} # A: one fixed opponent set, reused for all 3 times oppA = sample_opponents(exclude, 9) self.opp_A_json = json.dumps(oppA) self.true_A_rank_short = self._true_A_perf_rank_given_opponents(C.T_SHORT, oppA) self.true_A_rank_med = self._true_A_perf_rank_given_opponents(C.T_MED, oppA) self.true_A_rank_long = self._true_A_perf_rank_given_opponents(C.T_LONG, oppA) # B: per time, construct opponents via threshold method oppB_short = draw_opponents_for_B_threshold_method(self.prod_B, C.T_SHORT, self.b_perf_rank, exclude, n=9) oppB_med = draw_opponents_for_B_threshold_method(self.prod_B, C.T_MED, self.b_perf_rank, exclude, n=9) oppB_long = draw_opponents_for_B_threshold_method(self.prod_B, C.T_LONG, self.b_perf_rank, exclude, n=9) self.opp_B_short_json = json.dumps(oppB_short) self.opp_B_med_json = json.dumps(oppB_med) self.opp_B_long_json = json.dumps(oppB_long) # True B productivity ranks among the 10 productivities self.S_true_B_rank = self._prod_rank_B_given_opponents(oppB_short) self.M_true_B_rank = self._prod_rank_B_given_opponents(oppB_med) self.L_true_B_rank = self._prod_rank_B_given_opponents(oppB_long) # Choose paying question self.payoff_question_index = random.randint(0, 5) # NEW: within that question, choose a paying rank (1..10) for the MPL mechanism self.payoff_rank_selected = random.randint(1, 10) self.initialized = True def _true_A_perf_rank_given_opponents(self, tA, oppA): perf_list = [self.prod_A * tA] + [x * C.OTHERS_TIME for x in oppA] return rank_desc(perf_list)[0] def _prod_rank_B_given_opponents(self, opp): prod_list = [self.prod_B] + opp return rank_desc(prod_list)[0] def _get_true_rank_for_payoff_question(self) -> int: idx = self.payoff_question_index if idx == 0: return self.true_A_rank_short if idx == 1: return self.true_A_rank_med if idx == 2: return self.true_A_rank_long if idx == 3: return self.S_true_B_rank if idx == 4: return self.M_true_B_rank return self.L_true_B_rank def _get_reported_prob_for_selected_rank(self) -> float: """ Returns q in [0,1] from the elicited distribution corresponding to: - payoff_question_index (A/B and short/med/long) - payoff_rank_selected (1..10) """ idx = self.payoff_question_index r = self.payoff_rank_selected if idx == 0: v = getattr(self, f'A_short_p{r}') elif idx == 1: v = getattr(self, f'A_med_p{r}') elif idx == 2: v = getattr(self, f'A_long_p{r}') elif idx == 3: v = getattr(self, f'B_short_p{r}') elif idx == 4: v = getattr(self, f'B_med_p{r}') else: v = getattr(self, f'B_long_p{r}') return clamp01(v / 100.0) def _run_mpl_for_selected_rank(self, true_rank: int, q: float): """ Implements the single-response MPL logic consistent with the table idea: - Event E is: (true_rank == payoff_rank_selected) - Option A: win the prize if E occurs - Option B: win the prize with objective probability p - Randomly select one row p from the grid (0..100) - "Indifference point" is approximated by q: choose A iff q >= p This is equivalent to asking all rows and taking the switch-point under monotonicity, but without adding extra input fields. """ r = self.payoff_rank_selected self.mpl_event_happened = (true_rank == r) row = random.randint(C.MPL_MIN, C.MPL_MAX) p = row / 100.0 self.mpl_row_selected = row self.mpl_p_selected = p # implied switch-point rule: choose A when q >= p, else B if q >= p: self.mpl_choice = 'A' self.mpl_win = bool(self.mpl_event_happened) else: self.mpl_choice = 'B' self.mpl_win = (random.random() < p) def compute_part4_payoff(self): """ If Part 4 is selected for payment, the intended payment is: guaranteed 6€ + MPL prize (0€ or 3€) Mechanism steps for the 3€: 1) Select a paying question (already: payoff_question_index in 0..5) 2) Select a paying rank among 1..10 (payoff_rank_selected) 3) For that rank, run an MPL: - draw a row p from 0..100 - choose A iff reported q >= p - resolve A using the true event, resolve B using an objective random draw Stores total in: - player.part4_bonus - player.payoff (optional but usually desirable) - participant.part4_payoff (and participant.vars['part4_payoff']) """ true_rank = self._get_true_rank_for_payoff_question() q = self._get_reported_prob_for_selected_rank() # keep this variable name, now meaning "reported prob for selected rank" self.part4_score = q # run MPL based on q self._run_mpl_for_selected_rank(true_rank, q) self.part4_guaranteed = C.PART4_GUARANTEED_BONUS if self.mpl_win: self.part4_perf_bonus = C.PART4_MAX_BONUS else: self.part4_perf_bonus = cu(0) total = self.part4_guaranteed + self.part4_perf_bonus self.part4_bonus = total # Keep oTree payoff consistent (optional but usually desirable) self.payoff = total # Store on participant as requested: part4_payoff part = self.participant if hasattr(part, 'part4_payoff'): part.part4_payoff = total part.vars['part4_payoff'] = float(total) # robust for analysis / debugging # -------------------- PAGES -------------------- class Intro(Page): @staticmethod def vars_for_template(player): player.ensure_initialized() is_session1_flag = is_session1(player) return dict(t_short=C.T_SHORT, t_med=C.T_MED, t_long=C.T_LONG, is_session1=is_session1_flag) def fields_for(prefix): return [f'{prefix}_p{r}' for r in range(1, 11)] def validate_rows(values, who_prefix): for tname in ['short', 'med', 'long']: s = sum(values[f'{who_prefix}_{tname}_p{r}'] for r in range(1, 11)) if s != 100: return f'Pour la ligne "{tname}", les probabilités doivent totaliser 100 (actuellement {s}).' return None class PredictA(Page): form_model = 'player' form_fields = fields_for('A_short') + fields_for('A_med') + fields_for('A_long') @staticmethod def is_displayed(player): player.ensure_initialized() return player.show_A_first @staticmethod def vars_for_template(player): player.ensure_initialized() return dict(prod_A=player.prod_A, t_short=C.T_SHORT, t_med=C.T_MED, t_long=C.T_LONG) @staticmethod def error_message(player, values): return validate_rows(values, 'A') class PredictB(Page): form_model = 'player' form_fields = fields_for('B_short') + fields_for('B_med') + fields_for('B_long') @staticmethod def is_displayed(player): player.ensure_initialized() return not player.show_A_first @staticmethod def vars_for_template(player): player.ensure_initialized() return dict( prod_B=player.prod_B, b_perf_rank=player.b_perf_rank, b_above=player.b_perf_rank - 1, t_short=C.T_SHORT, t_med=C.T_MED, t_long=C.T_LONG, ) @staticmethod def error_message(player, values): return validate_rows(values, 'B') class PredictA_if_second(PredictA): template_name = 'predictions/PredictA.html' @staticmethod def is_displayed(player): player.ensure_initialized() return not player.show_A_first class PredictB_if_second(PredictB): template_name = 'predictions/PredictB.html' @staticmethod def is_displayed(player): player.ensure_initialized() return player.show_A_first class Results(Page): @staticmethod def vars_for_template(player): player.ensure_initialized() player.compute_part4_payoff() return dict( part4_bonus=player.part4_bonus, part4_score=player.part4_score, payoff_question_index=player.payoff_question_index, payoff_rank_selected=player.payoff_rank_selected, mpl_row_selected=player.mpl_row_selected, mpl_choice=player.mpl_choice, mpl_win=player.mpl_win, ) page_sequence = [ Intro, PredictA, PredictB, PredictA_if_second, PredictB_if_second, Results, ]