from otree.api import * import random # Import centralized scenario text. from scenario_text import SCENARIOS class C(BaseConstants): NAME_IN_URL = 'wtp' PLAYERS_PER_GROUP = None # Canonical scenario pairs: X, Y (used for the tree and diff definition) PAIRS = [ ('A', 'B'), ('A', 'C'), ('B', 'C'), ] NUM_PAIRS = len(PAIRS) # All possible amount combinations (payment_X, payment_Y) in cents AMOUNTS = [ (500, 500), (500, 525), (500, 550), (500, 575), (500, 600), (500, 650), (500, 700), (500, 750), (500, 800), (500, 850), (500, 900), (500, 950), (500, 1000), (525, 500), (550, 500), (575, 500), (600, 500), (650, 500), (700, 500), (750, 500), (800, 500), (850, 500), (900, 500), (950, 500), (1000, 500), ] NUM_AMOUNTS = len(AMOUNTS) # Max questions per pair (from your tree) MAX_QUESTIONS_PER_PAIR = 5 # Upper bound on total rounds (oTree requires fixed NUM_ROUNDS) NUM_ROUNDS = NUM_PAIRS * MAX_QUESTIONS_PER_PAIR # 3 * 5 = 15 BASE_CENTS = 500 # difference = payment_X - payment_Y (in cents) DIFF_TO_INDEX = {(a - b): i for i, (a, b) in enumerate(C.AMOUNTS)} # Grid of possible payment levels (in cents) for the "one-step-down" rule AMOUNT_LEVELS = sorted({x for xy in C.AMOUNTS for x in xy}) def prev_amount_level(x: int) -> int: """ Return the next lower amount level in AMOUNT_LEVELS. If x is already at the minimum, return the minimum. """ prev = AMOUNT_LEVELS[0] for lvl in AMOUNT_LEVELS: if lvl >= x: break prev = lvl return prev # Tree transitions in differences (X - Y), in cents # Branch 'X' = participant chose Scenario X # Branch 'Y' = participant chose Scenario Y # None = leaf: stop asking for that pair TREE = { 0: {'X': -250, 'Y': 250}, # left side (prefers X) -250: {'X': -400, 'Y': -100}, -400: {'X': -500, 'Y': -300}, -100: {'X': -200, 'Y': -50}, -200: {'X': None, 'Y': -150}, -50: {'X': -75, 'Y': -25}, # extremes when Y gets 10 -500: {'X': None, 'Y': -450}, -300: {'X': -350, 'Y': None}, # leaves -150: {'X': None, 'Y': None}, -75: {'X': None, 'Y': None}, -25: {'X': None, 'Y': None}, -450: {'X': None, 'Y': None}, -350: {'X': None, 'Y': None}, # right side (prefers Y) 250: {'X': 100, 'Y': 400}, 100: {'X': 50, 'Y': 200}, 50: {'X': 25, 'Y': 75}, 400: {'X': 300, 'Y': 500}, # extremes when X gets 10 500: {'X': 450, 'Y': None}, 300: {'X': None, 'Y': 350}, # leaves 25: {'X': None, 'Y': None}, 75: {'X': None, 'Y': None}, 200: {'X': 150, 'Y': None}, 150: {'X': None, 'Y': None}, 450: {'X': None, 'Y': None}, } class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): pair_pos = models.IntegerField() pair_index = models.IntegerField() amount_index = models.IntegerField() scenario_left = models.StringField() scenario_right = models.StringField() amount_left_cents = models.IntegerField() amount_right_cents = models.IntegerField() left_is_X = models.BooleanField() choice = models.IntegerField( choices=[[1, "Option 1"], [2, "Option 2"]], widget=widgets.RadioSelect, blank=True, # IMPORTANT: allow None (timeouts), we handle it safely ) pair_label = models.StringField() pair_order_str = models.StringField() # ---- New outputs you wanted ---- # Preferred scenario at equal pay (first diff=0 screen) for each canonical pair pref_AB = models.StringField() pref_AC = models.StringField() pref_BC = models.StringField() # Conservative WTP per pair (in cents), computed from last displayed screen + last choice # using your rule anchored at BASE_CENTS. wtp_AB_cents = models.IntegerField() wtp_AC_cents = models.IntegerField() wtp_BC_cents = models.IntegerField() # ------------------------------- # ---- Original MPD fields (kept for implementation and backward compatibility) ---- mpd_AB_XminusY_cents = models.IntegerField() mpd_AC_XminusY_cents = models.IntegerField() mpd_BC_XminusY_cents = models.IntegerField() # ------------------------------------------------------------------------------- impl_used_wtp = models.BooleanField() impl_pair_index = models.IntegerField() impl_pair_label = models.StringField() impl_amount_index = models.IntegerField() impl_scenario_left = models.StringField() impl_scenario_right = models.StringField() impl_amount_left_cents = models.IntegerField() impl_amount_right_cents = models.IntegerField() impl_implied_choice = models.IntegerField() def scenario_id_from_label(label: str) -> int: """ Map scenario letter used in WTP (A/B/C) to scenario id used in SCENARIOS (1/2/3). """ mapping = {'A': 1, 'B': 2, 'C': 3} return mapping[label] def init_state(participant): """ Ensure we have in participant.vars: - 'order': random order of canonical pair indices [0,1,2] - 'states': per pair position dict: diff: current canonical (X - Y) n: number of questions asked done: whether this pair position is done pref_branch_at_equal: 'X' or 'Y' based on the first answered diff=0 question last_diff_shown: last diff that was displayed for this pair position last_branch: last canonical branch ('X' or 'Y') chosen on that last displayed screen - 'left_is_X_by_pair': per canonical pair index, whether X is shown on the left """ if 'order' not in participant.vars: order = list(range(C.NUM_PAIRS)) random.shuffle(order) participant.vars['order'] = order if 'states' not in participant.vars: states = [] for _ in range(C.NUM_PAIRS): states.append(dict( diff=0, n=0, done=False, pref_branch_at_equal=None, last_diff_shown=None, last_branch=None, )) participant.vars['states'] = states if 'left_is_X_by_pair' not in participant.vars: left_is_X_by_pair = [] for _ in range(C.NUM_PAIRS): left_is_X_by_pair.append(random.choice([True, False])) participant.vars['left_is_X_by_pair'] = left_is_X_by_pair def get_active_pair_pos(participant): """ Return first pair-position (0,1,2) that still has questions to ask. None if all pairs are done. """ init_state(participant) states = participant.vars['states'] for pos, st in enumerate(states): if (not st['done']) and (st['n'] < C.MAX_QUESTIONS_PER_PAIR): return pos return None def compute_mpd_canonical(participant): """ Returns a dict: {'AB': mpd_A_minus_B, 'AC': mpd_A_minus_C, 'BC': mpd_B_minus_C} MPD is defined canonically as X minus Y (in cents) at the final state of the tree. """ init_state(participant) order = participant.vars['order'] states = participant.vars['states'] mpd_by_pair = {} for pos, pair_index in enumerate(order): st = states[pos] final_diff = st['diff'] # canonical X - Y if pair_index == 0: mpd_by_pair['AB'] = final_diff elif pair_index == 1: mpd_by_pair['AC'] = final_diff elif pair_index == 2: mpd_by_pair['BC'] = final_diff return mpd_by_pair def compute_pref_and_conservative_wtp(participant): """ Returns: { 'AB': {'pref': 'A'/'B', 'wtp_cents': int or None}, 'AC': {'pref': 'A'/'C', 'wtp_cents': int or None}, 'BC': {'pref': 'B'/'C', 'wtp_cents': int or None}, } pref is determined from the first answered question at diff=0. wtp_cents is computed using your rule on the last displayed question: - Identify non-preferred scenario amount (canonical) on that last screen. - If last choice == preferred: wtp = nonpref_amount - BASE Else (chose non-preferred): wtp = prev_amount_level(nonpref_amount) - BASE """ init_state(participant) order = participant.vars['order'] states = participant.vars['states'] out = {} for pos, pair_index in enumerate(order): Xlab, Ylab = C.PAIRS[pair_index] st = states[pos] pref_branch = st.get('pref_branch_at_equal') if pref_branch == 'X': pref = Xlab elif pref_branch == 'Y': pref = Ylab else: pref = '' last_diff = st.get('last_diff_shown') last_branch = st.get('last_branch') if last_diff is None or last_branch is None or pref == '': wtp = None else: idx = DIFF_TO_INDEX[last_diff] Xpay_cents, Ypay_cents = C.AMOUNTS[idx] if pref == Xlab: nonpref_amount = Ypay_cents chose_pref = (last_branch == 'X') else: nonpref_amount = Xpay_cents chose_pref = (last_branch == 'Y') if chose_pref: wtp = nonpref_amount - C.BASE_CENTS else: wtp = prev_amount_level(nonpref_amount) - C.BASE_CENTS if pair_index == 0: out['AB'] = dict(pref=pref, wtp_cents=wtp) elif pair_index == 1: out['AC'] = dict(pref=pref, wtp_cents=wtp) elif pair_index == 2: out['BC'] = dict(pref=pref, wtp_cents=wtp) return out class WTPInstructions(Page): def is_displayed(self): return self.round_number == 1 def vars_for_template(self): return { "scenarios": SCENARIOS, 'sA': SCENARIOS[1], 'sB': SCENARIOS[2], 'sC': SCENARIOS[3], } class WTPDecision(Page): form_model = 'player' form_fields = ['choice'] @staticmethod def is_displayed(player): return get_active_pair_pos(player.participant) is not None @staticmethod def vars_for_template(player): part = player.participant init_state(part) order = part.vars['order'] states = part.vars['states'] left_is_X_by_pair = part.vars['left_is_X_by_pair'] pair_pos = get_active_pair_pos(part) state = states[pair_pos] diff = state['diff'] idx = DIFF_TO_INDEX[diff] Xpay_cents, Ypay_cents = C.AMOUNTS[idx] pair_index = order[pair_pos] X, Y = C.PAIRS[pair_index] left_is_X = left_is_X_by_pair[pair_index] if left_is_X: scenario_left, scenario_right = X, Y amount_left, amount_right = Xpay_cents, Ypay_cents else: scenario_left, scenario_right = Y, X amount_left, amount_right = Ypay_cents, Xpay_cents player.pair_pos = pair_pos player.pair_index = pair_index player.amount_index = idx player.scenario_left = scenario_left player.scenario_right = scenario_right player.left_is_X = left_is_X player.amount_left_cents = amount_left player.amount_right_cents = amount_right player.pair_label = f"{scenario_left}_{scenario_right}" order_labels = [] for idx2 in order: Xlab, Ylab = C.PAIRS[idx2] order_labels.append(f"{Xlab}{Ylab}") player.pair_order_str = "-".join(order_labels) left_id = scenario_id_from_label(scenario_left) right_id = scenario_id_from_label(scenario_right) left_title = SCENARIOS[left_id].get("title", "") right_title = SCENARIOS[right_id].get("title", "") left_rules_html = SCENARIOS[left_id].get("rappels_html", "") right_rules_html = SCENARIOS[right_id].get("rappels_html", "") return dict( scenario_left=scenario_left, scenario_right=scenario_right, amount_left=amount_left / 100, amount_right=amount_right / 100, pair_number=pair_pos + 1, scenario_left_rappels_html=left_rules_html, scenario_right_rappels_html=right_rules_html, scenario_left_title = left_title, scenario_right_title = right_title, ) @staticmethod def error_message(player, values): if values.get('choice') is None: return "Veuillez sélectionner une option avant de continuer." @staticmethod def before_next_page(player, timeout_happened): part = player.participant init_state(part) states = part.vars['states'] pair_pos = player.pair_pos state = states[pair_pos] current_diff = state['diff'] state['n'] += 1 # SAFE READ (prevents crash if unanswered) choice = player.field_maybe_none('choice') # If no choice (timeout or other), force a fallback that keeps tree consistent. if choice is None: choice = 1 # default to left option player.choice = choice part.vars.setdefault('wtp_missing_choice_count', 0) part.vars['wtp_missing_choice_count'] += 1 if not state['done']: # Determine canonical branch (X or Y), correcting for left/right display if player.left_is_X: branch = 'X' if choice == 1 else 'Y' else: branch = 'Y' if choice == 1 else 'X' # Store preference at equal pay from the first diff=0 decision if current_diff == 0 and state.get('pref_branch_at_equal') is None: state['pref_branch_at_equal'] = branch # Always record the last displayed screen and last choice for this pair state['last_diff_shown'] = current_diff state['last_branch'] = branch next_diff = TREE.get(current_diff, {}).get(branch) if next_diff is None: state['done'] = True else: state['diff'] = next_diff if state['n'] >= C.MAX_QUESTIONS_PER_PAIR: state['done'] = True states[pair_pos] = state part.vars['states'] = states class WTPScenarioDraw(Page): """ Last page of the WTP app: decides final scenario and base payment. Uses ONLY the global flag use_wtp drawn in welcome. This version additionally stores: - preferred scenario for each pair (from first diff=0) - conservative WTP per your rule (from last displayed screen + last choice) """ @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player: Player): part = player.participant # Original canonical MPDs (for implementation and compatibility) mpd_can = compute_mpd_canonical(part) part.vars['mpd_AB_XminusY_cents'] = mpd_can.get('AB') part.vars['mpd_AC_XminusY_cents'] = mpd_can.get('AC') part.vars['mpd_BC_XminusY_cents'] = mpd_can.get('BC') player.mpd_AB_XminusY_cents = mpd_can.get('AB') player.mpd_AC_XminusY_cents = mpd_can.get('AC') player.mpd_BC_XminusY_cents = mpd_can.get('BC') # New preferred + conservative WTP outputs out = compute_pref_and_conservative_wtp(part) part.vars['pref_AB'] = out.get('AB', {}).get('pref') part.vars['pref_AC'] = out.get('AC', {}).get('pref') part.vars['pref_BC'] = out.get('BC', {}).get('pref') part.vars['wtp_AB_cents'] = out.get('AB', {}).get('wtp_cents') part.vars['wtp_AC_cents'] = out.get('AC', {}).get('wtp_cents') part.vars['wtp_BC_cents'] = out.get('BC', {}).get('wtp_cents') player.pref_AB = part.vars['pref_AB'] or '' player.pref_AC = part.vars['pref_AC'] or '' player.pref_BC = part.vars['pref_BC'] or '' player.wtp_AB_cents = part.vars['wtp_AB_cents'] player.wtp_AC_cents = part.vars['wtp_AC_cents'] player.wtp_BC_cents = part.vars['wtp_BC_cents'] def to_euros(c): if c is None: return None return c / 100 # Display both (optional) return dict( mpd_AB=to_euros(mpd_can.get('AB')), mpd_AC=to_euros(mpd_can.get('AC')), mpd_BC=to_euros(mpd_can.get('BC')), pref_AB=player.pref_AB, pref_AC=player.pref_AC, pref_BC=player.pref_BC, wtp_AB=to_euros(player.wtp_AB_cents), wtp_AC=to_euros(player.wtp_AC_cents), wtp_BC=to_euros(player.wtp_BC_cents), ) @staticmethod def before_next_page(player: Player, timeout_happened): part = player.participant # Single flag everywhere use_wtp = bool(part.vars.get('use_wtp', False)) player.impl_used_wtp = use_wtp # ------------------------------------------------------------ # Case 1: WTP does NOT apply # ------------------------------------------------------------ if not use_wtp: scen = random.choice(['A', 'B', 'C']) part.vars['scenario_final'] = scen part.vars['implemented_scenario_id'] = scenario_id_from_label(scen) base_cents = 500 part.vars['guaranteed_payment_cents'] = base_cents # backward compatibility part.vars['wtp_base_payment_cents'] = base_cents # canonical key # No drawn WTP payment question part.vars['wtp_pay_pair_label'] = None part.vars['wtp_pay_amount_index'] = None part.vars['wtp_pay_left_is_X'] = None part.vars['wtp_pay_implied_choice'] = None part.vars['wtp_pay_scenario_left'] = None part.vars['wtp_pay_scenario_right'] = None player.impl_pair_index = None player.impl_pair_label = '' player.impl_amount_index = None player.impl_scenario_left = '' player.impl_scenario_right = '' player.impl_amount_left_cents = None player.impl_amount_right_cents = None player.impl_implied_choice = None return # ------------------------------------------------------------ # Case 2: WTP applies (original MPD-based implementation, canonical) # ------------------------------------------------------------ mpd_AB = part.vars.get('mpd_AB_XminusY_cents') mpd_AC = part.vars.get('mpd_AC_XminusY_cents') mpd_BC = part.vars.get('mpd_BC_XminusY_cents') mpd_by_pairindex = {0: mpd_AB, 1: mpd_AC, 2: mpd_BC} pair_index = random.choice([0, 1, 2]) Xlab, Ylab = C.PAIRS[pair_index] mpd_pair = mpd_by_pairindex.get(pair_index) # Fallback if MPD missing if mpd_pair is None: scen = random.choice(['A', 'B', 'C']) part.vars['scenario_final'] = scen part.vars['implemented_scenario_id'] = scenario_id_from_label(scen) base_cents = 500 part.vars['guaranteed_payment_cents'] = base_cents part.vars['wtp_base_payment_cents'] = base_cents part.vars['wtp_pay_pair_label'] = None part.vars['wtp_pay_amount_index'] = None part.vars['wtp_pay_left_is_X'] = None part.vars['wtp_pay_implied_choice'] = None part.vars['wtp_pay_scenario_left'] = None part.vars['wtp_pay_scenario_right'] = None part.vars['wtp_pay_amount_left_cents'] = None part.vars['wtp_pay_amount_right_cents'] = None player.impl_pair_index = None player.impl_pair_label = '' player.impl_amount_index = None player.impl_scenario_left = '' player.impl_scenario_right = '' player.impl_amount_left_cents = None player.impl_amount_right_cents = None player.impl_implied_choice = None return init_state(part) left_is_X_by_pair = part.vars['left_is_X_by_pair'] left_is_X = left_is_X_by_pair[pair_index] amount_index = random.randrange(C.NUM_AMOUNTS) Xpay_cents, Ypay_cents = C.AMOUNTS[amount_index] # Canonical decision rule (in X-Y space), invariant to left/right diff_XminusY = Xpay_cents - Ypay_cents if diff_XminusY >= mpd_pair: chosen_scen = Xlab pay_cents = Xpay_cents implied_choice_canonical = 'X' else: chosen_scen = Ylab pay_cents = Ypay_cents implied_choice_canonical = 'Y' # Map chosen canonical scenario into left/right implied choice for bookkeeping (1=left, 2=right) if left_is_X: scen_left, scen_right = Xlab, Ylab pay_left, pay_right = Xpay_cents, Ypay_cents implied_choice = 1 if implied_choice_canonical == 'X' else 2 else: scen_left, scen_right = Ylab, Xlab pay_left, pay_right = Ypay_cents, Xpay_cents implied_choice = 2 if implied_choice_canonical == 'X' else 1 part.vars['scenario_final'] = chosen_scen part.vars['implemented_scenario_id'] = scenario_id_from_label(chosen_scen) part.vars['guaranteed_payment_cents'] = pay_cents # backward compatibility part.vars['wtp_base_payment_cents'] = pay_cents # canonical key # Store the drawn payment question details part.vars['wtp_pay_pair_label'] = f"{Xlab}{Ylab}" # e.g. "AB" part.vars['wtp_pay_amount_index'] = amount_index part.vars['wtp_pay_left_is_X'] = left_is_X part.vars['wtp_pay_implied_choice'] = implied_choice part.vars['wtp_pay_scenario_left'] = scen_left part.vars['wtp_pay_scenario_right'] = scen_right part.vars['wtp_pay_amount_left_cents'] = pay_left part.vars['wtp_pay_amount_right_cents'] = pay_right # Player bookkeeping fields player.impl_pair_index = pair_index player.impl_pair_label = f"{Xlab}{Ylab}" player.impl_amount_index = amount_index player.impl_scenario_left = scen_left player.impl_scenario_right = scen_right player.impl_amount_left_cents = pay_left player.impl_amount_right_cents = pay_right player.impl_implied_choice = implied_choice page_sequence = [WTPInstructions, WTPDecision, WTPScenarioDraw]