import random import csv import os import json import numpy as np from datetime import datetime, timezone from otree.api import * from .lexicon_en import Lexicon as LexiconEN from .lexicon_fr import Lexicon as LexiconFR # Helper function language access def get_language_and_lexicon(player, strict=True): from .lexicon_fr import Lexicon as LexiconFR from .lexicon_en import Lexicon as LexiconEN lang = ( player.participant.vars['language'] if strict else player.participant.vars.get('language') or player.field_maybe_none('language') ) if lang not in ['fr', 'en']: raise ValueError(f"Unsupported or missing language: {lang}") if lang == 'fr': return LexiconFR, 'fr' else: return LexiconEN, 'en' class C(BaseConstants): NAME_IN_URL = 'investment_decisions' PLAYERS_PER_GROUP = None # Configuration: Change these numbers for testing vs. real experiment # Phase lengths (customize for your experiment) block1_rounds = 5 # First no-treatment phase rounds block2_rounds = 5 # Treatment_1 (autonomous) phase rounds block3_rounds = 5 # Treatment_2/3 phase rounds (GROUP-DEPENDENT) block4_rounds = 5 # Final no-treatment phase rounds NUM_ROUNDS = block1_rounds + block2_rounds + block3_rounds + block4_rounds # Total rounds must equal sum of blocks # Available decision IDs from CSV (32 unique investment scenarios) AVAILABLE_DECISION_IDS = list(range(1, 33)) # decisions 1-32 from CSV # Round structure: Define which rounds belong to which phase # Block types: 'no-treatment', 'treatment_1' (autonomous), # 'treatment_2' (allocation), 'treatment_3' (allocation+forecast) ROUND_STRUCTURE = { 'no_treatment_1': list(range(1, block1_rounds + 1)), 'treatment_1': list(range(block1_rounds + 1, block1_rounds + block2_rounds + 1)), 'treatment_2_or_3': list(range(block1_rounds + block2_rounds + 1, block1_rounds + block2_rounds + block3_rounds + 1)), 'no_treatment_2': list(range(block1_rounds + block2_rounds + block3_rounds + 1, NUM_ROUNDS + 1)), } RA_DEFAULT = 5 # Default risk aversion if lottery not completed during testing @staticmethod def get_block_type_for_round(round_number, treatment_group): """Determine block type based on round number and treatment group. Args: round_number: The current round (1-indexed) treatment_group: 1 or 2 (determines treatment_2 vs treatment_3) Returns: Block type string: 'no-treatment', 'treatment_1', 'treatment_2', or 'treatment_3' """ if round_number in C.ROUND_STRUCTURE['no_treatment_1']: return 'no-treatment' elif round_number in C.ROUND_STRUCTURE['treatment_1']: return 'treatment_1' elif round_number in C.ROUND_STRUCTURE['treatment_2_or_3']: return 'treatment_2' if treatment_group == 1 else 'treatment_3' elif round_number in C.ROUND_STRUCTURE['no_treatment_2']: return 'no-treatment' else: raise ValueError(f"Round {round_number} not mapped in ROUND_STRUCTURE") @staticmethod def is_last_round_of_block(round_number): """Check if the current round is the last round of any block. Args: round_number: The current round (1-indexed) Returns: bool: True if this is the last round of a block, False otherwise """ last_rounds = [ C.block1_rounds, # Last round of block 1 C.block1_rounds + C.block2_rounds, # Last round of block 2 C.block1_rounds + C.block2_rounds + C.block3_rounds, # Last round of block 3 C.NUM_ROUNDS # Last round of block 4 ] return round_number in last_rounds """ EXAMPLE 36-ROUND CONFIGURATION: NUM_ROUNDS = 36 block1_rounds = 9 # Rounds 1-9: Baseline (no treatment) block2_rounds = 9 # Rounds 10-18: Treatment_1 (autonomous) block3_rounds = 9 # Rounds 19-27: Treatment_2 (group 1) or Treatment_3 (group 2) block4_rounds = 9 # Rounds 28-36: Final baseline (no treatment) """ class Subsession(BaseSubsession): pass class Group(BaseGroup): pass def utc_now_iso(): return datetime.now(timezone.utc).isoformat() def append_page_timestamp(player, field_name, page_name): raw = player.field_maybe_none(field_name) or '{}' try: payload = json.loads(raw) except json.JSONDecodeError: payload = {} if field_name == 'page_shown_at_utc' and page_name in payload: return payload[page_name] = utc_now_iso() setattr(player, field_name, json.dumps(payload, ensure_ascii=True)) class Player(BasePlayer): treatment_group = models.IntegerField() # 1 or 2 (determines treatment_2 vs treatment_3) decision_id = models.IntegerField() # Stores the randomized decision ID for this round block_type = models.StringField() # Stores the block type for this round (no-treatment, treatment_1, etc.) return_expectation = models.FloatField(min=None, blank=True) # Allow any numeric value (positive or negative) portfolio_allocation = models.FloatField(min=0, max=100, blank=True) # Set by robot in autonomous treatment confidence = models.IntegerField( min=0, max=100, blank=True ) realized_return = models.FloatField() # The actual return for this investment decision payoff_this_round = models.FloatField() # Store final payoff risk_aversion = models.FloatField(blank=True) # Store risk aversion from the lottery task forecasted_return = models.FloatField() optimal_allocation = models.FloatField(blank=True) # Post-decision questionnaire fields (Affective Slider specification: 0.0-1.0 scale) arousal = models.FloatField(min=0, max=1, blank=True) # Affective slider: arousal (sleepy to wide-awake) valence = models.FloatField(min=0, max=1, blank=True) # Affective slider: valence (unhappy to happy) mental_demand = models.IntegerField(min=0, max=100, blank=True) # NASA TLX: mental demand slider_order = models.StringField(blank=True) # Track randomized order: 'arousal_first' or 'valence_first' # Post-result questionnaire fields (shown only at last round of each block) arousal_post_result = models.FloatField(min=0, max=1, blank=True) # Affective slider: arousal after results valence_post_result = models.FloatField(min=0, max=1, blank=True) # Affective slider: valence after results mental_demand_post_result = models.IntegerField(min=0, max=100, blank=True) # NASA TLX: mental demand after results slider_order_post_result = models.StringField(blank=True) # Track randomized order: 'arousal_first' or 'valence_first' page_shown_at_utc = models.LongStringField(blank=True) page_submitted_at_utc = models.LongStringField(blank=True) def set_payoff(player): invested_tokens = player.session.config['endowment_investment'] * player.portfolio_allocation / 100 non_invested_tokens = player.session.config['endowment_investment'] - invested_tokens # Payoff formula: (non-invested part stays the same) + (invested part grows with realized return) player.payoff_this_round = non_invested_tokens + (invested_tokens * (1 + player.realized_return)) print(f"✅ Payoff calculated for round {player.round_number}: {player.payoff_this_round}") @staticmethod def calculate_optimal_allocation(player): """Compute Markowitz-style optimal allocation based on risk aversion and best forecast.""" # Retrieve risk-free rate and volatility from constants risk_free_rate = player.session.config['rf'] volatility = player.session.config['sigma'] # Get the forecasted return from participant vars forecasted_return = player.participant.forecast_returns.get(player.decision_id) risk_aversion = player.field_maybe_none('risk_aversion') or C.RA_DEFAULT # Temporary default # Markowitz allocation formula: w* = (E[R] - Rf) / (A * σ^2) if risk_aversion > 0 and volatility > 0: optimal_allocation = (forecasted_return - risk_free_rate) / (risk_aversion * volatility**2) else: optimal_allocation = 0.5 # Default to 50% if something goes wrong # Ensure allocation is between 0 and 100% optimal_allocation = max(0, min(optimal_allocation, 1)) print(f"📈 Optimal allocation for participant {player.participant.id_in_session}, " f"round {player.round_number}: {optimal_allocation:.2%}") return optimal_allocation * 100 # Convert to percentage # FUNCTIONS def validate_round_structure(): """Validate that ROUND_STRUCTURE covers all rounds exactly once.""" # Check that all blocks sum to NUM_ROUNDS total = C.block1_rounds + C.block2_rounds + C.block3_rounds + C.block4_rounds if total != C.NUM_ROUNDS: raise ValueError(f"Phase lengths ({C.block1_rounds} + {C.block2_rounds} + {C.block3_rounds} + {C.block4_rounds} = {total}) " f"don't match NUM_ROUNDS={C.NUM_ROUNDS}") # Check that all rounds are covered exactly once all_rounds = [] for phase_name, phase_rounds in C.ROUND_STRUCTURE.items(): all_rounds.extend(phase_rounds) if sorted(all_rounds) != list(range(1, C.NUM_ROUNDS + 1)): raise ValueError(f"ROUND_STRUCTURE doesn't cover all rounds 1-{C.NUM_ROUNDS} exactly once") print(f"✅ Round structure validated: {C.block1_rounds} + {C.block2_rounds} + {C.block3_rounds} + {C.block4_rounds} = {C.NUM_ROUNDS} rounds") def creating_session(subsession: Subsession): """Setup decision IDs, treatment groups, and load data from CSV.""" if subsession.round_number == 1: # Validate configuration first validate_round_structure() # BALANCED TREATMENT GROUP ASSIGNMENT players = subsession.get_players() num_players = len(players) # Create balanced list: half get group 1, half get group 2 treatment_groups = [1] * (num_players // 2) + [2] * (num_players // 2) # Handle odd number of participants if num_players % 2 == 1: treatment_groups.append(random.choice([1, 2])) # Randomize the assignment order random.shuffle(treatment_groups) print(f"🎲 Balanced randomization: {treatment_groups.count(1)} participants in Group 1 (treatment_2), " f"{treatment_groups.count(2)} in Group 2 (treatment_3)") # Load realized returns and forecasts from CSV realized_returns_dict = {} forecast_returns_dict = {} # Load from centralized _static folder file_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), '_static', 'realized_returns.csv') with open(file_path, newline='', encoding='utf-8-sig') as csvfile: reader = csv.DictReader(csvfile) for row in reader: decision_id = int(row['decision_id']) realized_returns_dict[decision_id] = float(row['realized_return']) forecast_returns_dict[decision_id] = float(row['best_forecast']) # For each participant, assign treatment group and randomly select decisions for p, group in zip(players, treatment_groups): # Store treatment group in participant vars (this persists across all rounds) p.participant.vars['treatment_group'] = group # Set treatment_group on player object for ALL rounds for player_in_round in p.in_all_rounds(): player_in_round.treatment_group = group # Randomly sample decision IDs (without replacement) selected_decisions = random.sample(C.AVAILABLE_DECISION_IDS, C.NUM_ROUNDS) # Create mapping: {round_number: decision_id} round_to_decision = dict(zip(range(1, C.NUM_ROUNDS + 1), selected_decisions)) # Store in participant vars p.participant.round_to_decision = round_to_decision p.participant.realized_returns = realized_returns_dict p.participant.forecast_returns = forecast_returns_dict print(f"👤 Participant {p.id_in_subsession}: Group {group}") print(f" Decisions: {selected_decisions}") # PAGES class InvestmentDecisionPage(Page): form_model = 'player' @staticmethod def get_form_fields(player): """Determine form fields based on block type for this round.""" # Always get from participant vars (persists across rounds) treatment_group = player.participant.vars['treatment_group'] block_type = C.get_block_type_for_round(player.round_number, treatment_group) # Treatment 1 (autonomous) - show forecast, but robot decides allocation if block_type == 'treatment_1': return ['return_expectation'] # Show only forecast, hide allocation # All other block types show allocation and expectation return ['portfolio_allocation', 'return_expectation'] @staticmethod def error_message(player, values): """Validate that required fields are filled when form is shown.""" # Always get from participant vars (persists across rounds) treatment_group = player.participant.vars['treatment_group'] block_type = C.get_block_type_for_round(player.round_number, treatment_group) # For treatment_1 (autonomous), validate forecast but NOT allocation if block_type == 'treatment_1': if values.get('return_expectation') is None: return 'Please enter your expected return' forecast = values.get('return_expectation') if abs(forecast * 10 - round(forecast * 10)) > 1e-9: return 'Expected return must have at most one decimal place' return None # For all other treatments, validate all required fields if values.get('return_expectation') is None: return 'Please enter your expected return' forecast = values.get('return_expectation') if abs(forecast * 10 - round(forecast * 10)) > 1e-9: return 'Expected return must have at most one decimal place' if values.get('portfolio_allocation') is None: return 'Please set your portfolio allocation using the slider' return None @staticmethod def before_next_page(player, timeout_happened): """Calculate allocation and payoff before moving to results.""" append_page_timestamp(player, 'page_submitted_at_utc', InvestmentDecisionPage.__name__) # If autonomous treatment, calculate optimal allocation automatically if player.block_type == 'treatment_1': player.portfolio_allocation = Player.calculate_optimal_allocation(player) # Ensure decision_id is set player.decision_id = player.participant.round_to_decision[player.round_number] # Retrieve realized return realized = player.participant.realized_returns.get(player.decision_id) print(f"🎯 Round {player.round_number}: decision_id={player.decision_id}, realized_return from dict = {realized}") player.realized_return = realized # Calculate payoff Player.set_payoff(player) # Store optimal allocation for data export player.optimal_allocation = Player.calculate_optimal_allocation(player) # Save investment decisions to participant vars if 'decisions' not in player.participant.vars: player.participant.vars['decisions'] = [] player.participant.vars['decisions'].append({ 'app': 'investment', 'round': player.round_number, 'decision_id': player.decision_id, 'block_type': player.block_type, 'expected_return': player.field_maybe_none('return_expectation'), 'best_forecast': player.participant.forecast_returns.get(player.decision_id), 'allocation': player.portfolio_allocation, 'payoff': player.payoff_this_round, }) print(f"👀 player.return_expectation: {player.field_maybe_none('return_expectation')}") @staticmethod def is_displayed(player: Player): """Assign decision_id, block_type, and realized return dynamically before displaying the page.""" player.decision_id = player.participant.round_to_decision[player.round_number] # Get block type dynamically based on treatment group from participant vars treatment_group = player.participant.vars['treatment_group'] player.block_type = C.get_block_type_for_round(player.round_number, treatment_group) # Also store treatment_group on player for this round player.treatment_group = treatment_group player.realized_return = player.participant.realized_returns.get(player.decision_id, 0) return True @staticmethod def vars_for_template(player): append_page_timestamp(player, 'page_shown_at_utc', InvestmentDecisionPage.__name__) form_fields = InvestmentDecisionPage.get_form_fields(player) allocation_in_form = 'portfolio_allocation' in form_fields block_type = player.block_type optimal_allocation = Player.calculate_optimal_allocation(player) # Get best forecast return from participant vars best_forecast = round(player.participant.forecast_returns.get(player.decision_id) * 100, 2) from . import get_language_and_lexicon Lexicon, lang = get_language_and_lexicon(player) return dict( Lexicon=Lexicon, lang=lang, round_number=player.round_number, decision_id=player.decision_id, block_type=block_type, graph_url=f"/static/graphs/graph{player.decision_id}.png", optimal_allocation=optimal_allocation, best_forecast=best_forecast, show_form=bool(form_fields), show_allocation=allocation_in_form, ) class PostDecisionQuestionnaire(Page): """Post-decision questionnaire with affective sliders and NASA TLX mental demand.""" form_model = 'player' form_fields = ['confidence', 'arousal', 'valence', 'mental_demand', 'slider_order'] @staticmethod def error_message(player, values): """Validate that all fields are filled.""" if values.get('confidence') is None: return 'Please select your confidence level' if values.get('arousal') is None: return 'Please interact with the arousal slider' if values.get('valence') is None: return 'Please interact with the valence slider' if values.get('mental_demand') is None: return 'Please interact with the mental demand slider' return None @staticmethod def vars_for_template(player): append_page_timestamp(player, 'page_shown_at_utc', PostDecisionQuestionnaire.__name__) from . import get_language_and_lexicon Lexicon, lang = get_language_and_lexicon(player) return dict( Lexicon=Lexicon, lang=lang, round_number=player.round_number, ) @staticmethod def before_next_page(player, timeout_happened): append_page_timestamp(player, 'page_submitted_at_utc', PostDecisionQuestionnaire.__name__) class InvestmentResultsPage(Page): @staticmethod def vars_for_template(player): append_page_timestamp(player, 'page_shown_at_utc', InvestmentResultsPage.__name__) from . import get_language_and_lexicon Lexicon, lang = get_language_and_lexicon(player) forecasted_return = player.field_maybe_none('return_expectation') # Fallback for legacy rows where autonomous rounds cleared return_expectation. if forecasted_return is None: forecasted_return = player.participant.forecast_returns.get(player.decision_id, 0) * 100 # Check forecast accuracy forecast_is_accurate = False if forecasted_return is not None: if abs(player.realized_return - forecasted_return / 100) <= 0.01: forecast_is_accurate = True forecast_correct = 'within' if lang == 'en' else '' else: forecast_correct = 'not within' if lang == 'en' else 'pas' else: forecast_correct = 'not within' if lang == 'en' else 'pas' payoff_forecast = 50 if forecast_is_accurate and player.field_maybe_none('return_expectation') is not None else 0 payoff_total = player.payoff_this_round + payoff_forecast print(forecast_correct, payoff_forecast) return dict( Lexicon=Lexicon, lang=lang, round_number=player.round_number, decision_id=player.decision_id, block_type=player.block_type, graph_url=f"/static/graphs/graph{player.decision_id}_r.png", realized_return=player.realized_return * 100, # convert to percentage allocation=player.portfolio_allocation, forecasted_return=forecasted_return, forecast_correct=forecast_correct, payoff_investment=player.payoff_this_round, payoff_forecast=payoff_forecast, payoff_total=payoff_total, ) @staticmethod def before_next_page(player, timeout_happened): append_page_timestamp(player, 'page_submitted_at_utc', InvestmentResultsPage.__name__) class PostResultsQuestionnaire(Page): """Post-result questionnaire with affective sliders. Only shown on the last round of each block.""" form_model = 'player' form_fields = ['arousal_post_result', 'valence_post_result', 'slider_order_post_result'] @staticmethod def is_displayed(player): """Only show this page on the last round of each block.""" return C.is_last_round_of_block(player.round_number) @staticmethod def error_message(player, values): """Validate that all affective fields are filled.""" if values.get('arousal_post_result') is None: return 'Please interact with the arousal slider' if values.get('valence_post_result') is None: return 'Please interact with the valence slider' return None @staticmethod def vars_for_template(player): append_page_timestamp(player, 'page_shown_at_utc', PostResultsQuestionnaire.__name__) from . import get_language_and_lexicon Lexicon, lang = get_language_and_lexicon(player) return dict( Lexicon=Lexicon, lang=lang, round_number=player.round_number, ) @staticmethod def before_next_page(player, timeout_happened): append_page_timestamp(player, 'page_submitted_at_utc', PostResultsQuestionnaire.__name__) class BlockTransitionPage(Page): """Transition page shown after each block to explain the next phase.""" @staticmethod def is_displayed(player): return C.is_last_round_of_block(player.round_number) @staticmethod def vars_for_template(player): append_page_timestamp(player, 'page_shown_at_utc', BlockTransitionPage.__name__) Lexicon, lang = get_language_and_lexicon(player) end_block_1 = C.block1_rounds end_block_2 = C.block1_rounds + C.block2_rounds end_block_3 = C.block1_rounds + C.block2_rounds + C.block3_rounds transition_title = Lexicon.transition_title transition_body = '' if player.round_number == end_block_1: transition_body = Lexicon.transition_block1_to_block2 elif player.round_number == end_block_2: treatment_group = player.participant.vars.get('treatment_group') if treatment_group == 1: transition_body = Lexicon.transition_block2_to_block3_allocation else: transition_body = Lexicon.transition_block2_to_block3_allocation_forecast elif player.round_number == end_block_3: transition_body = Lexicon.transition_block3_to_block4 else: transition_title = Lexicon.transition_final_title transition_body = Lexicon.transition_block4_to_questionnaire return dict( Lexicon=Lexicon, lang=lang, transition_title=transition_title, transition_body=transition_body, ) @staticmethod def before_next_page(player, timeout_happened): append_page_timestamp(player, 'page_submitted_at_utc', BlockTransitionPage.__name__) page_sequence = [ InvestmentDecisionPage, PostDecisionQuestionnaire, InvestmentResultsPage, PostResultsQuestionnaire, BlockTransitionPage, ]