import json import time import random import csv from pathlib import Path from otree.api import Currency as c from otree.api import Page, WaitPage from .models import ( Constants, register_participant, generate_round_plan, save_completed_round, mark_participant_completed, get_historical_data_for_paired_round, get_partner_available_urns, ParticipantRegistry, CompletedRound ) class Consent(Page): form_model = 'player' form_fields = ['consent_agreed'] def is_displayed(self): return self.round_number == 1 def before_next_page(self): # If they don't consent, redirect to thank you page # Record consent status in participant.vars so it's available across rounds self.participant.vars['consent_given'] = bool(self.player.consent_agreed) def vars_for_template(self): return dict() class Intake(Page): form_model = 'player' form_fields = ['org_chart_position', 'role_title', 'first_name'] def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): # Get currently taken position IDs from the registry taken_positions = [] # Only include registrations from the current subsession to avoid # showing positions taken in other (previous) sessions. for reg in ParticipantRegistry.filter(subsession=self.subsession): taken_positions.append(reg.node_id) team_names = _load_team_names() return dict( taken_positions_json=json.dumps(taken_positions), team_names=team_names, team_names_json=json.dumps(team_names), team_title_by_name_json=json.dumps(_load_team_title_by_name()), team_id_by_name_json=json.dumps(_load_team_id_by_name()), ) @staticmethod def live_method(player, data): """Handle real-time position reservation""" if data.get('type') == 'reserve_position': position_id = data.get('position_id') if position_id is not None: taken = player.session.vars.get('taken_positions', []) if position_id not in taken: taken.append(position_id) player.session.vars['taken_positions'] = taken return {0: {'type': 'position_taken', 'position_id': position_id}} elif data.get('type') == 'release_position': position_id = data.get('position_id') if position_id is not None: taken = player.session.vars.get('taken_positions', []) if position_id in taken: taken.remove(position_id) player.session.vars['taken_positions'] = taken return {0: {'type': 'position_released', 'position_id': position_id}} elif data.get('type') == 'get_taken_positions': # Include both session reservations and permanent registry taken = list(player.session.vars.get('taken_positions', [])) # Only include registry entries from the current subsession for reg in ParticipantRegistry.filter(subsession=player.subsession): if reg.node_id not in taken: taken.append(reg.node_id) return {player.id_in_group: {'type': 'taken_positions', 'positions': taken}} return {} def before_next_page(self): from .models import perform_draw, register_participant, generate_round_plan self.participant.vars['first_name'] = self.player.first_name self.participant.vars['role_title'] = self.player.role_title self.participant.vars['org_chart_position'] = self.player.org_chart_position # 1. Practice/Demo Round Info BEFORE registration # Prefer values pre-generated in Subsession.creating_session. demo_red = self.participant.vars.get('demo_red') demo_draws = self.participant.vars.get('demo_draws') if demo_red is None or demo_draws is None: demo_red = random.randint(4, 16) demo_draws = json.dumps(perform_draw(demo_red) + perform_draw(demo_red)) self.participant.vars['demo_red'] = demo_red self.participant.vars['demo_draws'] = demo_draws # 2. Register the participant and generate their round plan join_order, partners, num_rounds = register_participant( self.subsession, self.participant, self.player.org_chart_position, self.player.first_name, self.player.role_title, demo_red, demo_draws ) # Store assignment summary on Player for easy exports/testing. self.player.assigned_join_order = join_order partner_summaries = [] root_subsession = self.subsession.in_round(1) regs = list(ParticipantRegistry.filter(subsession=root_subsession)) reg_by_code = {r.participant_code: r for r in regs} for code in partners or []: reg = reg_by_code.get(code) if not reg: continue partner_summaries.append( { 'partner_code': code, 'partner_node_id': reg.node_id, 'partner_first_name': reg.first_name, 'partner_role_title': reg.role_title, } ) self.player.assigned_partners_json = json.dumps(partner_summaries) # Store metadata in participant.vars self.participant.vars['join_order'] = join_order self.participant.vars['partners'] = partners self.participant.vars['num_rounds'] = num_rounds # Generate the personalized round plan round_plan = generate_round_plan( self.subsession, join_order, partners, self.player.org_chart_position ) self.participant.vars['round_plan'] = round_plan # Set up ALL rounds immediately self._setup_all_rounds_data() # Initialize history from .models import initialize_participant_history initialize_participant_history( self.subsession, self.participant, self.participant.vars.get('drawing_room', {}) ) def _setup_all_rounds_data(self): """Pre-draw all urns and draws and store them in the database for ALL rounds.""" from .models import perform_draw # Setup all experiment rounds plan = self.participant.vars.get('round_plan', []) drawing_room = {} # Individual rounds are pre-generated at session creation time. pre_drawn_individual_rounds = self.participant.vars.get('pre_drawn_individual_rounds') or [] individual_index = 0 # For paired rounds, sample from each partner's available individual urns # without replacement (per partner) to avoid repeatedly using the same urn. partner_urn_pools = {} for i, entry in enumerate(plan): round_num = i + 1 round_data = { 'round_type': entry['round_type'], 'treatment': entry['treatment'], 'pair_mode': entry.get('pair_mode', ''), 'partner_code': entry.get('partner_code', ''), 'partner_first_name': entry.get('partner_first_name', ''), 'partner_role_title': entry.get('partner_role_title', ''), 'partner_node_id': entry.get('partner_node_id'), 'partner_relationship': entry.get('relationship', ''), } if entry['round_type'] == 'individual': if individual_index < len(pre_drawn_individual_rounds): pre = pre_drawn_individual_rounds[individual_index] or {} red_count = pre.get('urn_red_balls') individual_draws_json = pre.get('individual_draws_json') else: red_count = None individual_draws_json = None # Fallback for older sessions or if pre-draw data is missing. if red_count is None: red_count = random.randint(4, 16) if not individual_draws_json: individual_draws_json = json.dumps([perform_draw(red_count) for _ in range(2)]) round_data['urn_red_balls'] = red_count round_data['individual_draws_json'] = individual_draws_json individual_index += 1 elif entry['round_type'] == 'pair': partner_code = entry.get('partner_code') if partner_code and partner_code not in partner_urn_pools: urns = get_partner_available_urns(self.subsession, partner_code) or [] urn_pool = list(urns) random.shuffle(urn_pool) partner_urn_pools[partner_code] = urn_pool urn_pool = partner_urn_pools.get(partner_code) if partner_code else None if urn_pool: red_count = urn_pool.pop() else: red_count = random.randint(4, 16) round_data['urn_red_balls'] = red_count own_draws_json = json.dumps(perform_draw(red_count)) round_data['signal_draw_json'] = own_draws_json round_data['action_draw_json'] = own_draws_json if partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, partner_code, entry.get('pair_mode'), red_count ) if hist_data: round_data['partner_historical_guess'] = hist_data['first_guess'] round_data['partner_historical_draw_json'] = hist_data['draws_json'] if entry.get('pair_mode') == 'signal': round_data['signal_partner_draw_json'] = hist_data['draws_json'] elif entry.get('pair_mode') == 'action': round_data['action_partner_guess'] = hist_data['first_guess'] elif entry.get('pair_mode') == 'action-known': round_data['action_known_draw_json'] = hist_data['draws_json'] round_data['action_known_partner_guess'] = hist_data['first_guess'] drawing_room[round_num] = round_data # Sync to Database for this specific round IMMEDIATELY p = self.player.in_round(round_num) # Apply common data p.round_type = round_data['round_type'] p.round_treatment = round_data['treatment'] p.pair_mode = round_data.get('pair_mode', '') p.partner_code = round_data.get('partner_code', '') p.partner_first_name = round_data.get('partner_first_name', '') p.partner_role_title = round_data.get('partner_role_title', '') p.partner_node_id = round_data.get('partner_node_id') p.partner_relationship = round_data.get('relationship', '') p.urn_red_balls = round_data.get('urn_red_balls') if p.round_type == 'individual': p.individual_draws_json = round_data.get('individual_draws_json', '') else: p.signal_draw_json = round_data.get('signal_draw_json', '') if p.pair_mode == 'action': p.action_draw_json = round_data.get('action_draw_json', '') if 'partner_historical_guess' in round_data: p.partner_historical_guess = round_data['partner_historical_guess'] p.partner_historical_draw_json = round_data['partner_historical_draw_json'] if p.pair_mode == 'signal': p.signal_partner_draw_json = round_data.get('signal_partner_draw_json', '') elif p.pair_mode == 'action': p.action_partner_guess = round_data.get('action_partner_guess') elif p.pair_mode == 'action-known': p.action_known_draw_json = round_data.get('action_known_draw_json', '') p.action_known_partner_guess = round_data.get('action_known_partner_guess') # Store in participant.vars as backup self.participant.vars['drawing_room'] = drawing_room def _setup_round_data(self, round_num): """Deprecated: using _setup_all_rounds_data instead""" pass _TEAM_DIRECTORY_CACHE = None def _normalize_person_name(name: str) -> str: """Normalize person names so Excel + org chart labels match. The Excel directory sometimes includes honorific prefixes like "Mr"/"Ms"/"Mrs". The org chart labels do not, so we strip those prefixes and normalize spaces. """ if not name: return '' s = str(name).strip() # Collapse internal whitespace. s = ' '.join(s.split()) lowered = s.lower() honorifics = ( 'mr ', 'mr. ', 'ms ', 'ms. ', 'mrs ', 'mrs. ', 'miss ', 'miss. ', 'dr ', 'dr. ', 'prof ', 'prof. ', ) for h in honorifics: if lowered.startswith(h): s = s[len(h):].strip() s = ' '.join(s.split()) break return s def _load_team_directory(): """Return (names, title_by_name, id_by_name) for Intake autocomplete/suggestions. Important: id_by_name must match the node IDs used by org_chart_utils, because partner matching and relationship logic uses those node IDs. """ global _TEAM_DIRECTORY_CACHE if _TEAM_DIRECTORY_CACHE is not None: return _TEAM_DIRECTORY_CACHE root = Path(__file__).resolve().parent.parent xlsx_path = root / 'DAINAAITeamStructure_Updated.xlsx' csv_path = root / 'org_chart_directed.csv' excel_names = [] # Keys are normalized names (matching org_chart_directed.csv labels). title_by_name = {} try: from openpyxl import load_workbook wb = load_workbook(xlsx_path, read_only=True, data_only=True) ws = wb[wb.sheetnames[0]] header_row = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None) header_list = [str(x).strip() if x is not None else '' for x in (header_row or [])] try: name_col = header_list.index('SAP_Name') + 1 except ValueError: name_col = 1 try: title_col = header_list.index('Title') + 1 except ValueError: title_col = 2 max_col = max(name_col, title_col) for row in ws.iter_rows(min_row=2, max_col=max_col, values_only=True): name_val = row[name_col - 1] if len(row) >= name_col else None if name_val is None: continue name = _normalize_person_name(name_val) if not name: continue excel_names.append(name) title_val = row[title_col - 1] if len(row) >= title_col else None if title_val is None: continue title = str(title_val).strip() if title and name not in title_by_name: # Keep the first title we see for this normalized name. title_by_name[name] = title except Exception: excel_names = [] # Manual overrides for any directory gaps. # Keys must be normalized names matching org_chart_directed.csv labels. title_by_name.setdefault( _normalize_person_name('Dmitri Chtchourov'), 'Head of Data Science & AI/ML', ) # Names and IDs must come from org_chart_utils (directed CSV) so matching is correct. try: from .org_chart_utils import get_all_node_labels, get_node_id_by_label chart_names = get_all_node_labels() or [] chart_name_set = set(chart_names) # Prefer Excel order for autocomplete, but only keep names that exist in the chart. names = [n for n in excel_names if n in chart_name_set] # Append any chart names missing from Excel. excel_name_set = set(names) names.extend([n for n in chart_names if n not in excel_name_set]) id_by_name = {n: get_node_id_by_label(n) for n in names if get_node_id_by_label(n) is not None} except Exception: # Absolute fallback if org_chart_utils can't load. names = [] id_by_name = {} _TEAM_DIRECTORY_CACHE = (names, title_by_name, id_by_name) return _TEAM_DIRECTORY_CACHE def _load_team_names(): return _load_team_directory()[0] def _load_team_title_by_name(): return _load_team_directory()[1] def _load_team_id_by_name(): return _load_team_directory()[2] class IntakeWaitPage(Page): """ In async mode, this page is essentially a pass-through. No need to wait for other participants. """ def is_displayed(self): # In async mode, skip this wait page entirely return False def vars_for_template(self): completed = self.session.vars.get('intake_completed_participants', []) total_players = Constants.players_per_group first_completion = self.session.vars.get('intake_first_completion_time') time_remaining = 300 # 5 minutes default if first_completion: elapsed = time.time() - first_completion time_remaining = max(0, 300 - elapsed) return dict( completed_count=len(completed), total_players=total_players, time_remaining=int(time_remaining), all_completed=len(completed) >= total_players, ) @staticmethod def live_method(player, data): """Handle polling for wait status""" if data.get('type') == 'check_status': completed = player.session.vars.get('intake_completed_participants', []) total_players = Constants.players_per_group first_completion = player.session.vars.get('intake_first_completion_time') time_remaining = 300 timed_out = False if first_completion: elapsed = time.time() - first_completion time_remaining = max(0, 300 - elapsed) timed_out = elapsed >= 300 all_completed = len(completed) >= total_players can_proceed = all_completed or (timed_out and len(completed) >= 2) return { player.id_in_group: { 'type': 'status_update', 'completed_count': len(completed), 'total_players': total_players, 'time_remaining': int(time_remaining), 'all_completed': all_completed, 'timed_out': timed_out, 'can_proceed': can_proceed, } } return {} def before_next_page(self): # Mark which participants are active for this session completed = self.session.vars.get('intake_completed_participants', []) self.session.vars['active_participants'] = completed self.session.vars['active_player_count'] = len(completed) class GeneralInstructions(Page): def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True class ExampleDemo(Page): form_model = 'player' form_fields = ['demo_guess'] def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): # Fetch practice data from Registry reg = None root_subsession = self.subsession.in_round(1) regs = [ r for r in ParticipantRegistry.filter(subsession=root_subsession) if r.participant_code == self.participant.code ] if regs: reg = regs[0] draws_json = reg.demo_draws_json if reg else self.participant.vars.get('demo_draws', '[]') red_count = reg.demo_urn_red if reg else self.participant.vars.get('demo_red', 10) draws = json.loads(draws_json) # Ensure the practice animation can show 2 rounds of 3 draws each. # For older sessions/rows that only stored 3 draws, pad to 6 once. try: if isinstance(draws, list) and len(draws) < 6: from .models import perform_draw draws = list(draws) + perform_draw(red_count, num_balls=(6 - len(draws))) draws_json = json.dumps(draws) self.participant.vars['demo_draws'] = draws_json if reg: reg.demo_draws_json = draws_json except Exception: pass demo_draws_csv = ','.join(str(x) for x in draws) return dict( demo_draws=draws, demo_draws_json=draws_json, demo_draws_csv=demo_draws_csv, demo_red_count=red_count, ) class ExperimentIntro(Page): def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): return dict( total_rounds=self.participant.vars.get('num_rounds', 60), ) class IndividualRound(Page): form_model = 'player' form_fields = ['individual_guess_1', 'individual_guess_2'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): return plan[self.round_number - 1]['round_type'] == 'individual' return False def before_next_page(self): # Ensure round data is set up self._ensure_round_setup() # Save this round to historical data save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='individual', urn_red_balls=self.player.urn_red_balls, draws_json=self.player.individual_draws_json, first_guess=self.player.individual_guess_1, updated_guess=self.player.individual_guess_2 ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.individual_draws_json = round_data.get('individual_draws_json') def vars_for_template(self): self._ensure_round_setup() draws = json.loads(self.player.individual_draws_json or '[]') flat_draws = [] if draws and isinstance(draws, list) and isinstance(draws[0], list): for batch in draws: if isinstance(batch, list): flat_draws.extend(batch) elif isinstance(draws, list): flat_draws = draws individual_draws_csv = ','.join(str(x) for x in flat_draws if x is not None) return dict( draw_batches=draws, draw_batches_json=json.dumps(draws), individual_draws_csv=individual_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): required_fields = ['individual_guess_1', 'individual_guess_2'] for field in required_fields: if values.get(field) is None: return 'Please enter your guess after each draw before continuing.' class SignalRound(Page): """ Signal round: Player sees their own draw, makes initial guess, then sees partner's historical draw and updates their guess. """ form_model = 'player' form_fields = ['signal_guess_initial', 'signal_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'signal' return False def before_next_page(self): self._ensure_round_setup() # Save this round to historical data save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='signal', urn_red_balls=self.player.urn_red_balls, draws_json=self.player.signal_draw_json, first_guess=self.player.signal_guess_initial, updated_guess=self.player.signal_guess_updated ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.signal_partner_draw_json = hist_data['draws_json'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_draws_raw = json.loads(self.player.signal_partner_draw_json or '[]') # Historical data from individual rounds is stored as 2D array [[batch1], [batch2]] # Extract first batch if it's a 2D array, otherwise use as-is if partner_draws_raw and isinstance(partner_draws_raw[0], list): partner_draws = partner_draws_raw[0] # Use first batch of 3 balls else: partner_draws = partner_draws_raw partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) partner_draws_csv = ','.join(str(x) for x in partner_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, partner_draws=partner_draws, partner_draws_json=json.dumps(partner_draws), partner_draws_csv=partner_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('signal_guess_initial') is None or values.get('signal_guess_updated') is None: return 'Please submit your first guess and your updated guess before continuing.' class ActionRound(Page): """ Action round: Player sees their own draw, makes initial guess, then sees partner's historical first guess and updates their guess. """ form_model = 'player' form_fields = ['action_guess_initial', 'action_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'action' return False def before_next_page(self): self._ensure_round_setup() # Save this round to historical data # Use field_maybe_none to safely access nullable fields action_draw = self.player.field_maybe_none('action_draw_json') signal_draw = self.player.field_maybe_none('signal_draw_json') save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='action', urn_red_balls=self.player.urn_red_balls, draws_json=action_draw or signal_draw, first_guess=self.player.action_guess_initial, updated_guess=self.player.action_guess_updated ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') self.player.action_draw_json = round_data.get('signal_draw_json') # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.action_partner_guess = hist_data['first_guess'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' # In async mode, partner's guess comes from historical data partner_guess_initial = self.player.partner_historical_guess own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_guess_initial=partner_guess_initial, partner_guess_initial_json=json.dumps(partner_guess_initial), partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('action_guess_initial') is None or values.get('action_guess_updated') is None: return 'Please submit your first guess and your updated guess before continuing.' class ActionKnownRound(Page): """ Action-known round: Player makes initial guess, then sees partner's draw. Player chooses to either match partner's (hidden) updated guess or submit their own. """ form_model = 'player' form_fields = ['action_guess_initial', 'action_known_choice', 'action_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'action-known' return False def before_next_page(self): self._ensure_round_setup() # If player chose to match, use partner's historical updated guess if self.player.action_known_choice == 'match': # Use updated_guess if available, otherwise fall back to first_guess partner_updated = self.player.field_maybe_none('partner_historical_updated_guess') partner_first = self.player.field_maybe_none('partner_historical_guess') if partner_updated is not None: self.player.action_guess_updated = partner_updated elif partner_first is not None: self.player.action_guess_updated = partner_first else: # Fallback to their own initial guess if no partner data self.player.action_guess_updated = self.player.action_guess_initial # Save this round to historical data action_known_draw = self.player.field_maybe_none('action_known_draw_json') signal_draw = self.player.field_maybe_none('signal_draw_json') save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='action-known', urn_red_balls=self.player.urn_red_balls, draws_json=action_known_draw or signal_draw, first_guess=self.player.action_guess_initial, updated_guess=self.player.action_guess_updated ) # Store the own guess if they chose "own" if self.player.action_known_choice == 'own': self.player.action_known_own_guess = self.player.action_guess_updated def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') self.player.action_known_draw_json = round_data.get('signal_draw_json') # In action-known, partner's draw is shown # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.partner_historical_updated_guess = hist_data['first_guess'] self.player.partner_historical_draw_json = hist_data['draws_json'] self.player.action_known_partner_guess = hist_data['first_guess'] # Set action_known_draw_json to the partner's actual draw self.player.action_known_draw_json = hist_data['draws_json'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_draws_raw = json.loads(self.player.partner_historical_draw_json or '[]') if partner_draws_raw and isinstance(partner_draws_raw[0], list): partner_draws = partner_draws_raw[0] else: partner_draws = partner_draws_raw partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) partner_draws_csv = ','.join(str(x) for x in partner_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, partner_draws=partner_draws, partner_draws_json=json.dumps(partner_draws), partner_draws_csv=partner_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('action_guess_initial') is None: return 'Please submit your first guess before continuing.' if values.get('action_known_choice') is None: return 'Please choose whether to match or submit your own guess.' if values.get('action_known_choice') == 'own' and values.get('action_guess_updated') is None: return 'Please submit your own guess before continuing.' class Results(Page): """Final results page shown after all rounds are complete.""" form_model = 'player' form_fields = ['participant_comments'] def is_displayed(self): total_rounds = self.participant.vars.get('num_rounds', 60) # Only show Results after the post-experiment survey, demographics, and Likert scale were started if self.round_number != total_rounds: return False return bool(self.participant.vars.get('likert_started')) def before_next_page(self): # Mark participant as completed in the registry mark_participant_completed(self.subsession, self.participant) # Participant completed their experiment; Results will show after survey and demographics def vars_for_template(self): return dict( total_rounds=self.participant.vars.get('num_rounds', 60), join_order=self.participant.vars.get('join_order', 1), ) class Survey(Page): form_model = 'player' # No direct form_fields here since we handle manually in before_next_page form_fields = [] def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) if not (consent is True and self.round_number == total_rounds): return False if self.participant.vars.get('survey_started'): return False return True def vars_for_template(self): req = getattr(self, 'request', None) req_method = getattr(req, 'method', 'GET') if req is not None else 'GET' req_get = getattr(req, 'query_params', {}) if req is not None else {} # In oTree/Starlette, POSTed form data is available as self._form_data. req_post = getattr(self, '_form_data', {}) # Determine how many nomination rows to show. # - On normal GET, optionally accept counts from query params. # - On POST (e.g., if the page re-renders due to an error_message), infer # counts from submitted field names so typed values don't disappear. if req_method == 'POST': import re def infer_max_index(prefix: str) -> int: max_i = 0 for k in req_post.keys(): m = re.match(rf'^{prefix}_name_(\d+)$', k) if m: max_i = max(max_i, int(m.group(1))) return max_i advice_fields_to_show = max(3, min(10, infer_max_index('advice') or int(self.participant.vars.get('advice_fields_to_show', 3)))) friends_fields_to_show = max(3, min(10, infer_max_index('friends') or int(self.participant.vars.get('friends_fields_to_show', 3)))) insight_fields_to_show = max(3, min(10, infer_max_index('insight') or int(self.participant.vars.get('insight_fields_to_show', 3)))) else: advice_fields_to_show = int(req_get.get('advice_fields_to_show', self.participant.vars.get('advice_fields_to_show', 3)) or 3) friends_fields_to_show = int(req_get.get('friends_fields_to_show', self.participant.vars.get('friends_fields_to_show', 3)) or 3) insight_fields_to_show = int(req_get.get('insight_fields_to_show', self.participant.vars.get('insight_fields_to_show', 3)) or 3) advice_fields_to_show = max(3, min(10, advice_fields_to_show)) friends_fields_to_show = max(3, min(10, friends_fields_to_show)) insight_fields_to_show = max(3, min(10, insight_fields_to_show)) self.participant.vars['advice_fields_to_show'] = advice_fields_to_show self.participant.vars['friends_fields_to_show'] = friends_fields_to_show self.participant.vars['insight_fields_to_show'] = insight_fields_to_show advice_range = list(range(1, advice_fields_to_show + 1)) friends_range = list(range(1, friends_fields_to_show + 1)) insight_range = list(range(1, insight_fields_to_show + 1)) # Fetch existing JSON data from Registry if available reg = None root_subsession = self.subsession.in_round(1) regs = [ r for r in ParticipantRegistry.filter(subsession=root_subsession) if r.participant_code == self.participant.code ] if regs: reg = regs[0] if req_method == 'POST': # Preserve whatever the participant just typed if the page re-renders. def extract_entries_from_post(prefix: str): entries = [] for i in range(1, 11): name = (req_post.get(f'{prefix}_name_{i}', '') or '').strip() if name or i <= (advice_fields_to_show if prefix == 'advice' else friends_fields_to_show if prefix == 'friends' else insight_fields_to_show): entries.append({'name': name}) return entries advice_data = extract_entries_from_post('advice') friends_data = extract_entries_from_post('friends') insight_data = extract_entries_from_post('insight') else: advice_json = reg.advice_json if reg else '[]' friends_json = reg.friends_json if reg else '[]' insight_json = reg.insight_json if reg else '[]' advice_data = json.loads(advice_json or '[]') friends_data = json.loads(friends_json or '[]') insight_data = json.loads(insight_json or '[]') advice_entries = [] for i in advice_range: entry = advice_data[i-1] if i <= len(advice_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') advice_entries.append({'index': i, 'name': name_val}) friends_entries = [] for i in friends_range: entry = friends_data[i-1] if i <= len(friends_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') friends_entries.append({'index': i, 'name': name_val}) insight_entries = [] for i in insight_range: entry = insight_data[i-1] if i <= len(insight_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') insight_entries.append({'index': i, 'name': name_val}) return dict( advice_fields_to_show=advice_fields_to_show, friends_fields_to_show=friends_fields_to_show, insight_fields_to_show=insight_fields_to_show, advice_range=advice_range, friends_range=friends_range, insight_range=insight_range, advice_entries=advice_entries, friends_entries=friends_entries, insight_entries=insight_entries, team_names_json=json.dumps(_load_team_names()), ) # NOTE: No server-side empty-survey warning here. # The template already handles this via a client-side modal. # Keeping server-side warnings caused confusing false-empties and wiped inputs. def before_next_page(self): from .models import update_participant_registry req_post = getattr(self, '_form_data', {}) def extract_json(prefix): entries = [] for i in range(1, 11): name = (req_post.get(f'{prefix}_name_{i}', '') or '').strip() if name: entries.append({'name': name}) return json.dumps(entries) advice_json = extract_json('advice') friends_json = extract_json('friends') insight_json = extract_json('insight') update_participant_registry( self.subsession, self.participant, advice_json=advice_json, friends_json=friends_json, insight_json=insight_json, ) # Mirror onto Player so it's available in standard app exports. self.player.advice_json = advice_json self.player.friends_json = friends_json self.player.insight_json = insight_json self.participant.vars['survey_started'] = True self.participant.vars['survey_completed'] = True class Demographics(Page): form_model = 'player' form_fields = ['gender', 'gender_self_describe', 'tenure_years', 'work_location'] def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) # Show on final round for participants who consented and started survey if not (consent is True and self.round_number == total_rounds): return False if not self.participant.vars.get('survey_started'): return False if self.participant.vars.get('demographics_started'): return False return True def error_message(self, values): # Check if all required fields are empty gender = values.get('gender', '').strip() tenure_years = values.get('tenure_years', '').strip() work_location = values.get('work_location', '').strip() # If gender is "Prefer to self-describe", the self-describe field is required if gender == 'Prefer to self-describe': gender_self_describe = values.get('gender_self_describe', '').strip() if not gender_self_describe: return 'Please describe your gender in the text field.' # Check if all fields are empty if not gender and not tenure_years and not work_location: # Check if they've already been warned (second attempt) if self.participant.vars.get('demographics_empty_warned'): # Allow the empty submission on second attempt self.participant.vars['demographics_empty_warned'] = False return None else: # First attempt — warn them with styled HTML message self.participant.vars['demographics_empty_warned'] = True return '
⚠️ You have not answered any demographic questions.

Click Submit again to confirm you want to proceed without completing the demographics.

' # Clear the warning flag if they've entered something self.participant.vars['demographics_empty_warned'] = False return None def vars_for_template(self): # Pass field values safely to template (handle None values) return dict( gender_value=self.player.field_maybe_none('gender') or '', gender_self_describe_value=self.player.field_maybe_none('gender_self_describe') or '', tenure_years_value=self.player.field_maybe_none('tenure_years') or '', work_location_value=self.player.field_maybe_none('work_location') or '', ) def before_next_page(self): from .models import update_participant_registry update_participant_registry( self.subsession, self.participant, gender=self.player.gender, gender_self_describe=self.player.gender_self_describe, tenure_years=self.player.tenure_years, work_location=self.player.work_location ) self.participant.vars['demographics_started'] = True class LikertScale(Page): form_model = 'player' form_fields = [ 'likert_1', 'likert_2', 'likert_3', 'likert_4', 'likert_5', 'likert_6', 'likert_7', 'likert_8', 'likert_9', 'likert_10', 'likert_11', 'likert_12', 'likert_13', 'likert_14', 'likert_15', ] def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) # Show on final round for participants who consented and started demographics if not (consent is True and self.round_number == total_rounds): return False if not self.participant.vars.get('demographics_started'): return False if self.participant.vars.get('likert_started'): return False return True def vars_for_template(self): # Define the 15 statements and randomize their order statements = [ "Managers should make most decisions without consulting subordinates.", "Managers should seldom ask for the opinions of employees.", "It is frequently necessary for a manager to use authority and power when dealing with subordinates.", "People at lower levels should carry out higher level requests without questions.", "I believe that when in disagreement with adults, young people should defer to elders.", "Employees should not disagree with management decisions.", "Distinct reporting relationships are extremely important to me.", "A clear chain command is extremely important to me.", "Definite lines of authority are extremely important to me.", "Gaining respect is an important value that serves as a guiding principle in my life.", "It is extremely important for me to obtain status.", "It is extremely important for me to be looked up to by others.", "Managers should avoid off-the-job social contacts with employees.", "I believe that people in positions of power should try to increase their social distance from less powerful individuals.", "Organizations should have separate facilities, such as eating areas, for higher-level managers.", ] # Use a local random instance to avoid poisoning the global state rng = random.Random(hash(self.participant.code) % (2**32)) randomized_statements = list(enumerate(statements, 1)) rng.shuffle(randomized_statements) return dict( statements=randomized_statements, ) def error_message(self, values): # Check if all Likert fields are empty all_empty = True for i in range(1, 16): if values.get(f'likert_{i}'): all_empty = False break if all_empty: # Check if they've already been warned (second attempt) if self.participant.vars.get('likert_empty_warned'): # Allow the empty submission on second attempt self.participant.vars['likert_empty_warned'] = False return None else: # First attempt — warn them with styled HTML message self.participant.vars['likert_empty_warned'] = True return '
⚠️ You have not answered any of the statements.

Click Submit again to confirm you want to proceed without completing the survey.

' # Clear the warning flag if they've entered something self.participant.vars['likert_empty_warned'] = False return None def before_next_page(self): from .models import update_participant_registry # Use field_maybe_none to avoid errors when some Likert items are left blank likert_data = {f'likert_{i}': self.player.field_maybe_none(f'likert_{i}') for i in range(1, 16)} update_participant_registry( self.subsession, self.participant, **likert_data ) self.participant.vars['likert_started'] = True class PartnerRatings(Page): form_model = 'player' # No direct form_fields here since we handle manually in before_next_page form_fields = [] def _get_partner_codes_from_rounds(self): """Derive partners from actual paired-round data for this participant.""" codes = [] for r in self.player.in_all_rounds(): if r.field_maybe_none('round_type') == 'pair': code = r.field_maybe_none('partner_code') or '' if code: codes.append(code) # preserve stable order seen = set() unique = [] for code in codes: if code not in seen: unique.append(code) seen.add(code) return unique def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) # Show on final round for participants who consented and started Likert if not (consent is True and self.round_number == total_rounds): return False if not self.participant.vars.get('likert_started'): return False if self.participant.vars.get('partner_ratings_started'): return False # Also check if they had any partners (based on paired rounds) return len(self._get_partner_codes_from_rounds()) > 0 def vars_for_template(self): partners_codes = self._get_partner_codes_from_rounds() partners_list = [] # Prefer partner metadata stored on the participant's paired rounds. # This avoids false "no partners" messages if the registry lookup fails. rounds = self.player.in_all_rounds() for code in partners_codes: name = '' role = '' for r in rounds: if (r.field_maybe_none('round_type') == 'pair') and ((r.field_maybe_none('partner_code') or '') == code): name = r.field_maybe_none('partner_first_name') or '' role = r.field_maybe_none('partner_role_title') or '' if name or role: break partners_list.append({'code': code, 'name': name, 'role': role}) return dict( partners_list=partners_list, range_1_10=list(range(1, 11)) ) def before_next_page(self): partners_codes = self._get_partner_codes_from_rounds() ratings = [] req_post = getattr(self, '_form_data', {}) for code in partners_codes: # The input name is rating_{code} val = req_post.get(f'rating_{code}') if val: ratings.append({ 'partner_code': code, 'rating': int(val) }) from .models import update_participant_registry ratings_json = json.dumps(ratings) update_participant_registry( self.subsession, self.participant, partner_ratings_json=ratings_json ) # Mirror onto Player so it's available in standard app exports. self.player.partner_ratings_json = ratings_json self.participant.vars['partner_ratings_started'] = True class ThankYou(Page): """Thank you page - different message based on consent.""" def is_displayed(self): consent = self.participant.vars.get('consent_given') # Only show this page for non-consenters. return consent is False and self.round_number == 1 def vars_for_template(self): consent = self.player.field_maybe_none('consent_agreed') return dict( consent_given=consent is True ) page_sequence = [ Consent, Intake, IntakeWaitPage, GeneralInstructions, ExampleDemo, ExperimentIntro, IndividualRound, SignalRound, ActionRound, ActionKnownRound, Survey, Demographics, LikertScale, PartnerRatings, Results, ThankYou, ]