import json import time import random import csv from pathlib import Path from otree.api import Currency as c from otree.api import Page, WaitPage from .models import ( Constants, register_participant, generate_round_plan, save_completed_round, mark_participant_completed, get_historical_data_for_paired_round, get_partner_available_urns, ParticipantRegistry, CompletedRound ) class Consent(Page): form_model = 'player' form_fields = ['consent_agreed'] def is_displayed(self): return self.round_number == 1 def before_next_page(self): # If they don't consent, redirect to thank you page # Record consent status in participant.vars so it's available across rounds self.participant.vars['consent_given'] = bool(self.player.consent_agreed) def vars_for_template(self): return dict() class Intake(Page): form_model = 'player' form_fields = ['org_chart_position', 'role_title', 'first_name'] def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): # Get currently taken position IDs from the registry taken_positions = [] # Only include registrations from the current subsession to avoid # showing positions taken in other (previous) sessions. for reg in ParticipantRegistry.filter(subsession=self.subsession): taken_positions.append(reg.node_id) team_names = _load_team_names() return dict( taken_positions_json=json.dumps(taken_positions), team_names=team_names, team_names_json=json.dumps(team_names), team_title_by_name_json=json.dumps(_load_team_title_by_name()), team_id_by_name_json=json.dumps(_load_team_id_by_name()), ) @staticmethod def live_method(player, data): """Handle real-time position reservation""" if data.get('type') == 'reserve_position': position_id = data.get('position_id') if position_id is not None: taken = player.session.vars.get('taken_positions', []) if position_id not in taken: taken.append(position_id) player.session.vars['taken_positions'] = taken return {0: {'type': 'position_taken', 'position_id': position_id}} elif data.get('type') == 'release_position': position_id = data.get('position_id') if position_id is not None: taken = player.session.vars.get('taken_positions', []) if position_id in taken: taken.remove(position_id) player.session.vars['taken_positions'] = taken return {0: {'type': 'position_released', 'position_id': position_id}} elif data.get('type') == 'get_taken_positions': # Include both session reservations and permanent registry taken = list(player.session.vars.get('taken_positions', [])) # Only include registry entries from the current subsession for reg in ParticipantRegistry.filter(subsession=player.subsession): if reg.node_id not in taken: taken.append(reg.node_id) return {player.id_in_group: {'type': 'taken_positions', 'positions': taken}} return {} def before_next_page(self): from .models import perform_draw, register_participant, generate_round_plan self.participant.vars['first_name'] = self.player.first_name self.participant.vars['role_title'] = self.player.role_title self.participant.vars['org_chart_position'] = self.player.org_chart_position # 1. Practice/Demo Round Info BEFORE registration # Prefer values pre-generated in Subsession.creating_session. demo_red = self.participant.vars.get('demo_red') demo_draws = self.participant.vars.get('demo_draws') if demo_red is None or demo_draws is None: demo_red = random.randint(4, 16) demo_draws = json.dumps(perform_draw(demo_red) + perform_draw(demo_red)) self.participant.vars['demo_red'] = demo_red self.participant.vars['demo_draws'] = demo_draws # 2. Register the participant and generate their round plan join_order, partners, num_rounds = register_participant( self.subsession, self.participant, self.player.org_chart_position, self.player.first_name, self.player.role_title, demo_red, demo_draws ) # Store assignment summary on Player for easy exports/testing. self.player.assigned_join_order = join_order partner_summaries = [] root_subsession = self.subsession.in_round(1) regs = list(ParticipantRegistry.filter(subsession=root_subsession)) reg_by_code = {r.participant_code: r for r in regs} for code in partners or []: reg = reg_by_code.get(code) if not reg: continue partner_summaries.append( { 'partner_code': code, 'partner_node_id': reg.node_id, 'partner_first_name': reg.first_name, 'partner_role_title': reg.role_title, } ) self.player.assigned_partners_json = json.dumps(partner_summaries) # Store metadata in participant.vars self.participant.vars['join_order'] = join_order self.participant.vars['partners'] = partners self.participant.vars['num_rounds'] = num_rounds # Generate the personalized round plan round_plan = generate_round_plan( self.subsession, join_order, partners, self.player.org_chart_position ) self.participant.vars['round_plan'] = round_plan # Set up ALL rounds immediately self._setup_all_rounds_data() # Initialize history from .models import initialize_participant_history initialize_participant_history( self.subsession, self.participant, self.participant.vars.get('drawing_room', {}) ) def _setup_all_rounds_data(self): """Pre-draw all urns and draws and store them in the database for ALL rounds.""" from .models import perform_draw # Setup all experiment rounds plan = self.participant.vars.get('round_plan', []) drawing_room = {} # Individual rounds are pre-generated at session creation time. pre_drawn_individual_rounds = self.participant.vars.get('pre_drawn_individual_rounds') or [] individual_index = 0 # For paired rounds, sample from each partner's available individual urns # without replacement (per partner) to avoid repeatedly using the same urn. partner_urn_pools = {} for i, entry in enumerate(plan): round_num = i + 1 round_data = { 'round_type': entry['round_type'], 'treatment': entry['treatment'], 'pair_mode': entry.get('pair_mode', ''), 'partner_code': entry.get('partner_code', ''), 'partner_first_name': entry.get('partner_first_name', ''), 'partner_role_title': entry.get('partner_role_title', ''), 'partner_node_id': entry.get('partner_node_id'), 'partner_relationship': entry.get('relationship', ''), } if entry['round_type'] == 'individual': if individual_index < len(pre_drawn_individual_rounds): pre = pre_drawn_individual_rounds[individual_index] or {} red_count = pre.get('urn_red_balls') individual_draws_json = pre.get('individual_draws_json') else: red_count = None individual_draws_json = None # Fallback for older sessions or if pre-draw data is missing. if red_count is None: red_count = random.randint(4, 16) if not individual_draws_json: individual_draws_json = json.dumps([perform_draw(red_count) for _ in range(2)]) round_data['urn_red_balls'] = red_count round_data['individual_draws_json'] = individual_draws_json individual_index += 1 elif entry['round_type'] == 'pair': partner_code = entry.get('partner_code') if partner_code and partner_code not in partner_urn_pools: urns = get_partner_available_urns(self.subsession, partner_code) or [] urn_pool = list(urns) random.shuffle(urn_pool) partner_urn_pools[partner_code] = urn_pool urn_pool = partner_urn_pools.get(partner_code) if partner_code else None if urn_pool: red_count = urn_pool.pop() else: red_count = random.randint(4, 16) round_data['urn_red_balls'] = red_count own_draws_json = json.dumps(perform_draw(red_count)) round_data['signal_draw_json'] = own_draws_json round_data['action_draw_json'] = own_draws_json if partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, partner_code, entry.get('pair_mode'), red_count ) if hist_data: round_data['partner_historical_guess'] = hist_data['first_guess'] round_data['partner_historical_draw_json'] = hist_data['draws_json'] if entry.get('pair_mode') == 'signal': round_data['signal_partner_draw_json'] = hist_data['draws_json'] elif entry.get('pair_mode') == 'action': round_data['action_partner_guess'] = hist_data['first_guess'] elif entry.get('pair_mode') == 'action-known': round_data['action_known_draw_json'] = hist_data['draws_json'] round_data['action_known_partner_guess'] = hist_data['first_guess'] drawing_room[round_num] = round_data # Sync to Database for this specific round IMMEDIATELY p = self.player.in_round(round_num) # Apply common data p.round_type = round_data['round_type'] p.round_treatment = round_data['treatment'] p.pair_mode = round_data.get('pair_mode', '') p.partner_code = round_data.get('partner_code', '') p.partner_first_name = round_data.get('partner_first_name', '') p.partner_role_title = round_data.get('partner_role_title', '') p.partner_node_id = round_data.get('partner_node_id') p.partner_relationship = round_data.get('relationship', '') p.urn_red_balls = round_data.get('urn_red_balls') if p.round_type == 'individual': p.individual_draws_json = round_data.get('individual_draws_json', '') else: p.signal_draw_json = round_data.get('signal_draw_json', '') if p.pair_mode == 'action': p.action_draw_json = round_data.get('action_draw_json', '') if 'partner_historical_guess' in round_data: p.partner_historical_guess = round_data['partner_historical_guess'] p.partner_historical_draw_json = round_data['partner_historical_draw_json'] if p.pair_mode == 'signal': p.signal_partner_draw_json = round_data.get('signal_partner_draw_json', '') elif p.pair_mode == 'action': p.action_partner_guess = round_data.get('action_partner_guess') elif p.pair_mode == 'action-known': p.action_known_draw_json = round_data.get('action_known_draw_json', '') p.action_known_partner_guess = round_data.get('action_known_partner_guess') # Store in participant.vars as backup self.participant.vars['drawing_room'] = drawing_room def _setup_round_data(self, round_num): """Deprecated: using _setup_all_rounds_data instead""" pass _TEAM_DIRECTORY_CACHE = None def _normalize_person_name(name: str) -> str: """Normalize person names so Excel + org chart labels match. The Excel directory sometimes includes honorific prefixes like "Mr"/"Ms"/"Mrs". The org chart labels do not, so we strip those prefixes and normalize spaces. """ if not name: return '' s = str(name).strip() # Collapse internal whitespace. s = ' '.join(s.split()) lowered = s.lower() honorifics = ( 'mr ', 'mr. ', 'ms ', 'ms. ', 'mrs ', 'mrs. ', 'miss ', 'miss. ', 'dr ', 'dr. ', 'prof ', 'prof. ', ) for h in honorifics: if lowered.startswith(h): s = s[len(h):].strip() s = ' '.join(s.split()) break return s def _load_team_directory(): """Return (names, title_by_name, id_by_name) for Intake autocomplete/suggestions. Important: id_by_name must match the node IDs used by org_chart_utils, because partner matching and relationship logic uses those node IDs. """ global _TEAM_DIRECTORY_CACHE if _TEAM_DIRECTORY_CACHE is not None: return _TEAM_DIRECTORY_CACHE root = Path(__file__).resolve().parent.parent xlsx_path = root / 'DAINAAITeamStructure_Updated.xlsx' csv_path = root / 'org_chart_directed.csv' excel_names = [] # Keys are normalized names (matching org_chart_directed.csv labels). title_by_name = {} try: from openpyxl import load_workbook wb = load_workbook(xlsx_path, read_only=True, data_only=True) ws = wb[wb.sheetnames[0]] header_row = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None) header_list = [str(x).strip() if x is not None else '' for x in (header_row or [])] try: name_col = header_list.index('SAP_Name') + 1 except ValueError: name_col = 1 try: title_col = header_list.index('Title') + 1 except ValueError: title_col = 2 max_col = max(name_col, title_col) for row in ws.iter_rows(min_row=2, max_col=max_col, values_only=True): name_val = row[name_col - 1] if len(row) >= name_col else None if name_val is None: continue name = _normalize_person_name(name_val) if not name: continue excel_names.append(name) title_val = row[title_col - 1] if len(row) >= title_col else None if title_val is None: continue title = str(title_val).strip() if title and name not in title_by_name: # Keep the first title we see for this normalized name. title_by_name[name] = title except Exception: excel_names = [] # Manual overrides for any directory gaps. # Keys must be normalized names matching org_chart_directed.csv labels. title_by_name.setdefault( _normalize_person_name('Dmitri Chtchourov'), 'Head of Data Science & AI/ML', ) # Names and IDs must come from org_chart_utils (directed CSV) so matching is correct. try: from .org_chart_utils import get_all_node_labels, get_node_id_by_label chart_names = get_all_node_labels() or [] chart_name_set = set(chart_names) # Prefer Excel order for autocomplete, but only keep names that exist in the chart. names = [n for n in excel_names if n in chart_name_set] # Append any chart names missing from Excel. excel_name_set = set(names) names.extend([n for n in chart_names if n not in excel_name_set]) id_by_name = {n: get_node_id_by_label(n) for n in names if get_node_id_by_label(n) is not None} except Exception: # Absolute fallback if org_chart_utils can't load. names = [] id_by_name = {} _TEAM_DIRECTORY_CACHE = (names, title_by_name, id_by_name) return _TEAM_DIRECTORY_CACHE def _load_team_names(): return _load_team_directory()[0] def _load_team_title_by_name(): return _load_team_directory()[1] def _load_team_id_by_name(): return _load_team_directory()[2] class IntakeWaitPage(Page): """ In async mode, this page is essentially a pass-through. No need to wait for other participants. """ def is_displayed(self): # In async mode, skip this wait page entirely return False def vars_for_template(self): completed = self.session.vars.get('intake_completed_participants', []) total_players = Constants.players_per_group first_completion = self.session.vars.get('intake_first_completion_time') time_remaining = 300 # 5 minutes default if first_completion: elapsed = time.time() - first_completion time_remaining = max(0, 300 - elapsed) return dict( completed_count=len(completed), total_players=total_players, time_remaining=int(time_remaining), all_completed=len(completed) >= total_players, ) @staticmethod def live_method(player, data): """Handle polling for wait status""" if data.get('type') == 'check_status': completed = player.session.vars.get('intake_completed_participants', []) total_players = Constants.players_per_group first_completion = player.session.vars.get('intake_first_completion_time') time_remaining = 300 timed_out = False if first_completion: elapsed = time.time() - first_completion time_remaining = max(0, 300 - elapsed) timed_out = elapsed >= 300 all_completed = len(completed) >= total_players can_proceed = all_completed or (timed_out and len(completed) >= 2) return { player.id_in_group: { 'type': 'status_update', 'completed_count': len(completed), 'total_players': total_players, 'time_remaining': int(time_remaining), 'all_completed': all_completed, 'timed_out': timed_out, 'can_proceed': can_proceed, } } return {} def before_next_page(self): # Mark which participants are active for this session completed = self.session.vars.get('intake_completed_participants', []) self.session.vars['active_participants'] = completed self.session.vars['active_player_count'] = len(completed) class GeneralInstructions(Page): def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True class ExampleDemo(Page): form_model = 'player' form_fields = ['demo_guess'] def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): # Fetch practice data from Registry reg = None root_subsession = self.subsession.in_round(1) regs = [ r for r in ParticipantRegistry.filter(subsession=root_subsession) if r.participant_code == self.participant.code ] if regs: reg = regs[0] draws_json = reg.demo_draws_json if reg else self.participant.vars.get('demo_draws', '[]') red_count = reg.demo_urn_red if reg else self.participant.vars.get('demo_red', 10) draws = json.loads(draws_json) # Ensure the practice animation can show 2 rounds of 3 draws each. # For older sessions/rows that only stored 3 draws, pad to 6 once. try: if isinstance(draws, list) and len(draws) < 6: from .models import perform_draw draws = list(draws) + perform_draw(red_count, num_balls=(6 - len(draws))) draws_json = json.dumps(draws) self.participant.vars['demo_draws'] = draws_json if reg: reg.demo_draws_json = draws_json except Exception: pass demo_draws_csv = ','.join(str(x) for x in draws) return dict( demo_draws=draws, demo_draws_json=draws_json, demo_draws_csv=demo_draws_csv, demo_red_count=red_count, ) class ExperimentIntro(Page): def is_displayed(self): consent = self.player.field_maybe_none('consent_agreed') return self.round_number == 1 and consent is True def vars_for_template(self): return dict( total_rounds=self.participant.vars.get('num_rounds', 60), ) class IndividualRound(Page): form_model = 'player' form_fields = ['individual_guess_1', 'individual_guess_2'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): return plan[self.round_number - 1]['round_type'] == 'individual' return False def before_next_page(self): # Ensure round data is set up self._ensure_round_setup() # Save this round to historical data save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='individual', urn_red_balls=self.player.urn_red_balls, draws_json=self.player.individual_draws_json, first_guess=self.player.individual_guess_1, updated_guess=self.player.individual_guess_2 ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.individual_draws_json = round_data.get('individual_draws_json') def vars_for_template(self): self._ensure_round_setup() draws = json.loads(self.player.individual_draws_json or '[]') flat_draws = [] if draws and isinstance(draws, list) and isinstance(draws[0], list): for batch in draws: if isinstance(batch, list): flat_draws.extend(batch) elif isinstance(draws, list): flat_draws = draws individual_draws_csv = ','.join(str(x) for x in flat_draws if x is not None) return dict( draw_batches=draws, draw_batches_json=json.dumps(draws), individual_draws_csv=individual_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): required_fields = ['individual_guess_1', 'individual_guess_2'] for field in required_fields: if values.get(field) is None: return 'Please enter your guess after each draw before continuing.' class SignalRound(Page): """ Signal round: Player sees their own draw, makes initial guess, then sees partner's historical draw and updates their guess. """ form_model = 'player' form_fields = ['signal_guess_initial', 'signal_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'signal' return False def before_next_page(self): self._ensure_round_setup() # Save this round to historical data save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='signal', urn_red_balls=self.player.urn_red_balls, draws_json=self.player.signal_draw_json, first_guess=self.player.signal_guess_initial, updated_guess=self.player.signal_guess_updated ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.signal_partner_draw_json = hist_data['draws_json'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_draws_raw = json.loads(self.player.signal_partner_draw_json or '[]') # Historical data from individual rounds is stored as 2D array [[batch1], [batch2]] # Extract first batch if it's a 2D array, otherwise use as-is if partner_draws_raw and isinstance(partner_draws_raw[0], list): partner_draws = partner_draws_raw[0] # Use first batch of 3 balls else: partner_draws = partner_draws_raw partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) partner_draws_csv = ','.join(str(x) for x in partner_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, partner_draws=partner_draws, partner_draws_json=json.dumps(partner_draws), partner_draws_csv=partner_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('signal_guess_initial') is None or values.get('signal_guess_updated') is None: return 'Please submit your first guess and your updated guess before continuing.' class ActionRound(Page): """ Action round: Player sees their own draw, makes initial guess, then sees partner's historical first guess and updates their guess. """ form_model = 'player' form_fields = ['action_guess_initial', 'action_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'action' return False def before_next_page(self): self._ensure_round_setup() # Save this round to historical data # Use field_maybe_none to safely access nullable fields action_draw = self.player.field_maybe_none('action_draw_json') signal_draw = self.player.field_maybe_none('signal_draw_json') save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='action', urn_red_balls=self.player.urn_red_balls, draws_json=action_draw or signal_draw, first_guess=self.player.action_guess_initial, updated_guess=self.player.action_guess_updated ) def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') self.player.action_draw_json = round_data.get('signal_draw_json') # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.action_partner_guess = hist_data['first_guess'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' # In async mode, partner's guess comes from historical data partner_guess_initial = self.player.partner_historical_guess own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_guess_initial=partner_guess_initial, partner_guess_initial_json=json.dumps(partner_guess_initial), partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('action_guess_initial') is None or values.get('action_guess_updated') is None: return 'Please submit your first guess and your updated guess before continuing.' class ActionKnownRound(Page): """ Action-known round: Player makes initial guess, then sees partner's draw. Player chooses to either match partner's (hidden) updated guess or submit their own. """ form_model = 'player' form_fields = ['action_guess_initial', 'action_known_choice', 'action_guess_updated'] def is_displayed(self): # Check if this round is beyond the participant's total rounds total_rounds = self.participant.vars.get('num_rounds', 60) if self.round_number > total_rounds: return False # Check round type from plan plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] return entry['round_type'] == 'pair' and entry.get('pair_mode') == 'action-known' return False def before_next_page(self): self._ensure_round_setup() # If player chose to match, use partner's historical updated guess if self.player.action_known_choice == 'match': # Use updated_guess if available, otherwise fall back to first_guess partner_updated = self.player.field_maybe_none('partner_historical_updated_guess') partner_first = self.player.field_maybe_none('partner_historical_guess') if partner_updated is not None: self.player.action_guess_updated = partner_updated elif partner_first is not None: self.player.action_guess_updated = partner_first else: # Fallback to their own initial guess if no partner data self.player.action_guess_updated = self.player.action_guess_initial # Save this round to historical data action_known_draw = self.player.field_maybe_none('action_known_draw_json') signal_draw = self.player.field_maybe_none('signal_draw_json') save_completed_round( self.subsession, self.participant, round_index=self.round_number, round_type='action-known', urn_red_balls=self.player.urn_red_balls, draws_json=action_known_draw or signal_draw, first_guess=self.player.action_guess_initial, updated_guess=self.player.action_guess_updated ) # Store the own guess if they chose "own" if self.player.action_known_choice == 'own': self.player.action_known_own_guess = self.player.action_guess_updated def _ensure_round_setup(self): """Sync this round's data from the pre-drawn drawing_room in participant.vars.""" if not self.player.round_type: drawing_room = self.participant.vars.get('drawing_room', {}) round_data = drawing_room.get(self.round_number) or drawing_room.get(str(self.round_number)) if round_data: self.player.round_type = round_data['round_type'] self.player.round_treatment = round_data['treatment'] self.player.pair_mode = round_data.get('pair_mode', '') self.player.partner_code = round_data.get('partner_code', '') self.player.urn_red_balls = round_data.get('urn_red_balls') self.player.signal_draw_json = round_data.get('signal_draw_json') self.player.action_known_draw_json = round_data.get('signal_draw_json') # In action-known, partner's draw is shown # Setup partner details plan = self.participant.vars.get('round_plan', []) if self.round_number <= len(plan): entry = plan[self.round_number - 1] self.player.partner_first_name = entry.get('partner_first_name', '') self.player.partner_role_title = entry.get('partner_role_title', '') self.player.partner_node_id = entry.get('partner_node_id') self.player.partner_relationship = entry.get('relationship', '') self.player.partner_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') # Fetch historical data if partner is assigned if self.player.partner_code: hist_data = get_historical_data_for_paired_round( self.subsession, self.player.partner_code, self.player.pair_mode, self.player.urn_red_balls ) if hist_data: self.player.partner_historical_guess = hist_data['first_guess'] self.player.partner_historical_updated_guess = hist_data['first_guess'] self.player.partner_historical_draw_json = hist_data['draws_json'] self.player.action_known_partner_guess = hist_data['first_guess'] # Set action_known_draw_json to the partner's actual draw self.player.action_known_draw_json = hist_data['draws_json'] def vars_for_template(self): self._ensure_round_setup() own_draws = json.loads(self.player.signal_draw_json or '[]') partner_draws_raw = json.loads(self.player.partner_historical_draw_json or '[]') if partner_draws_raw and isinstance(partner_draws_raw[0], list): partner_draws = partner_draws_raw[0] else: partner_draws = partner_draws_raw partner_full_label = '' if self.player.partner_first_name or self.player.partner_role_title: partner_full_label = f"{self.player.partner_first_name}, {self.player.partner_role_title}".strip(', ') partner_display_label = partner_full_label or 'your partner' own_draws_csv = ','.join(str(x) for x in own_draws if x is not None) partner_draws_csv = ','.join(str(x) for x in partner_draws if x is not None) return dict( own_draws=own_draws, own_draws_json=json.dumps(own_draws), own_draws_csv=own_draws_csv, partner_draws=partner_draws, partner_draws_json=json.dumps(partner_draws), partner_draws_csv=partner_draws_csv, urn_red_count=self.player.urn_red_balls if self.player.urn_red_balls is not None else 10, min_guess=4, max_guess=16, partner_first_name=self.player.partner_first_name, partner_role_title=self.player.partner_role_title, partner_full_label=partner_full_label, partner_display_label=partner_display_label, partner_relationship=self.player.partner_relationship, round_number=self.round_number, total_rounds=self.participant.vars.get('num_rounds', 60), ) def error_message(self, values): if values.get('action_guess_initial') is None: return 'Please submit your first guess before continuing.' if values.get('action_known_choice') is None: return 'Please choose whether to match or submit your own guess.' if values.get('action_known_choice') == 'own' and values.get('action_guess_updated') is None: return 'Please submit your own guess before continuing.' class Results(Page): """Final results page shown after all rounds are complete.""" form_model = 'player' form_fields = ['participant_comments'] def is_displayed(self): total_rounds = self.participant.vars.get('num_rounds', 60) # Only show Results after the post-experiment survey, demographics, and Likert scale were started if self.round_number != total_rounds: return False return bool(self.participant.vars.get('likert_started')) def before_next_page(self): # Mark participant as completed in the registry mark_participant_completed(self.subsession, self.participant) # Participant completed their experiment; Results will show after survey and demographics def vars_for_template(self): return dict( total_rounds=self.participant.vars.get('num_rounds', 60), join_order=self.participant.vars.get('join_order', 1), ) class Survey(Page): form_model = 'player' # No direct form_fields here since we handle manually in before_next_page form_fields = [] def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) if not (consent is True and self.round_number == total_rounds): return False if self.participant.vars.get('survey_started'): return False return True def vars_for_template(self): req = getattr(self, 'request', None) req_method = getattr(req, 'method', 'GET') if req is not None else 'GET' req_get = getattr(req, 'query_params', {}) if req is not None else {} # In oTree/Starlette, POSTed form data is available as self._form_data. req_post = getattr(self, '_form_data', {}) # Determine how many nomination rows to show. # - On normal GET, optionally accept counts from query params. # - On POST (e.g., if the page re-renders due to an error_message), infer # counts from submitted field names so typed values don't disappear. if req_method == 'POST': import re def infer_max_index(prefix: str) -> int: max_i = 0 for k in req_post.keys(): m = re.match(rf'^{prefix}_name_(\d+)$', k) if m: max_i = max(max_i, int(m.group(1))) return max_i advice_fields_to_show = max(3, min(10, infer_max_index('advice') or int(self.participant.vars.get('advice_fields_to_show', 3)))) friends_fields_to_show = max(3, min(10, infer_max_index('friends') or int(self.participant.vars.get('friends_fields_to_show', 3)))) insight_fields_to_show = max(3, min(10, infer_max_index('insight') or int(self.participant.vars.get('insight_fields_to_show', 3)))) else: advice_fields_to_show = int(req_get.get('advice_fields_to_show', self.participant.vars.get('advice_fields_to_show', 3)) or 3) friends_fields_to_show = int(req_get.get('friends_fields_to_show', self.participant.vars.get('friends_fields_to_show', 3)) or 3) insight_fields_to_show = int(req_get.get('insight_fields_to_show', self.participant.vars.get('insight_fields_to_show', 3)) or 3) advice_fields_to_show = max(3, min(10, advice_fields_to_show)) friends_fields_to_show = max(3, min(10, friends_fields_to_show)) insight_fields_to_show = max(3, min(10, insight_fields_to_show)) self.participant.vars['advice_fields_to_show'] = advice_fields_to_show self.participant.vars['friends_fields_to_show'] = friends_fields_to_show self.participant.vars['insight_fields_to_show'] = insight_fields_to_show advice_range = list(range(1, advice_fields_to_show + 1)) friends_range = list(range(1, friends_fields_to_show + 1)) insight_range = list(range(1, insight_fields_to_show + 1)) # Fetch existing JSON data from Registry if available reg = None root_subsession = self.subsession.in_round(1) regs = [ r for r in ParticipantRegistry.filter(subsession=root_subsession) if r.participant_code == self.participant.code ] if regs: reg = regs[0] if req_method == 'POST': # Preserve whatever the participant just typed if the page re-renders. def extract_entries_from_post(prefix: str): entries = [] for i in range(1, 11): name = (req_post.get(f'{prefix}_name_{i}', '') or '').strip() if name or i <= (advice_fields_to_show if prefix == 'advice' else friends_fields_to_show if prefix == 'friends' else insight_fields_to_show): entries.append({'name': name}) return entries advice_data = extract_entries_from_post('advice') friends_data = extract_entries_from_post('friends') insight_data = extract_entries_from_post('insight') else: advice_json = reg.advice_json if reg else '[]' friends_json = reg.friends_json if reg else '[]' insight_json = reg.insight_json if reg else '[]' advice_data = json.loads(advice_json or '[]') friends_data = json.loads(friends_json or '[]') insight_data = json.loads(insight_json or '[]') advice_entries = [] for i in advice_range: entry = advice_data[i-1] if i <= len(advice_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') advice_entries.append({'index': i, 'name': name_val}) friends_entries = [] for i in friends_range: entry = friends_data[i-1] if i <= len(friends_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') friends_entries.append({'index': i, 'name': name_val}) insight_entries = [] for i in insight_range: entry = insight_data[i-1] if i <= len(insight_data) else {'name': ''} if isinstance(entry, str): name_val = entry else: name_val = (entry or {}).get('name', '') insight_entries.append({'index': i, 'name': name_val}) return dict( advice_fields_to_show=advice_fields_to_show, friends_fields_to_show=friends_fields_to_show, insight_fields_to_show=insight_fields_to_show, advice_range=advice_range, friends_range=friends_range, insight_range=insight_range, advice_entries=advice_entries, friends_entries=friends_entries, insight_entries=insight_entries, team_names_json=json.dumps(_load_team_names()), ) # NOTE: No server-side empty-survey warning here. # The template already handles this via a client-side modal. # Keeping server-side warnings caused confusing false-empties and wiped inputs. def before_next_page(self): from .models import update_participant_registry req_post = getattr(self, '_form_data', {}) def extract_json(prefix): entries = [] for i in range(1, 11): name = (req_post.get(f'{prefix}_name_{i}', '') or '').strip() if name: entries.append({'name': name}) return json.dumps(entries) advice_json = extract_json('advice') friends_json = extract_json('friends') insight_json = extract_json('insight') update_participant_registry( self.subsession, self.participant, advice_json=advice_json, friends_json=friends_json, insight_json=insight_json, ) # Mirror onto Player so it's available in standard app exports. self.player.advice_json = advice_json self.player.friends_json = friends_json self.player.insight_json = insight_json self.participant.vars['survey_started'] = True self.participant.vars['survey_completed'] = True class Demographics(Page): form_model = 'player' form_fields = ['gender', 'gender_self_describe', 'tenure_years', 'work_location'] def is_displayed(self): consent = self.participant.vars.get('consent_given') total_rounds = self.participant.vars.get('num_rounds', 13) # Show on final round for participants who consented and started survey if not (consent is True and self.round_number == total_rounds): return False if not self.participant.vars.get('survey_started'): return False if self.participant.vars.get('demographics_started'): return False return True def error_message(self, values): # Check if all required fields are empty gender = values.get('gender', '').strip() tenure_years = values.get('tenure_years', '').strip() work_location = values.get('work_location', '').strip() # If gender is "Prefer to self-describe", the self-describe field is required if gender == 'Prefer to self-describe': gender_self_describe = values.get('gender_self_describe', '').strip() if not gender_self_describe: return 'Please describe your gender in the text field.' # Check if all fields are empty if not gender and not tenure_years and not work_location: # Check if they've already been warned (second attempt) if self.participant.vars.get('demographics_empty_warned'): # Allow the empty submission on second attempt self.participant.vars['demographics_empty_warned'] = False return None else: # First attempt — warn them with styled HTML message self.participant.vars['demographics_empty_warned'] = True return '
Click Submit again to confirm you want to proceed without completing the demographics.
Click Submit again to confirm you want to proceed without completing the survey.