from otree.api import ( Currency, cu, currency_range, models, widgets, BaseConstants, BaseSubsession, BaseGroup, BasePlayer, ExtraModel, WaitPage, Page, read_csv, ) import units import shared_out import csv import json import os import random import math import time import threading from contextlib import contextmanager doc = '' class C(BaseConstants): # built-in constants NAME_IN_URL = 'hiring_funnel' PLAYERS_PER_GROUP = None NUM_ROUNDS = 13 # 1 practice + 12 main # user-defined constants NUM_MAIN_ROUNDS = 12 NUM_CANDIDATES = 10 MU = 50 SIGMA = 15 SIGMA_RESUME = 15 SIGMA_ZOOM = 15 SIGMA_INPERSON = 15 DEFAULT_REWARD = 4.0 BONUS_RATE = 0.05 MAX_REWARD = 10 GATE1_SELECT = 5 GATE2_SELECT = 3 CANDIDATE_LABELS = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J') class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): # Prolific tracking (captured on first page) prolific_pid = models.StringField(blank=True, label='Prolific ID') prolific_pid_from_url = models.StringField(blank=True) prolific_pid_mismatch = models.BooleanField(initial=False) prolific_pid_link_missing = models.BooleanField(initial=False) prolific_study_id = models.StringField(blank=True, label='Study ID (optional)') prolific_session_id = models.StringField(blank=True, label='Session ID (optional)') # Comprehension check questions Q111_instructions_willing = models.IntegerField( label='Sometimes survey takers rush through the questions without reading any instructions. Doing that ruins our results. Are you willing to read the instructions on the next pages?', choices=[ (1, 'No'), (2, 'Yes'), ], widget=widgets.RadioSelect ) # Comprehension error counters (number of incorrect attempts) comp_mission_errors = models.IntegerField(initial=0) comp_score_errors = models.IntegerField(initial=0) comp_decision_errors = models.IntegerField(initial=0) comp_advance_errors = models.IntegerField(initial=0) comp_bonus_errors = models.IntegerField(initial=0) # Comprehension trial counters (number of submissions until passing) trial_preinstructions = models.IntegerField(initial=0) trial_instructions_part1 = models.IntegerField(initial=0) trial_instructions_part2 = models.IntegerField(initial=0) trial_comprehension_page = models.IntegerField(initial=0) # Per-question answer-attempt counters (submitted with a non-empty answer) trial_bonus_answered = models.IntegerField(initial=0) trial_decision_answered = models.IntegerField(initial=0) trial_advance_answered = models.IntegerField(initial=0) Q_ultimate_mission = models.IntegerField( label='What is your ultimate objective in every single hiring case?', choices=[ (1, 'To pick the candidate with the best Resume score.'), (2, 'To pick the candidate with the highest True Quality.'), (3, 'To pick the candidate I would want to have a beer with.'), (4, 'To pick randomly and finish quickly.'), ], widget=widgets.RadioSelect, blank=True ) Q_decision_process = models.IntegerField( label='How will you make hiring decisions in this study?', choices=[ (1, 'I will only see the Resume scores and must choose a hire.'), (2, 'I will see only the In-person interview scores and must choose a hire.'), (3, 'I will see scores across all three stages (Resume, Zoom, In-person) and then choose one hire.'), (4, 'I must eliminate candidates based on their Resume before seeing any other scores.'), ], widget=widgets.RadioSelect, blank=True ) Q_score_interpretation = models.IntegerField( label='How should you interpret the scores (Resume, Zoom, In-person) you see?', choices=[ (1, 'They are noisy clues that are usually helpful, but sometimes wrong.'), (2, 'They are perfect facts. I can always trust them 100%.'), (3, 'They are useless. I should just guess the true quality by rules of thumb.'), ], widget=widgets.RadioSelect, blank=True ) Q_bonus_calculation = models.IntegerField( label='How is your final cash bonus calculated?', choices=[ (1, 'It is the AVERAGE true quality of all 12 people I hired.'), (2, 'It depends on the true quality of the ONE random hire I made during the study.'), (3, 'It depends only on the true quality of the very LAST candidate I hire in the final round.'), ], widget=widgets.RadioSelect, blank=True ) Q_advance_candidate = models.IntegerField( label='What happens when you advance a candidate to the next round?', choices=[ (1, 'I immediately hire them.'), (2, 'I get more information (a new score) about them.'), (3, 'Their previous scores are deleted.'), ], widget=widgets.RadioSelect, blank=True ) # Practice round data (per round stored at participant level, but accessible here) practice_hired_candidate = models.StringField(blank=True) practice_hired_trueq = models.IntegerField(blank=True) practice_reward = models.StringField(blank=True) # Main round decision data (per round) gate1_choices = models.LongStringField(blank=True, help_text="JSON list of candidates selected at Gate 1") gate2_choices = models.LongStringField(blank=True, help_text="JSON list of candidates selected at Gate 2") gate1_initial_checked = models.LongStringField(blank=True, help_text="JSON list of checkboxes pre-selected on Gate 1 page load") gate2_initial_checked = models.LongStringField(blank=True, help_text="JSON list of checkboxes pre-selected on Gate 2 page load") gate2_initial_disabled_checked = models.LongStringField(blank=True, help_text="JSON list of disabled-yet-checked boxes on Gate 2 page load") hired_candidate = models.StringField(blank=True, help_text="Final hired candidate label (A-J)") hired_trueq = models.IntegerField(blank=True) reward_str = models.StringField(blank=True, help_text="Currency string for this round") gate1_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of Gate 1 decision context") gate2_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of Gate 2 decision context") hire_decision_context = models.LongStringField(blank=True, help_text="JSON snapshot of final hiring decision context") # Stage duration tracking (seconds) practice_stage1_secs = models.FloatField(blank=True) practice_stage2_secs = models.FloatField(blank=True) practice_stage3_secs = models.FloatField(blank=True) main_stage1_secs = models.FloatField(blank=True) main_stage2_secs = models.FloatField(blank=True) main_stage3_secs = models.FloatField(blank=True) practice_gate1_secs = models.FloatField(blank=True) practice_gate1_after_secs = models.FloatField(blank=True) practice_gate2_secs = models.FloatField(blank=True) practice_gate2_after_secs = models.FloatField(blank=True) practice_gate3_secs = models.FloatField(blank=True) main_gate1_secs = models.FloatField(blank=True) main_gate1_after_secs = models.FloatField(blank=True) main_gate2_secs = models.FloatField(blank=True) main_gate2_after_secs = models.FloatField(blank=True) main_gate3_secs = models.FloatField(blank=True) # Instruction/comprehension page durations (seconds) preinstructions_secs = models.FloatField(blank=True) instructions_part1_secs = models.FloatField(blank=True) instructions_part2_secs = models.FloatField(blank=True) comprehension_secs = models.FloatField(blank=True) # Post-task survey resume_weight = models.IntegerField(blank=True, min=0, max=100) zoom_weight = models.IntegerField(blank=True, min=0, max=100) inperson_weight = models.IntegerField(blank=True, min=0, max=100) open_ended_explanation = models.LongStringField(blank=True) # Demographics has_hiring_experience = models.IntegerField( label='Do you have any experience with a real-world hiring task?', choices=[ (1, 'Yes, I have.'), (2, 'No, I don\'t.'), ], widget=widgets.RadioSelect, blank=True ) hiring_experience_type = models.StringField( label='Which of these describe your hiring experience the best?', choices=[ ('it', 'Hiring in an IT company'), ('academic', 'Faculty hiring in an academic school'), ('other', 'None of these: Please describe it.'), ], widget=widgets.RadioSelect, blank=True ) hiring_experience_other = models.LongStringField( label='Please describe your hiring experience:', blank=True ) age_range = models.StringField( label='What is your age range?', choices=[ ('18-25', '18-25'), ('26-35', '26-35'), ('36-45', '36-45'), ('45+', '45+'), ], widget=widgets.RadioSelect, blank=True ) gender = models.StringField( label='What is your gender?', choices=[ ('female', 'Female'), ('male', 'Male'), ('other', 'Other'), ('unwilling', 'Prefer not to say'), ], widget=widgets.RadioSelect, blank=True ) device_type = models.StringField( label='On what type of device are you completing this survey?', choices=[ ('computer', 'Computer'), ('smartphone', 'Smart Phone'), ('tablet', 'Tablet'), ('other', 'Other'), ], widget=widgets.RadioSelect, blank=True ) survey_comments = models.LongStringField( label='Do you have comments for us about this survey?', blank=True ) DATA_POOL_PATH = os.path.join(os.path.dirname(__file__), '..', 'pilot_data_clean_raw.csv') PATCH_ASSIGNMENTS_PATH = os.path.join(os.path.dirname(__file__), '..', 'patch_assignments.csv') SCORE_FIELDS = ('resume', 'zoom', 'inperson', 'true_q') CONDITION_STANDARD = 'standard' CONDITION_BLIND = 'blind' # Temporary assignment override: set to CONDITION_BLIND or CONDITION_STANDARD. # Set to None to restore alternating blind/standard assignment. FORCED_CONDITION = os.environ.get('FORCED_CONDITION', None) HOLD_MESSAGE_TEMPLATE = ( 'We apologize for the inconvenience. The study is currently at capacity for new participants. ' 'Please exit and return later to check availability, or check back in {minutes} minutes. ' 'We appreciate your patience.' ) def _warn_pool(message): print(f"[hiring_funnel data pool] {message}") _ASSIGNMENT_STATE_LOCK = threading.RLock() @contextmanager def _assignment_state_lock(): """In-process lock for assignment state mutations (oTree 6 has no Django ORM transaction API).""" _ASSIGNMENT_STATE_LOCK.acquire() try: yield finally: _ASSIGNMENT_STATE_LOCK.release() def _get_config_int(config_dict, key, default=0, minimum=None): raw = config_dict.get(key, default) try: value = int(raw) except (TypeError, ValueError): value = int(default) if minimum is not None and value < minimum: return minimum return value def _build_dataset_unique_id(dataset_row_number, dataset_cycle_number): """Stable dataset ID for analysis; shared by matched standard/blind participants.""" row = max(0, int(dataset_row_number or 0)) cycle = max(0, int(dataset_cycle_number or 0)) return f'DS{row:03d}_C{cycle:03d}' def _coerce_forced_condition(raw_condition): """Normalize forced-condition values from constants/env strings.""" if raw_condition in (CONDITION_STANDARD, CONDITION_BLIND): return raw_condition if isinstance(raw_condition, str): token = raw_condition.strip().lower() if token in (CONDITION_STANDARD, CONDITION_BLIND): return token if token in ('condition_standard', 'standard_condition', '1'): return CONDITION_STANDARD if token in ('condition_blind', 'blind_condition', '2'): return CONDITION_BLIND if raw_condition == 1: return CONDITION_STANDARD if raw_condition == 2: return CONDITION_BLIND return None def _get_assignment_config(session): hold_wait_minutes = _get_config_int( session.config, 'hold_wait_minutes', default=os.environ.get('HOLD_WAIT_MINUTES', 10), minimum=1, ) return dict( hold_wait_minutes=hold_wait_minutes, ) def _empty_matching_state(): return dict( standard_assigned_count=0, completed_standard_codes=[], available_standard_codes=[], blind_to_standard_matches={}, standard_to_blind_matches={}, total_matched_pairs=0, reclaimed_blind_codes=[], blind_completion_marked=False, next_expected_condition=CONDITION_STANDARD, phase1_ended=False, ) def _get_matching_state(session): state = session.vars.get('matching_state') if not isinstance(state, dict): state = _empty_matching_state() session.vars['matching_state'] = state return state def _save_matching_state(session, state): state['completed_standard_count'] = len(state.get('completed_standard_codes', [])) session.vars['matching_state'] = state # Top-level mirrors for easy visibility in oTree admin panel session.vars['available_blind_slots'] = len(state.get('available_standard_codes', [])) session.vars['total_matched_pairs'] = int(state.get('total_matched_pairs', 0)) session.vars['completed_standard_count'] = len(state.get('completed_standard_codes', [])) def _matching_state_snapshot(session): state = _get_matching_state(session) return dict( standard_assigned_count=int(state.get('standard_assigned_count', 0)), completed_standard_count=len(state.get('completed_standard_codes', [])), available_standard_count=len(state.get('available_standard_codes', [])), blind_match_count=len(state.get('blind_to_standard_matches', {})), next_expected_condition=state.get('next_expected_condition', CONDITION_STANDARD), phase1_ended=bool(state.get('phase1_ended', False)), ) def _debug_matching_state(session, event='state'): if not get_config_bool(session.config, 'debug_matching_state', default=False): return snapshot = _matching_state_snapshot(session) _warn_pool( f"[matching_debug] event={event} " f"standard_assigned={snapshot['standard_assigned_count']} " f"completed_standard={snapshot['completed_standard_count']} " f"available_standard={snapshot['available_standard_count']} " f"blind_matches={snapshot['blind_match_count']} " f"next_expected={snapshot['next_expected_condition']} " f"phase1_ended={snapshot['phase1_ended']}" ) def _get_session_participant_by_code(session, participant_code): for participant in session.get_participants(): if participant.code == participant_code: return participant return None def _get_app_player_for_round(participant, round_number): """Return this app's Player for the given participant/round in oTree 6.""" target_round = int(round_number) try: players = participant.get_players() except Exception: players = [] for app_player in players: if isinstance(app_player, Player) and int(getattr(app_player, 'round_number', 0)) == target_round: return app_player for app_player in players: if ( getattr(getattr(app_player, '_meta', None), 'app_label', None) == C.NAME_IN_URL and int(getattr(app_player, 'round_number', 0)) == target_round ): return app_player return None def _build_blind_replay_data_from_standard(standard_participant): if not standard_participant.practice_data_json or not standard_participant.main_data_json: return None practice_data = get_round_data(standard_participant, 0) practice_player = _get_app_player_for_round(standard_participant, 1) if not practice_player: return None practice_labels = sanitize_choices( parse_choice_list(practice_player.field_maybe_none('gate2_choices')), get_candidate_labels_in_order(practice_data), ) if len(practice_labels) != C.GATE2_SELECT: return None blind_practice = {label: dict(practice_data[label]) for label in practice_labels} blind_main = [] for round_num in range(2, C.NUM_ROUNDS + 1): round_data = get_round_data(standard_participant, round_num) round_player = _get_app_player_for_round(standard_participant, round_num) if not round_player: return None round_labels = sanitize_choices( parse_choice_list(round_player.field_maybe_none('gate2_choices')), get_candidate_labels_in_order(round_data), ) if len(round_labels) != C.GATE2_SELECT: return None blind_main.append({label: dict(round_data[label]) for label in round_labels}) if len(blind_main) != C.NUM_MAIN_ROUNDS: return None return dict(practice=blind_practice, main=blind_main) def _mark_standard_completed_if_ready(player: 'BasePlayer'): participant = player.participant if participant.vars.get('condition') != CONDITION_STANDARD: return if participant.vars.get('standard_completion_marked'): return if participant.vars.get('standard_reclaimed_for_rematch'): return final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS) if not final_round_player: return if not final_round_player.field_maybe_none('hired_candidate'): return replay_payload = _build_blind_replay_data_from_standard(participant) if replay_payload is None: return participant.vars['blind_replay_practice_json'] = json.dumps(replay_payload['practice']) participant.vars['blind_replay_main_json'] = json.dumps(replay_payload['main']) with _assignment_state_lock(): state = _get_matching_state(player.session) code = participant.code completed_codes = state.get('completed_standard_codes', []) if code not in completed_codes: completed_codes.append(code) available_codes = state.get('available_standard_codes', []) standard_to_blind = state.get('standard_to_blind_matches', {}) if code not in standard_to_blind and code not in available_codes: available_codes.append(code) state['completed_standard_codes'] = completed_codes state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'standard_completed:{participant.code}') participant.vars['standard_completion_marked'] = True participant.vars['standard_available_for_match'] = True def _register_standard_completion_if_ready(participant, session, state): """Idempotently register a completed standard participant into the available blind-match pool.""" if participant.vars.get('condition') != CONDITION_STANDARD: return False if participant.vars.get('standard_completion_marked'): return False if participant.vars.get('standard_reclaimed_for_rematch'): return False final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS) if not final_round_player: return False if not final_round_player.field_maybe_none('hired_candidate'): return False replay_payload = _build_blind_replay_data_from_standard(participant) if replay_payload is None: return False participant.vars['blind_replay_practice_json'] = json.dumps(replay_payload['practice']) participant.vars['blind_replay_main_json'] = json.dumps(replay_payload['main']) completed_codes = state.get('completed_standard_codes', []) available_codes = state.get('available_standard_codes', []) standard_to_blind = state.get('standard_to_blind_matches', {}) code = participant.code if code not in completed_codes: completed_codes.append(code) if code not in standard_to_blind and code not in available_codes: available_codes.append(code) state['completed_standard_codes'] = completed_codes state['available_standard_codes'] = available_codes participant.vars['standard_completion_marked'] = True participant.vars['standard_available_for_match'] = True return True def _reclaim_unfinished_blind_matches(session, state): """ Return Standard codes to available_standard_codes for any Blind participant who was assigned but did not complete (no hired_candidate on round 13). Only called when FORCED_CONDITION switches to Standard, i.e. after the Blind batch is closed. Idempotent. """ blind_to_standard = state.get('blind_to_standard_matches', {}) standard_to_blind = state.get('standard_to_blind_matches', {}) available_codes = state.get('available_standard_codes', []) reclaimed = [] for blind_code, standard_code in list(blind_to_standard.items()): blind_participant = _get_session_participant_by_code(session, blind_code) if not blind_participant: continue if blind_participant.vars.get('condition') != CONDITION_BLIND: continue final_player = _get_app_player_for_round(blind_participant, C.NUM_ROUNDS) if not final_player: continue hired = final_player.field_maybe_none('hired_candidate') if hired: continue # Completed — do not reclaim # Blind participant did not complete — return standard code to pool if standard_code not in available_codes: available_codes.append(standard_code) reclaimed.append((blind_code, standard_code)) # Remove from match registries so the reclaimed standard code # can be matched to a new Blind participant cleanly blind_to_standard.pop(blind_code, None) standard_to_blind.pop(standard_code, None) # Mark the standard participant to prevent defensive completion scan # from re-adding this code to available_codes a second time. standard_participant = _get_session_participant_by_code(session, standard_code) if standard_participant: standard_participant.vars['standard_reclaimed_for_rematch'] = True # Invalidate the dropout Blind participant so they cannot re-enter. blind_participant.vars['assignment_finalized'] = False blind_participant.vars['condition'] = '' reclaimed_list = state.get('reclaimed_blind_codes', []) if blind_code not in reclaimed_list: reclaimed_list.append(blind_code) state['reclaimed_blind_codes'] = reclaimed_list if reclaimed: state['available_standard_codes'] = available_codes state['blind_to_standard_matches'] = blind_to_standard state['standard_to_blind_matches'] = standard_to_blind _warn_pool(f'[reclaim] Returned {len(reclaimed)} standard codes to pool: {reclaimed}') return len(reclaimed) def _mark_blind_completed_if_ready(player: 'BasePlayer'): participant = player.participant if participant.vars.get('condition') != CONDITION_BLIND: return if participant.vars.get('blind_completion_marked'): return final_round_player = _get_app_player_for_round(participant, C.NUM_ROUNDS) if not final_round_player: return if not final_round_player.field_maybe_none('hired_candidate'): return with _assignment_state_lock(): state = _get_matching_state(player.session) state['total_matched_pairs'] = int(state.get('total_matched_pairs', 0)) + 1 _save_matching_state(player.session, state) participant.vars['blind_completion_marked'] = True def _apply_assignment_to_participant(participant, condition, dataset_index, dataset_cycle_number, dataset_row_number, matched_standard_code=None): dataset_package = DATASET_PACKAGES[dataset_index] if DATASET_PACKAGES and 0 <= dataset_index < len(DATASET_PACKAGES) else None def _generate_new_standard_sequence(): practice = generate_candidate_data() main = [generate_candidate_data() for _ in range(C.NUM_MAIN_ROUNDS)] return practice, main if condition == CONDITION_STANDARD: # Dynamic assignment: standard-condition participants must receive newly generated data. if get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True) and not participant.session.config.get('is_patch_session', False): practice_data, main_data = _generate_new_standard_sequence() else: if not dataset_package: raise RuntimeError('No dataset package available for non-dynamic standard assignment.') practice_data = dataset_package['standard_practice'] main_data = dataset_package['standard_main'] else: if matched_standard_code: standard_participant = _get_session_participant_by_code(participant.session, matched_standard_code) if not standard_participant: raise RuntimeError(f'Matched standard participant not found: {matched_standard_code}') practice_json = standard_participant.vars.get('blind_replay_practice_json', '') main_json = standard_participant.vars.get('blind_replay_main_json', '') if not practice_json or not main_json: raise RuntimeError('Matched standard participant has no replay payload for blind assignment.') practice_data = json.loads(practice_json) main_data = json.loads(main_json) else: raise RuntimeError('Unmatched blind assignment is not supported in forced-condition mode.') participant.vars['condition'] = condition participant.condition = condition participant.vars['dataset_index'] = dataset_index participant.vars['dataset_cycle_number'] = dataset_cycle_number participant.vars['dataset_row_number'] = dataset_row_number participant.vars['dataset_csv_row_number'] = dataset_package['csv_row_number'] if dataset_package else 0 dataset_unique_id = _build_dataset_unique_id(dataset_row_number, dataset_cycle_number) participant.vars['dataset_unique_id'] = dataset_unique_id participant.dataset_unique_id = dataset_unique_id participant.vars['matched_standard_code'] = matched_standard_code or '' participant.vars['assignment_finalized'] = True participant.vars['assignment_hold'] = False participant.vars['assignment_hold_message'] = '' participant.practice_data_json = json.dumps(practice_data) participant.main_data_json = json.dumps(main_data) def _set_assignment_hold(participant, wait_minutes): participant.vars['assignment_hold'] = True participant.vars['assignment_hold_message'] = HOLD_MESSAGE_TEMPLATE.format(minutes=wait_minutes) def _ensure_dynamic_assignment(player: 'BasePlayer'): participant = player.participant with _assignment_state_lock(): config = _get_assignment_config(player.session) state = _get_matching_state(player.session) reclaimed_codes = set(state.get('reclaimed_blind_codes', [])) if participant.code in reclaimed_codes: _set_assignment_hold(participant, config['hold_wait_minutes']) participant.vars['assignment_hold_message'] = ( 'This study session is no longer available. ' 'Please return your submission on Prolific.' ) return dict( assigned=False, hold=True, message=participant.vars['assignment_hold_message'], ) if ( participant.vars.get('assignment_finalized') is True and participant.vars.get('condition') in (CONDITION_STANDARD, CONDITION_BLIND) ): participant.vars['assignment_hold'] = False participant.vars['assignment_hold_message'] = '' return dict(assigned=True, hold=False, message='') was_phase1_ended = bool(state.get('phase1_ended', False)) # Three-phase dynamic thresholds/caps (unused in forced path above). initial_threshold = _get_config_int( player.session.config, 'initial_threshold', default=os.environ.get('INITIAL_THRESHOLD', 10), minimum=1, ) standard_cap = _get_config_int( player.session.config, 'standard_cap', default=os.environ.get('STANDARD_CAP', 110), minimum=1, ) if initial_threshold >= standard_cap: raise RuntimeError( f'Configuration error: initial_threshold ({initial_threshold}) must be less than ' f'standard_cap ({standard_cap}).' ) # Refresh completion pool defensively so blind availability is accurate even if a page hook was missed. any_new_completion = False for session_participant in player.session.get_participants(): if _register_standard_completion_if_ready(session_participant, player.session, state): any_new_completion = True # Forced assignment override for dynamic mode. forced = _coerce_forced_condition(FORCED_CONDITION) print(f"[FORCED_CONDITION_DEBUG] raw='{FORCED_CONDITION}' type={type(FORCED_CONDITION)} forced='{forced}'", flush=True) _debug_matching_state( player.session, event=f'forced_resolved:{participant.code}:raw={FORCED_CONDITION}:resolved={forced}', ) if forced in (CONDITION_STANDARD, CONDITION_BLIND): # When switching to Standard, reclaim unfinished Blind participants first. if forced == CONDITION_STANDARD: _reclaim_unfinished_blind_matches(player.session, state) if forced == CONDITION_STANDARD: standard_assigned = int(state.get('standard_assigned_count', 0)) dataset_count = max(1, len(DATASET_PACKAGES)) dataset_index = standard_assigned % dataset_count dataset_cycle_number = standard_assigned // dataset_count dataset_row_number = dataset_index + 1 _apply_assignment_to_participant( participant, condition=CONDITION_STANDARD, dataset_index=dataset_index, dataset_cycle_number=dataset_cycle_number, dataset_row_number=dataset_row_number, ) state['standard_assigned_count'] = standard_assigned + 1 _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'assigned_forced_standard:{participant.code}') return dict(assigned=True, hold=False, message='') if forced == CONDITION_BLIND: available_codes = list(state.get('available_standard_codes', [])) if not available_codes: _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_forced_blind_no_available:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) matched_standard_code = available_codes.pop(0) standard_participant = _get_session_participant_by_code(player.session, matched_standard_code) if not standard_participant: _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_forced_blind_missing_standard:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) dataset_index = int(standard_participant.vars.get('dataset_index', 0)) dataset_cycle_number = int(standard_participant.vars.get('dataset_cycle_number', 0)) dataset_row_number = int(standard_participant.vars.get('dataset_row_number', dataset_index + 1)) _apply_assignment_to_participant( participant, condition=CONDITION_BLIND, dataset_index=dataset_index, dataset_cycle_number=dataset_cycle_number, dataset_row_number=dataset_row_number, matched_standard_code=matched_standard_code, ) blind_to_standard = state.get('blind_to_standard_matches', {}) standard_to_blind = state.get('standard_to_blind_matches', {}) blind_to_standard[participant.code] = matched_standard_code standard_to_blind[matched_standard_code] = participant.code state['blind_to_standard_matches'] = blind_to_standard state['standard_to_blind_matches'] = standard_to_blind state['available_standard_codes'] = available_codes standard_participant.vars['standard_available_for_match'] = False _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'assigned_forced_blind:{participant.code}') return dict(assigned=True, hold=False, message='') # Three-phase dynamic thresholds/caps. initial_threshold = _get_config_int( player.session.config, 'initial_threshold', default=os.environ.get('INITIAL_THRESHOLD', 2), minimum=1, ) standard_cap = _get_config_int( player.session.config, 'standard_cap', default=os.environ.get('STANDARD_CAP', 4), minimum=1, ) if initial_threshold >= standard_cap: raise RuntimeError( f'Configuration error: initial_threshold ({initial_threshold}) must be less than ' f'standard_cap ({standard_cap}).' ) if any_new_completion and state.get('phase1_ended') and not was_phase1_ended: state['next_expected_condition'] = CONDITION_BLIND completed_count = len(state.get('completed_standard_codes', [])) if completed_count >= initial_threshold: state['phase1_ended'] = True if not was_phase1_ended: state['next_expected_condition'] = CONDITION_BLIND elif state.get('next_expected_condition') not in (CONDITION_STANDARD, CONDITION_BLIND): state['next_expected_condition'] = CONDITION_BLIND standard_assigned = int(state.get('standard_assigned_count', 0)) available_codes = list(state.get('available_standard_codes', [])) next_expected = state.get('next_expected_condition', CONDITION_STANDARD) phase1_ended = bool(state.get('phase1_ended', False)) selected_condition = None matched_standard_code = None flip_expected = False keep_blind_expected = False if not phase1_ended: if standard_assigned >= standard_cap: _set_assignment_hold(participant, config['hold_wait_minutes']) _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_phase1:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) selected_condition = CONDITION_STANDARD else: if standard_assigned >= standard_cap: # Phase 3: blind-only matching after cap. state['next_expected_condition'] = CONDITION_BLIND if available_codes: selected_condition = CONDITION_BLIND matched_standard_code = available_codes.pop(0) else: _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_phase3:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) elif next_expected == CONDITION_BLIND: if available_codes: selected_condition = CONDITION_BLIND matched_standard_code = available_codes.pop(0) flip_expected = True else: # No blind replay available: hold instead of assigning additional standard. _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes state['next_expected_condition'] = CONDITION_BLIND _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_waiting_blind_data:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) else: # next expected standard selected_condition = CONDITION_STANDARD flip_expected = True # Hard guard: never allow any new Standard assignment once cap is reached. if selected_condition == CONDITION_STANDARD and standard_assigned >= standard_cap: _set_assignment_hold(participant, config['hold_wait_minutes']) state['next_expected_condition'] = CONDITION_BLIND _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_standard_cap_guard:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) if selected_condition == CONDITION_STANDARD: # DATASET_PACKAGES is intentionally empty in forced-condition design. # For dynamic standard assignment, data is freshly generated in # _apply_assignment_to_participant, so these tracking fields are # synthetic but stable. dataset_index = 0 dataset_cycle_number = standard_assigned dataset_row_number = 1 _apply_assignment_to_participant( participant, condition=CONDITION_STANDARD, dataset_index=dataset_index, dataset_cycle_number=dataset_cycle_number, dataset_row_number=dataset_row_number, ) state['standard_assigned_count'] = standard_assigned + 1 if phase1_ended: if keep_blind_expected: state['next_expected_condition'] = CONDITION_BLIND elif flip_expected: state['next_expected_condition'] = CONDITION_BLIND else: if not matched_standard_code: _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_no_match:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) standard_participant = _get_session_participant_by_code(player.session, matched_standard_code) if not standard_participant: _set_assignment_hold(participant, config['hold_wait_minutes']) state['available_standard_codes'] = available_codes _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'hold_missing_standard:{participant.code}') return dict(assigned=False, hold=True, message=participant.vars['assignment_hold_message']) dataset_index = int(standard_participant.vars.get('dataset_index', 0)) dataset_cycle_number = int(standard_participant.vars.get('dataset_cycle_number', 0)) dataset_row_number = int(standard_participant.vars.get('dataset_row_number', dataset_index + 1)) _apply_assignment_to_participant( participant, condition=CONDITION_BLIND, dataset_index=dataset_index, dataset_cycle_number=dataset_cycle_number, dataset_row_number=dataset_row_number, matched_standard_code=matched_standard_code, ) blind_to_standard = state.get('blind_to_standard_matches', {}) standard_to_blind = state.get('standard_to_blind_matches', {}) blind_to_standard[participant.code] = matched_standard_code standard_to_blind[matched_standard_code] = participant.code state['blind_to_standard_matches'] = blind_to_standard state['standard_to_blind_matches'] = standard_to_blind state['available_standard_codes'] = available_codes if flip_expected: state['next_expected_condition'] = CONDITION_STANDARD standard_participant.vars['standard_available_for_match'] = False _save_matching_state(player.session, state) _debug_matching_state(player.session, event=f'assigned_{selected_condition}:{participant.code}') return dict(assigned=True, hold=False, message='') def _load_dataset_packages(): return [] DATASET_PACKAGES = _load_dataset_packages() def _load_patch_assignments(): """Load ordered patch assignments from CSV for patch sessions.""" assignments = [] if not os.path.exists(PATCH_ASSIGNMENTS_PATH): return assignments with open(PATCH_ASSIGNMENTS_PATH, 'r', encoding='utf-8-sig', newline='') as csv_file: reader = csv.DictReader(csv_file) for row_num, row in enumerate(reader, start=2): try: patch_order = int(row.get('patch_order')) missing_id_in_session = int(row.get('missing_id_in_session')) dataset_index = int(row.get('dataset_index')) dataset_row_number = int(row.get('dataset_row_number')) dataset_cycle_number = int(row.get('dataset_cycle_number')) except (TypeError, ValueError): _warn_pool(f'Skipping patch row {row_num}: invalid integer field(s).') continue raw_condition = row.get('condition') condition = (str(raw_condition).strip().lower() if raw_condition is not None else '') if condition not in (CONDITION_STANDARD, CONDITION_BLIND): _warn_pool(f'Skipping patch row {row_num}: invalid condition.') continue assignments.append(dict( patch_order=patch_order, missing_id_in_session=missing_id_in_session, condition=condition, dataset_index=dataset_index, dataset_row_number=dataset_row_number, dataset_cycle_number=dataset_cycle_number, )) assignments.sort(key=lambda x: x['patch_order']) _warn_pool(f'Loaded patch assignments: {len(assignments)} rows.') return assignments PATCH_ASSIGNMENTS = _load_patch_assignments() def _get_patch_entry(participant): patch_idx = max(0, int(participant.id_in_session) - 1) if patch_idx >= len(PATCH_ASSIGNMENTS): raise RuntimeError( f'Patch assignment not found for id_in_session={participant.id_in_session}. ' f'Available patch rows={len(PATCH_ASSIGNMENTS)}.' ) return PATCH_ASSIGNMENTS[patch_idx] def _patch_assignment(participant): """Sequential patch assignment: participant 1->patch_order 1, etc.""" if not PATCH_ASSIGNMENTS: raise RuntimeError('No patch assignments loaded from patch_assignments.csv.') patch_entry = _get_patch_entry(participant) dataset_index = patch_entry['dataset_index'] if dataset_index < 0 or dataset_index >= len(DATASET_PACKAGES): raise RuntimeError( f'Patch assignment has out-of-range dataset_index={dataset_index} ' f'for id_in_session={participant.id_in_session}.' ) condition = patch_entry.get('condition') if condition not in (CONDITION_STANDARD, CONDITION_BLIND): raise RuntimeError( f"Patch assignment missing/invalid 'condition' for id_in_session={participant.id_in_session}." ) return ( condition, dataset_index, patch_entry.get('dataset_cycle_number', 0), ) def _participant_assignment(participant): """Deterministic alternating assignment based on id_in_session. Using zero-based `k = participant.id_in_session - 1`: - even k => standard, odd k => blind - dataset index cycles every 2 participants across ordered CSV rows """ if participant.session.config.get('is_patch_session', False): if FORCED_CONDITION is not None: raise RuntimeError('FORCED_CONDITION and is_patch_session cannot both be active.') return _patch_assignment(participant) if get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True): # Assignment is deferred until participant entry (ProlificIntro) under serialized matching logic. return None, None, None k = max(0, int(participant.id_in_session) - 1) forced = _coerce_forced_condition(FORCED_CONDITION) if forced in (CONDITION_BLIND, CONDITION_STANDARD): # In forced single-condition mode, advance one dataset row per participant. condition = forced dataset_index = 0 dataset_cycle_number = k return condition, dataset_index, dataset_cycle_number condition = CONDITION_STANDARD if (k % 2 == 0) else CONDITION_BLIND dataset_index = 0 dataset_cycle_number = k // 2 return condition, dataset_index, dataset_cycle_number # built-in hook function(s) (called automatically by oTree) # def creating_session(subsession: Subsession): for player in subsession.get_players(): participant = player.participant if subsession.round_number == 1: if participant.label: player.prolific_pid = participant.label # Initialize dynamic matching state once per session. if player.id_in_subsession == 1: _get_assignment_config(subsession.session) _save_matching_state(subsession.session, _get_matching_state(subsession.session)) participant.history_json = json.dumps([]) participant.selected_round = 0 participant.selected_candidate = '' participant.selected_true_quality = 0 participant.selected_reward_string = '$0.00' participant.dataset_unique_id = '' participant.vars['condition'] = participant.vars.get('condition', '') participant.vars['assignment_finalized'] = bool(participant.vars.get('assignment_finalized', False)) participant.vars['assignment_hold'] = False participant.vars['assignment_hold_message'] = '' participant.vars['actually_visited'] = False participant.vars['blind_completion_marked'] = False is_patch_session = participant.session.config.get('is_patch_session', False) use_dynamic = get_config_bool(participant.session.config, 'use_dynamic_condition_assignment', default=True) if use_dynamic and not is_patch_session: # Deferred assignment mode: assign on first page entry after checking live state. participant.vars['condition'] = '' participant.vars['dataset_index'] = 0 participant.vars['dataset_cycle_number'] = 0 participant.vars['dataset_row_number'] = 0 participant.vars['dataset_csv_row_number'] = 0 participant.vars['dataset_unique_id'] = '' participant.vars['matched_standard_code'] = '' participant.vars['assignment_finalized'] = False participant.practice_data_json = '' participant.main_data_json = '' continue condition, dataset_index, dataset_cycle_number = _participant_assignment(participant) dataset_package = DATASET_PACKAGES[dataset_index] patch_entry = None if participant.session.config.get('is_patch_session', False): patch_entry = _get_patch_entry(participant) if condition == CONDITION_STANDARD: practice_data = dataset_package['standard_practice'] main_data = dataset_package['standard_main'] else: practice_data = dataset_package['blind_practice'] main_data = dataset_package['blind_main'] participant.vars['condition'] = condition participant.condition = condition participant.vars['dataset_index'] = dataset_index participant.vars['dataset_cycle_number'] = dataset_cycle_number participant.vars['dataset_row_number'] = ( patch_entry['dataset_row_number'] if patch_entry else dataset_package['dataset_row_number'] ) participant.vars['dataset_csv_row_number'] = dataset_package['csv_row_number'] participant.vars['dataset_unique_id'] = _build_dataset_unique_id( participant.vars['dataset_row_number'], dataset_cycle_number, ) participant.dataset_unique_id = participant.vars['dataset_unique_id'] participant.vars['matched_standard_code'] = '' participant.vars['assignment_finalized'] = True participant.practice_data_json = json.dumps(practice_data) participant.main_data_json = json.dumps(main_data) # # the below function(s) are user-defined, not called by oTree # def generate_candidate_data(): """Generate data for 10 candidates with true quality and noisy scores.""" result = {} for i in range(C.NUM_CANDIDATES): true_q = round(C.MU + random.gauss(0, 1) * C.SIGMA) true_q = max(0, true_q) # Generate noisy scores resume = round(true_q + random.gauss(0, 1) * C.SIGMA_RESUME) resume = max(0, resume) zoom = round(true_q + random.gauss(0, 1) * C.SIGMA_ZOOM) zoom = max(0, zoom) inperson = round(true_q + random.gauss(0, 1) * C.SIGMA_INPERSON) inperson = max(0, inperson) result[C.CANDIDATE_LABELS[i]] = { 'true_q': true_q, 'resume': resume, 'zoom': zoom, 'inperson': inperson } return result def calculate_reward(true_quality): """Calculate reward based on true quality of hired candidate.""" reward = min(C.MAX_REWARD, C.DEFAULT_REWARD + C.BONUS_RATE * true_quality) return f"${reward:.2f}" def calculate_quality_quantile(true_quality): """Calculate percentile of true quality under the normal distribution.""" z_score = (true_quality - C.MU) / C.SIGMA # Standard normal CDF using error function: Phi(z) = 0.5 * (1 + erf(z / sqrt(2))) return 0.5 * (1 + math.erf(z_score / math.sqrt(2))) * 100 def format_percentile(percentile): """Format percentile with ordinal suffix (e.g., 92.4th percentile).""" rounded = round(percentile, 1) int_part = int(rounded) if 10 <= (int_part % 100) <= 20: suffix = 'th' else: suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(int_part % 10, 'th') return f"{rounded:.1f}{suffix} percentile" def get_history(participant): """Get accumulated history of decisions.""" if participant.history_json: return json.loads(participant.history_json) return [] def add_to_history(participant, round_num, hired_candidate, true_q, reward_str, decision_context=None): """Add a decision to the history.""" history = get_history(participant) entry = { 'round': round_num, 'hired_candidate': hired_candidate, 'true_q': true_q, 'reward': reward_str } if decision_context: entry['decision_context'] = decision_context history.append(entry) participant.history_json = json.dumps(history) def parse_choice_list(raw_value): """Parse stored JSON list fields safely.""" if not raw_value: return [] try: parsed = json.loads(raw_value) return parsed if isinstance(parsed, list) else [] except (json.JSONDecodeError, TypeError): return [raw_value] if isinstance(raw_value, str) and raw_value else [] def sanitize_choices(raw_choices, allowed_labels): """Keep unique choices in original order, restricted to allowed labels.""" if not isinstance(raw_choices, list): return [] allowed = set(allowed_labels) cleaned = [] seen = set() for label in raw_choices: if label in allowed and label not in seen: cleaned.append(label) seen.add(label) return cleaned def _get_form_feedback_store(participant): """Participant-scoped form feedback store to avoid session-wide leakage.""" store = participant.vars.get('form_feedback') if not isinstance(store, dict): store = {} participant.vars['form_feedback'] = store return store def get_form_feedback(player: 'BasePlayer', gate_key: str): key = f"r{player.round_number}_{gate_key}" store = _get_form_feedback_store(player.participant) payload = store.get(key, {}) if not isinstance(payload, dict): return '', [] error = payload.get('error', '') or '' choices = payload.get('choices', []) if not isinstance(choices, list): choices = [] return error, choices def set_form_feedback(player: 'BasePlayer', gate_key: str, error: str, choices): key = f"r{player.round_number}_{gate_key}" store = _get_form_feedback_store(player.participant) store[key] = { 'error': error or '', 'choices': choices if isinstance(choices, list) else [], } player.participant.vars['form_feedback'] = store def clear_form_feedback(player: 'BasePlayer', gate_key: str): key = f"r{player.round_number}_{gate_key}" store = _get_form_feedback_store(player.participant) store.pop(key, None) player.participant.vars['form_feedback'] = store def build_visible_scores(data, labels, include_zoom=False, include_inperson=False): """Build score snapshot for labels with only scores visible at the current decision point.""" snapshot = [] for label in labels: row = { 'label': label, 'resume': data[label]['resume'], } if include_zoom: row['zoom'] = data[label]['zoom'] if include_inperson: row['inperson'] = data[label]['inperson'] snapshot.append(row) return snapshot def select_final_payment(participant): """Select one main round for final payment.""" history = get_history(participant) # Main rounds in this app are oTree rounds 2..13 (practice is round 0 in history). main_rounds = [h for h in history if h['round'] > 1] if main_rounds: selected = random.choice(main_rounds) # History uses oTree round numbers (practice=0, main=2..13). # Convert to participant-facing main round index (1..12) for display/export consistency. participant.selected_round = selected['round'] - 1 participant.selected_reward_string = selected['reward'] participant.selected_candidate = selected['hired_candidate'] participant.selected_true_quality = selected['true_q'] def get_candidate_labels_in_order(data): """Return candidate labels in stored dataset order.""" if not isinstance(data, dict): return [] return [str(label) for label in data.keys()] def get_round_data(participant, round_num=None): """Get candidate data for a specific round. round_num uses oTree numbering in this app: practice=0, main=2..13. """ if round_num is None or round_num == 0: return json.loads(participant.practice_data_json) main_data = json.loads(participant.main_data_json) index = round_num - 2 if index < 0 or index >= len(main_data): raise IndexError(f'Main round index out of bounds: round_num={round_num}, index={index}') return main_data[index] def get_condition2_finalists(participant, round_num=None): """Return finalist labels for blind condition in a given round.""" if round_num is None: round_num = 0 data = get_round_data(participant, round_num) if isinstance(data, dict): labels = get_candidate_labels_in_order(data) if len(labels) >= C.GATE2_SELECT: return labels[:C.GATE2_SELECT] return [] def _normalize_condition_name(raw_condition, default=CONDITION_BLIND): if isinstance(raw_condition, str): token = raw_condition.strip().lower() if token in (CONDITION_STANDARD, CONDITION_BLIND): return token if token == '1': return CONDITION_STANDARD if token == '2': return CONDITION_BLIND if raw_condition == 1: return CONDITION_STANDARD if raw_condition == 2: return CONDITION_BLIND return default def calculate_progress_percentage(player: 'BasePlayer', page_name: str = '') -> int: """Calculate condition-aware progress through the study as a percentage. Tracks progress page-by-page for accurate progress display.""" condition = get_condition(player) round_num = player.round_number # Define page order for each condition in Round 1 round1_pages_both = ['ProlificIntro', 'Consent', 'PreInstructionsDisclaimer', 'InstructionsPart1', 'Instructions', 'Comprehension', 'PracticeIntro'] round1_pages_standard = ['PracticeGate1', 'PracticeGate1After', 'PracticeGate2', 'PracticeGate2After', 'PracticeGate3', 'PracticePerformance'] round1_pages_blind = ['PracticeStage3Decision', 'PracticePerformance'] # Main round pages (rounds 2-13) main_pages_standard = ['MainGate1', 'MainGate1After', 'MainGate2', 'MainGate2After', 'MainGate3', 'MainRoundPerformance'] main_pages_blind = ['MainStage3Decision', 'MainRoundPerformance'] # Final pages (round 13 only) final_pages = ['PostTaskWeights', 'WeightsJustification', 'Demographics', 'FinalPayoff', 'ProlificRedirect'] pages_completed = 0 if condition == CONDITION_STANDARD: total_pages = 88 # 7+6 (round1) + 6*12 (main) + 5 (final) if round_num == 1: # Count pages in round 1 all_round1_pages = round1_pages_both + round1_pages_standard if page_name in all_round1_pages: pages_completed = all_round1_pages.index(page_name) + 1 else: pages_completed = len(all_round1_pages) elif round_num <= 13: # Completed round 1, now in main rounds pages_completed = 13 # All of round 1 (7+6) main_round = round_num - 1 # rounds 2-13 map to main 1-12 # Add completed main rounds pages_completed += (main_round - 1) * 6 # Add current page within this round if page_name in main_pages_standard: pages_completed += main_pages_standard.index(page_name) + 1 elif page_name in final_pages and round_num == 13: pages_completed += 6 # All main round pages pages_completed += final_pages.index(page_name) + 1 else: pages_completed += 6 # Assume at end of round else: pages_completed = total_pages else: total_pages = 38 # 7+2 (round1) + 2*12 (main) + 5 (final) for blind condition if round_num == 1: all_round1_pages = round1_pages_both + round1_pages_blind if page_name in all_round1_pages: pages_completed = all_round1_pages.index(page_name) + 1 else: pages_completed = len(all_round1_pages) elif round_num <= 13: pages_completed = 9 # All of round 1 (7+2) main_round = round_num - 1 pages_completed += (main_round - 1) * 2 if page_name in main_pages_blind: pages_completed += main_pages_blind.index(page_name) + 1 elif page_name in final_pages and round_num == 13: pages_completed += 2 pages_completed += final_pages.index(page_name) + 1 else: pages_completed += 2 else: pages_completed = total_pages progress = int((pages_completed / total_pages) * 100) return max(0, min(progress, 100)) def get_condition(player: 'BasePlayer'): """Safely get participant's condition choice across rounds.""" stored = player.participant.vars.get('condition') if stored is not None: return _normalize_condition_name(stored) # Fallback for legacy sessions where condition may exist as a participant field. # Avoid direct getattr(..., default) because oTree participant field lookup can raise # KeyError (not AttributeError) when the key is missing. legacy_condition = None try: legacy_condition = player.participant.condition except (AttributeError, KeyError): legacy_condition = None return _normalize_condition_name(legacy_condition) STAGE_DURATION_FIELD_BY_PAGE = { 'PreInstructionsDisclaimer': 'preinstructions_secs', 'InstructionsPart1': 'instructions_part1_secs', 'Instructions': 'instructions_part2_secs', 'Comprehension': 'comprehension_secs', 'PracticeStage1Resume': 'practice_stage1_secs', 'PracticeStage2Zoom': 'practice_stage2_secs', 'PracticeStage3Decision': 'practice_stage3_secs', 'MainStage1Resume': 'main_stage1_secs', 'MainStage2Zoom': 'main_stage2_secs', 'MainStage3Decision': 'main_stage3_secs', 'PracticeGate1': 'practice_gate1_secs', 'PracticeGate1After': 'practice_gate1_after_secs', 'PracticeGate2': 'practice_gate2_secs', 'PracticeGate2After': 'practice_gate2_after_secs', 'PracticeGate3': 'practice_gate3_secs', 'MainGate1': 'main_gate1_secs', 'MainGate1After': 'main_gate1_after_secs', 'MainGate2': 'main_gate2_secs', 'MainGate2After': 'main_gate2_after_secs', 'MainGate3': 'main_gate3_secs', } def _stage_timer_key(player: 'BasePlayer', page_name: str) -> str: return f'_stage_start_{player.round_number}_{page_name}' def start_stage_timer(player: 'BasePlayer', page_name: str): if page_name in STAGE_DURATION_FIELD_BY_PAGE: player.participant.vars[_stage_timer_key(player, page_name)] = time.time() def record_stage_duration(player: 'BasePlayer', page_name: str): field_name = STAGE_DURATION_FIELD_BY_PAGE.get(page_name) if not field_name: return start_ts = player.participant.vars.get(_stage_timer_key(player, page_name)) if start_ts is None: return duration_secs = max(0.0, round(time.time() - start_ts, 3)) setattr(player, field_name, duration_secs) def get_config_bool(config_dict, key, default=False): """Parse boolean-like values from session config safely.""" raw = config_dict.get(key, default) if isinstance(raw, bool): return raw if raw is None: return default return str(raw).strip().lower() in ('1', 'true', 'yes', 'y', 'on') # class BasePage(Page): """Base page class that automatically includes progress percentage.""" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) page_name = self.__class__.__name__ start_stage_timer(self.player, page_name) context['progress_percentage'] = calculate_progress_percentage(self.player, page_name) # Show "Round X of 12" during main rounds (applies to both conditions) main_round_pages = { 'MainStage1Resume', 'MainStage2Zoom', 'MainStage3Decision', 'MainGate1', 'MainGate1After', 'MainGate2', 'MainGate2After', 'MainGate3', 'MainRoundPerformance' } is_main_round_page = page_name in main_round_pages and 2 <= self.player.round_number <= C.NUM_ROUNDS context['show_round_counter'] = is_main_round_page context['current_main_round'] = max(1, min(self.player.round_number - 1, C.NUM_MAIN_ROUNDS)) context['total_main_rounds'] = C.NUM_MAIN_ROUNDS return context class ProlificIntro(BasePage): form_model = 'player' form_fields = ['prolific_pid', 'prolific_pid_from_url', 'prolific_study_id', 'prolific_session_id'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): player.participant.vars['actually_visited'] = True label = (player.participant.label or '').strip() existing_pid = (player.field_maybe_none('prolific_pid') or '').strip() pid_from_url = (player.field_maybe_none('prolific_pid_from_url') or '').strip() if label and not existing_pid: player.prolific_pid = label elif pid_from_url and not existing_pid: player.prolific_pid = pid_from_url return dict( prefill_prolific_pid=(player.field_maybe_none('prolific_pid') or '').strip(), assignment_hold=bool(player.participant.vars.get('assignment_hold', False)), assignment_hold_message=player.participant.vars.get('assignment_hold_message', ''), ) @staticmethod def error_message(player: Player, values): if get_config_bool(player.session.config, 'use_dynamic_condition_assignment', default=True) and not player.session.config.get('is_patch_session', False): assignment_result = _ensure_dynamic_assignment(player) if assignment_result.get('hold'): return assignment_result.get('message') or player.participant.vars.get('assignment_hold_message', '') prolific_pid = (values.get('prolific_pid') or '').strip() pid_from_url = (values.get('prolific_pid_from_url') or '').strip() require_pid = get_config_bool(player.session.config, 'require_prolific_pid', default=True) # Check if Prolific ID is required and validate it's not empty if require_pid: if not prolific_pid: return 'Please provide your Prolific ID to continue.' # Prolific IDs are typically 24-character alphanumeric strings if len(prolific_pid) < 8: return 'Prolific ID appears to be invalid. Please check your ID.' if pid_from_url and prolific_pid != pid_from_url: return 'The Prolific ID must match the PROLIFIC_PID in your study link.' link_label = (player.participant.label or '').strip() if link_label and prolific_pid != link_label: return 'The Prolific ID must match the ID from your study link.' @staticmethod def before_next_page(player: Player, timeout_happened): prolific_pid = (player.field_maybe_none('prolific_pid') or '').strip() pid_from_url = (player.field_maybe_none('prolific_pid_from_url') or '').strip() study_id = (player.field_maybe_none('prolific_study_id') or '').strip() session_id = (player.field_maybe_none('prolific_session_id') or '').strip() if pid_from_url and not prolific_pid: prolific_pid = pid_from_url player.prolific_pid = pid_from_url if prolific_pid and not player.participant.label: player.participant.label = prolific_pid player.prolific_pid_mismatch = bool(pid_from_url and prolific_pid and pid_from_url != prolific_pid) player.prolific_pid_link_missing = bool(not pid_from_url) if prolific_pid: player.participant.prolific_pid = prolific_pid player.participant.vars['prolific_pid'] = prolific_pid player.participant.vars['prolific_pid_from_url'] = pid_from_url player.participant.vars['prolific_pid_mismatch'] = player.prolific_pid_mismatch player.participant.vars['prolific_pid_link_missing'] = player.prolific_pid_link_missing player.participant.vars['prolific_study_id'] = study_id player.participant.vars['prolific_session_id'] = session_id class Consent(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == 1 class PreInstructionsDisclaimer(BasePage): form_model = 'player' form_fields = ['Q111_instructions_willing'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def error_message(player: Player, values): player.trial_preinstructions += 1 if values.get('Q111_instructions_willing') != 2: return 'Please confirm that you are willing to read the instructions.' @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PreInstructionsDisclaimer') class InstructionsPart1(BasePage): form_model = 'player' form_fields = ['Q_ultimate_mission'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): condition = get_condition(player) return dict( is_standard=condition == CONDITION_STANDARD, is_blind=condition == CONDITION_BLIND, is_condition_1=condition == CONDITION_STANDARD, is_condition_2=condition == CONDITION_BLIND, mu=C.MU, sigma=C.SIGMA, sigma_resume=C.SIGMA_RESUME ) @staticmethod def error_message(player: Player, values): player.trial_instructions_part1 += 1 mission = values.get('Q_ultimate_mission') if mission is None: return 'Please answer the question before continuing.' if mission != 2: player.comp_mission_errors += 1 return 'Please review your ultimate objective.' @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'InstructionsPart1') class Instructions(BasePage): form_model = 'player' form_fields = ['Q_score_interpretation'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): condition = get_condition(player) return dict( is_standard=condition == CONDITION_STANDARD, is_blind=condition == CONDITION_BLIND, is_condition_1=condition == CONDITION_STANDARD, is_condition_2=condition == CONDITION_BLIND, mu=C.MU, sigma=C.SIGMA, sigma_resume=C.SIGMA_RESUME ) @staticmethod def error_message(player: Player, values): player.trial_instructions_part2 += 1 interpretation = values.get('Q_score_interpretation') if interpretation is None: return 'Please answer the question before continuing.' if interpretation != 1: player.comp_score_errors += 1 return 'Please review the instructions about how to interpret scores. Scores are noisy signals, not perfect information.' @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'Instructions') class Comprehension(BasePage): form_model = 'player' @staticmethod def get_form_fields(player: Player): condition = get_condition(player) if condition == CONDITION_STANDARD: return ['Q_advance_candidate', 'Q_bonus_calculation'] else: return ['Q_decision_process', 'Q_bonus_calculation'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): condition = get_condition(player) return dict( is_standard=condition == CONDITION_STANDARD, is_blind=condition == CONDITION_BLIND, is_condition_1=condition == CONDITION_STANDARD, is_condition_2=condition == CONDITION_BLIND, ) @staticmethod def error_message(player: Player, values): condition = get_condition(player) player.trial_comprehension_page += 1 if values.get('Q_bonus_calculation') is not None: player.trial_bonus_answered += 1 if condition == CONDITION_STANDARD and values.get('Q_advance_candidate') is not None: player.trial_advance_answered += 1 if condition == CONDITION_BLIND and values.get('Q_decision_process') is not None: player.trial_decision_answered += 1 messages = [] # Require answers if values.get('Q_bonus_calculation') is None: messages.append('Please answer the bonus calculation question.') if condition == CONDITION_STANDARD and values.get('Q_advance_candidate') is None: messages.append('Please answer the advance candidate question.') if condition == CONDITION_BLIND and values.get('Q_decision_process') is None: messages.append('Please answer the decision process question.') # Check Q_bonus_calculation (must be "ONE random hire") if values.get('Q_bonus_calculation') is not None and values['Q_bonus_calculation'] != 2: player.comp_bonus_errors += 1 messages.append('Please review how your bonus is calculated.') # Standard-condition specific validation if condition == CONDITION_STANDARD: if values.get('Q_advance_candidate') is not None and values['Q_advance_candidate'] != 2: player.comp_advance_errors += 1 messages.append('Please review what happens when you advance a candidate.') # Blind-condition specific validation if condition == CONDITION_BLIND: if values.get('Q_decision_process') is not None and values['Q_decision_process'] != 3: player.comp_decision_errors += 1 messages.append('Please review the decision process. You will see scores across all three stages and then choose one hire.') if messages: return ' '.join(messages) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'Comprehension') class PracticeIntro(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): condition = get_condition(player) return dict( is_standard=condition == CONDITION_STANDARD, is_blind=condition == CONDITION_BLIND, is_condition_1=condition == CONDITION_STANDARD, ) # Practice Round Pages - Blind condition (stage-based) class PracticeStage1Resume(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return False @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) finalist_labels = get_condition2_finalists(player.participant, 0) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-' }) return dict( candidates=candidates, is_practice=True, round_num=0 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeStage1Resume') class PracticeStage2Zoom(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return False @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) finalist_labels = get_condition2_finalists(player.participant, 0) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-' }) return dict( candidates=candidates, is_practice=True, round_num=0 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeStage2Zoom') class PracticeStage3Decision(BasePage): form_model = 'player' form_fields = ['hired_candidate'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_BLIND @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) finalist_labels = get_condition2_finalists(player.participant, 0) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': data[label]['inperson'], 'true_q': data[label]['true_q'] }) return dict( candidates=candidates, is_practice=True, round_num=0 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeStage3Decision') data = get_round_data(player.participant, 0) finalist_labels = get_condition2_finalists(player.participant, 0) hired_label = player.hired_candidate hired_data = data[hired_label] true_q = hired_data['true_q'] reward_str = calculate_reward(true_q) # Store visible score context at hiring decision (all 3 scores shown on this screen) decision_context = dict( decision_type='hire', condition=get_condition(player), display_round=0, chosen_candidate=hired_label, visible_candidates=build_visible_scores(data, finalist_labels, include_zoom=True, include_inperson=True), ) player.hire_decision_context = json.dumps(decision_context) # Store practice results player.practice_hired_candidate = hired_label player.practice_hired_trueq = true_q player.practice_reward = reward_str # Add to history (round 0 = practice) add_to_history(player.participant, 0, hired_label, true_q, reward_str, decision_context=decision_context) class PracticePerformance(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): true_q = player.field_maybe_none('practice_hired_trueq') quality_percentile = '' if true_q is not None: quality_percentile = format_percentile(calculate_quality_quantile(true_q)) return dict( hired_candidate=player.field_maybe_none('practice_hired_candidate') or '', last_round_true_q=true_q or 0, reward=player.field_maybe_none('practice_reward') or '', quality_percentile=quality_percentile ) # Main Round Pages - Blind condition (stage-based) class MainStage1Resume(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return False @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) finalist_labels = get_condition2_finalists(player.participant, player.round_number) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-' }) return dict( candidates=candidates, round_num=player.round_number - 1 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainStage1Resume') class MainStage2Zoom(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return False @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) finalist_labels = get_condition2_finalists(player.participant, player.round_number) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-' }) return dict( candidates=candidates, round_num=player.round_number - 1 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainStage2Zoom') class MainStage3Decision(BasePage): form_model = 'player' form_fields = ['hired_candidate'] @staticmethod def is_displayed(player: Player): return get_condition(player) == CONDITION_BLIND and player.round_number > 1 @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) finalist_labels = get_condition2_finalists(player.participant, player.round_number) candidates = [] for label in finalist_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': data[label]['inperson'], 'true_q': data[label]['true_q'] }) return dict( candidates=candidates, round_num=player.round_number - 1 ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainStage3Decision') data = get_round_data(player.participant, player.round_number) finalist_labels = get_condition2_finalists(player.participant, player.round_number) hired_label = player.hired_candidate hired_data = data[hired_label] true_q = hired_data['true_q'] reward_str = calculate_reward(true_q) # Store visible score context at hiring decision (all 3 scores shown on this screen) decision_context = dict( decision_type='hire', condition=get_condition(player), display_round=player.round_number - 1, chosen_candidate=hired_label, visible_candidates=build_visible_scores(data, finalist_labels, include_zoom=True, include_inperson=True), ) player.hire_decision_context = json.dumps(decision_context) player.hired_trueq = true_q player.reward_str = reward_str add_to_history(player.participant, player.round_number, hired_label, true_q, reward_str, decision_context=decision_context) class MainRoundPerformance(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number > 1 @staticmethod def vars_for_template(player: Player): history = get_history(player.participant) # Filter to only show main rounds (exclude practice which has round == 0) main_history = [h for h in history if h['round'] > 0] # Adjust round numbers for display (convert from internal numbering to user-facing) for entry in main_history: entry['display_round'] = entry['round'] - 1 entry['quality_percentile'] = format_percentile(calculate_quality_quantile(entry['true_q'])) last_item = main_history[-1] if main_history else None return dict( history=main_history, last_round_true_q=last_item['true_q'] if last_item else 0, last_round_quality_percentile=( format_percentile(calculate_quality_quantile(last_item['true_q'])) if last_item else '' ), round_num=player.round_number - 1 ) # Standard Condition Pages (Gate-based) class PracticeGate1(BasePage): form_model = 'player' form_fields = ['gate1_choices', 'gate1_initial_checked'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-' }) form_error, form_post_gate1_choices = get_form_feedback(player, 'gate1') form_post_gate1_choices = sanitize_choices(form_post_gate1_choices, all_labels) return dict( candidates=candidates, is_practice=True, round_num=0, gate_num=1, required_count=C.GATE1_SELECT, form_error=form_error, form_post_gate1_choices=form_post_gate1_choices ) @staticmethod def error_message(player: Player, values): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate1_raw = values.get('gate1_choices') try: gate1_choices = json.loads(gate1_raw) if gate1_raw else [] except (json.JSONDecodeError, TypeError): gate1_choices = [] gate1_choices = sanitize_choices(gate1_choices, all_labels) if len(gate1_choices) != C.GATE1_SELECT: msg = f'Please select exactly {C.GATE1_SELECT} candidates.' set_form_feedback(player, 'gate1', msg, gate1_choices) return msg clear_form_feedback(player, 'gate1') player.gate1_choices = json.dumps(gate1_choices) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeGate1') data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) selected_gate1 = parse_choice_list(player.gate1_choices) player.gate1_decision_context = json.dumps(dict( decision_type='gate1', condition=get_condition(player), display_round=0, selected_candidates=selected_gate1, visible_candidates=build_visible_scores(data, all_labels, include_zoom=False, include_inperson=False), )) class PracticeGate1After(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') # Handle both JSON array format and plain string format if gate1_choices_json: try: gate1_choices = json.loads(gate1_choices_json) except (json.JSONDecodeError, TypeError): # If not JSON, treat as single string or empty gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else [] else: gate1_choices = [] eliminated = [c for c in all_labels if c not in gate1_choices] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-', 'eliminated': label in eliminated }) return dict( candidates=candidates, gate1_choices=gate1_choices, is_practice=True, round_num=0, gate_num=1, advanced_count=len(gate1_choices) ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeGate1After') class PracticeGate2(BasePage): form_model = 'player' form_fields = ['gate2_choices', 'gate2_initial_checked', 'gate2_initial_disabled_checked'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') gate1_choices = parse_choice_list(gate1_choices_json) gate1_choices = sanitize_choices(gate1_choices, all_labels) eliminated = [c for c in all_labels if c not in gate1_choices] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-', 'eliminated': label in eliminated }) available_candidates = gate1_choices form_error, form_post_gate2_choices = get_form_feedback(player, 'gate2') form_post_gate2_choices = sanitize_choices(form_post_gate2_choices, available_candidates) return dict( candidates=candidates, available_candidates=available_candidates, is_practice=True, round_num=0, gate_num=2, required_count=C.GATE2_SELECT, form_error=form_error, form_post_gate2_choices=form_post_gate2_choices ) @staticmethod def error_message(player: Player, values): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate2_raw = values.get('gate2_choices') try: gate2_choices = json.loads(gate2_raw) if gate2_raw else [] except (json.JSONDecodeError, TypeError): gate2_choices = [] gate2_choices = sanitize_choices(gate2_choices, all_labels) if len(gate2_choices) != C.GATE2_SELECT: msg = f'Please select exactly {C.GATE2_SELECT} candidates.' set_form_feedback(player, 'gate2', msg, gate2_choices) return msg gate1_choices_json = player.field_maybe_none('gate1_choices') gate1_choices = parse_choice_list(gate1_choices_json) gate1_choices = sanitize_choices(gate1_choices, all_labels) if not all(c in gate1_choices for c in gate2_choices): msg = 'All selected candidates must be from Gate 1 selections.' set_form_feedback(player, 'gate2', msg, gate2_choices) return msg clear_form_feedback(player, 'gate2') player.gate2_choices = json.dumps(gate2_choices) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeGate2') data = get_round_data(player.participant, 0) gate1_choices = parse_choice_list(player.gate1_choices) gate2_choices = parse_choice_list(player.gate2_choices) player.gate2_decision_context = json.dumps(dict( decision_type='gate2', condition=get_condition(player), display_round=0, available_candidates=gate1_choices, selected_candidates=gate2_choices, visible_candidates=build_visible_scores(data, gate1_choices, include_zoom=True, include_inperson=False), )) class PracticeGate2After(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate2_choices') is not None @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') # Handle both JSON array format and plain string format if gate1_choices_json: try: gate1_choices = json.loads(gate1_choices_json) except (json.JSONDecodeError, TypeError): gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else [] else: gate1_choices = [] gate2_choices_json = player.field_maybe_none('gate2_choices') # Handle both JSON array format and plain string format if gate2_choices_json: try: gate2_choices = json.loads(gate2_choices_json) except (json.JSONDecodeError, TypeError): gate2_choices = [gate2_choices_json] if isinstance(gate2_choices_json, str) and gate2_choices_json else [] else: gate2_choices = [] candidates = [] for label in all_labels: eliminated = label not in gate2_choices candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-', 'eliminated': eliminated }) return dict( candidates=candidates, gate2_choices=gate2_choices, is_practice=True, round_num=0, gate_num=2, advanced_count=len(gate2_choices) ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeGate2After') class PracticeGate3(BasePage): form_model = 'player' form_fields = ['hired_candidate'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, 0) all_labels = get_candidate_labels_in_order(data) gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else [] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': data[label]['inperson'], 'true_q': data[label]['true_q'], 'available': label in gate2_choices }) return dict( candidates=candidates, available_candidates=gate2_choices, is_practice=True, round_num=0, gate_num=3 ) @staticmethod def error_message(player: Player, values): hired = values.get('hired_candidate') gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else [] if hired and hired not in gate2_choices: return 'Selected candidate must be from Gate 2 selections.' @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'PracticeGate3') data = get_round_data(player.participant, 0) gate2_choices = parse_choice_list(player.gate2_choices) hired_label = player.hired_candidate hired_data = data[hired_label] true_q = hired_data['true_q'] reward_str = calculate_reward(true_q) # Store finalist score context (3 finalists with all 3 revealed scores) decision_context = dict( decision_type='hire', condition=get_condition(player), display_round=0, finalists=gate2_choices, chosen_candidate=hired_label, visible_candidates=build_visible_scores(data, gate2_choices, include_zoom=True, include_inperson=True), ) player.hire_decision_context = json.dumps(decision_context) player.practice_hired_candidate = hired_label player.practice_hired_trueq = true_q player.practice_reward = reward_str add_to_history(player.participant, 0, hired_label, true_q, reward_str, decision_context=decision_context) # Main Rounds - Standard condition (Funnel) class MainGate1(BasePage): form_model = 'player' form_fields = ['gate1_choices', 'gate1_initial_checked'] @staticmethod def is_displayed(player: Player): return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-' }) return dict( candidates=candidates, round_num=player.round_number - 1, gate_num=1, required_count=C.GATE1_SELECT, form_error=get_form_feedback(player, 'gate1')[0], form_post_gate1_choices=sanitize_choices(get_form_feedback(player, 'gate1')[1], all_labels) ) @staticmethod def error_message(player: Player, values): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate1_raw = values.get('gate1_choices') try: gate1_choices = json.loads(gate1_raw) if gate1_raw else [] except (json.JSONDecodeError, TypeError): gate1_choices = [] gate1_choices = sanitize_choices(gate1_choices, all_labels) if len(gate1_choices) != C.GATE1_SELECT: msg = f'Please select exactly {C.GATE1_SELECT} candidates.' set_form_feedback(player, 'gate1', msg, gate1_choices) return msg clear_form_feedback(player, 'gate1') player.gate1_choices = json.dumps(gate1_choices) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainGate1') data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) selected_gate1 = parse_choice_list(player.gate1_choices) player.gate1_decision_context = json.dumps(dict( decision_type='gate1', condition=get_condition(player), display_round=player.round_number - 1, selected_candidates=selected_gate1, visible_candidates=build_visible_scores(data, all_labels, include_zoom=False, include_inperson=False), )) class MainGate1After(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate1_choices') is not None @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') # Handle both JSON array format and plain string format if gate1_choices_json: try: gate1_choices = json.loads(gate1_choices_json) except (json.JSONDecodeError, TypeError): gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else [] else: gate1_choices = [] eliminated = [c for c in all_labels if c not in gate1_choices] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': '-', 'inperson': '-', 'eliminated': label in eliminated }) return dict( candidates=candidates, gate1_choices=gate1_choices, round_num=player.round_number - 1, gate_num=1, advanced_count=len(gate1_choices) ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainGate1After') class MainGate2(BasePage): form_model = 'player' form_fields = ['gate2_choices', 'gate2_initial_checked', 'gate2_initial_disabled_checked'] @staticmethod def is_displayed(player: Player): return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate1_choices') is not None @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') gate1_choices = parse_choice_list(gate1_choices_json) gate1_choices = sanitize_choices(gate1_choices, all_labels) eliminated = [c for c in all_labels if c not in gate1_choices] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-', 'eliminated': label in eliminated }) return dict( candidates=candidates, available_candidates=gate1_choices, round_num=player.round_number - 1, gate_num=2, required_count=C.GATE2_SELECT, form_error=get_form_feedback(player, 'gate2')[0], form_post_gate2_choices=sanitize_choices(get_form_feedback(player, 'gate2')[1], gate1_choices) ) @staticmethod def error_message(player: Player, values): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate2_raw = values.get('gate2_choices') try: gate2_choices = json.loads(gate2_raw) if gate2_raw else [] except (json.JSONDecodeError, TypeError): gate2_choices = [] gate2_choices = sanitize_choices(gate2_choices, all_labels) if len(gate2_choices) != C.GATE2_SELECT: msg = f'Please select exactly {C.GATE2_SELECT} candidates.' set_form_feedback(player, 'gate2', msg, gate2_choices) return msg gate1_choices_json = player.field_maybe_none('gate1_choices') gate1_choices = parse_choice_list(gate1_choices_json) gate1_choices = sanitize_choices(gate1_choices, all_labels) if not all(c in gate1_choices for c in gate2_choices): msg = 'All selected candidates must be from Gate 1 selections.' set_form_feedback(player, 'gate2', msg, gate2_choices) return msg clear_form_feedback(player, 'gate2') player.gate2_choices = json.dumps(gate2_choices) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainGate2') data = get_round_data(player.participant, player.round_number) gate1_choices = parse_choice_list(player.gate1_choices) gate2_choices = parse_choice_list(player.gate2_choices) player.gate2_decision_context = json.dumps(dict( decision_type='gate2', condition=get_condition(player), display_round=player.round_number - 1, available_candidates=gate1_choices, selected_candidates=gate2_choices, visible_candidates=build_visible_scores(data, gate1_choices, include_zoom=True, include_inperson=False), )) class MainGate2After(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD and player.field_maybe_none('gate2_choices') is not None @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate1_choices_json = player.field_maybe_none('gate1_choices') # Handle both JSON array format and plain string format if gate1_choices_json: try: gate1_choices = json.loads(gate1_choices_json) except (json.JSONDecodeError, TypeError): gate1_choices = [gate1_choices_json] if isinstance(gate1_choices_json, str) and gate1_choices_json else [] else: gate1_choices = [] gate2_choices_json = player.field_maybe_none('gate2_choices') # Handle both JSON array format and plain string format if gate2_choices_json: try: gate2_choices = json.loads(gate2_choices_json) except (json.JSONDecodeError, TypeError): gate2_choices = [gate2_choices_json] if isinstance(gate2_choices_json, str) and gate2_choices_json else [] else: gate2_choices = [] candidates = [] for label in all_labels: is_eliminated = label not in gate2_choices candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': '-', 'eliminated': is_eliminated }) return dict( candidates=candidates, gate2_choices=gate2_choices, round_num=player.round_number - 1, gate_num=2, advanced_count=len(gate2_choices) ) @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainGate2After') class MainGate3(BasePage): form_model = 'player' form_fields = ['hired_candidate'] @staticmethod def is_displayed(player: Player): return player.round_number > 1 and get_condition(player) == CONDITION_STANDARD @staticmethod def vars_for_template(player: Player): data = get_round_data(player.participant, player.round_number) all_labels = get_candidate_labels_in_order(data) gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else [] candidates = [] for label in all_labels: candidates.append({ 'label': label, 'resume': data[label]['resume'], 'zoom': data[label]['zoom'], 'inperson': data[label]['inperson'], 'true_q': data[label]['true_q'], 'available': label in gate2_choices }) return dict( candidates=candidates, available_candidates=gate2_choices, round_num=player.round_number - 1, gate_num=3 ) @staticmethod def error_message(player: Player, values): hired = values.get('hired_candidate') gate2_choices = json.loads(player.gate2_choices) if player.gate2_choices else [] if hired and hired not in gate2_choices: return 'Selected candidate must be from Gate 2 selections.' @staticmethod def before_next_page(player: Player, timeout_happened): record_stage_duration(player, 'MainGate3') data = get_round_data(player.participant, player.round_number) gate2_choices = parse_choice_list(player.gate2_choices) hired_label = player.hired_candidate hired_data = data[hired_label] true_q = hired_data['true_q'] reward_str = calculate_reward(true_q) # Store finalist score context (3 finalists with all 3 revealed scores) decision_context = dict( decision_type='hire', condition=get_condition(player), display_round=player.round_number - 1, finalists=gate2_choices, chosen_candidate=hired_label, visible_candidates=build_visible_scores(data, gate2_choices, include_zoom=True, include_inperson=True), ) player.hire_decision_context = json.dumps(decision_context) player.hired_trueq = true_q player.reward_str = reward_str add_to_history(player.participant, player.round_number, hired_label, true_q, reward_str, decision_context=decision_context) # Post-task pages class PostTaskWeights(BasePage): form_model = 'player' form_fields = ['resume_weight', 'zoom_weight', 'inperson_weight'] @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def error_message(player: Player, values): resume = values.get('resume_weight', 0) or 0 zoom = values.get('zoom_weight', 0) or 0 inperson = values.get('inperson_weight', 0) or 0 total = resume + zoom + inperson if total != 100: return f'Weights must sum to 100. Current total: {total}' class WeightsJustification(BasePage): form_model = 'player' form_fields = ['open_ended_explanation'] @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def error_message(player: Player, values): explanation = (values.get('open_ended_explanation') or '').strip() if not explanation: return 'Please provide a brief explanation before continuing.' class Demographics(BasePage): form_model = 'player' form_fields = ['age_range', 'gender', 'device_type', 'survey_comments'] @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def error_message(player: Player, values): # All fields are required except survey_comments if not values.get('age_range'): return 'Please select your age range.' if not values.get('gender'): return 'Please select your gender.' if not values.get('device_type'): return 'Please select the device type you are using.' @staticmethod def before_next_page(player: Player, timeout_happened): _mark_standard_completed_if_ready(player) _mark_blind_completed_if_ready(player) class FinalPayoff(BasePage): form_model = 'player' @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def before_next_page(player: Player, timeout_happened): # Do not re-draw if it was already selected on page render. if not player.participant.selected_round: select_final_payment(player.participant) @staticmethod def vars_for_template(player: Player): if not player.participant.selected_round: select_final_payment(player.participant) selected_reward_string = player.participant.selected_reward_string or '$0.00' selected_reward_total = float(selected_reward_string.replace('$', '')) bonus_only = max(0.0, selected_reward_total - C.DEFAULT_REWARD) # Set player payoff once based on total payment (includes fixed base payment) if player.participant.selected_reward_string and not player.payoff: player.payoff = cu(selected_reward_total) return dict( selected_round=player.participant.selected_round, selected_candidate=player.participant.selected_candidate, selected_true_q=player.participant.selected_true_quality, selected_reward=f"${bonus_only:.2f}", payoff=player.payoff ) class ProlificRedirect(BasePage): @staticmethod def is_displayed(player: Player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player: Player): redirect_url = 'https://app.prolific.com/submissions/complete?cc=CIWZ6NQB' completion_code = 'CIWZ6NQB' return dict( redirect_url=redirect_url, completion_code=completion_code, ) page_sequence = [ ProlificIntro, Consent, PreInstructionsDisclaimer, InstructionsPart1, Instructions, Comprehension, PracticeIntro, # Standard condition practice PracticeGate1, PracticeGate1After, PracticeGate2, PracticeGate2After, PracticeGate3, # Blind condition practice PracticeStage1Resume, PracticeStage2Zoom, PracticeStage3Decision, # Both conditions practice performance PracticePerformance, # Standard condition main MainGate1, MainGate1After, MainGate2, MainGate2After, MainGate3, # Blind condition main MainStage1Resume, MainStage2Zoom, MainStage3Decision, # Both conditions main performance MainRoundPerformance, # Post-task PostTaskWeights, WeightsJustification, Demographics, FinalPayoff, ProlificRedirect, ]