from otree.api import * import random import csv from pathlib import Path def load_prompt_bank(): path = Path(__file__).with_name('combined_prompt_bank.csv') rows = [] with path.open(newline='', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: rows.append(row) return rows PROMPT_BANK = load_prompt_bank() REQUIRED_COLUMNS = {'prompt_id', 'domain', 'prompt_text'} VALID_DOMAINS = {'risk', 'time', 'social'} if not PROMPT_BANK: raise ValueError('combined_prompt_bank.csv is empty.') missing = REQUIRED_COLUMNS - set(PROMPT_BANK[0].keys()) if missing: raise ValueError(f'Missing required CSV columns: {missing}') for row in PROMPT_BANK: if row['domain'] not in VALID_DOMAINS: raise ValueError(f"Invalid domain '{row['domain']}' in CSV.") from .parameters import ( BASE_PARTICIPATION_FEE, PRIZE_AMOUNT, TARGET_REPS, RISK_QUESTIONS, TIME_QUESTIONS, SOCIAL_QUESTIONS, ) doc = '' class C(BaseConstants): NAME_IN_URL = 'prompt_implementation' PLAYERS_PER_GROUP = None NUM_ROUNDS = TARGET_REPS class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): respondent_id = models.StringField(blank=True) # assigned prompt/task info for this round prompt_id = models.StringField(blank=True) domain = models.StringField(blank=True) prompt_text = models.LongStringField(blank=True) # original participant answers for this round orig_q1 = models.StringField(blank=True) orig_q2 = models.StringField(blank=True) orig_q3 = models.StringField(blank=True) orig_q4 = models.StringField(blank=True) orig_q5 = models.StringField(blank=True) orig_q6 = models.StringField(blank=True) orig_q7 = models.StringField(blank=True) orig_q8 = models.StringField(blank=True) orig_q9 = models.StringField(blank=True) orig_q10 = models.StringField(blank=True) orig_q11 = models.StringField(blank=True) # participant answers for this round answer_q1 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q2 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q3 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q4 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q5 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q6 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q7 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q8 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q9 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q10 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) answer_q11 = models.StringField( choices=['A', 'B'], widget=widgets.RadioSelect, blank=True, ) # scoring for this round total_matches = models.IntegerField(initial=0) match_rate = models.FloatField(initial=0) # final payment info paying_round = models.IntegerField(initial=0) prize_draw = models.FloatField(initial=0) won_prize = models.BooleanField(initial=False) selected_match_rate = models.FloatField(initial=0) final_payoff_amount = models.CurrencyField(initial=0) base_fee_paid = models.CurrencyField(initial=0) prize_amount_possible = models.CurrencyField(initial=0) prize_won_amount = models.CurrencyField(initial=0) paying_domain = models.StringField(blank=True) paying_prompt_id = models.StringField(blank=True) grade = models.StringField( choices=['Freshman', 'Sophomore', 'Junior', 'Senior', 'Other'], blank=True ) age = models.IntegerField(blank=True) ethnicity = models.StringField( choices=[ 'American Indian or Alaska Native', 'Asian', 'Black or African American', 'Hispanic or Latino', 'Native Hawaiian or Other Pacific Islander', 'White', 'Multiracial', 'Other', 'Prefer not to say', ], blank=True ) gender = models.StringField( choices=[ 'Female', 'Male', 'Non-binary', 'Other', 'Prefer not to say', ], blank=True ) def creating_session(subsession): if subsession.round_number == 1: players = subsession.get_players() prompts_by_domain = { 'risk': [row for row in PROMPT_BANK if row['domain'] == 'risk'], 'time': [row for row in PROMPT_BANK if row['domain'] == 'time'], 'social': [row for row in PROMPT_BANK if row['domain'] == 'social'], } # safety check: each domain must have at least NUM_ROUNDS unique prompts for domain, rows in prompts_by_domain.items(): if len(rows) < C.NUM_ROUNDS: raise ValueError( f"Not enough prompts in domain '{domain}'. " f"Need at least {C.NUM_ROUNDS}, found {len(rows)}." ) # make domain assignment as balanced as possible base_domains = ['risk', 'time', 'social'] domain_cycle = base_domains * (len(players) // 3) remainder = len(players) % 3 domain_cycle += random.sample(base_domains, remainder) random.shuffle(domain_cycle) for player, assigned_domain in zip(players, domain_cycle): assigned_prompts = random.sample( prompts_by_domain[assigned_domain], C.NUM_ROUNDS ) player.participant.vars['assigned_domain'] = assigned_domain player.participant.vars['assigned_prompts'] = assigned_prompts for player in subsession.get_players(): prompt_record = player.participant.vars['assigned_prompts'][subsession.round_number - 1] player.respondent_id = player.participant.code player.prompt_id = prompt_record['prompt_id'] player.domain = player.participant.vars['assigned_domain'] player.prompt_text = prompt_record['prompt_text'] questions = get_questions_for_domain(player.domain) for i in range(1, len(questions) + 1): setattr( player, f'orig_q{i}', (prompt_record.get(f'orig_q{i}', '') or '').strip().upper() ) def score_round(player): questions = get_questions_for_domain(player.domain) total = 0 for i in range(1, len(questions) + 1): participant_answer = getattr(player, f'answer_q{i}') original_answer = getattr(player, f'orig_q{i}') if participant_answer == original_answer: total += 1 player.total_matches = total player.match_rate = total / len(questions) def get_questions_for_domain(domain): if domain == 'risk': return RISK_QUESTIONS elif domain == 'time': return TIME_QUESTIONS elif domain == 'social': return SOCIAL_QUESTIONS else: return [] def build_display_questions(player): questions = get_questions_for_domain(player.domain) display_questions = [] for i, q in enumerate(questions, start=1): display_questions.append(dict( field_name=f'answer_q{i}', text=q['text'], name=q['name'], )) return display_questions class Instructions(Page): @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def vars_for_template(player): return dict( total_rounds=C.NUM_ROUNDS, ) class RiskTaskPage(Page): form_model = 'player' @staticmethod def is_displayed(player): return player.domain == 'risk' @staticmethod def get_form_fields(player): questions = get_questions_for_domain(player.domain) return [f'answer_q{i}' for i in range(1, len(questions) + 1)] @staticmethod def vars_for_template(player): return dict( prompt_text=player.prompt_text, domain=player.domain, questions=build_display_questions(player), round_number=player.round_number, total_rounds=C.NUM_ROUNDS, ) @staticmethod def before_next_page(player, timeout_happened): score_round(player) class TimeTaskPage(Page): form_model = 'player' @staticmethod def is_displayed(player): return player.domain == 'time' @staticmethod def get_form_fields(player): questions = get_questions_for_domain(player.domain) return [f'answer_q{i}' for i in range(1, len(questions) + 1)] @staticmethod def vars_for_template(player): return dict( prompt_text=player.prompt_text, domain=player.domain, questions=build_display_questions(player), round_number=player.round_number, total_rounds=C.NUM_ROUNDS, ) @staticmethod def before_next_page(player, timeout_happened): score_round(player) class SocialTaskPage(Page): form_model = 'player' @staticmethod def is_displayed(player): return player.domain == 'social' @staticmethod def get_form_fields(player): questions = get_questions_for_domain(player.domain) return [f'answer_q{i}' for i in range(1, len(questions) + 1)] @staticmethod def vars_for_template(player): return dict( prompt_text=player.prompt_text, domain=player.domain, questions=build_display_questions(player), round_number=player.round_number, total_rounds=C.NUM_ROUNDS, ) @staticmethod def before_next_page(player, timeout_happened): score_round(player) class FinalResults(Page): @staticmethod def is_displayed(player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player): all_rounds = player.in_all_rounds() if 'paying_round' not in player.participant.vars: selected_round = random.choice(all_rounds) draw = random.random() won_prize = draw < selected_round.match_rate final_payoff = BASE_PARTICIPATION_FEE prize_won_amount = cu(0) if won_prize: final_payoff += PRIZE_AMOUNT prize_won_amount = PRIZE_AMOUNT player.participant.vars['paying_round'] = selected_round.round_number player.participant.vars['selected_match_rate'] = selected_round.match_rate player.participant.vars['prize_draw'] = draw player.participant.vars['won_prize'] = won_prize player.participant.vars['final_payoff'] = final_payoff player.participant.vars['paying_domain'] = selected_round.domain player.participant.vars['paying_prompt_id'] = selected_round.prompt_id player.participant.vars['prize_won_amount'] = prize_won_amount player.paying_round = player.participant.vars['paying_round'] player.prize_draw = player.participant.vars['prize_draw'] player.won_prize = player.participant.vars['won_prize'] player.selected_match_rate = player.participant.vars['selected_match_rate'] player.final_payoff_amount = player.participant.vars['final_payoff'] player.base_fee_paid = BASE_PARTICIPATION_FEE player.prize_amount_possible = PRIZE_AMOUNT player.prize_won_amount = player.participant.vars['prize_won_amount'] player.paying_domain = player.participant.vars['paying_domain'] player.paying_prompt_id = player.participant.vars['paying_prompt_id'] player.payoff = player.final_payoff_amount paying_round_number = player.paying_round paying_round = player.in_round(paying_round_number) return dict( paying_round=paying_round_number, paying_domain_label=paying_round.domain.capitalize(), paying_prompt=paying_round.prompt_text, paying_match_rate_percent=round(paying_round.match_rate * 100, 1), won_prize=player.won_prize, final_payoff=player.final_payoff_amount, base_fee=player.base_fee_paid, prize_amount=player.prize_amount_possible, ) class Survey(Page): form_model = 'player' form_fields = [ 'grade', 'age', 'ethnicity', 'gender', ] @staticmethod def is_displayed(player): return player.round_number == C.NUM_ROUNDS class ThankYou(Page): @staticmethod def is_displayed(player): return player.round_number == C.NUM_ROUNDS page_sequence = [Instructions, RiskTaskPage, TimeTaskPage, SocialTaskPage, FinalResults, Survey, ThankYou]