from otree.api import * from markupsafe import Markup import random import json import time doc = """ Your app description """ class C(BaseConstants): NAME_IN_URL = 'intro' PLAYERS_PER_GROUP = None NUM_ROUNDS = 1 # Treatment assignment quotas SINGLE_QUOTA = 50 SINGLE_TREATMENT = 'S1' # T2/T2E prioritized first; CG fills last (least costly if lab runs short) TEAM_TREATMENTS_LIST = ['T2', 'T2E', 'CG'] DATA_FILE_PATH = f'{__name__}/static/{__name__}/data/' DATA_PRACTICE_FILE_NAME = 'practice.csv' DATA_FILE_NAME = 'data.csv' INTERFACE_DATA_RANDOM3 = 45 # The random3 value of the interface data ### Treatment arms (order used for practice rotation and task_group matching) # - S1: single participant; third-party estimate labeled AI # - CG: control; no third-party estimate in UI # - T2: team; AI third-party estimate # - T2E: team; third-party labeled "Third Party" (same data as T2) ### TREATMENTS = ['S1', 'CG', 'T2', 'T2E'] BONUSES = [cu(2), cu(1.5), cu(1), cu(0.5), cu(0)] BONUS_THRESHOLD = [50, 60, 70, 80] # Attention check interval in milliseconds ATTENTION_CHECK_INTERVAL_SECONDS = 300 # 5 * 60 # When will the warning text be shown? ATTENTION_CHECK_WARNING_SECONDS = 30 # 30 # Timeout for the attention check in total(including warning seconds) ATTENTION_CHECK_TIMEOUT_SECONDS = 60 # 60 # Waiting page threshold in seconds WAITING_PAGE_THRESHOLD_SECONDS = 4 * 60 # 4 minutes for lab version QUIZ_ERROR_MSG = "There is at least one mistake in your responses. Please try again" QUIZ = dict( q1=dict( label=Markup("Question 1: Example: Your team's estimates for 6 students were off by 15 points in total. How much bonus money would you earn?"), choices=["2.0", "1.5"], correct="2.0" ), q2=dict( label=Markup("Question 2: Example: You enter 46 and your partner enters 50 as the team's estimate. What will your team's final estimate be, which will be used for bonus calculation?"), choices=["46", "48"], correct="48" ) ) SINGLE_TREATMENTS = frozenset({'S1'}) # Third-party estimate UI: AI wording and label (T2, S1) AI_THIRD_PARTY_LABEL_TREATMENTS = frozenset({'T2', 'S1'}) # "Previous Participant's Estimate" label in task UI (T2E) PREVIOUS_PARTICIPANT_LABEL_TREATMENTS = frozenset({'T2E'}) def treatment_is_single(treatment: str) -> bool: return treatment in SINGLE_TREATMENTS def treatment_has_ai(treatment: str) -> bool: return treatment != 'CG' def third_party_estimate_label(treatment: str) -> str: """Label for third-party estimate in task_group.""" return ( "Previous Participant's Estimate" if treatment in PREVIOUS_PARTICIPANT_LABEL_TREATMENTS else "AI's Estimate" ) def survey_third_party_is_ai(treatment: str) -> bool: """True when third-party framing is AI (T2, S1); False for T2E.""" return (treatment or "") in AI_THIRD_PARTY_LABEL_TREATMENTS def third_party_popover_body(treatment: str) -> Markup: """Bootstrap popover HTML for the third-party estimate box (AI vs previous-participant arms).""" t = treatment or "" if t in AI_THIRD_PARTY_LABEL_TREATMENTS: return Markup( "

This prediction is generated by an algorithm trained on past student records.

" ) if t in PREVIOUS_PARTICIPANT_LABEL_TREATMENTS: return Markup( "

This estimate was made by a participant in a pre-study.

" ) return Markup("") class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): prolific_id = models.StringField(default=str(" ")) consent = models.BooleanField( label="I declare being of age and accept of free will, after having read and fully understood the above paragraphs, to participate in the study.", choices=[(True, 'Yes'), (False, 'No')], widget=widgets.RadioSelect(), # initial=True ) gender = models.IntegerField( label="1. Please indicate your gender:", choices=[ (0, 'Female'), (1, 'Male'), (2, 'I prefer not to disclose') ], widget=widgets.RadioSelect ) education = models.IntegerField( label="2. What is the highest degree or level of school you have completed? " "If currently enrolled, highest degree received.", choices=[ (1, 'High School'), (2, 'Higher Secondary School'), (3, 'Undergraduate'), (4, 'Postgraduate'), (5, 'Doctorate'), ], widget=widgets.RadioSelect ) ap_familiarity = models.IntegerField( label=Markup("3. Rate your familiarity with AP tests"), choices=list(range(1, 11)), widget=widgets.RadioSelectHorizontal ) psat_familiarity = models.IntegerField( label=Markup("4. Rate your familiarity with PSAT tests"), choices=list(range(1, 11)), widget=widgets.RadioSelectHorizontal ) for k, v in C.QUIZ.items(): locals()[k] = models.StringField( label=v['label'], choices=v['choices'], widget=widgets.RadioSelect ) locals()[f'{k}_num_errors'] = models.IntegerField(initial=0) del k, v # FUNCTIONS def read_csv(file_name: str, file_path: str) -> list[dict]: import csv f = open(f'{file_path}{file_name}', encoding='utf-8-sig') data = list(csv.DictReader(f)) return data def creating_session(subsession: Subsession): session = subsession.session datas = read_csv(C.DATA_FILE_NAME, C.DATA_FILE_PATH) practice_datas = read_csv(C.DATA_PRACTICE_FILE_NAME, C.DATA_FILE_PATH) # Not shuffle the data, keep original order # random.shuffle(datas) # random.shuffle(practice_datas) # TODO: Modify to 5 rounds # Key is the round number, value is corresponding data session.vars['data'] = {i+1: data for i, data in enumerate(datas)} # 保留前3轮的practice data session.vars['practice_data'] = {i+1: data for i, data in enumerate(practice_datas)} session.vars['interface_data'] = [d for d in practice_datas if int(d['random3']) == C.INTERFACE_DATA_RANDOM3][0] task_order = [int(d['random3']) for d in datas] session.vars['task_order'] = json.dumps(task_order) practice_order = [int(d['random3']) for d in practice_datas] session.vars['practice_order'] = json.dumps(practice_order) for p in subsession.get_players(): p.participant.vars['is_dropout'] = False p.participant.vars['is_teammate_dropout'] = False p.participant.vars['warning_count'] = 0 p.participant.vars['last_check_time'] = 0 p.participant.vars['waiting_too_long'] = False if p.session.config.get('debug', False): p.participant.vars['consent'] = True def has_error(player: Player, values: dict) -> bool: error = False for k, v in values.items(): if k in C.QUIZ and v != C.QUIZ[k]['correct']: error = True setattr(player, f'{k}_num_errors', getattr(player, f'{k}_num_errors') + 1) return error def get_player_treatment(player: Player) -> str: """Get treatment from participant (assigned in AssignTreatment wait page) or session config.""" return player.participant.vars.get('treatment') or player.session.config.get('treatment', '') def _assign_treatment(player: Player, treatment: str): """Write treatment to participant fields.""" player.participant.treatment = treatment player.participant.vars['treatment'] = treatment player.participant.vars['is_single'] = treatment_is_single(treatment) def group_by_arrival_time_method(subsession: Subsession, waiting_players: list[Player]): """Assign treatment as participants arrive: first SINGLE_QUOTA get S1, then team treatments rotate.""" if len(waiting_players) == 0: return None session = subsession.session # Initialise counters once per session if 'single_count' not in session.vars: session.vars['single_count'] = 0 session.vars['team_count'] = 0 cfg_treatment = session.config.get('treatment') # Fixed-treatment session (e.g. "AI and team (CG)"): bypass quota logic if cfg_treatment: player = waiting_players[0] _assign_treatment(player, str(cfg_treatment)) return [player] # Phase 1: assign S1 until quota is reached if session.vars['single_count'] < C.SINGLE_QUOTA: player = waiting_players[0] _assign_treatment(player, C.SINGLE_TREATMENT) session.vars['single_count'] += 1 return [player] # Phase 2: team treatments in rotating pairs team_idx = session.vars['team_count'] // 2 current_treatment = C.TEAM_TREATMENTS_LIST[team_idx % len(C.TEAM_TREATMENTS_LIST)] player = waiting_players[0] _assign_treatment(player, current_treatment) session.vars['team_count'] += 1 return [player] def is_debug(player: Player) -> bool: return player.session.config.get('debug', False) def is_not_debug(player: Player) -> bool: return not is_debug(player) # Custom page that always past timeout class CustomTimeoutPage(Page): def _is_past_timeout(self): return True class AssignTreatment(WaitPage): group_by_arrival_time = True class Consent(CustomTimeoutPage): form_model = 'player' form_fields = ['consent'] # is_displayed = is_not_debug @staticmethod def vars_for_template(player: Player): treatment = get_player_treatment(player) return dict(is_single=treatment_is_single(treatment)) @staticmethod def before_next_page(player: Player, timeout_happened): player.participant.vars['consent'] = player.consent print(player.participant.vars['consent']) print(player.participant.consent) @staticmethod def app_after_this_page(player: Player, upcoming_apps): if not player.consent and is_not_debug(player): return 'end' class SurveyI(CustomTimeoutPage): form_model = 'player' form_fields = ['gender', 'education', 'ap_familiarity', 'psat_familiarity'] # is_displayed = is_not_debug @staticmethod def before_next_page(self, timeout_happened): self.prolific_id = self.participant.label class GeneralInstructionsI(CustomTimeoutPage): form_model = 'player' # is_displayed = is_not_debug @staticmethod def get_form_fields(player: Player): treatment = get_player_treatment(player) if treatment_is_single(treatment): return ['q1'] return ['q1', 'q2'] @staticmethod def vars_for_template(player: Player): treatment = get_player_treatment(player) return dict(is_single=treatment_is_single(treatment)) @staticmethod def error_message(player: Player, values): if has_error(player, values): return C.QUIZ_ERROR_MSG class GeneralInstructionsII(CustomTimeoutPage): pass # is_displayed = is_not_debug class GeneralInstructionsIII(CustomTimeoutPage): pass # is_displayed = is_not_debug page_sequence = [ AssignTreatment, # Consent, GeneralInstructionsI, GeneralInstructionsII, GeneralInstructionsIII ]