from otree.api import * import pandas as pd import os import random import math import time import json doc = """ Your app description """ _MATH_QUESTION_SETS = {} _WORD_BANKS = {} def load_math_questions(period=1): global _MATH_QUESTION_SETS if period not in _MATH_QUESTION_SETS: sheet_map = { 1: 'question period 1', 2: 'question period 2', 3: 'question period 3', } path = os.path.join(os.path.dirname(__file__), 'math_questions_hard3.xlsx') df = pd.read_excel(path, sheet_name=sheet_map[period], header=None) questions = [] for _, row in df.iterrows(): q_num = int(row[0]) q_text = str(row[1]).strip() q_answer = row[2] if isinstance(q_answer, float) and q_answer == int(q_answer): q_answer = int(q_answer) questions.append(dict( num=q_num, text=q_text, answer=str(q_answer).strip(), )) _MATH_QUESTION_SETS[period] = questions return _MATH_QUESTION_SETS[period] def load_word_bank(task_key): global _WORD_BANKS if task_key not in _WORD_BANKS: config = C.WORD_TASKS[task_key] path = os.path.join(os.path.dirname(__file__), config['word_file']) with open(path, 'r', encoding='utf-8') as f: _WORD_BANKS[task_key] = {line.strip().lower() for line in f if line.strip()} return _WORD_BANKS[task_key] class C(BaseConstants): NAME_IN_URL = 'v1' PLAYERS_PER_GROUP = 6 NUM_ROUNDS = 1 TASK_TIMEOUT = 180 QUESTIONS_PER_PAGE = 20 SCORE_BY_LENGTH = { 5: 1, 6: 2, 7: 3, 8: 5, # 8+ handled below } WORD_TASKS = { 'easy_1': dict( word_file='b1_easy.txt', board_image='b1_easy.jpeg', ), 'easy_2': dict( word_file='b2_easy.txt', board_image='b2_easy.jpeg', ), 'easy_3': dict( word_file='b3_easy.txt', board_image='b3_easy.jpg', ), } MATH_ADVICE = { 1: "If you doubt you will have time to finish the page you are working on, scan the page for easy problems you can solve quickly to complete more problems.", 2: "Multiply using decomposition (break the problem into smaller pieces). Example: 14 x 7 = 10 x 7 + 4 x 7", 3: "Try to enjoy working the problems and you will be more productive.", 4: "The solution to question nine (754-682=?) of the upcoming task is 72.", 5: "Exploit hard-wired operations you already know. Example: 4 x 127 is close to 4 x 125 = 500, so just add the remaining 4 x 2 = 8 to arrive at the correct answer of 508.", 6: "Use the input boxes provided to store intermediate answers so you don’t need to keep everything in your head.", 7: "Any number multiplied by 1 is itself.", 8: "The solution to question seventeen (128/8=?) of the upcoming task is 16.", } VERBAL_ADVICE = { 1: "Type fast and don’t second guess. A word that feels valid may be valid. There is no cost for an incorrect submission beyond a moment of time.", 2: "Look for letters that form common prefixes (UN-, RE-, OUT-, OVER-, PRE-).", 3: "Look for longer words first. Points scale with word length, so grab the big-point words in the beginning and find the easier, shorter words when time is running out at the end.", 4: "Try the word 'ALERTNESS' in the upcoming puzzle.", 5: "If there is an ‘S’ in the puzzle, make plural forms of the words you find.", 6: "Look for letters that form common endings (-ING, -TION, -ED, -LY, -NESS).", 7: "Do your best and have fun!", 8: "Try the word 'ALTERNATE' in the upcoming puzzle.", } SHARING_COSTS = [0, 10, 25, 50] # cumulative cost in pennies: 1st free, 2nd $0.10, etc. # COMP CHECK ANSWERS B_Instruction_cqa = 'Earn $0.20 for each point earned' p3_pay_cqa_emp = 'Earn $0.20 for each point earned on a word-creation task' p3_pay_cqa_man = ('Earn $0.15 for each correct answer solving math problems in addition to earnings tied to the' ' performance of group employees and overall Business Unit performance') p3_senior_cqa = 'Earns both of the above' share_cost1a = '$0.25' share_cost2a = '$0.25' share_cost3a = 'Nothing' share_cost4a = '$1.00' share_cost5a = '$0.20' def score_word(word: str) -> int: """Return points for a *valid* word based on its length.""" n = len(word) if n >= 8: return C.SCORE_BY_LENGTH[8] return C.SCORE_BY_LENGTH.get(n, 0) def normalize_word(raw: str) -> str: return raw.strip().lower() class Subsession(BaseSubsession): load_math_questions() class Group(BaseGroup): promoted_subA_gid = models.IntegerField(initial=0) promoted_subB_gid = models.IntegerField(initial=0) promoted_subA_reason = models.StringField(initial="") promoted_subB_reason = models.StringField(initial="") class Player(BasePlayer): # COMPENSATION & PERFORMANCE VARS pennies_earned = models.IntegerField(initial=0) part2_accepted_words = models.LongStringField(initial="") part2_score = models.IntegerField(initial=0) math_p3_score1 = models.IntegerField(initial=0) math_p3_score2 = models.IntegerField(initial=0) p3_bee1_score = models.IntegerField(initial=0) p3_bee1_accepted_words = models.LongStringField(initial="") p3_bee2_score = models.IntegerField(initial=0) p3_bee2_accepted_words = models.LongStringField(initial="") # LOGIC VARS w_score = models.IntegerField(initial=0) g_id = models.IntegerField(initial=0) treatment = models.StringField(initial="") subgroup_rank = models.IntegerField(initial=0) subgroup_label = models.StringField(initial="") is_promoted = models.BooleanField(initial=False) senior = models.BooleanField(initial=False) B_Instruction_cq = models.StringField( choices=['Earn $3.00 regardless of performance on the task', 'Earn $0.20 for each point earned', 'Earn $0.50 for each valid word but lose $0.10 for each invalid word'], widget=widgets.RadioSelect, label='Which of the following correctly explains your payment for Part 2?', initial='filler') p3_pay_cq = models.StringField( choices=['Earn $0.15 for each correct answer solving math problems in addition to earnings tied to the' ' performance of group employees and overall Business Unit performance', 'Earn $0.20 for each point' ' earned on a word-creation task', 'Earn $3.00 regardless of my performance'], widget=widgets.RadioSelect, label='Which of the following correctly explains your pay during Part 3 based on your current role?', initial='filler') p3_senior_cq = models.StringField( choices=['Earns a higher rate of $0.40 for each math problem solved, but the problems are much more difficult', 'Earns a percentage of the total pay earned by all participants in Business Unit A', 'Earns both of' ' the above'], widget=widgets.RadioSelect, label='The top performing manager, accounting for their group’s performance will be promoted to the role of' ' senior manager, who:', initial='filler') share_cost1 = models.StringField( choices=['$0.25', '$0.20', '$0.15'], widget=widgets.RadioSelect, label='How much would it cost to share 3 pieces of advice with the other manager and 1 piece of advice with the' ' employees in your group?', initial='filler') share_cost2 = models.StringField( choices=['$0.25', '$0.20', '$0.15'], widget=widgets.RadioSelect, label='How much would it cost to share 1 piece of advice with the other manager and 3 pieces of advice with the' ' employees in your group?', initial='filler') share_cost3 = models.StringField( choices=['$0.10', 'Nothing', '$0.20'], widget=widgets.RadioSelect, label='How much would it cost to share 1 piece of advice with the other manager and 1 piece of advice with the' ' employees in your group?', initial='filler') share_cost4 = models.StringField( choices=['$0.50', '$1.00', 'You cannot share this much'], widget=widgets.RadioSelect, label='How much would it cost to share 4 pieces of advice with the other manager and 4 pieces of advice with' ' the employees in your group?', initial='filler') share_cost5 = models.StringField( choices=['$0.15', '$0.25', '$0.20'], widget=widgets.RadioSelect, label='How much would it cost to view 4 pieces of advice shared with you?', initial='filler') pages_completed = models.IntegerField(initial=0) task_timed_out = models.BooleanField(initial=False) all_answers_json = models.LongStringField(blank=True, initial="") # SHARING VARS sharing_m2m_math = models.StringField(blank=True, initial="") sharing_m2m_math_count_manager_task = models.IntegerField(initial=0) sharing_m2m_verbal = models.StringField(blank=True, initial="") sharing_m2m_verbal_count_emp_task = models.IntegerField(initial=0) sharing_m2m_combined_order = models.StringField(blank=True, initial="") sharing_m2m_count = models.IntegerField(initial=0) sharing_m2e_verbal = models.StringField(blank=True, initial="") sharing_m2e_count = models.IntegerField(initial=0) sharing_cost = models.IntegerField(initial=0) # RECEIVING VARS m2m_advice_available_count = models.IntegerField(initial=0) # how many were shared TO this player m2m_advice_accepted_count = models.IntegerField(blank=True, label='', initial=None) # how many this player chose to view m2m_received_advice = models.StringField(blank=True, initial="") m2e_advice_available_count = models.IntegerField(initial=0) m2e_received_advice = models.StringField(blank=True, initial="") m2e_advice_accepted_count = models.IntegerField(blank=True, label='', initial=None) accept_cost = models.IntegerField(initial=0) # FUNCTIONS def B_Instruction_cq_error_message(player, B_Instruction_cq): if B_Instruction_cq != C.B_Instruction_cqa: return 'Incorrect. Answer correctly to continue.' def math_task_get_timeout(player, start_key): start = player.participant.vars.get(start_key, time.time()) elapsed = time.time() - start remaining = C.TASK_TIMEOUT - elapsed return max(3, int(remaining) + 2) def math_task_vars_for_template(player, period): questions = load_math_questions(period) return dict( questions_json=json.dumps(questions), questions_per_page=C.QUESTIONS_PER_PAGE, timeout=C.TASK_TIMEOUT, ) def math_task_live(player, data, period, start_key, page_key, answers_key, score_key): """ Shared live_method logic for all math task pages. period: which question set (1, 2, or 3) start_key: participant.vars key for start time (e.g., 'math_start_time_p2') page_key: participant.vars key for current page (e.g., 'math_current_page_p2') answers_key: participant.vars key for stored answers (e.g., 'math_answers_p2') score_key: player field name for the score (e.g., 'math_p2_score') """ action = data.get('type') questions = load_math_questions(period) if action == 'load_state': stored = player.participant.vars.get(answers_key, {}) current_page = player.participant.vars.get(page_key, 0) start = player.participant.vars.get(start_key, time.time()) elapsed = time.time() - start remaining = max(0, C.TASK_TIMEOUT - elapsed) start_idx = current_page * C.QUESTIONS_PER_PAGE end_idx = start_idx + C.QUESTIONS_PER_PAGE page_questions = questions[start_idx:end_idx] current_answers = {} for q in page_questions: key = str(q['num']) if key in stored: current_answers[key] = stored[key] return { player.id_in_group: dict( type='state_restored', current_page=current_page, current_answers=current_answers, total_correct=getattr(player, score_key), time_remaining=int(remaining), ) } elif action == 'submit_page': answers = data.get('answers', {}) current_page = data.get('current_page', 0) start = current_page * C.QUESTIONS_PER_PAGE end = start + C.QUESTIONS_PER_PAGE page_questions = questions[start:end] incorrect = [] for q in page_questions: submitted = str(answers.get(str(q['num']), "")).strip() if submitted != q['answer']: incorrect.append(q['num']) stored = player.participant.vars.get(answers_key, {}) stored.update(answers) player.participant.vars[answers_key] = stored player.participant.vars[page_key] = current_page total_correct = sum( 1 for q in questions if str(stored.get(str(q['num']), "")).strip() == q['answer'] ) setattr(player, score_key, total_correct) if incorrect: return { player.id_in_group: dict( type='feedback', success=False, incorrect=incorrect, total_correct=total_correct, message=f"The following question(s) are incorrect: {', '.join(str(n) for n in incorrect)}. Please correct them to proceed." ) } else: next_page = current_page + 1 player.pages_completed = next_page player.participant.vars[page_key] = next_page max_pages = math.ceil(len(questions) / C.QUESTIONS_PER_PAGE) if next_page >= max_pages: player.all_answers_json = json.dumps(stored) return { player.id_in_group: dict( type='feedback', success=True, finished=True, total_correct=total_correct, ) } else: next_start = next_page * C.QUESTIONS_PER_PAGE next_end = next_start + C.QUESTIONS_PER_PAGE next_questions = questions[next_start:next_end] return { player.id_in_group: dict( type='feedback', success=True, finished=False, next_page=next_page, next_questions=next_questions, total_correct=total_correct, ) } elif action == 'timeout': answers = data.get('answers', {}) player.task_timed_out = True stored = player.participant.vars.get(answers_key, {}) stored.update(answers) player.participant.vars[answers_key] = stored total_correct = sum( 1 for q in questions if str(stored.get(str(q['num']), "")).strip() == q['answer'] ) setattr(player, score_key, total_correct) player.all_answers_json = json.dumps(stored) return { player.id_in_group: dict( type='timeout_ack', total_correct=total_correct, ) } elif action == 'save_progress': answers = data.get('answers', {}) current_page = data.get('current_page', 0) stored = player.participant.vars.get(answers_key, {}) stored.update(answers) player.participant.vars[answers_key] = stored player.participant.vars[page_key] = current_page total_correct = sum( 1 for q in questions if str(stored.get(str(q['num']), "")).strip() == q['answer'] ) setattr(player, score_key, total_correct) return {player.id_in_group: dict(type='progress_saved')} def math_task_before_next(player, period, answers_key, score_key, timeout_happened): """Shared before_next_page logic.""" if timeout_happened: player.task_timed_out = True questions = load_math_questions(period) stored = player.participant.vars.get(answers_key, {}) total_correct = sum( 1 for q in questions if str(stored.get(str(q['num']), "")).strip() == q['answer'] ) setattr(player, score_key, total_correct) player.all_answers_json = json.dumps(stored) def word_task_live(player, data, task_key, score_field, words_field): """Shared live_method logic for all spelling bee pages.""" bank = load_word_bank(task_key) current_score = getattr(player, score_field) current_words = getattr(player, words_field) if data.get('type') == 'init': return { player.id_in_group: { "accepted_words": current_words, "score": current_score, } } if data.get('type') == 'submit': raw_word = data["word"] word = normalize_word(raw_word) if word in bank: accepted_list = current_words.split(", ") if current_words else [] if word in accepted_list: return {player.id_in_group: { "ok": "false", "accepted_words": current_words, "score": current_score, }} else: w_score = score_word(word) if current_words == "": setattr(player, words_field, word) else: setattr(player, words_field, f"{current_words}, {word}") new_score = current_score + w_score setattr(player, score_field, new_score) player.w_score = w_score return {player.id_in_group: { "ok": "true", "accepted_words": getattr(player, words_field), "score": new_score, "w_score": w_score, }} else: return {player.id_in_group: { "ok": "false", "accepted_words": current_words, "score": current_score, }} def word_task_vars(player, task_key, score_field, words_field): """Shared vars_for_template logic.""" config = C.WORD_TASKS[task_key] return dict( accepts=getattr(player, words_field), pts=getattr(player, score_field), board_image=config['board_image'], ) # PAGES class B_Instructions(Page): form_model = 'player' form_fields = ['B_Instruction_cq'] @staticmethod def before_next_page(player: Player, timeout_happened): player.g_id = player.participant.vars['g_id'] player.treatment = player.participant.vars['treatment'] player.pennies_earned = player.participant.vars['pennies_earned'] class B(Page): timeout_seconds = C.TASK_TIMEOUT @staticmethod def vars_for_template(player): return word_task_vars(player, 'easy_1', 'part2_score', 'part2_accepted_words') @staticmethod def live_method(player, data): return word_task_live(player, data, 'easy_1', 'part2_score', 'part2_accepted_words') @staticmethod def before_next_page(player, timeout_happened): player.w_score = 0 player.pennies_earned += player.part2_score * 20 class RankingWaitPage(WaitPage): @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() # Split into subgroups based on g_id subgroup_a = [p for p in players if p.g_id in [1, 2, 3]] subgroup_b = [p for p in players if p.g_id in [4, 5, 6]] # Rank within each subgroup by part2_score (descending) for subgroup, label in [(subgroup_a, "A"), (subgroup_b, "B")]: sorted_players = sorted(subgroup, key=lambda p: p.part2_score, reverse=True) for rank, p in enumerate(sorted_players, start=1): p.subgroup_rank = rank p.subgroup_label = label treatment = players[0].treatment if treatment == 'performance': # Best part2_score in each subgroup promo_a = min(subgroup_a, key=lambda p: p.subgroup_rank) promo_b = min(subgroup_b, key=lambda p: p.subgroup_rank) group.promoted_subA_gid = promo_a.g_id group.promoted_subB_gid = promo_b.g_id group.promoted_subA_reason = "Strong Performance on the Part 2 Task" group.promoted_subB_reason = "Strong Performance on the Part 2 Task" elif treatment == 'potential': # g_id 1 and 4 are promoted (top performers on second Part 1 task: math) group.promoted_subA_gid = 1 group.promoted_subB_gid = 4 group.promoted_subA_reason = "Assessed Potential for the upcoming Part 3 Task" group.promoted_subB_reason = "Assessed Potential for the upcoming Part 3 Task" elif treatment == 'mixed': # Subgroup A: promote by performance; Subgroup B: promote by potential promo_a = min(subgroup_a, key=lambda p: p.subgroup_rank) group.promoted_subA_gid = promo_a.g_id group.promoted_subB_gid = 4 group.promoted_subA_reason = "Strong Performance on the Part 2 Task" group.promoted_subB_reason = "Assessed Potential for the upcoming Part 3 Task" if random.choice([True, False]): group.promoted_subA_gid = 1 promo_b = min(subgroup_b, key=lambda p: p.subgroup_rank) group.promoted_subB_gid = promo_b.g_id group.promoted_subA_reason = "Assessed Potential for the upcoming Part 3 Task" group.promoted_subB_reason = "Strong Performance on the Part 2 Task" # Mark promoted players for p in players: if p.g_id == group.promoted_subA_gid or p.g_id == group.promoted_subB_gid: p.is_promoted = True class Results(Page): @staticmethod def vars_for_template(player: Player): # Gather subgroup members' scores for display group_players = player.group.get_players() subgroup_members = [ p for p in group_players if p.subgroup_label == player.subgroup_label ] subgroup_sorted = sorted(subgroup_members, key=lambda p: p.subgroup_rank) subgroup_data = [ dict( g_id=p.g_id, score=p.part2_score, rank=p.subgroup_rank, is_you=(p == player), ) for p in subgroup_sorted ] return dict( g_id=player.g_id, subgroup_label=player.subgroup_label, my_rank=player.subgroup_rank, my_score=player.part2_score, subgroup_data=subgroup_data, ) class Promotion(Page): @staticmethod def vars_for_template(player: Player): group = player.group promotion_table = [ dict( promoted_gid=group.promoted_subA_gid, reason=group.promoted_subA_reason, is_you=(player.g_id == group.promoted_subA_gid), ), dict( promoted_gid=group.promoted_subB_gid, reason=group.promoted_subB_reason, is_you=(player.g_id == group.promoted_subB_gid), ), ] return dict( promotion_table=promotion_table, ) class p3_instruct(Page): form_model = 'player' form_fields = ['p3_pay_cq', 'p3_senior_cq'] @staticmethod def error_message(player, values): if player.is_promoted: if values['p3_senior_cq'] != C.p3_senior_cqa or values['p3_pay_cq'] != C.p3_pay_cqa_man: return 'One or more answers are incorrect. Answer correctly to continue.' elif not player.is_promoted: if values['p3_senior_cq'] != C.p3_senior_cqa or values['p3_pay_cq'] != C.p3_pay_cqa_emp: return 'One or more answers are incorrect. Answer correctly to continue.' class p3_info_sharing(Page): form_model = 'player' form_fields = ['share_cost1', 'share_cost2', 'share_cost3', 'share_cost4', 'share_cost5'] @staticmethod def error_message(player, values): if (values['share_cost1'] != C.share_cost1a or values['share_cost2'] != C.share_cost2a or values['share_cost3'] != C.share_cost3a or values['share_cost4'] != C.share_cost4a or values['share_cost5'] != C.share_cost5a): return 'One or more answers are incorrect. Answer correctly to continue.' class m2m_sharing(Page): form_model = 'player' form_fields = ['sharing_m2m_math', 'sharing_m2m_verbal', 'sharing_m2m_combined_order'] @staticmethod def is_displayed(player: Player): return player.is_promoted @staticmethod def vars_for_template(player: Player): group = player.group if player.g_id in [1, 2, 3]: math_advice_items = [ dict(id="m" + str(key), text=value) for key, value in C.MATH_ADVICE.items() if key <= 4 ] else: math_advice_items = [ dict(id="m" + str(key), text=value) for key, value in C.MATH_ADVICE.items() if key >= 5 ] verbal_advice_items = [ dict(id="v" + str(key), text=value) for key, value in C.VERBAL_ADVICE.items() ] promotion_table = [ dict( promoted_gid=group.promoted_subA_gid, reason=group.promoted_subA_reason, is_you=(player.g_id == group.promoted_subA_gid), ), dict( promoted_gid=group.promoted_subB_gid, reason=group.promoted_subB_reason, is_you=(player.g_id == group.promoted_subB_gid), ), ] return dict( math_advice_items=math_advice_items, verbal_advice_items=verbal_advice_items, promotion_table=promotion_table, ) @staticmethod def error_message(player: Player, values): math_ids = [x for x in values['sharing_m2m_math'].split(",") if x.strip()] verbal_ids = [x for x in values['sharing_m2m_verbal'].split(",") if x.strip()] total = len(math_ids) + len(verbal_ids) if total > 4: return "You may share at most 4 pieces of advice with the other manager." @staticmethod def before_next_page(player: Player, timeout_happened): math_ids = [x for x in player.sharing_m2m_math.split(",") if x.strip()] verbal_ids = [x for x in player.sharing_m2m_verbal.split(",") if x.strip()] combined = [x for x in player.sharing_m2m_combined_order.split(",") if x.strip()] player.sharing_m2m_math_count_manager_task = len(math_ids) player.sharing_m2m_verbal_count_emp_task = len(verbal_ids) total_shared = len(math_ids) + len(verbal_ids) player.sharing_m2m_count = total_shared if len(combined) > 0: player.sharing_cost += C.SHARING_COSTS[total_shared-1] player.pennies_earned -= player.sharing_cost class m2m_sharing_wait(WaitPage): title_text = "Wait Page" body_text = "Please wait while the other manager decides which advice to share." @staticmethod def is_displayed(player: Player): return player.is_promoted class m2m_receiving(Page): form_model = 'player' form_fields = ['m2m_advice_accepted_count'] @staticmethod def is_displayed(player: Player): return player.is_promoted @staticmethod def vars_for_template(player: Player): group = player.group # Find the OTHER promoted player in the group other_manager = None for p in group.get_players(): if p.is_promoted and p.id_in_group != player.id_in_group: other_manager = p break available_count = 0 if other_manager: combined = [x for x in other_manager.sharing_m2m_combined_order.split(",") if x.strip()] available_count = len(combined) player.m2m_advice_available_count = available_count return dict( available_count=available_count, ) @staticmethod def get_form_fields(player: Player): # Find the other manager group = player.group other_manager = None for p in group.get_players(): if p.is_promoted and p.id_in_group != player.id_in_group: other_manager = p break if other_manager: combined = [x.strip() for x in other_manager.sharing_m2m_combined_order.split(",") if x.strip()] if len(combined) > 0: return ['m2m_advice_accepted_count'] return [] # no form fields when nothing was shared @staticmethod def error_message(player: Player, values): if 'm2m_advice_accepted_count' not in values: return # no validation needed count = values['m2m_advice_accepted_count'] if count is None: return "Please enter a number." if count < 0: return "You cannot accept a negative number of items." if count > player.m2m_advice_available_count: return f"The other manager only shared {player.m2m_advice_available_count} item(s)." @staticmethod def before_next_page(player: Player, timeout_happened): group = player.group accepted = player.field_maybe_none('m2m_advice_accepted_count') if accepted is None: accepted = 0 # Calculate receiving cost cost = 0 for i in range(accepted): cost += 5 player.accept_cost = cost player.pennies_earned -= cost # Retrieve the actual advice from the other manager other_manager = None for p in group.get_players(): if p.is_promoted and p.id_in_group != player.id_in_group: other_manager = p break if other_manager and accepted > 0: combined = [x for x in other_manager.sharing_m2m_combined_order.split(",") if x.strip()] revealed = combined[:accepted] # rank order preserved! player.m2m_received_advice = ",".join(revealed) class m2m_advice(Page): @staticmethod def is_displayed(player: Player): return (player.is_promoted and player.field_maybe_none('m2m_advice_accepted_count') is not None and player.m2m_advice_accepted_count > 0) @staticmethod def vars_for_template(player: Player): revealed_ids = [x.strip() for x in player.m2m_received_advice.split(",") if x.strip()] # Build lookup with prefixed keys to match what was stored all_advice = {} for key, value in C.MATH_ADVICE.items(): all_advice["m" + str(key)] = value for key, value in C.VERBAL_ADVICE.items(): all_advice["v" + str(key)] = value revealed_items = [] for idx, aid in enumerate(revealed_ids): category = "Math" if aid.startswith("m") else "Verbal" revealed_items.append(dict( rank=idx + 1, id=aid, category=category, category_css=category.lower(), text=all_advice.get(aid, "Unknown advice"), )) return dict(revealed_items=revealed_items) class m2e_sharing(Page): form_model = 'player' form_fields = ['sharing_m2e_verbal'] @staticmethod def is_displayed(player: Player): return player.is_promoted @staticmethod def vars_for_template(player: Player): verbal_advice_items = [ dict(id="v" + str(key), text=value) for key, value in C.VERBAL_ADVICE.items() ] return dict( verbal_advice_items=verbal_advice_items, ) @staticmethod def error_message(player: Player, values): verbal_ids = [x for x in values['sharing_m2e_verbal'].split(",") if x.strip()] if len(verbal_ids) > 4: return "You may share at most 4 pieces of advice with your employees." @staticmethod def before_next_page(player: Player, timeout_happened): verbal_ids = [x for x in player.sharing_m2e_verbal.split(",") if x.strip()] player.sharing_m2e_count = len(verbal_ids) count = len(verbal_ids) if count > 0: player.sharing_cost += C.SHARING_COSTS[count - 1] player.pennies_earned -= player.sharing_cost class m2e_sharing_wait(WaitPage): title_text = "Wait Page" body_text = "Please wait while managers decide what advice to share." class m2e_receiving(Page): form_model = 'player' form_fields = ['m2e_advice_accepted_count'] @staticmethod def is_displayed(player: Player): return not player.is_promoted @staticmethod def vars_for_template(player: Player): group = player.group my_manager = None if player.g_id < 4: for p in group.get_players(): if p.is_promoted and p.g_id < 4: my_manager = p break elif player.g_id > 3: for p in group.get_players(): if p.is_promoted and p.g_id > 3: my_manager = p break available_count = 0 if my_manager: combined = [x for x in my_manager.sharing_m2e_verbal.split(",") if x.strip()] available_count = len(combined) player.m2e_advice_available_count = available_count return dict( available_count=available_count, ) @staticmethod def get_form_fields(player: Player): group = player.group my_manager = None if player.g_id < 4: for p in group.get_players(): if p.is_promoted and p.g_id < 4: my_manager = p break elif player.g_id > 3: for p in group.get_players(): if p.is_promoted and p.g_id > 3: my_manager = p break if my_manager: combined = [x.strip() for x in my_manager.sharing_m2e_verbal.split(",") if x.strip()] if len(combined) > 0: return ['m2e_advice_accepted_count'] return [] # no form fields when nothing was shared @staticmethod def error_message(player: Player, values): if 'm2e_advice_accepted_count' not in values: return # no validation needed count = values['m2e_advice_accepted_count'] if count is None: return "Please enter a number." if count < 0: return "You cannot accept a negative number of items." if count > player.m2e_advice_available_count: return f"Your manager only shared {player.m2e_advice_available_count} item(s)." @staticmethod def before_next_page(player: Player, timeout_happened): group = player.group accepted = player.field_maybe_none('m2e_advice_accepted_count') if accepted is None: accepted = 0 # Calculate receiving cost cost = 0 for i in range(accepted): cost += 5 player.accept_cost = cost player.pennies_earned -= cost # Retrieve the actual advice from the other manager my_manager = None if player.g_id < 4: for p in group.get_players(): if p.is_promoted and p.g_id < 4: my_manager = p break elif player.g_id > 3: for p in group.get_players(): if p.is_promoted and p.g_id > 3: my_manager = p break if my_manager and accepted > 0: combined = [x for x in my_manager.sharing_m2e_verbal.split(",") if x.strip()] revealed = combined[:accepted] # rank order preserved! player.m2e_received_advice = ",".join(revealed) class m2e_advice(Page): @staticmethod def is_displayed(player: Player): return (not player.is_promoted and player.field_maybe_none('m2e_advice_accepted_count') is not None and player.m2e_advice_accepted_count > 0) @staticmethod def vars_for_template(player: Player): revealed_ids = [x.strip() for x in player.m2e_received_advice.split(",") if x.strip()] # Build lookup with prefixed keys to match what was stored all_advice = {} for key, value in C.VERBAL_ADVICE.items(): all_advice["v" + str(key)] = value revealed_items = [] for idx, aid in enumerate(revealed_ids): revealed_items.append(dict( rank=idx + 1, id=aid, text=all_advice.get(aid, "Unknown advice"), )) return dict(revealed_items=revealed_items) class p3_task_ready(Page): @staticmethod def before_next_page(player, timeout_happened): player.participant.vars['math_start_time_p2'] = time.time() player.participant.vars['math_current_page_p2'] = 0 player.participant.vars['math_answers_p2'] = {} class M(Page): timeout_seconds = C.TASK_TIMEOUT timer_text = 'Remaining time to work on the task:' @staticmethod def is_displayed(player: Player): return player.is_promoted @staticmethod def get_timeout_seconds(player): return math_task_get_timeout(player, 'math_start_time_p2') @staticmethod def vars_for_template(player): return math_task_vars_for_template(player, period=1) @staticmethod def live_method(player, data): return math_task_live( player, data, period=1, start_key='math_start_time_p2', page_key='math_current_page_p2', answers_key='math_answers_p2', score_key='math_p3_score1', ) @staticmethod def before_next_page(player, timeout_happened): player.pennies_earned += player.math_p3_score1 * 15 math_task_before_next( player, period=1, answers_key='math_answers_p2', score_key='math_p3_score1', timeout_happened=timeout_happened, ) class B2(Page): timeout_seconds = C.TASK_TIMEOUT @staticmethod def is_displayed(player): return not player.is_promoted @staticmethod def vars_for_template(player): return word_task_vars(player, 'easy_2', 'p3_bee1_score', 'p3_bee1_accepted_words') @staticmethod def live_method(player, data): return word_task_live(player, data, 'easy_2', 'p3_bee1_score', 'p3_bee1_accepted_words') @staticmethod def before_next_page(player, timeout_happened): player.w_score = 0 player.pennies_earned += player.p3_bee1_score * 20 class senior_wait(WaitPage): body_text = "Please wait while the promotion to senior manager is decided." @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() # Split into subgroups subgroup_a = [p for p in players if p.g_id in [1, 2, 3]] manager_a_bonus = 0 subgroup_b = [p for p in players if p.g_id in [4, 5, 6]] manager_b_bonus = 0 # Calculate subgroup A total earnings subgroup_a_earnings = 0 for p in subgroup_a: if p.is_promoted: subgroup_a_earnings += p.math_p3_score1 * 15 else: subgroup_a_earnings += p.p3_bee1_score * 20 manager_a_bonus += p.p3_bee1_score * 4 # Calculate subgroup B total earnings subgroup_b_earnings = 0 for p in subgroup_b: if p.is_promoted: subgroup_b_earnings += p.math_p3_score1 * 15 else: subgroup_b_earnings += p.p3_bee1_score * 20 manager_b_bonus += p.p3_bee1_score * 4 BU_earnings = subgroup_a_earnings + subgroup_b_earnings manager_a_bonus += round(BU_earnings * 0.02) manager_b_bonus += round(BU_earnings * 0.02) # Find each subgroup's manager manager_a = None manager_b = None for p in subgroup_a: if p.is_promoted: manager_a = p manager_a.pennies_earned += manager_a_bonus for p in subgroup_b: if p.is_promoted: manager_b = p manager_b.pennies_earned += manager_b_bonus # Determine which manager becomes senior # In case of a tie, you can pick either — here we default to subgroup A if subgroup_a_earnings >= subgroup_b_earnings: if manager_a: manager_a.senior = True else: if manager_b: manager_b.senior = True class p3_feedback(Page): @staticmethod def vars_for_template(player: Player): if player.is_promoted: result = f'{player.math_p3_score1} problems solved' if player.senior: message = ('Based on your performance in the past period, accounting for your group’s performance, you' ' have been promoted to Senior Manager. You will earn $0.40 per math problem solved in this' ' final 3-minute period. You will also earn 3% of the total pay of all the participants in' ' Business Unit A.') else: message = ("You were not promoted to Senior Manager. You will remain as a manager with the same task" " and pay structure in this final 3-minute period.") else: result = f'{player.p3_bee1_score} points' message = ('You will remain as an employee with the same task and pay structure in this final 3-minute' ' period.') return dict( total_correct=result, message=message, ) @staticmethod def before_next_page(player, timeout_happened): player.participant.vars['math_start_time_p3b'] = time.time() player.participant.vars['math_current_page_p3b'] = 0 player.participant.vars['math_answers_p3b'] = {} class M2(Page): timeout_seconds = C.TASK_TIMEOUT timer_text = 'Remaining time to work on the task:' @staticmethod def is_displayed(player): return player.is_promoted and not player.senior @staticmethod def get_timeout_seconds(player): return math_task_get_timeout(player, 'math_start_time_p3b') @staticmethod def vars_for_template(player): return math_task_vars_for_template(player, period=2) @staticmethod def live_method(player, data): return math_task_live( player, data, period=2, start_key='math_start_time_p3b', page_key='math_current_page_p3b', answers_key='math_answers_p3b', score_key='math_p3_score2', ) @staticmethod def before_next_page(player, timeout_happened): player.pennies_earned += player.math_p3_score2 * 15 math_task_before_next( player, period=2, answers_key='math_answers_p3b', score_key='math_p3_score2', timeout_happened=timeout_happened, ) class M2_senior(Page): timeout_seconds = C.TASK_TIMEOUT timer_text = 'Remaining time to work on the task:' @staticmethod def is_displayed(player): return player.senior @staticmethod def get_timeout_seconds(player): return math_task_get_timeout(player, 'math_start_time_p3b') @staticmethod def vars_for_template(player): return math_task_vars_for_template(player, period=3) @staticmethod def live_method(player, data): return math_task_live( player, data, period=3, start_key='math_start_time_p3b', page_key='math_current_page_p3b', answers_key='math_answers_p3b', score_key='math_p3_score2', ) @staticmethod def before_next_page(player, timeout_happened): player.pennies_earned += player.math_p3_score2 * 40 math_task_before_next( player, period=3, answers_key='math_answers_p3b', score_key='math_p3_score2', timeout_happened=timeout_happened, ) class B3(Page): timeout_seconds = C.TASK_TIMEOUT @staticmethod def is_displayed(player: Player): return not player.is_promoted @staticmethod def vars_for_template(player): return word_task_vars(player, 'easy_3', 'p3_bee2_score', 'p3_bee2_accepted_words') @staticmethod def live_method(player, data): return word_task_live(player, data, 'easy_3', 'p3_bee2_score', 'p3_bee2_accepted_words') @staticmethod def before_next_page(player, timeout_happened): player.w_score = 0 player.pennies_earned += player.p3_bee2_score * 20 class calc_pay(WaitPage): body_text = "Please wait while your pay is calculated." @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() # Split into subgroups subgroup_a = [p for p in players if p.g_id in [1, 2, 3]] manager_a_bonus = 0 subgroup_b = [p for p in players if p.g_id in [4, 5, 6]] manager_b_bonus = 0 # Calculate subgroup A total earnings subgroup_a_earnings = 0 for p in subgroup_a: if p.is_promoted and not p.senior: subgroup_a_earnings += p.math_p3_score2 * 15 elif not p.is_promoted: subgroup_a_earnings += p.p3_bee2_score * 20 manager_a_bonus += p.p3_bee2_score * 4 else: subgroup_a_earnings += p.math_p3_score2 * 40 # Calculate subgroup B total earnings subgroup_b_earnings = 0 for p in subgroup_b: if p.is_promoted and not p.senior: subgroup_b_earnings += p.math_p3_score2 * 15 elif not p.is_promoted: subgroup_b_earnings += p.p3_bee2_score * 20 manager_b_bonus += p.p3_bee2_score * 4 else: subgroup_b_earnings += p.math_p3_score2 * 40 BU_earnings = subgroup_a_earnings + subgroup_b_earnings manager_a_bonus += round(BU_earnings * 0.02) manager_b_bonus += round(BU_earnings * 0.02) senior_bonus = round(BU_earnings * 0.03) # Find each subgroup's manager manager_a = None manager_b = None for p in subgroup_a: if p.is_promoted: manager_a = p if p.senior: manager_a.pennies_earned += senior_bonus else: manager_a.pennies_earned += manager_a_bonus for p in subgroup_b: if p.is_promoted: manager_b = p if p.senior: manager_b.pennies_earned += senior_bonus else: manager_b.pennies_earned += manager_b_bonus class p3_feedback2(Page): @staticmethod def vars_for_template(player: Player): if player.is_promoted: result = f'{player.math_p3_score2} problems solved' else: result = f'{player.p3_bee2_score} points' return dict( total_correct=result, ) @staticmethod def before_next_page(player, timeout_happened): player.participant.pennies_earned = player.pennies_earned player.participant.is_promoted = player.is_promoted @staticmethod def app_after_this_page(player, upcoming_apps): return upcoming_apps[1] page_sequence = [B_Instructions, B, RankingWaitPage, Results, Promotion, p3_instruct, p3_info_sharing, m2m_sharing, m2m_sharing_wait, m2m_receiving, m2m_advice, m2e_sharing, m2e_sharing_wait, m2e_receiving, m2e_advice, p3_task_ready, M, B2, senior_wait, p3_feedback, M2, M2_senior, B3, calc_pay, p3_feedback2]