""" from otree.api import * class Constants(BaseConstants): name_in_url = 'Ambiguity_New' players_per_group = None # 3 个 task,每个最多 6 轮 = 18 轮 num_rounds = 18 # 三个起始“已知紫球概率”(百分比写成 0~1 之间) KNOWN_PROBS = [0.10, 0.50, 0.90] MAX_ROUNDS_PER_TASK = 6 class Subsession(BaseSubsession): def creating_session(self): for p in self.get_players(): if p.round_number == 1: # 为每个 task 建立自适应状态:区间 [lo, hi],当前 p,已进行轮次,是否完成,是否记录到无差点 state = {} for prob in Constants.KNOWN_PROBS: state[str(prob)] = dict( lo=0.0, # 下界(自适应会更新) hi=1.0, # 上界(自适应会更新) p=prob, # 当前展示的“已知紫球概率”(初始为 10%/50%/90%) rounds_done=0, done=False, indiff=None ) p.participant.vars['amb_state'] = state p.participant.vars['amb_order'] = [str(x) for x in Constants.KNOWN_PROBS] # 需要的话可在此 random.shuffle p.participant.vars['amb_task_index'] = 0 # 当前在第几个 task(0,1,2) class Group(BaseGroup): pass class Player(BasePlayer): # 记录每一轮的信息,便于导出与复现 active_task = models.StringField() # '0.1' / '0.5' / '0.9' round_in_task = models.IntegerField() # 1~6 shown_known_prob = models.FloatField() # 当前这一轮展示的“已知紫球概率”(0~1) choice = models.StringField( choices=[ ['K', 'Known'], ['U', 'Unknown'], ['I', 'Indifferent'], ], widget=widgets.RadioSelect ) # 三个 task 的“无差点”(最终) indiff_10 = models.FloatField() indiff_50 = models.FloatField() indiff_90 = models.FloatField() def _get_active_key(self): order = self.participant.vars['amb_order'] idx = self.participant.vars['amb_task_index'] return order[idx] # '0.1' / '0.5' / '0.9' def _get_state(self): return self.participant.vars['amb_state'][self._get_active_key()] def prepare_round(self): #在进入页面前,填充当轮需要展示的 task、轮次与概率。 # 如果当前 task 完成,推进到下一个未完成的 task while self.participant.vars['amb_task_index'] < len(Constants.KNOWN_PROBS): key = self._get_active_key() st = self.participant.vars['amb_state'][key] if (st['done'] is True) or (st['rounds_done'] >= Constants.MAX_ROUNDS_PER_TASK): self.participant.vars['amb_task_index'] += 1 else: break # 若所有 task 都完成了,就不再准备(页面 is_displayed 会返回 False) if self.participant.vars['amb_task_index'] >= len(Constants.KNOWN_PROBS): return key = self._get_active_key() st = self.participant.vars['amb_state'][key] self.active_task = key self.round_in_task = st['rounds_done'] + 1 self.shown_known_prob = st['p'] def update_after_choice(self): # 根据被试选择更新自适应区间与下一轮概率;若 Indifferent 或达 6 轮则确定该 task 的无差点。 key = self.active_task st = self.participant.vars['amb_state'][key] # 安全:如果已完成就不再更新 if st['done']: return # 记录本轮已完成 st['rounds_done'] += 1 # 规则:选择 UNKNOWN → 说明已知概率还不够“诱人”?为了让已知继续有吸引力,需要提高 p # 选择 KNOWN → 说明已知概率太高了?需要降低 p # 于是我们用二分法缩小 [lo, hi],p 取区间中值 if self.choice == 'I': st['indiff'] = st['p'] st['done'] = True else: if self.choice == 'U': st['lo'] = max(st['lo'], st['p']) elif self.choice == 'K': st['hi'] = min(st['hi'], st['p']) # 缩小区间并设定下一轮 p st['p'] = (st['lo'] + st['hi']) / 2 # 如果已经做满 6 轮仍未 I,则以当前区间中点作为“无差点” if st['rounds_done'] >= Constants.MAX_ROUNDS_PER_TASK: st['indiff'] = st['p'] st['done'] = True # 把最终结果写进 Player 字段(仅当 task 结束时) if st['done']: val = st['indiff'] f = float(key) if abs(f - 0.10) < 1e-9: self.indiff_10 = val elif abs(f - 0.50) < 1e-9: self.indiff_50 = val elif abs(f - 0.90) < 1e-9: self.indiff_90 = val """ from otree.api import * class Constants(BaseConstants): name_in_url = 'Ambiguity_New' players_per_group = None # 三个任务,每个最多 6 轮 = 18 总轮数 num_rounds = 18 MAX_ROUNDS_PER_TASK = 6 # 三个任务的初始已知概率(紫色的比例) # Task1: 0.50 (2色, 紫中奖) # Task2: 0.10 (10色, 紫中奖) # Task3: 0.90 (10色, 非紫中奖) TASKS = [ {'key': 'task1', 'start_p': 0.50, 'mode': '2color', 'prize': 'purple'}, {'key': 'task2', 'start_p': 0.10, 'mode': '10color', 'prize': 'purple'}, {'key': 'task3', 'start_p': 0.10, 'mode': '10color', 'prize': 'nonpurple'}, ] class Subsession(BaseSubsession): def creating_session(self): for p in self.get_players(): if p.round_number == 1: state = {} for t in Constants.TASKS: state[t['key']] = dict( lo=0.0, hi=1.0, p=t['start_p'], rounds_done=0, done=False, indiff=None, ) p.participant.vars['amb_state'] = state p.participant.vars['amb_order'] = [t['key'] for t in Constants.TASKS] p.participant.vars['amb_task_index'] = 0 # payment-related placeholders p.participant.vars['amb_last_choice'] = {} p.participant.vars['amb_indiff_reached'] = {} p.participant.vars['amb_payoff_set'] = False class Group(BaseGroup): pass class Player(BasePlayer): active_task = models.StringField() round_in_task = models.IntegerField() shown_known_prob = models.FloatField() choice = models.StringField( choices=[['K', 'Known'], ['U', 'Unknown'], ['I', 'Indifferent']], widget=widgets.RadioSelect, ) # 记录三个任务的 indifference point indiff_task1 = models.FloatField() indiff_task2 = models.FloatField() indiff_task3 = models.FloatField() amb_selected_task = models.StringField(blank=True) amb_selected_box = models.StringField(blank=True) # 'U' or 'K' amb_selected_prob = models.FloatField(blank=True, null=True) amb_selected_draw = models.FloatField(blank=True, null=True) amb_result = models.StringField(blank=True) # 'win' or 'lose' amb_payoff = models.CurrencyField(blank=True, null=True) # define variables for control questions q1a = models.IntegerField( choices=[[1, 'Box U'], [2, 'Box K'], [3, 'Either Box U or Box K']], label=' a) From which box was the ball drawn?', ) q1b = models.CurrencyField( label='b) Please enter the amount you will earn. ', min=0 ) q2 = models.IntegerField( choices=[[1, '0%'], [2, '50%'], [3, '75%'], [4, '100%']], label=' What is the chance of winning $5 in this case?', ) incorrect_attempts = models.IntegerField(initial=0) def _get_active_key(self): order = self.participant.vars['amb_order'] idx = self.participant.vars['amb_task_index'] return order[idx] if idx < len(order) else None def _get_state(self): return self.participant.vars['amb_state'][self._get_active_key()] def prepare_round(self): """进入页面前,决定当前 task、轮次和展示概率""" # 如果当前 task 完成,跳到下一个 while self.participant.vars['amb_task_index'] < len(Constants.TASKS): key = self._get_active_key() st = self.participant.vars['amb_state'][key] if st['done'] or st['rounds_done'] >= Constants.MAX_ROUNDS_PER_TASK: self.participant.vars['amb_task_index'] += 1 else: break if self.participant.vars['amb_task_index'] >= len(Constants.TASKS): return # 所有任务完成 key = self._get_active_key() st = self.participant.vars['amb_state'][key] self.active_task = key self.round_in_task = st['rounds_done'] + 1 self.shown_known_prob = st['p'] """ def update_after_choice(self): #更新区间,确定下一轮 p key = self.active_task st = self.participant.vars['amb_state'][key] if st['done']: return st['rounds_done'] += 1 if self.choice == 'I': st['indiff'] = st['p'] st['done'] = True else: # ---- 仅在 task3(非紫中奖,赢概率=1-p)时反转方向 ---- if key == 'task3': # 选 K -> 下一轮应提高 p(降低 1-p) # 选 U -> 下一轮应降低 p(提高 1-p) if self.choice == 'K': st['lo'] = max(st['lo'], st['p']) elif self.choice == 'U': st['hi'] = min(st['hi'], st['p']) else: # task1/task2:紫中奖(赢概率=p),保持你原来的逻辑不变 if self.choice == 'U': st['lo'] = max(st['lo'], st['p']) elif self.choice == 'K': st['hi'] = min(st['hi'], st['p']) # 区间中点作为下一轮 p st['p'] = (st['lo'] + st['hi']) / 2 # 满 6 轮仍未 I:以当前 p 作为无差点并结束 if st['rounds_done'] >= Constants.MAX_ROUNDS_PER_TASK: st['indiff'] = st['p'] st['done'] = True # 写进 Player 字段 if st['done']: if key == 'task1': self.indiff_task1 = st['indiff'] elif key == 'task2': self.indiff_task2 = st['indiff'] elif key == 'task3': self.indiff_task3 = st['indiff'] """ def update_after_choice(self): """更新区间,确定下一轮 p,并记录 payment 所需信息""" key = self.active_task st = self.participant.vars['amb_state'][key] if st['done']: return st['rounds_done'] += 1 # record last actual choice in this task self.participant.vars['amb_last_choice'][key] = self.choice if self.choice == 'I': st['indiff'] = st['p'] st['done'] = True self.participant.vars['amb_indiff_reached'][key] = True else: self.participant.vars['amb_indiff_reached'][key] = False # task3: win on non-purple => reverse direction if key == 'task3': if self.choice == 'K': st['lo'] = max(st['lo'], st['p']) elif self.choice == 'U': st['hi'] = min(st['hi'], st['p']) else: # task1/task2: win on purple if self.choice == 'U': st['lo'] = max(st['lo'], st['p']) elif self.choice == 'K': st['hi'] = min(st['hi'], st['p']) st['p'] = (st['lo'] + st['hi']) / 2 if st['rounds_done'] >= Constants.MAX_ROUNDS_PER_TASK: st['indiff'] = st['p'] st['done'] = True if st['done']: if key == 'task1': self.indiff_task1 = st['indiff'] elif key == 'task2': self.indiff_task2 = st['indiff'] elif key == 'task3': self.indiff_task3 = st['indiff'] def set_ambiguity_payoff(self): pv = self.participant.vars # avoid setting twice across rounds/pages if pv.get('amb_payoff_set', False): return selected_task = pv.get('amb_task_to_pay') state = pv['amb_state'][selected_task] indiff = state.get('indiff') last_choice = pv.get('amb_last_choice', {}).get(selected_task) indiff_reached = pv.get('amb_indiff_reached', {}).get(selected_task, False) # decide which box is used for payment if indiff_reached: selected_box = pv.get('amb_box_if_indiff') # random U/K from Welcome else: selected_box = last_choice # use 6th-round actual choice # determine winning probability # For K: probability depends on selected task and indifference point # For U: use 0.5 baseline, resolved with the same draw mechanism if selected_box == 'K': if selected_task in ('task1', 'task2'): selected_prob = indiff elif selected_task == 'task3': selected_prob = 1 - indiff else: selected_prob = 0.5 else: selected_prob = 0.5 draw = pv.get('amb_random_draw', 0.0) if draw <= selected_prob: payoff = c(15) result = 'win' else: payoff = c(0) result = 'lose' self.amb_selected_task = selected_task self.amb_selected_box = selected_box self.amb_selected_prob = selected_prob self.amb_selected_draw = draw self.amb_result = result self.amb_payoff = payoff pv['payoff_ambig'] = payoff pv['amb_selected_task'] = selected_task pv['amb_selected_box'] = selected_box pv['amb_selected_prob'] = selected_prob pv['amb_selected_draw'] = draw pv['amb_result'] = result pv['amb_final_bonus'] = payoff pv['amb_payoff_set'] = True