from otree.api import (
Currency,
cu,
currency_range,
models,
widgets,
BaseConstants,
BaseSubsession,
BaseGroup,
BasePlayer,
ExtraModel,
WaitPage,
Page,
read_csv,
)
import units
import shared_out
import csv
import json
import os
import random
import math
import time
import threading
from contextlib import contextmanager
doc = ''
class C(BaseConstants):
# built-in constants
NAME_IN_URL = 'hiring_funnel'
PLAYERS_PER_GROUP = None
NUM_ROUNDS = 13 # 1 practice + 12 main
# user-defined constants
NUM_MAIN_ROUNDS = 12
NUM_CANDIDATES = 10
MU = 50
SIGMA = 15
SIGMA_RESUME = 15
SIGMA_ZOOM = 15
SIGMA_INPERSON = 15
DEFAULT_REWARD = 4.0
BONUS_RATE = 0.05
MAX_REWARD = 10
GATE1_SELECT = 5
GATE2_SELECT = 3
CANDIDATE_LABELS = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J')
class Subsession(BaseSubsession):
pass
class Group(BaseGroup):
pass
class Player(BasePlayer):
# Prolific tracking (captured on first page)
prolific_pid = models.StringField(blank=True, label='Prolific ID')
prolific_pid_from_url = models.StringField(blank=True)
prolific_pid_mismatch = models.BooleanField(initial=False)
prolific_pid_link_missing = models.BooleanField(initial=False)
prolific_study_id = models.StringField(blank=True, label='Study ID (optional)')
prolific_session_id = models.StringField(blank=True, label='Session ID (optional)')
# Comprehension check questions
Q111_instructions_willing = models.IntegerField(
label='Sometimes survey takers rush through the questions without reading any instructions. Doing that ruins our results. Are you willing to read the instructions on the next pages?',
choices=[
(1, 'No'),
(2, 'Yes'),
],
widget=widgets.RadioSelect
)
# Comprehension error counters (number of incorrect attempts)
comp_mission_errors = models.IntegerField(initial=0)
comp_score_errors = models.IntegerField(initial=0)
comp_decision_errors = models.IntegerField(initial=0)
comp_advance_errors = models.IntegerField(initial=0)
comp_bonus_errors = models.IntegerField(initial=0)
# Comprehension trial counters (number of submissions until passing)
trial_preinstructions = models.IntegerField(initial=0)
trial_instructions_part1 = models.IntegerField(initial=0)
trial_instructions_part2 = models.IntegerField(initial=0)
trial_comprehension_page = models.IntegerField(initial=0)
# Per-question answer-attempt counters (submitted with a non-empty answer)
trial_bonus_answered = models.IntegerField(initial=0)
trial_decision_answered = models.IntegerField(initial=0)
trial_advance_answered = models.IntegerField(initial=0)
Q_ultimate_mission = models.IntegerField(
label='What is your ultimate objective in every single hiring case?',
choices=[
(1, 'To pick the candidate with the best Resume score.'),
(2, 'To pick the candidate with the highest True Quality.'),
(3, 'To pick the candidate I would want to have a beer with.'),
(4, 'To pick randomly and finish quickly.'),
],
widget=widgets.RadioSelect,
blank=True
)
Q_decision_process = models.IntegerField(
label='How will you make hiring decisions in this study?',
choices=[
(1, 'I will only see the Resume scores and must choose a hire.'),
(2, 'I will see only the In-person interview scores and must choose a hire.'),
(3, 'I will see scores across all three stages (Resume, Zoom, In-person) and then choose one hire.'),
(4, 'I must eliminate candidates based on their Resume before seeing any other scores.'),
],
widget=widgets.RadioSelect,
blank=True
)
Q_score_interpretation = models.IntegerField(
label='How should you interpret the scores (Resume, Zoom, In-person) you see?',
choices=[
(1, 'They are noisy clues that are usually helpful, but sometimes wrong.'),
(2, 'They are perfect facts. I can always trust them 100%.'),
(3, 'They are useless. I should just guess the true quality by rules of thumb.'),
],
widget=widgets.RadioSelect,
blank=True
)
Q_bonus_calculation = models.IntegerField(
label='How is your final cash bonus calculated?',
choices=[
(1, 'It is the AVERAGE true quality of all 12 people I hired.'),
(2, 'It depends on the true quality of the ONE random hire I made during the study.'),
(3, 'It depends only on the true quality of the very LAST candidate I hire in the final round.'),
],
widget=widgets.RadioSelect,
blank=True
)
Q_advance_candidate = models.IntegerField(
label='What happens when you advance a candidate to the next round?',
choices=[
(1, 'I immediately hire them.'),
(2, 'I get more information (a new score) about them.'),
(3, 'Their previous scores are deleted.'),
],
widget=widgets.RadioSelect,
blank=True
)
# Practice round data (per round stored at participant level, but accessible here)
practice_hired_candidate = models.StringField(blank=True)
practice_hired_trueq = models.IntegerField(blank=True)
practice_reward = models.StringField(blank=True)
# Main round decision data (per round)
gate1_choices = models.LongStringField(blank=True, help_text="JSON list of candidates selected at Gate 1")
gate2_choices = models.LongStringField(blank=True, help_text="JSON list of candidates selected at Gate 2")
gate1_initial_checked = models.LongStringField(blank=True, help_text="JSON list of checkboxes pre-selected on Gate 1 page load")
gate2_initial_checked = models.LongStringField(blank=True, help_text="JSON list of checkboxes pre-selected on Gate 2 page load")
gate2_initial_disabled_checked = models.LongStringField(blank=True, help_text="JSON list of disabled-yet-checked boxes on Gate 2 page load")
hired_candidate = models.StringField(blank=True, help_text="Final hired candidate label (A-J)")
hired_trueq = models.IntegerField(blank=True)
reward_str = models.StringField(blank=True, help_text="Currency string for this round")
gate1_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of Gate 1 decision context")
gate2_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of Gate 2 decision context")
hire_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of final hiring decision context")
# Stage duration tracking (seconds)
practice_stage1_secs = models.FloatField(blank=True)
practice_stage2_secs = models.FloatField(blank=True)
practice_stage3_secs = models.FloatField(blank=True)
main_stage1_secs = models.FloatField(blank=True)
main_stage2_secs = models.FloatField(blank=True)
main_stage3_secs = models.FloatField(blank=True)
practice_gate1_secs = models.FloatField(blank=True)
practice_gate1_after_secs = models.FloatField(blank=True)
practice_gate2_secs = models.FloatField(blank=True)
practice_gate2_after_secs = models.FloatField(blank=True)
practice_gate3_secs = models.FloatField(blank=True)
main_gate1_secs = models.FloatField(blank=True)
main_gate1_after_secs = models.FloatField(blank=True)
main_gate2_secs = models.FloatField(blank=True)
main_gate2_after_secs = models.FloatField(blank=True)
main_gate3_secs = models.FloatField(blank=True)
# Instruction/comprehension page durations (seconds)
preinstructions_secs = models.FloatField(blank=True)
instructions_part1_secs = models.FloatField(blank=True)
instructions_part2_secs = models.FloatField(blank=True)
comprehension_secs = models.FloatField(blank=True)
# Post-task survey
resume_weight = models.IntegerField(blank=True, min=0, max=100)
zoom_weight = models.IntegerField(blank=True, min=0, max=100)
inperson_weight = models.IntegerField(blank=True, min=0, max=100)
open_ended_explanation = models.LongStringField(blank=True)
# Demographics
has_hiring_experience = models.IntegerField(
label='Do you have any experience with a real-world hiring task?',
choices=[
(1, 'Yes, I have.'),
(2, 'No, I don\'t.'),
],
widget=widgets.RadioSelect,
blank=True
)
hiring_experience_type = models.StringField(
label='Which of these describe your hiring experience the best?',
choices=[
('it', 'Hiring in an IT company'),
('academic', 'Faculty hiring in an academic school'),
('other', 'None of these: Please describe it.'),
],
widget=widgets.RadioSelect,
blank=True
)
hiring_experience_other = models.LongStringField(
label='Please describe your hiring experience:',
blank=True
)
age_range = models.StringField(
label='What is your age range?',
choices=[
('18-25', '18-25'),
('26-35', '26-35'),
('36-45', '36-45'),
('45+', '45+'),
],
widget=widgets.RadioSelect,
blank=True
)
gender = models.StringField(
label='What is your gender?',
choices=[
('female', 'Female'),
('male', 'Male'),
('other', 'Other'),
('unwilling', 'Prefer not to say'),
],
widget=widgets.RadioSelect,
blank=True
)
device_type = models.StringField(
label='On what type of device are you completing this survey?',
choices=[
('computer', 'Computer'),
('smartphone', 'Smart Phone'),
('tablet', 'Tablet'),
('other', 'Other'),
],
widget=widgets.RadioSelect,
blank=True
)
survey_comments = models.LongStringField(
label='Do you have comments for us about this survey?',
blank=True
)
DATA_POOL_PATH = os.path.join(os.path.dirname(__file__), '..', 'pilot_data_clean_raw.csv')
PATCH_ASSIGNMENTS_PATH = os.path.join(os.path.dirname(__file__), '..', 'patch_assignments.csv')
SCORE_FIELDS = ('resume', 'zoom', 'inperson', 'true_q')
CONDITION_STANDARD = 'standard'
CONDITION_BLIND = 'blind'
# Temporary assignment override: set to CONDITION_BLIND or CONDITION_STANDARD.
# Set to None to restore alternating blind/standard assignment.
FORCED_CONDITION = os.environ.get('FORCED_CONDITION', None)
HOLD_MESSAGE_TEMPLATE = (
'We apologize for the inconvenience. The study is currently at capacity for new participants. '
'Please exit and return later to check availability, or check back in {minutes} minutes. '
'We appreciate your patience.'
)
def _warn_pool(message):
print(f"[hiring_funnel data pool] {message}")
_ASSIGNMENT_STATE_LOCK = threading.RLock()
@contextmanager
def _assignment_state_lock():
"""In-process lock for assignment state mutations (oTree 6 has no Django ORM transaction API)."""
_ASSIGNMENT_STATE_LOCK.acquire()
try:
yield
finally:
_ASSIGNMENT_STATE_LOCK.release()
def _get_config_int(config_dict, key, default=0, minimum=None):
raw = config_dict.get(key, default)
try:
value = int(raw)
except (TypeError, ValueError):
value = int(default)
if minimum is not None and value < minimum:
return minimum
return value
def _build_dataset_unique_id(dataset_row_number, dataset_cycle_number):
"""Stable dataset ID for analysis; shared by matched standard/blind participants."""
row = max(0, int(dataset_row_number or 0))
cycle = max(0, int(dataset_cycle_number or 0))
return f'DS{row:03d}_C{cycle:03d}'
def _coerce_forced_condition(raw_condition):
"""Normalize forced-condition values from constants/env strings."""
if raw_condition in (CONDITION_STANDARD, CONDITION_BLIND):
return raw_condition
if isinstance(raw_condition, str):
token = raw_condition.strip().lower()
if token in (CONDITION_STANDARD, CONDITION_BLIND):
return token
if token in ('condition_standard', 'standard_condition', '1'):
return CONDITION_STANDARD
if token in ('condition_blind', 'blind_condition', '2'):
return CONDITION_BLIND
if raw_condition == 1:
return CONDITION_STANDARD
if raw_condition == 2:
return CONDITION_BLIND
return None
def _get_assignment_config(session):
hold_wait_minutes = _get_config_int(
session.config,
'hold_wait_minutes',
default=os.environ.get('HOLD_WAIT_MINUTES', 10),
minimum=1,
)
return dict(
hold_wait_minutes=hold_wait_minutes,
)
def _empty_matching_state():
return dict(
standard_assigned_count=0,
completed_standard_codes=[],
available_standard_codes=[],
blind_to_standard_matches={},
standard_to_blind_matches={},
total_matched_pairs=0,
reclaimed_blind_codes=[],
blind_completion_marked=False,
next_expected_condition=CONDITION_STANDARD,
phase1_ended=False,
)
def _get_matching_state(session):
state = session.vars.get('matching_state')
if not isinstance(state, dict):
state = _empty_matching_state()
session.vars['matching_state'] = state
return state
def _save_matching_state(session, state):
state['completed_standard_count'] = len(state.get('completed_standard_codes', []))
session.vars['matching_state'] = state
# Top-level mirrors for easy visibility in oTree admin panel
session.vars['available_blind_slots'] = len(state.get('available_standard_codes', []))
session.vars['total_matched_pairs'] = int(state.get('total_matched_pairs', 0))
session.vars['completed_standard_count'] = len(state.get('completed_standard_codes', []))
def _matching_state_snapshot(session):
state = _get_matching_state(session)
return dict(
standard_assigned_count=int(state.get('standard_assigned_count', 0)),
completed_standard_count=len(state.get('completed_standard_codes', [])),
available_standard_count=len(state.get('available_standard_codes', [])),
blind_match_count=len(state.get('blind_to_standard_matches', {})),
next_expected_condition=state.get('next_expected_condition', CONDITION_STANDARD),
phase1_ended=bool(state.get('phase1_ended', False)),
)
def _debug_matching_state(session, event='state'):
if not get_config_bool(session.config, 'debug_matching_state', default=False):
return
snapshot = _matching_state_snapshot(session)
_warn_pool(
f"[matching_debug] event={event} "
f"standard_assigned={snapshot['standard_assigned_count']} "
f"completed_standard={snapshot['completed_standard_count']} "
f"available_standard={snapshot['available_standard_count']} "
f"blind_matches={snapshot['blind_match_count']} "
f"next_expected={snapshot['next_expected_condition']} "
f"phase1_ended={snapshot['phase1_ended']}"
)
def _get_session_participant_by_code(session, participant_code):
for participant in session.get_participants():
if participant.code == participant_code:
return participant
return None
def _get_app_player_for_round(participant, round_number):
"""Return this app's Player for the given participant/round in oTree 6."""
target_round = int(round_number)
try:
players = participant.get_players()
except Exception:
players = []
for app_player in players:
if isinstance(app_player, Player) and int(getattr(app_player, 'round_number', 0)) == target_round:
return app_player
for app_player in players:
if (
getattr(getattr(app_player, '_meta', None), 'app_label', None) == C.NAME_IN_URL
and int(getattr(app_player, 'round_number', 0)) == target_round
):
return app_player
return None
def _build_blind_replay_data_from_standard(standard_participant):
if not standard_participant.practice_data_json or not standard_participant.main_data_json:
return None
practice_data = get_round_data(standard_participant, 0)
practice_player = _get_app_player_for_round(standard_participant, 1)
if not practice_player:
return None
practice_labels = sanitize_choices(
parse_choice_list(practice_player.field_maybe_none('gate2_choices')),
get_candidate_labels_in_order(practice_data),
)
if len(practice_labels) != C.GATE2_SELECT:
return None
blind_practice = {label: dict(practice_data[label]) for label in practice_labels}
blind_main = []
for round_num in range(2, C.NUM_ROUNDS + 1):
round_data = get_round_data(standard_participant, round_num)
round_player = _get_app_player_for_round(standard_participant, round_num)
if not round_player:
return None
round_labels = sanitize_choices(
parse_choice_list(round_player.field_maybe_none('gate2_choices')),
get_candidate_labels_in_order(round_data),
)
if len(round_labels) != C.GATE2_SELECT:
return None
blind_main.append({label: dict(round_data[label]) for label in round_labels})
if len(blind_main) != C.NUM_MAIN_ROUNDS:
return None
return dict(practice=blind_practice, main=blind_main)
def _mark_standard_completed_if_ready(player: 'BasePlayer'):
participant = player.participant
if participant.vars.get('condition') != CONDITION_STANDARD:
return
if participant.vars.get('standard_completion_marked'):
return
if participant.vars.get('standard_reclaimed_for_rematch'):
return
final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS)
if not final_round_player:
return
if not final_round_player.field_maybe_none('hired_candidate'):
return
replay_payload = _build_blind_replay_data_from_standard(participant)
if replay_payload is None:
return
participant.vars['blind_replay_practice_json'] = json.dumps(replay_payload['practice'])
participant.vars['blind_replay_main_json'] = json.dumps(replay_payload['main'])
with _assignment_state_lock():
state = _get_matching_state(player.session)
code = participant.code
completed_codes = state.get('completed_standard_codes', [])
if code not in completed_codes:
completed_codes.append(code)
available_codes = state.get('available_standard_codes', [])
standard_to_blind = state.get('standard_to_blind_matches', {})
if code not in standard_to_blind and code not in available_codes:
available_codes.append(code)
state['completed_standard_codes'] = completed_codes
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'standard_completed:{participant.code}')
participant.vars['standard_completion_marked'] = True
participant.vars['standard_available_for_match'] = True
def _register_standard_completion_if_ready(participant, session, state):
"""Idempotently register a completed standard participant into the available blind-match pool."""
if participant.vars.get('condition') != CONDITION_STANDARD:
return False
if participant.vars.get('standard_completion_marked'):
return False
if participant.vars.get('standard_reclaimed_for_rematch'):
return False
final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS)
if not final_round_player:
return False
if not final_round_player.field_maybe_none('hired_candidate'):
return False
replay_payload = _build_blind_replay_data_from_standard(participant)
if replay_payload is None:
return False
participant.vars['blind_replay_practice_json'] = json.dumps(replay_payload['practice'])
participant.vars['blind_replay_main_json'] = json.dumps(replay_payload['main'])
completed_codes = state.get('completed_standard_codes', [])
available_codes = state.get('available_standard_codes', [])
standard_to_blind = state.get('standard_to_blind_matches', {})
code = participant.code
if code not in completed_codes:
completed_codes.append(code)
if code not in standard_to_blind and code not in available_codes:
available_codes.append(code)
state['completed_standard_codes'] = completed_codes
state['available_standard_codes'] = available_codes
participant.vars['standard_completion_marked'] = True
participant.vars['standard_available_for_match'] = True
return True
def _reclaim_unfinished_blind_matches(session, state):
"""
Return Standard codes to available_standard_codes for any Blind participant
who was assigned but did not complete (no hired_candidate on round 13).
Only called when FORCED_CONDITION switches to Standard, i.e. after the
Blind batch is closed. Idempotent.
"""
blind_to_standard = state.get('blind_to_standard_matches', {})
standard_to_blind = state.get('standard_to_blind_matches', {})
available_codes = state.get('available_standard_codes', [])
reclaimed = []
for blind_code, standard_code in list(blind_to_standard.items()):
blind_participant = _get_session_participant_by_code(session, blind_code)
if not blind_participant:
continue
if blind_participant.vars.get('condition') != CONDITION_BLIND:
continue
final_player = _get_app_player_for_round(blind_participant, C.NUM_ROUNDS)
if not final_player:
continue
hired = final_player.field_maybe_none('hired_candidate')
if hired:
continue # Completed — do not reclaim
# Blind participant did not complete — return standard code to pool
if standard_code not in available_codes:
available_codes.append(standard_code)
reclaimed.append((blind_code, standard_code))
# Remove from match registries so the reclaimed standard code
# can be matched to a new Blind participant cleanly
blind_to_standard.pop(blind_code, None)
standard_to_blind.pop(standard_code, None)
# Mark the standard participant to prevent defensive completion scan
# from re-adding this code to available_codes a second time.
standard_participant = _get_session_participant_by_code(session, standard_code)
if standard_participant:
standard_participant.vars['standard_reclaimed_for_rematch'] = True
# Invalidate the dropout Blind participant so they cannot re-enter.
blind_participant.vars['assignment_finalized'] = False
blind_participant.vars['condition'] = ''
reclaimed_list = state.get('reclaimed_blind_codes', [])
if blind_code not in reclaimed_list:
reclaimed_list.append(blind_code)
state['reclaimed_blind_codes'] = reclaimed_list
if reclaimed:
state['available_standard_codes'] = available_codes
state['blind_to_standard_matches'] = blind_to_standard
state['standard_to_blind_matches'] = standard_to_blind
_warn_pool(f'[reclaim] Returned {len(reclaimed)} standard codes to pool: {reclaimed}')
return len(reclaimed)
def _mark_blind_completed_if_ready(player: 'BasePlayer'):
participant = player.participant
if participant.vars.get('condition') != CONDITION_BLIND:
return
if participant.vars.get('blind_completion_marked'):
return
final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS)
if not final_round_player:
return
if not final_round_player.field_maybe_none('hired_candidate'):
return
with _assignment_state_lock():
state = _get_matching_state(player.session)
state['total_matched_pairs'] = int(state.get('total_matched_pairs', 0)) + 1
_save_matching_state(player.session, state)
participant.vars['blind_completion_marked'] = True
def _apply_assignment_to_participant(participant, condition, dataset_index, dataset_cycle_number, dataset_row_number,
matched_standard_code=None):
dataset_package = DATASET_PACKAGES[dataset_index] if DATASET_PACKAGES and 0 <= dataset_index < len(DATASET_PACKAGES) else None
def _generate_new_standard_sequence():
practice = generate_candidate_data()
main = [generate_candidate_data() for _ in range(C.NUM_MAIN_ROUNDS)]
return practice, main
if condition == CONDITION_STANDARD:
# Dynamic assignment: standard-condition participants must receive newly generated data.
if get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True) and not participant.session.config.get('is_patch_session', False):
practice_data, main_data = _generate_new_standard_sequence()
else:
if not dataset_package:
raise RuntimeError('No dataset package available for non-dynamic standard assignment.')
practice_data = dataset_package['standard_practice']
main_data = dataset_package['standard_main']
else:
if matched_standard_code:
standard_participant = _get_session_participant_by_code(participant.session, matched_standard_code)
if not standard_participant:
raise RuntimeError(f'Matched standard participant not found: {matched_standard_code}')
practice_json = standard_participant.vars.get('blind_replay_practice_json', '')
main_json = standard_participant.vars.get('blind_replay_main_json', '')
if not practice_json or not main_json:
raise RuntimeError('Matched standard participant has no replay payload for blind assignment.')
practice_data = json.loads(practice_json)
main_data = json.loads(main_json)
else:
raise RuntimeError('Unmatched blind assignment is not supported in forced-condition mode.')
participant.vars['condition'] = condition
participant.condition = condition
participant.vars['dataset_index'] = dataset_index
participant.vars['dataset_cycle_number'] = dataset_cycle_number
participant.vars['dataset_row_number'] = dataset_row_number
participant.vars['dataset_csv_row_number'] = dataset_package['csv_row_number'] if dataset_package else 0
dataset_unique_id = _build_dataset_unique_id(dataset_row_number, dataset_cycle_number)
participant.vars['dataset_unique_id'] = dataset_unique_id
participant.dataset_unique_id = dataset_unique_id
participant.vars['matched_standard_code'] = matched_standard_code or ''
participant.vars['assignment_finalized'] = True
participant.vars['assignment_hold'] = False
participant.vars['assignment_hold_message'] = ''
participant.practice_data_json = json.dumps(practice_data)
participant.main_data_json = json.dumps(main_data)
def _set_assignment_hold(participant, wait_minutes):
participant.vars['assignment_hold'] = True
participant.vars['assignment_hold_message'] = HOLD_MESSAGE_TEMPLATE.format(minutes=wait_minutes)
def _ensure_dynamic_assignment(player: 'BasePlayer'):
participant = player.participant
with _assignment_state_lock():
config = _get_assignment_config(player.session)
state = _get_matching_state(player.session)
reclaimed_codes = set(state.get('reclaimed_blind_codes', []))
if participant.code in reclaimed_codes:
_set_assignment_hold(participant, config['hold_wait_minutes'])
participant.vars['assignment_hold_message'] = (
'This study session is no longer available. '
'Please return your submission on Prolific.'
)
return dict(
assigned=False,
hold=True,
message=participant.vars['assignment_hold_message'],
)
if (
participant.vars.get('assignment_finalized') is True
and participant.vars.get('condition') in (CONDITION_STANDARD, CONDITION_BLIND)
):
participant.vars['assignment_hold'] = False
participant.vars['assignment_hold_message'] = ''
return dict(assigned=True, hold=False, message='')
was_phase1_ended = bool(state.get('phase1_ended', False))
# Three-phase dynamic thresholds/caps (unused in forced path above).
initial_threshold = _get_config_int(
player.session.config,
'initial_threshold',
default=os.environ.get('INITIAL_THRESHOLD', 10),
minimum=1,
)
standard_cap = _get_config_int(
player.session.config,
'standard_cap',
default=os.environ.get('STANDARD_CAP', 110),
minimum=1,
)
if initial_threshold >= standard_cap:
raise RuntimeError(
f'Configuration error: initial_threshold ({initial_threshold}) must be less than '
f'standard_cap ({standard_cap}).'
)
# Refresh completion pool defensively so blind availability is accurate even if a page hook was missed.
any_new_completion = False
for session_participant in player.session.get_participants():
if _register_standard_completion_if_ready(session_participant, player.session, state):
any_new_completion = True
# Forced assignment override for dynamic mode.
forced = _coerce_forced_condition(FORCED_CONDITION)
print(f"[FORCED_CONDITION_DEBUG] raw='{FORCED_CONDITION}' type={type(FORCED_CONDITION)} forced='{forced}'", flush=True)
_debug_matching_state(
player.session,
event=f'forced_resolved:{participant.code}:raw={FORCED_CONDITION}:resolved={forced}',
)
if forced in (CONDITION_STANDARD, CONDITION_BLIND):
# When switching to Standard, reclaim unfinished Blind participants first.
if forced == CONDITION_STANDARD:
_reclaim_unfinished_blind_matches(player.session, state)
if forced == CONDITION_STANDARD:
standard_assigned = int(state.get('standard_assigned_count', 0))
dataset_count = max(1, len(DATASET_PACKAGES))
dataset_index = standard_assigned % dataset_count
dataset_cycle_number = standard_assigned // dataset_count
dataset_row_number = dataset_index + 1
_apply_assignment_to_participant(
participant,
condition=CONDITION_STANDARD,
dataset_index=dataset_index,
dataset_cycle_number=dataset_cycle_number,
dataset_row_number=dataset_row_number,
)
state['standard_assigned_count'] = standard_assigned + 1
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'assigned_forced_standard:{participant.code}')
return dict(assigned=True, hold=False, message='')
if forced == CONDITION_BLIND:
available_codes = list(state.get('available_standard_codes', []))
if not available_codes:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_forced_blind_no_available:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
matched_standard_code = available_codes.pop(0)
standard_participant = _get_session_participant_by_code(player.session, matched_standard_code)
if not standard_participant:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_forced_blind_missing_standard:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
dataset_index = int(standard_participant.vars.get('dataset_index', 0))
dataset_cycle_number = int(standard_participant.vars.get('dataset_cycle_number', 0))
dataset_row_number = int(standard_participant.vars.get('dataset_row_number', dataset_index + 1))
_apply_assignment_to_participant(
participant,
condition=CONDITION_BLIND,
dataset_index=dataset_index,
dataset_cycle_number=dataset_cycle_number,
dataset_row_number=dataset_row_number,
matched_standard_code=matched_standard_code,
)
blind_to_standard = state.get('blind_to_standard_matches', {})
standard_to_blind = state.get('standard_to_blind_matches', {})
blind_to_standard[participant.code] = matched_standard_code
standard_to_blind[matched_standard_code] = participant.code
state['blind_to_standard_matches'] = blind_to_standard
state['standard_to_blind_matches'] = standard_to_blind
state['available_standard_codes'] = available_codes
standard_participant.vars['standard_available_for_match'] = False
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'assigned_forced_blind:{participant.code}')
return dict(assigned=True, hold=False, message='')
# Three-phase dynamic thresholds/caps.
initial_threshold = _get_config_int(
player.session.config,
'initial_threshold',
default=os.environ.get('INITIAL_THRESHOLD', 2),
minimum=1,
)
standard_cap = _get_config_int(
player.session.config,
'standard_cap',
default=os.environ.get('STANDARD_CAP', 4),
minimum=1,
)
if initial_threshold >= standard_cap:
raise RuntimeError(
f'Configuration error: initial_threshold ({initial_threshold}) must be less than '
f'standard_cap ({standard_cap}).'
)
if any_new_completion and state.get('phase1_ended') and not was_phase1_ended:
state['next_expected_condition'] = CONDITION_BLIND
completed_count = len(state.get('completed_standard_codes', []))
if completed_count >= initial_threshold:
state['phase1_ended'] = True
if not was_phase1_ended:
state['next_expected_condition'] = CONDITION_BLIND
elif state.get('next_expected_condition') not in (CONDITION_STANDARD, CONDITION_BLIND):
state['next_expected_condition'] = CONDITION_BLIND
standard_assigned = int(state.get('standard_assigned_count', 0))
available_codes = list(state.get('available_standard_codes', []))
next_expected = state.get('next_expected_condition', CONDITION_STANDARD)
phase1_ended = bool(state.get('phase1_ended', False))
selected_condition = None
matched_standard_code = None
flip_expected = False
keep_blind_expected = False
if not phase1_ended:
if standard_assigned >= standard_cap:
_set_assignment_hold(participant, config['hold_wait_minutes'])
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_phase1:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
selected_condition = CONDITION_STANDARD
else:
if standard_assigned >= standard_cap:
# Phase 3: blind-only matching after cap.
state['next_expected_condition'] = CONDITION_BLIND
if available_codes:
selected_condition = CONDITION_BLIND
matched_standard_code = available_codes.pop(0)
else:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_phase3:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
elif next_expected == CONDITION_BLIND:
if available_codes:
selected_condition = CONDITION_BLIND
matched_standard_code = available_codes.pop(0)
flip_expected = True
else:
# No blind replay available: hold instead of assigning additional standard.
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
state['next_expected_condition'] = CONDITION_BLIND
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_waiting_blind_data:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
else:
# next expected standard
selected_condition = CONDITION_STANDARD
flip_expected = True
# Hard guard: never allow any new Standard assignment once cap is reached.
if selected_condition == CONDITION_STANDARD and standard_assigned >= standard_cap:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['next_expected_condition'] = CONDITION_BLIND
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_standard_cap_guard:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
if selected_condition == CONDITION_STANDARD:
# DATASET_PACKAGES is intentionally empty in forced-condition design.
# For dynamic standard assignment, data is freshly generated in
# _apply_assignment_to_participant, so these tracking fields are
# synthetic but stable.
dataset_index = 0
dataset_cycle_number = standard_assigned
dataset_row_number = 1
_apply_assignment_to_participant(
participant,
condition=CONDITION_STANDARD,
dataset_index=dataset_index,
dataset_cycle_number=dataset_cycle_number,
dataset_row_number=dataset_row_number,
)
state['standard_assigned_count'] = standard_assigned + 1
if phase1_ended:
if keep_blind_expected:
state['next_expected_condition'] = CONDITION_BLIND
elif flip_expected:
state['next_expected_condition'] = CONDITION_BLIND
else:
if not matched_standard_code:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_no_match:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
standard_participant = _get_session_participant_by_code(player.session, matched_standard_code)
if not standard_participant:
_set_assignment_hold(participant, config['hold_wait_minutes'])
state['available_standard_codes'] = available_codes
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'hold_missing_standard:{participant.code}')
return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message'])
dataset_index = int(standard_participant.vars.get('dataset_index', 0))
dataset_cycle_number = int(standard_participant.vars.get('dataset_cycle_number', 0))
dataset_row_number = int(standard_participant.vars.get('dataset_row_number', dataset_index + 1))
_apply_assignment_to_participant(
participant,
condition=CONDITION_BLIND,
dataset_index=dataset_index,
dataset_cycle_number=dataset_cycle_number,
dataset_row_number=dataset_row_number,
matched_standard_code=matched_standard_code,
)
blind_to_standard = state.get('blind_to_standard_matches', {})
standard_to_blind = state.get('standard_to_blind_matches', {})
blind_to_standard[participant.code] = matched_standard_code
standard_to_blind[matched_standard_code] = participant.code
state['blind_to_standard_matches'] = blind_to_standard
state['standard_to_blind_matches'] = standard_to_blind
state['available_standard_codes'] = available_codes
if flip_expected:
state['next_expected_condition'] = CONDITION_STANDARD
standard_participant.vars['standard_available_for_match'] = False
_save_matching_state(player.session, state)
_debug_matching_state(player.session, event=f'assigned_{selected_condition}:{participant.code}')
return dict(assigned=True, hold=False, message='')
def _load_dataset_packages():
return []
DATASET_PACKAGES = _load_dataset_packages()
def _load_patch_assignments():
"""Load ordered patch assignments from CSV for patch sessions."""
assignments = []
if not os.path.exists(PATCH_ASSIGNMENTS_PATH):
return assignments
with open(PATCH_ASSIGNMENTS_PATH, 'r', encoding='utf-8-sig', newline='') as csv_file:
reader = csv.DictReader(csv_file)
for row_num, row in enumerate(reader, start=2):
try:
patch_order = int(row.get('patch_order'))
missing_id_in_session = int(row.get('missing_id_in_session'))
dataset_index = int(row.get('dataset_index'))
dataset_row_number = int(row.get('dataset_row_number'))
dataset_cycle_number = int(row.get('dataset_cycle_number'))
except (TypeError, ValueError):
_warn_pool(f'Skipping patch row {row_num}: invalid integer field(s).')
continue
raw_condition = row.get('condition')
condition = (str(raw_condition).strip().lower() if raw_condition is not None else '')
if condition not in (CONDITION_STANDARD, CONDITION_BLIND):
_warn_pool(f'Skipping patch row {row_num}: invalid condition.')
continue
assignments.append(dict(
patch_order=patch_order,
missing_id_in_session=missing_id_in_session,
condition=condition,
dataset_index=dataset_index,
dataset_row_number=dataset_row_number,
dataset_cycle_number=dataset_cycle_number,
))
assignments.sort(key=lambda x: x['patch_order'])
_warn_pool(f'Loaded patch assignments: {len(assignments)} rows.')
return assignments
PATCH_ASSIGNMENTS = _load_patch_assignments()
def _get_patch_entry(participant):
patch_idx = max(0, int(participant.id_in_session) - 1)
if patch_idx >= len(PATCH_ASSIGNMENTS):
raise RuntimeError(
f'Patch assignment not found for id_in_session={participant.id_in_session}. '
f'Available patch rows={len(PATCH_ASSIGNMENTS)}.'
)
return PATCH_ASSIGNMENTS[patch_idx]
def _patch_assignment(participant):
"""Sequential patch assignment: participant 1->patch_order 1, etc."""
if not PATCH_ASSIGNMENTS:
raise RuntimeError('No patch assignments loaded from patch_assignments.csv.')
patch_entry = _get_patch_entry(participant)
dataset_index = patch_entry['dataset_index']
if dataset_index < 0 or dataset_index >= len(DATASET_PACKAGES):
raise RuntimeError(
f'Patch assignment has out-of-range dataset_index={dataset_index} '
f'for id_in_session={participant.id_in_session}.'
)
condition = patch_entry.get('condition')
if condition not in (CONDITION_STANDARD, CONDITION_BLIND):
raise RuntimeError(
f"Patch assignment missing/invalid 'condition' for id_in_session={participant.id_in_session}."
)
return (
condition,
dataset_index,
patch_entry.get('dataset_cycle_number', 0),
)
def _participant_assignment(participant):
"""Deterministic alternating assignment based on id_in_session.
Using zero-based `k = participant.id_in_session - 1`:
- even k => standard, odd k => blind
- dataset index cycles every 2 participants across ordered CSV rows
"""
if participant.session.config.get('is_patch_session', False):
if FORCED_CONDITION is not None:
raise RuntimeError('FORCED_CONDITION and is_patch_session cannot both be active.')
return _patch_assignment(participant)
if get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True):
# Assignment is deferred until participant entry (ProlificIntro) under serialized matching logic.
return None, None, None
k = max(0, int(participant.id_in_session) - 1)
forced = _coerce_forced_condition(FORCED_CONDITION)
if forced in (CONDITION_BLIND, CONDITION_STANDARD):
# In forced single-condition mode, advance one dataset row per participant.
condition = forced
dataset_index = 0
dataset_cycle_number = k
return condition, dataset_index, dataset_cycle_number
condition = CONDITION_STANDARD if (k % 2 == 0) else CONDITION_BLIND
dataset_index = 0
dataset_cycle_number = k // 2
return condition, dataset_index, dataset_cycle_number
# built-in hook function(s) (called automatically by oTree)
#
def creating_session(subsession: Subsession):
for player in subsession.get_players():
participant = player.participant
if subsession.round_number == 1:
if participant.label:
player.prolific_pid = participant.label
# Initialize dynamic matching state once per session.
if player.id_in_subsession == 1:
_get_assignment_config(subsession.session)
_save_matching_state(subsession.session, _get_matching_state(subsession.session))
participant.history_json = json.dumps([])
participant.selected_round = 0
participant.selected_candidate = ''
participant.selected_true_quality = 0
participant.selected_reward_string = '$0.00'
participant.dataset_unique_id = ''
participant.vars['condition'] = participant.vars.get('condition', '')
participant.vars['assignment_finalized'] = bool(participant.vars.get('assignment_finalized', False))
participant.vars['assignment_hold'] = False
participant.vars['assignment_hold_message'] = ''
participant.vars['actually_visited'] = False
participant.vars['blind_completion_marked'] = False
is_patch_session = participant.session.config.get('is_patch_session', False)
use_dynamic = get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True)
if use_dynamic and not is_patch_session:
# Deferred assignment mode: assign on first page entry after checking live state.
participant.vars['condition'] = ''
participant.vars['dataset_index'] = 0
participant.vars['dataset_cycle_number'] = 0
participant.vars['dataset_row_number'] = 0
participant.vars['dataset_csv_row_number'] = 0
participant.vars['dataset_unique_id'] = ''
participant.vars['matched_standard_code'] = ''
participant.vars['assignment_finalized'] = False
participant.practice_data_json = ''
participant.main_data_json = ''
continue
condition, dataset_index, dataset_cycle_number = _participant_assignment(participant)
dataset_package = DATASET_PACKAGES[dataset_index]
patch_entry = None
if participant.session.config.get('is_patch_session', False):
patch_entry = _get_patch_entry(participant)
if condition == CONDITION_STANDARD:
practice_data = dataset_package['standard_practice']
main_data = dataset_package['standard_main']
else:
practice_data = dataset_package['blind_practice']
main_data = dataset_package['blind_main']
participant.vars['condition'] = condition
participant.condition = condition
participant.vars['dataset_index'] = dataset_index
participant.vars['dataset_cycle_number'] = dataset_cycle_number
participant.vars['dataset_row_number'] = (
patch_entry['dataset_row_number'] if patch_entry else dataset_package['dataset_row_number']
)
participant.vars['dataset_csv_row_number'] = dataset_package['csv_row_number']
participant.vars['dataset_unique_id'] = _build_dataset_unique_id(
participant.vars['dataset_row_number'],
dataset_cycle_number,
)
participant.dataset_unique_id = participant.vars['dataset_unique_id']
participant.vars['matched_standard_code'] = ''
participant.vars['assignment_finalized'] = True
participant.practice_data_json = json.dumps(practice_data)
participant.main_data_json = json.dumps(main_data)
#
# the below function(s) are user-defined, not called by oTree
#
def generate_candidate_data():
"""Generate data for 10 candidates with true quality and noisy scores."""
result = {}
for i in range(C.NUM_CANDIDATES):
true_q = round(C.MU + random.gauss(0, 1) * C.SIGMA)
true_q = max(0, true_q)
# Generate noisy scores
resume = round(true_q + random.gauss(0, 1) * C.SIGMA_RESUME)
resume = max(0, resume)
zoom = round(true_q + random.gauss(0, 1) * C.SIGMA_ZOOM)
zoom = max(0, zoom)
inperson = round(true_q + random.gauss(0, 1) * C.SIGMA_INPERSON)
inperson = max(0, inperson)
result[C.CANDIDATE_LABELS[i]] = {
'true_q': true_q,
'resume': resume,
'zoom': zoom,
'inperson': inperson
}
return result
def calculate_reward(true_quality):
"""Calculate reward based on true quality of hired candidate."""
reward = min(C.MAX_REWARD, C.DEFAULT_REWARD + C.BONUS_RATE * true_quality)
return f"${reward:.2f}"
def calculate_quality_quantile(true_quality):
"""Calculate percentile of true quality under the normal distribution."""
z_score = (true_quality - C.MU) / C.SIGMA
# Standard normal CDF using error function: Phi(z) = 0.5 * (1 + erf(z / sqrt(2)))
return 0.5 * (1 + math.erf(z_score / math.sqrt(2))) * 100
def format_percentile(percentile):
"""Format percentile with ordinal suffix (e.g., 92.4th percentile)."""
rounded = round(percentile, 1)
int_part = int(rounded)
if 10 <= (int_part % 100) <= 20:
suffix = 'th'
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(int_part % 10, 'th')
return f"{rounded:.1f}{suffix} percentile"
def get_history(participant):
"""Get accumulated history of decisions."""
if participant.history_json:
return json.loads(participant.history_json)
return []
def add_to_history(participant, round_num, hired_candidate, true_q, reward_str, decision_context=None):
"""Add a decision to the history."""
history = get_history(participant)
entry = {
'round': round_num,
'hired_candidate': hired_candidate,
'true_q': true_q,
'reward': reward_str
}
if decision_context:
entry['decision_context'] = decision_context
history.append(entry)
participant.history_json = json.dumps(history)
def parse_choice_list(raw_value):
"""Parse stored JSON list fields safely."""
if not raw_value:
return []
try:
parsed = json.loads(raw_value)
return parsed if isinstance(parsed, list) else []
except (json.JSONDecodeError, TypeError):
return [raw_value] if isinstance(raw_value, str) and raw_value else []
def sanitize_choices(raw_choices, allowed_labels):
"""Keep unique choices in original order, restricted to allowed labels."""
if not isinstance(raw_choices, list):
return []
allowed = set(allowed_labels)
cleaned = []
seen = set()
for label in raw_choices:
if label in allowed and label not in seen:
cleaned.append(label)
seen.add(label)
return cleaned
def _get_form_feedback_store(participant):
"""Participant-scoped form feedback store to avoid session-wide leakage."""
store = participant.vars.get('form_feedback')
if not isinstance(store, dict):
store = {}
participant.vars['form_feedback'] = store
return store
def get_form_feedback(player: 'BasePlayer', gate_key: str):
key = f"r{player.round_number}_{gate_key}"
store = _get_form_feedback_store(player.participant)
payload = store.get(key, {})
if not isinstance(payload, dict):
return '', []
error = payload.get('error', '') or ''
choices = payload.get('choices', [])
if not isinstance(choices, list):
choices = []
return error, choices
def set_form_feedback(player: 'BasePlayer', gate_key: str, error: str, choices):
key = f"r{player.round_number}_{gate_key}"
store = _get_form_feedback_store(player.participant)
store[key] = {
'error': error or '',
'choices': choices if isinstance(choices, list) else [],
}
player.participant.vars['form_feedback'] = store
def clear_form_feedback(player: 'BasePlayer', gate_key: str):
key = f"r{player.round_number}_{gate_key}"
store = _get_form_feedback_store(player.participant)
store.pop(key, None)
player.participant.vars['form_feedback'] = store
def build_visible_scores(data, labels, include_zoom=False, include_inperson=False):
"""Build score snapshot for labels with only scores visible at the current decision point."""
snapshot = []
for label in labels:
row = {
'label': label,
'resume': data[label]['resume'],
}
if include_zoom:
row['zoom'] = data[label]['zoom']
if include_inperson:
row['inperson'] = data[label]['inperson']
snapshot.append(row)
return snapshot
def select_final_payment(participant):
"""Select one main round for final payment."""
history = get_history(participant)
# Main rounds in this app are oTree rounds 2..13 (practice is round 0 in history).
main_rounds = [h for h in history if h['round'] > 1]
if main_rounds:
selected = random.choice(main_rounds)
# History uses oTree round numbers (practice=0, main=2..13).
# Convert to participant-facing main round index (1..12) for display/export consistency.
participant.selected_round = selected['round'] - 1
participant.selected_reward_string = selected['reward']
participant.selected_candidate = selected['hired_candidate']
participant.selected_true_quality = selected['true_q']
def get_candidate_labels_in_order(data):
"""Return candidate labels in stored dataset order."""
if not isinstance(data, dict):
return []
return [str(label) for label in data.keys()]
def get_round_data(participant, round_num=None):
"""Get candidate data for a specific round.
round_num uses oTree numbering in this app: practice=0, main=2..13.
"""
if round_num is None or round_num == 0:
return json.loads(participant.practice_data_json)
main_data = json.loads(participant.main_data_json)
index = round_num - 2
if index < 0 or index >= len(main_data):
raise IndexError(f'Main round index out of bounds: round_num={round_num}, index={index}')
return main_data[index]
def get_condition2_finalists(participant, round_num=None):
"""Return finalist labels for blind condition in a given round."""
if round_num is None:
round_num = 0
data = get_round_data(participant, round_num)
if isinstance(data, dict):
labels = get_candidate_labels_in_order(data)
if len(labels) >= C.GATE2_SELECT:
return labels[:C.GATE2_SELECT]
return []
def _normalize_condition_name(raw_condition, default=CONDITION_BLIND):
if isinstance(raw_condition, str):
token = raw_condition.strip().lower()
if token in (CONDITION_STANDARD, CONDITION_BLIND):
return token
if token == '1':
return CONDITION_STANDARD
if token == '2':
return CONDITION_BLIND
if raw_condition == 1:
return CONDITION_STANDARD
if raw_condition == 2:
return CONDITION_BLIND
return default
def calculate_progress_percentage(player: 'BasePlayer', page_name: str = '') -> int:
"""Calculate condition-aware progress through the study as a percentage.
Tracks progress page-by-page for accurate progress display."""
condition = get_condition(player)
round_num = player.round_number
# Define page order for each condition in Round 1
round1_pages_both = ['ProlificIntro', 'Consent', 'PreInstructionsDisclaimer', 'InstructionsPart1', 'Instructions', 'Comprehension', 'PracticeIntro']
round1_pages_standard = ['PracticeGate1', 'PracticeGate1After', 'PracticeGate2', 'PracticeGate2After', 'PracticeGate3', 'PracticePerformance']
round1_pages_blind = ['PracticeStage3Decision', 'PracticePerformance']
# Main round pages (rounds 2-13)
main_pages_standard = ['MainGate1', 'MainGate1After', 'MainGate2', 'MainGate2After', 'MainGate3', 'MainRoundPerformance']
main_pages_blind = ['MainStage3Decision', 'MainRoundPerformance']
# Final pages (round 13 only)
final_pages = ['PostTaskWeights', 'WeightsJustification', 'Demographics', 'FinalPayoff', 'ProlificRedirect']
pages_completed = 0
if condition == CONDITION_STANDARD:
total_pages = 88 # 7+6 (round1) + 6*12 (main) + 5 (final)
if round_num == 1:
# Count pages in round 1
all_round1_pages = round1_pages_both + round1_pages_standard
if page_name in all_round1_pages:
pages_completed = all_round1_pages.index(page_name) + 1
else:
pages_completed = len(all_round1_pages)
elif round_num <= 13:
# Completed round 1, now in main rounds
pages_completed = 13 # All of round 1 (7+6)
main_round = round_num - 1 # rounds 2-13 map to main 1-12
# Add completed main rounds
pages_completed += (main_round - 1) * 6
# Add current page within this round
if page_name in main_pages_standard:
pages_completed += main_pages_standard.index(page_name) + 1
elif page_name in final_pages and round_num == 13:
pages_completed += 6 # All main round pages
pages_completed += final_pages.index(page_name) + 1
else:
pages_completed += 6 # Assume at end of round
else:
pages_completed = total_pages
else:
total_pages = 38 # 7+2 (round1) + 2*12 (main) + 5 (final) for blind condition
if round_num == 1:
all_round1_pages = round1_pages_both + round1_pages_blind
if page_name in all_round1_pages:
pages_completed = all_round1_pages.index(page_name) + 1
else:
pages_completed = len(all_round1_pages)
elif round_num <= 13:
pages_completed = 9 # All of round 1 (7+2)
main_round = round_num - 1
pages_completed += (main_round - 1) * 2
if page_name in main_pages_blind:
pages_completed += main_pages_blind.index(page_name) + 1
elif page_name in final_pages and round_num == 13:
pages_completed += 2
pages_completed += final_pages.index(page_name) + 1
else:
pages_completed += 2
else:
pages_completed = total_pages
progress = int((pages_completed / total_pages) * 100)
return max(0, min(progress, 100))
def get_condition(player: 'BasePlayer'):
"""Safely get participant's condition choice across rounds."""
stored = player.participant.vars.get('condition')
if stored is not None:
return _normalize_condition_name(stored)
# Fallback for legacy sessions where condition may exist as a participant field.
# Avoid direct getattr(..., default) because oTree participant field lookup can raise
# KeyError (not AttributeError) when the key is missing.
legacy_condition = None
try:
legacy_condition = player.participant.condition
except (AttributeError, KeyError):
legacy_condition = None
return _normalize_condition_name(legacy_condition)
STAGE_DURATION_FIELD_BY_PAGE = {
'PreInstructionsDisclaimer': 'preinstructions_secs',
'InstructionsPart1': 'instructions_part1_secs',
'Instructions': 'instructions_part2_secs',
'Comprehension': 'comprehension_secs',
'PracticeStage1Resume': 'practice_stage1_secs',
'PracticeStage2Zoom': 'practice_stage2_secs',
'PracticeStage3Decision': 'practice_stage3_secs',
'MainStage1Resume': 'main_stage1_secs',
'MainStage2Zoom': 'main_stage2_secs',
'MainStage3Decision': 'main_stage3_secs',
'PracticeGate1': 'practice_gate1_secs',
'PracticeGate1After': 'practice_gate1_after_secs',
'PracticeGate2': 'practice_gate2_secs',
'PracticeGate2After': 'practice_gate2_after_secs',
'PracticeGate3': 'practice_gate3_secs',
'MainGate1': 'main_gate1_secs',
'MainGate1After': 'main_gate1_after_secs',
'MainGate2': 'main_gate2_secs',
'MainGate2After': 'main_gate2_after_secs',
'MainGate3': 'main_gate3_secs',
}
def _stage_timer_key(player: 'BasePlayer', page_name: str) -> str:
return f'_stage_start_{player.round_number}_{page_name}'
def start_stage_timer(player: 'BasePlayer', page_name: str):
if page_name in STAGE_DURATION_FIELD_BY_PAGE:
player.participant.vars[_stage_timer_key(player, page_name)] = time.time()
def record_stage_duration(player: 'BasePlayer', page_name: str):
field_name = STAGE_DURATION_FIELD_BY_PAGE.get(page_name)
if not field_name:
return
start_ts = player.participant.vars.get(_stage_timer_key(player, page_name))
if start_ts is None:
return
duration_secs = max(0.0, round(time.time() - start_ts, 3))
setattr(player, field_name, duration_secs)
def get_config_bool(config_dict, key, default=False):
"""Parse boolean-like values from session config safely."""
raw = config_dict.get(key, default)
if isinstance(raw, bool):
return raw
if raw is None:
return default
return str(raw).strip().lower() in ('1', 'true', 'yes', 'y', 'on')
#
class BasePage(Page):
"""Base page class that automatically includes progress percentage."""
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
page_name = self.__class__.__name__
start_stage_timer(self.player, page_name)
context['progress_percentage'] = calculate_progress_percentage(self.player, page_name)
# Show "Round X of 12" during main rounds (applies to both conditions)
main_round_pages = {
'MainStage1Resume', 'MainStage2Zoom', 'MainStage3Decision',
'MainGate1', 'MainGate1After', 'MainGate2', 'MainGate2After', 'MainGate3',
'MainRoundPerformance'
}
is_main_round_page = page_name in main_round_pages and 2 <= self.player.round_number <= C.NUM_ROUNDS
context['show_round_counter'] = is_main_round_page
context['current_main_round'] = max(1, min(self.player.round_number - 1, C.NUM_MAIN_ROUNDS))
context['total_main_rounds'] = C.NUM_MAIN_ROUNDS
return context
class ProlificIntro(BasePage):
form_model = 'player'
form_fields = ['prolific_pid', 'prolific_pid_from_url', 'prolific_study_id', 'prolific_session_id']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
player.participant.vars['actually_visited'] = True
label = (player.participant.label or '').strip()
existing_pid = (player.field_maybe_none('prolific_pid') or '').strip()
pid_from_url = (player.field_maybe_none('prolific_pid_from_url') or '').strip()
if label and not existing_pid:
player.prolific_pid = label
elif pid_from_url and not existing_pid:
player.prolific_pid = pid_from_url
return dict(
prefill_prolific_pid=(player.field_maybe_none('prolific_pid') or '').strip(),
assignment_hold=bool(player.participant.vars.get('assignment_hold', False)),
assignment_hold_message=player.participant.vars.get('assignment_hold_message', ''),
)
@staticmethod
def error_message(player: Player, values):
if get_config_bool(player.session.config, 'use_dynamic_condition_assignment', default=True) and not player.session.config.get('is_patch_session', False):
assignment_result = _ensure_dynamic_assignment(player)
if assignment_result.get('hold'):
return assignment_result.get('message') or player.participant.vars.get('assignment_hold_message', '')
prolific_pid = (values.get('prolific_pid') or '').strip()
pid_from_url = (values.get('prolific_pid_from_url') or '').strip()
require_pid = get_config_bool(player.session.config, 'require_prolific_pid', default=True)
# Check if Prolific ID is required and validate it's not empty
if require_pid:
if not prolific_pid:
return 'Please provide your Prolific ID to continue.'
# Prolific IDs are typically 24-character alphanumeric strings
if len(prolific_pid) < 8:
return 'Prolific ID appears to be invalid. Please check your ID.'
if pid_from_url and prolific_pid != pid_from_url:
return 'The Prolific ID must match the PROLIFIC_PID in your study link.'
link_label = (player.participant.label or '').strip()
if link_label and prolific_pid != link_label:
return 'The Prolific ID must match the ID from your study link.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
prolific_pid = (player.field_maybe_none('prolific_pid') or '').strip()
pid_from_url = (player.field_maybe_none('prolific_pid_from_url') or '').strip()
study_id = (player.field_maybe_none('prolific_study_id') or '').strip()
session_id = (player.field_maybe_none('prolific_session_id') or '').strip()
if pid_from_url and not prolific_pid:
prolific_pid = pid_from_url
player.prolific_pid = pid_from_url
if prolific_pid and not player.participant.label:
player.participant.label = prolific_pid
player.prolific_pid_mismatch = bool(pid_from_url and prolific_pid and pid_from_url != prolific_pid)
player.prolific_pid_link_missing = bool(not pid_from_url)
if prolific_pid:
player.participant.prolific_pid = prolific_pid
player.participant.vars['prolific_pid'] = prolific_pid
player.participant.vars['prolific_pid_from_url'] = pid_from_url
player.participant.vars['prolific_pid_mismatch'] = player.prolific_pid_mismatch
player.participant.vars['prolific_pid_link_missing'] = player.prolific_pid_link_missing
player.participant.vars['prolific_study_id'] = study_id
player.participant.vars['prolific_session_id'] = session_id
class Consent(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
class PreInstructionsDisclaimer(BasePage):
form_model = 'player'
form_fields = ['Q111_instructions_willing']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def error_message(player: Player, values):
player.trial_preinstructions += 1
if values.get('Q111_instructions_willing') != 2:
return 'Please confirm that you are willing to read the instructions.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PreInstructionsDisclaimer')
class InstructionsPart1(BasePage):
form_model = 'player'
form_fields = ['Q_ultimate_mission']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
condition = get_condition(player)
return dict(
is_standard=condition == CONDITION_STANDARD,
is_blind=condition == CONDITION_BLIND,
is_condition_1=condition == CONDITION_STANDARD,
is_condition_2=condition == CONDITION_BLIND,
mu=C.MU,
sigma=C.SIGMA,
sigma_resume=C.SIGMA_RESUME
)
@staticmethod
def error_message(player: Player, values):
player.trial_instructions_part1 += 1
mission = values.get('Q_ultimate_mission')
if mission is None:
return 'Please answer the question before continuing.'
if mission != 2:
player.comp_mission_errors += 1
return 'Please review your ultimate objective.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'InstructionsPart1')
class Instructions(BasePage):
form_model = 'player'
form_fields = ['Q_score_interpretation']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
condition = get_condition(player)
return dict(
is_standard=condition == CONDITION_STANDARD,
is_blind=condition == CONDITION_BLIND,
is_condition_1=condition == CONDITION_STANDARD,
is_condition_2=condition == CONDITION_BLIND,
mu=C.MU,
sigma=C.SIGMA,
sigma_resume=C.SIGMA_RESUME
)
@staticmethod
def error_message(player: Player, values):
player.trial_instructions_part2 += 1
interpretation = values.get('Q_score_interpretation')
if interpretation is None:
return 'Please answer the question before continuing.'
if interpretation != 1:
player.comp_score_errors += 1
return 'Please review the instructions about how to interpret scores. Scores are noisy signals, not perfect information.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'Instructions')
class Comprehension(BasePage):
form_model = 'player'
@staticmethod
def get_form_fields(player: Player):
condition = get_condition(player)
if condition == CONDITION_STANDARD:
return ['Q_advance_candidate', 'Q_bonus_calculation']
else:
return ['Q_decision_process', 'Q_bonus_calculation']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
condition = get_condition(player)
return dict(
is_standard=condition == CONDITION_STANDARD,
is_blind=condition == CONDITION_BLIND,
is_condition_1=condition == CONDITION_STANDARD,
is_condition_2=condition == CONDITION_BLIND,
)
@staticmethod
def error_message(player: Player, values):
condition = get_condition(player)
player.trial_comprehension_page += 1
if values.get('Q_bonus_calculation') is not None:
player.trial_bonus_answered += 1
if condition == CONDITION_STANDARD and values.get('Q_advance_candidate') is not None:
player.trial_advance_answered += 1
if condition == CONDITION_BLIND and values.get('Q_decision_process') is not None:
player.trial_decision_answered += 1
messages = []
# Require answers
if values.get('Q_bonus_calculation') is None:
messages.append('Please answer the bonus calculation question.')
if condition == CONDITION_STANDARD and values.get('Q_advance_candidate') is None:
messages.append('Please answer the advance candidate question.')
if condition == CONDITION_BLIND and values.get('Q_decision_process') is None:
messages.append('Please answer the decision process question.')
# Check Q_bonus_calculation (must be "ONE random hire")
if values.get('Q_bonus_calculation') is not None and values['Q_bonus_calculation'] != 2:
player.comp_bonus_errors += 1
messages.append('Please review how your bonus is calculated.')
# Standard-condition specific validation
if condition == CONDITION_STANDARD:
if values.get('Q_advance_candidate') is not None and values['Q_advance_candidate'] != 2:
player.comp_advance_errors += 1
messages.append('Please review what happens when you advance a candidate.')
# Blind-condition specific validation
if condition == CONDITION_BLIND:
if values.get('Q_decision_process') is not None and values['Q_decision_process'] != 3:
player.comp_decision_errors += 1
messages.append('Please review the decision process. You will see scores across all three stages and then choose one hire.')
if messages:
return ' '.join(messages)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'Comprehension')
class PracticeIntro(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
condition = get_condition(player)
return dict(
is_standard=condition == CONDITION_STANDARD,
is_blind=condition == CONDITION_BLIND,
is_condition_1=condition == CONDITION_STANDARD,
)
# Practice Round Pages - Blind condition (stage-based)
class PracticeStage1Resume(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return False
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
finalist_labels = get_condition2_finalists(player.participant, 0)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-'
})
return dict(
candidates=candidates,
is_practice=True,
round_num=0
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeStage1Resume')
class PracticeStage2Zoom(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return False
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
finalist_labels = get_condition2_finalists(player.participant, 0)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-'
})
return dict(
candidates=candidates,
is_practice=True,
round_num=0
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeStage2Zoom')
class PracticeStage3Decision(BasePage):
form_model = 'player'
form_fields = ['hired_candidate']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_BLIND
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
finalist_labels = get_condition2_finalists(player.participant, 0)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': data[label]['inperson'],
'true_q': data[label]['true_q']
})
return dict(
candidates=candidates,
is_practice=True,
round_num=0
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeStage3Decision')
data = get_round_data(player.participant, 0)
finalist_labels = get_condition2_finalists(player.participant, 0)
hired_label = player.hired_candidate
hired_data = data[hired_label]
true_q = hired_data['true_q']
reward_str = calculate_reward(true_q)
# Store visible score context at hiring decision (all 3 scores shown on this screen)
decision_context = dict(
decision_type='hire',
condition=get_condition(player),
display_round=0,
chosen_candidate=hired_label,
visible_candidates=build_visible_scores(data, finalist_labels, include_zoom=True, include_inperson=True),
)
player.hire_decision_context = json.dumps(decision_context)
# Store practice results
player.practice_hired_candidate = hired_label
player.practice_hired_trueq = true_q
player.practice_reward = reward_str
# Add to history (round 0 = practice)
add_to_history(player.participant, 0, hired_label, true_q, reward_str, decision_context=decision_context)
class PracticePerformance(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
true_q = player.field_maybe_none('practice_hired_trueq')
quality_percentile = ''
if true_q is not None:
quality_percentile = format_percentile(calculate_quality_quantile(true_q))
return dict(
hired_candidate=player.field_maybe_none('practice_hired_candidate') or '',
last_round_true_q=true_q or 0,
reward=player.field_maybe_none('practice_reward') or '',
quality_percentile=quality_percentile
)
# Main Round Pages - Blind condition (stage-based)
class MainStage1Resume(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return False
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
finalist_labels = get_condition2_finalists(player.participant, player.round_number)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-'
})
return dict(
candidates=candidates,
round_num=player.round_number - 1
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainStage1Resume')
class MainStage2Zoom(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return False
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
finalist_labels = get_condition2_finalists(player.participant, player.round_number)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-'
})
return dict(
candidates=candidates,
round_num=player.round_number - 1
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainStage2Zoom')
class MainStage3Decision(BasePage):
form_model = 'player'
form_fields = ['hired_candidate']
@staticmethod
def is_displayed(player: Player):
return get_condition(player) == CONDITION_BLIND and player.round_number > 1
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
finalist_labels = get_condition2_finalists(player.participant, player.round_number)
candidates = []
for label in finalist_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': data[label]['inperson'],
'true_q': data[label]['true_q']
})
return dict(
candidates=candidates,
round_num=player.round_number - 1
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainStage3Decision')
data = get_round_data(player.participant, player.round_number)
finalist_labels = get_condition2_finalists(player.participant, player.round_number)
hired_label = player.hired_candidate
hired_data = data[hired_label]
true_q = hired_data['true_q']
reward_str = calculate_reward(true_q)
# Store visible score context at hiring decision (all 3 scores shown on this screen)
decision_context = dict(
decision_type='hire',
condition=get_condition(player),
display_round=player.round_number - 1,
chosen_candidate=hired_label,
visible_candidates=build_visible_scores(data, finalist_labels, include_zoom=True, include_inperson=True),
)
player.hire_decision_context = json.dumps(decision_context)
player.hired_trueq = true_q
player.reward_str = reward_str
add_to_history(player.participant, player.round_number, hired_label, true_q, reward_str, decision_context=decision_context)
class MainRoundPerformance(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1
@staticmethod
def vars_for_template(player: Player):
history = get_history(player.participant)
# Filter to only show main rounds (exclude practice which has round == 0)
main_history = [h for h in history if h['round'] > 0]
# Adjust round numbers for display (convert from internal numbering to user-facing)
for entry in main_history:
entry['display_round'] = entry['round'] - 1
entry['quality_percentile'] = format_percentile(calculate_quality_quantile(entry['true_q']))
last_item = main_history[-1] if main_history else None
return dict(
history=main_history,
last_round_true_q=last_item['true_q'] if last_item else 0,
last_round_quality_percentile=(
format_percentile(calculate_quality_quantile(last_item['true_q'])) if last_item else ''
),
round_num=player.round_number - 1
)
# Standard Condition Pages (Gate-based)
class PracticeGate1(BasePage):
form_model = 'player'
form_fields = ['gate1_choices', 'gate1_initial_checked']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-'
})
form_error, form_post_gate1_choices = get_form_feedback(player, 'gate1')
form_post_gate1_choices = sanitize_choices(form_post_gate1_choices, all_labels)
return dict(
candidates=candidates,
is_practice=True,
round_num=0,
gate_num=1,
required_count=C.GATE1_SELECT,
form_error=form_error,
form_post_gate1_choices=form_post_gate1_choices
)
@staticmethod
def error_message(player: Player, values):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate1_raw = values.get('gate1_choices')
try:
gate1_choices = json.loads(gate1_raw) if gate1_raw else []
except (json.JSONDecodeError, TypeError):
gate1_choices = []
gate1_choices = sanitize_choices(gate1_choices, all_labels)
if len(gate1_choices) != C.GATE1_SELECT:
msg = f'Please select exactly {C.GATE1_SELECT} candidates.'
set_form_feedback(player, 'gate1', msg, gate1_choices)
return msg
clear_form_feedback(player, 'gate1')
player.gate1_choices = json.dumps(gate1_choices)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeGate1')
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
selected_gate1 = parse_choice_list(player.gate1_choices)
player.gate1_decision_context = json.dumps(dict(
decision_type='gate1',
condition=get_condition(player),
display_round=0,
selected_candidates=selected_gate1,
visible_candidates=build_visible_scores(data, all_labels, include_zoom=False, include_inperson=False),
))
class PracticeGate1After(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
# Handle both JSON array format and plain string format
if gate1_choices_json:
try:
gate1_choices = json.loads(gate1_choices_json)
except (json.JSONDecodeError, TypeError):
# If not JSON, treat as single string or empty
gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else []
else:
gate1_choices = []
eliminated = [c for c in all_labels if c not in gate1_choices]
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-',
'eliminated': label in eliminated
})
return dict(
candidates=candidates,
gate1_choices=gate1_choices,
is_practice=True,
round_num=0,
gate_num=1,
advanced_count=len(gate1_choices)
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeGate1After')
class PracticeGate2(BasePage):
form_model = 'player'
form_fields = ['gate2_choices', 'gate2_initial_checked', 'gate2_initial_disabled_checked']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
gate1_choices = parse_choice_list(gate1_choices_json)
gate1_choices = sanitize_choices(gate1_choices, all_labels)
eliminated = [c for c in all_labels if c not in gate1_choices]
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-',
'eliminated': label in eliminated
})
available_candidates = gate1_choices
form_error, form_post_gate2_choices = get_form_feedback(player, 'gate2')
form_post_gate2_choices = sanitize_choices(form_post_gate2_choices, available_candidates)
return dict(
candidates=candidates,
available_candidates=available_candidates,
is_practice=True,
round_num=0,
gate_num=2,
required_count=C.GATE2_SELECT,
form_error=form_error,
form_post_gate2_choices=form_post_gate2_choices
)
@staticmethod
def error_message(player: Player, values):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate2_raw = values.get('gate2_choices')
try:
gate2_choices = json.loads(gate2_raw) if gate2_raw else []
except (json.JSONDecodeError, TypeError):
gate2_choices = []
gate2_choices = sanitize_choices(gate2_choices, all_labels)
if len(gate2_choices) != C.GATE2_SELECT:
msg = f'Please select exactly {C.GATE2_SELECT} candidates.'
set_form_feedback(player, 'gate2', msg, gate2_choices)
return msg
gate1_choices_json = player.field_maybe_none('gate1_choices')
gate1_choices = parse_choice_list(gate1_choices_json)
gate1_choices = sanitize_choices(gate1_choices, all_labels)
if not all(c in gate1_choices for c in gate2_choices):
msg = 'All selected candidates must be from Gate 1 selections.'
set_form_feedback(player, 'gate2', msg, gate2_choices)
return msg
clear_form_feedback(player, 'gate2')
player.gate2_choices = json.dumps(gate2_choices)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeGate2')
data = get_round_data(player.participant, 0)
gate1_choices = parse_choice_list(player.gate1_choices)
gate2_choices = parse_choice_list(player.gate2_choices)
player.gate2_decision_context = json.dumps(dict(
decision_type='gate2',
condition=get_condition(player),
display_round=0,
available_candidates=gate1_choices,
selected_candidates=gate2_choices,
visible_candidates=build_visible_scores(data, gate1_choices, include_zoom=True, include_inperson=False),
))
class PracticeGate2After(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate2_choices') is not None
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
# Handle both JSON array format and plain string format
if gate1_choices_json:
try:
gate1_choices = json.loads(gate1_choices_json)
except (json.JSONDecodeError, TypeError):
gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else []
else:
gate1_choices = []
gate2_choices_json = player.field_maybe_none('gate2_choices')
# Handle both JSON array format and plain string format
if gate2_choices_json:
try:
gate2_choices = json.loads(gate2_choices_json)
except (json.JSONDecodeError, TypeError):
gate2_choices = [gate2_choices_json] if isinstance(gate2_choices_json, str) and gate2_choices_json else []
else:
gate2_choices = []
candidates = []
for label in all_labels:
eliminated = label not in gate2_choices
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-',
'eliminated': eliminated
})
return dict(
candidates=candidates,
gate2_choices=gate2_choices,
is_practice=True,
round_num=0,
gate_num=2,
advanced_count=len(gate2_choices)
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeGate2After')
class PracticeGate3(BasePage):
form_model = 'player'
form_fields = ['hired_candidate']
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, 0)
all_labels = get_candidate_labels_in_order(data)
gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else []
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': data[label]['inperson'],
'true_q': data[label]['true_q'],
'available': label in gate2_choices
})
return dict(
candidates=candidates,
available_candidates=gate2_choices,
is_practice=True,
round_num=0,
gate_num=3
)
@staticmethod
def error_message(player: Player, values):
hired = values.get('hired_candidate')
gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else []
if hired and hired not in gate2_choices:
return 'Selected candidate must be from Gate 2 selections.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'PracticeGate3')
data = get_round_data(player.participant, 0)
gate2_choices = parse_choice_list(player.gate2_choices)
hired_label = player.hired_candidate
hired_data = data[hired_label]
true_q = hired_data['true_q']
reward_str = calculate_reward(true_q)
# Store finalist score context (3 finalists with all 3 revealed scores)
decision_context = dict(
decision_type='hire',
condition=get_condition(player),
display_round=0,
finalists=gate2_choices,
chosen_candidate=hired_label,
visible_candidates=build_visible_scores(data, gate2_choices, include_zoom=True, include_inperson=True),
)
player.hire_decision_context = json.dumps(decision_context)
player.practice_hired_candidate = hired_label
player.practice_hired_trueq = true_q
player.practice_reward = reward_str
add_to_history(player.participant, 0, hired_label, true_q, reward_str, decision_context=decision_context)
# Main Rounds - Standard condition (Funnel)
class MainGate1(BasePage):
form_model = 'player'
form_fields = ['gate1_choices', 'gate1_initial_checked']
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-'
})
return dict(
candidates=candidates,
round_num=player.round_number - 1,
gate_num=1,
required_count=C.GATE1_SELECT,
form_error=get_form_feedback(player, 'gate1')[0],
form_post_gate1_choices=sanitize_choices(get_form_feedback(player, 'gate1')[1], all_labels)
)
@staticmethod
def error_message(player: Player, values):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate1_raw = values.get('gate1_choices')
try:
gate1_choices = json.loads(gate1_raw) if gate1_raw else []
except (json.JSONDecodeError, TypeError):
gate1_choices = []
gate1_choices = sanitize_choices(gate1_choices, all_labels)
if len(gate1_choices) != C.GATE1_SELECT:
msg = f'Please select exactly {C.GATE1_SELECT} candidates.'
set_form_feedback(player, 'gate1', msg, gate1_choices)
return msg
clear_form_feedback(player, 'gate1')
player.gate1_choices = json.dumps(gate1_choices)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainGate1')
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
selected_gate1 = parse_choice_list(player.gate1_choices)
player.gate1_decision_context = json.dumps(dict(
decision_type='gate1',
condition=get_condition(player),
display_round=player.round_number - 1,
selected_candidates=selected_gate1,
visible_candidates=build_visible_scores(data, all_labels, include_zoom=False, include_inperson=False),
))
class MainGate1After(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate1_choices') is not None
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
# Handle both JSON array format and plain string format
if gate1_choices_json:
try:
gate1_choices = json.loads(gate1_choices_json)
except (json.JSONDecodeError, TypeError):
gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else []
else:
gate1_choices = []
eliminated = [c for c in all_labels if c not in gate1_choices]
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': '-',
'inperson': '-',
'eliminated': label in eliminated
})
return dict(
candidates=candidates,
gate1_choices=gate1_choices,
round_num=player.round_number - 1,
gate_num=1,
advanced_count=len(gate1_choices)
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainGate1After')
class MainGate2(BasePage):
form_model = 'player'
form_fields = ['gate2_choices', 'gate2_initial_checked', 'gate2_initial_disabled_checked']
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate1_choices') is not None
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
gate1_choices = parse_choice_list(gate1_choices_json)
gate1_choices = sanitize_choices(gate1_choices, all_labels)
eliminated = [c for c in all_labels if c not in gate1_choices]
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-',
'eliminated': label in eliminated
})
return dict(
candidates=candidates,
available_candidates=gate1_choices,
round_num=player.round_number - 1,
gate_num=2,
required_count=C.GATE2_SELECT,
form_error=get_form_feedback(player, 'gate2')[0],
form_post_gate2_choices=sanitize_choices(get_form_feedback(player, 'gate2')[1], gate1_choices)
)
@staticmethod
def error_message(player: Player, values):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate2_raw = values.get('gate2_choices')
try:
gate2_choices = json.loads(gate2_raw) if gate2_raw else []
except (json.JSONDecodeError, TypeError):
gate2_choices = []
gate2_choices = sanitize_choices(gate2_choices, all_labels)
if len(gate2_choices) != C.GATE2_SELECT:
msg = f'Please select exactly {C.GATE2_SELECT} candidates.'
set_form_feedback(player, 'gate2', msg, gate2_choices)
return msg
gate1_choices_json = player.field_maybe_none('gate1_choices')
gate1_choices = parse_choice_list(gate1_choices_json)
gate1_choices = sanitize_choices(gate1_choices, all_labels)
if not all(c in gate1_choices for c in gate2_choices):
msg = 'All selected candidates must be from Gate 1 selections.'
set_form_feedback(player, 'gate2', msg, gate2_choices)
return msg
clear_form_feedback(player, 'gate2')
player.gate2_choices = json.dumps(gate2_choices)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainGate2')
data = get_round_data(player.participant, player.round_number)
gate1_choices = parse_choice_list(player.gate1_choices)
gate2_choices = parse_choice_list(player.gate2_choices)
player.gate2_decision_context = json.dumps(dict(
decision_type='gate2',
condition=get_condition(player),
display_round=player.round_number - 1,
available_candidates=gate1_choices,
selected_candidates=gate2_choices,
visible_candidates=build_visible_scores(data, gate1_choices, include_zoom=True, include_inperson=False),
))
class MainGate2After(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate2_choices') is not None
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate1_choices_json = player.field_maybe_none('gate1_choices')
# Handle both JSON array format and plain string format
if gate1_choices_json:
try:
gate1_choices = json.loads(gate1_choices_json)
except (json.JSONDecodeError, TypeError):
gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else []
else:
gate1_choices = []
gate2_choices_json = player.field_maybe_none('gate2_choices')
# Handle both JSON array format and plain string format
if gate2_choices_json:
try:
gate2_choices = json.loads(gate2_choices_json)
except (json.JSONDecodeError, TypeError):
gate2_choices = [gate2_choices_json] if isinstance(gate2_choices_json, str) and gate2_choices_json else []
else:
gate2_choices = []
candidates = []
for label in all_labels:
is_eliminated = label not in gate2_choices
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': '-',
'eliminated': is_eliminated
})
return dict(
candidates=candidates,
gate2_choices=gate2_choices,
round_num=player.round_number - 1,
gate_num=2,
advanced_count=len(gate2_choices)
)
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainGate2After')
class MainGate3(BasePage):
form_model = 'player'
form_fields = ['hired_candidate']
@staticmethod
def is_displayed(player: Player):
return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD
@staticmethod
def vars_for_template(player: Player):
data = get_round_data(player.participant, player.round_number)
all_labels = get_candidate_labels_in_order(data)
gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else []
candidates = []
for label in all_labels:
candidates.append({
'label': label,
'resume': data[label]['resume'],
'zoom': data[label]['zoom'],
'inperson': data[label]['inperson'],
'true_q': data[label]['true_q'],
'available': label in gate2_choices
})
return dict(
candidates=candidates,
available_candidates=gate2_choices,
round_num=player.round_number - 1,
gate_num=3
)
@staticmethod
def error_message(player: Player, values):
hired = values.get('hired_candidate')
gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else []
if hired and hired not in gate2_choices:
return 'Selected candidate must be from Gate 2 selections.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
record_stage_duration(player, 'MainGate3')
data = get_round_data(player.participant, player.round_number)
gate2_choices = parse_choice_list(player.gate2_choices)
hired_label = player.hired_candidate
hired_data = data[hired_label]
true_q = hired_data['true_q']
reward_str = calculate_reward(true_q)
# Store finalist score context (3 finalists with all 3 revealed scores)
decision_context = dict(
decision_type='hire',
condition=get_condition(player),
display_round=player.round_number - 1,
finalists=gate2_choices,
chosen_candidate=hired_label,
visible_candidates=build_visible_scores(data, gate2_choices, include_zoom=True, include_inperson=True),
)
player.hire_decision_context = json.dumps(decision_context)
player.hired_trueq = true_q
player.reward_str = reward_str
add_to_history(player.participant, player.round_number, hired_label, true_q, reward_str, decision_context=decision_context)
# Post-task pages
class PostTaskWeights(BasePage):
form_model = 'player'
form_fields = ['resume_weight', 'zoom_weight', 'inperson_weight']
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def error_message(player: Player, values):
resume = values.get('resume_weight', 0) or 0
zoom = values.get('zoom_weight', 0) or 0
inperson = values.get('inperson_weight', 0) or 0
total = resume + zoom + inperson
if total != 100:
return f'Weights must sum to 100. Current total: {total}'
class WeightsJustification(BasePage):
form_model = 'player'
form_fields = ['open_ended_explanation']
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def error_message(player: Player, values):
explanation = (values.get('open_ended_explanation') or '').strip()
if not explanation:
return 'Please provide a brief explanation before continuing.'
class Demographics(BasePage):
form_model = 'player'
form_fields = ['age_range', 'gender', 'device_type', 'survey_comments']
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def error_message(player: Player, values):
# All fields are required except survey_comments
if not values.get('age_range'):
return 'Please select your age range.'
if not values.get('gender'):
return 'Please select your gender.'
if not values.get('device_type'):
return 'Please select the device type you are using.'
@staticmethod
def before_next_page(player: Player, timeout_happened):
_mark_standard_completed_if_ready(player)
_mark_blind_completed_if_ready(player)
class FinalPayoff(BasePage):
form_model = 'player'
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def before_next_page(player: Player, timeout_happened):
# Do not re-draw if it was already selected on page render.
if not player.participant.selected_round:
select_final_payment(player.participant)
@staticmethod
def vars_for_template(player: Player):
if not player.participant.selected_round:
select_final_payment(player.participant)
selected_reward_string = player.participant.selected_reward_string or '$0.00'
selected_reward_total = float(selected_reward_string.replace('$', ''))
bonus_only = max(0.0, selected_reward_total - C.DEFAULT_REWARD)
# Set player payoff once based on total payment (includes fixed base payment)
if player.participant.selected_reward_string and not player.payoff:
player.payoff = cu(selected_reward_total)
return dict(
selected_round=player.participant.selected_round,
selected_candidate=player.participant.selected_candidate,
selected_true_q=player.participant.selected_true_quality,
selected_reward=f"${bonus_only:.2f}",
payoff=player.payoff
)
class ProlificRedirect(BasePage):
@staticmethod
def is_displayed(player: Player):
return player.round_number == C.NUM_ROUNDS
@staticmethod
def vars_for_template(player: Player):
redirect_url = 'https://app.prolific.com/submissions/complete?cc=CIWZ6NQB'
completion_code = 'CIWZ6NQB'
return dict(
redirect_url=redirect_url,
completion_code=completion_code,
)
page_sequence = [
ProlificIntro,
Consent,
PreInstructionsDisclaimer,
InstructionsPart1,
Instructions,
Comprehension,
PracticeIntro,
# Standard condition practice
PracticeGate1,
PracticeGate1After,
PracticeGate2,
PracticeGate2After,
PracticeGate3,
# Blind condition practice
PracticeStage1Resume,
PracticeStage2Zoom,
PracticeStage3Decision,
# Both conditions practice performance
PracticePerformance,
# Standard condition main
MainGate1,
MainGate1After,
MainGate2,
MainGate2After,
MainGate3,
# Blind condition main
MainStage1Resume,
MainStage2Zoom,
MainStage3Decision,
# Both conditions main performance
MainRoundPerformance,
# Post-task
PostTaskWeights,
WeightsJustification,
Demographics,
FinalPayoff,
ProlificRedirect,
]