from otree.api import * import os, json from datetime import datetime import random from sqlalchemy import create_engine, text doc = "Online market app" # ===================================================================================== # CONSTANTS # ===================================================================================== class C(BaseConstants): NAME_IN_URL = 'onlinemarket' PLAYERS_PER_GROUP = None NUM_ROUNDS = 9 MEMORY_NUMBERS = {17, 42, 89, 63, 25, 78, 34, 57, 91, 46} # Treatments TREAT_SIB = 'SIB' TREAT_ISB = 'ISB' ITEM_DEALS = { 'A': ['single', 'P3', 'P4', 'P5'], 'B': ['single', 'ALPHA', 'BETA'], 'C': ['single'], 'D': ['single', 'P3', 'P5', 'GAMMA', 'DELTA'], 'E': ['single', 'P4'], } LIST0 = dict( name="List 0 (Comprehension)", shopping=dict(A=9, B=1, C=1, D=0, E=4), ) MARKET0 = dict( name="Market 0 (Comprehension)", endowment=330, percent=50, market=[ dict(sku="A_SINGLE", label="Single A", gives=dict(A=1), price=25), dict(sku="A_P3", label="Pack of 3 A", gives=dict(A=3), price=60), dict(sku="B_SINGLE", label="Single B", gives=dict(B=1), price=20), dict(sku="BETA_1B1C", label="Combo Beta (1B + 1C)", gives=dict(B=1, C=1), price=60), dict(sku="C_SINGLE", label="Single C", gives=dict(C=1), price=50), dict(sku="D_SINGLE", label="Single D", gives=dict(D=1), price=15), dict(sku="DELTA_1D1E", label="Combo Delta (2D + 2E)", gives=dict(D=1, E=1), price=40), dict(sku="E_SINGLE", label="Single E", gives=dict(E=1), price=10), dict(sku="E_P4", label="Pack of 4 E", gives=dict(E=4), price=45), ], ) LIST1 = dict( name="List 1", shopping=dict(A=14, B=4, C=8, D=9, E=6), ) MARKET1 = dict( name="Market 1", endowment=849, promo_threshold=850, promo_threshold_save=40, percent=100, percent_promo=25, market=[ dict(sku="A_SINGLE", label="Single A", gives=dict(A=1), price=27), dict(sku="A_P3", label="Pack of 3 A", gives=dict(A=3), price=70), dict(sku="A_P4", label="Pack of 4 A", gives=dict(A=4), price=102), dict(sku="A_P5", label="Pack of 5 A", gives=dict(A=5), price=121), dict(sku="B_SINGLE", label="Single B", gives=dict(B=1), price=30), dict(sku="C_SINGLE", label="Single C", gives=dict(C=1), price=20), dict(sku="ALPHA_1B1C", label="Combo Alpha (1B + 1C)", gives=dict(B=1, C=1), price=45), dict(sku="BETA_1B2C", label="Combo Beta (1B + 2C)", gives=dict(B=1, C=2), price=74), dict(sku="D_SINGLE", label="Single D", gives=dict(D=1), price=24), dict(sku="D_P3", label="Pack of 3 D", gives=dict(D=3), price=68), dict(sku="D_P5", label="Pack of 5 D", gives=dict(D=5), price=76), dict(sku="E_SINGLE", label="Single E", gives=dict(E=1), price=24), dict(sku="E_P4", label="Pack of 4 E", gives=dict(E=4), price=92), dict(sku="GAMMA_1D3E", label="Combo Gamma (1D + 3E)", gives=dict(D=1, E=3), price=62), dict(sku="DELTA_2D2E", label="Combo Delta (2D + 2E)", gives=dict(D=2, E=2), price=84), ], ) LIST2 = dict( name="List 2", shopping=dict(A=15, B=4, C=6, D=13, E=9), ) MARKET2 = dict( name="Market 2", endowment=999, promo_threshold=1000, promo_threshold_save=40, percent=100, percent_promo=25, market=[ dict(sku="A_SINGLE", label="Single A", gives=dict(A=1), price=27), dict(sku="A_P3", label="Pack of 3 A", gives=dict(A=3), price=55), dict(sku="A_P4", label="Pack of 4 A", gives=dict(A=4), price=68), dict(sku="A_P5", label="Pack of 5 A", gives=dict(A=5), price=87), dict(sku="B_SINGLE", label="Single B", gives=dict(B=1), price=47), dict(sku="C_SINGLE", label="Single C", gives=dict(C=1), price=32), dict(sku="ALPHA_1B1C", label="Combo Alpha (1B + 1C)", gives=dict(B=1, C=1), price=63), dict(sku="BETA_1B2C", label="Combo Beta (1B + 2C)", gives=dict(B=1, C=2), price=94), dict(sku="D_SINGLE", label="Single D", gives=dict(D=1), price=27), dict(sku="D_P3", label="Pack of 3 D", gives=dict(D=3), price=68), dict(sku="D_P5", label="Pack of 5 D", gives=dict(D=5), price=99), dict(sku="E_SINGLE", label="Single E", gives=dict(E=1), price=20), dict(sku="E_P4", label="Pack of 4 E", gives=dict(E=4), price=68), dict(sku="GAMMA_1D3E", label="Combo Gamma (1D + 3E)", gives=dict(D=1, E=3), price=75), dict(sku="DELTA_2D2E", label="Combo Delta (2D + 2E)", gives=dict(D=2, E=2), price=90), ], ) LIST3 = dict( name="List 3", shopping=dict(A=8, B=6, C=6, D=6, E=8), ) MARKET3 = dict( name="Market 3", endowment=799, promo_threshold=800, promo_threshold_save=40, percent=100, percent_promo=25, market=[ dict(sku="A_SINGLE", label="Single A", gives=dict(A=1), price=25), dict(sku="A_P3", label="Pack of 3 A", gives=dict(A=3), price=70), dict(sku="A_P4", label="Pack of 4 A", gives=dict(A=4), price=86), dict(sku="A_P5", label="Pack of 5 A", gives=dict(A=5), price=109), dict(sku="B_SINGLE", label="Single B", gives=dict(B=1), price=24), dict(sku="C_SINGLE", label="Single C", gives=dict(C=1), price=35), dict(sku="ALPHA_1B1C", label="Combo Alpha (1B + 1C)", gives=dict(B=1, C=1), price=55), dict(sku="BETA_1B1C", label="Combo Beta (1B + 2C)", gives=dict(B=1, C=2), price=75), dict(sku="D_SINGLE", label="Single D", gives=dict(D=1), price=20), dict(sku="D_P3", label="Pack of 3 D", gives=dict(D=3), price=67), dict(sku="D_P5", label="Pack of 5 D", gives=dict(D=5), price=83), dict(sku="E_SINGLE", label="Single E", gives=dict(E=1), price=30), dict(sku="E_P4", label="Pack of 4 E", gives=dict(E=4), price=110), dict(sku="GAMMA_1D2E", label="Combo Gamma (1D + 3E)", gives=dict(D=1, E=3), price=99), dict(sku="DELTA_2D2E", label="Combo Delta (2D + 2E)", gives=dict(D=2, E=2), price=97), ], ) LIST4 = dict( name="List 4", shopping=dict(A=12, B=3, C=8, D=3, E=9), ) MARKET4 = dict( name="Market 4", endowment=699, promo_threshold=700, promo_threshold_save=40, percent=100, percent_promo=25, market=[ dict(sku="A_SINGLE", label="Single A", gives=dict(A=1), price=25), dict(sku="A_P3", label="Pack of 3 A", gives=dict(A=3), price=59), dict(sku="A_P4", label="Pack of 4 A", gives=dict(A=4), price=79), dict(sku="A_P5", label="Pack of 5 A", gives=dict(A=5), price=96), dict(sku="B_SINGLE", label="Single B", gives=dict(B=1), price=29), dict(sku="C_SINGLE", label="Single C", gives=dict(C=1), price=20), dict(sku="ALPHA_1B1C", label="Combo Alpha (1B + 1C)", gives=dict(B=1, C=1), price=47), dict(sku="BETA_1B2C", label="Combo Beta (1B + 2C)", gives=dict(B=1, C=2), price=57), dict(sku="D_SINGLE", label="Single D", gives=dict(D=1), price=23), dict(sku="D_P3", label="Pack of 3 D", gives=dict(D=3), price=68), dict(sku="D_P5", label="Pack of 5 D", gives=dict(D=5), price=82), dict(sku="E_SINGLE", label="Single E", gives=dict(E=1), price=28), dict(sku="E_P4", label="Pack of 4 E", gives=dict(E=4), price=76), dict(sku="GAMMA_2D1E", label="Combo Gamma (1D + 3E)", gives=dict(D=1, E=3), price=84), dict(sku="DELTA_1D1E", label="Combo Delta (2D + 2E)", gives=dict(D=2, E=2), price=84), ], ) DECISION_SCHEDULE_SIB = [ (LIST4, MARKET4), (LIST4, MARKET4), (LIST3, MARKET3), (LIST3, MARKET3), (LIST1, MARKET1), (LIST2, MARKET2), (LIST2, MARKET2), (LIST1, MARKET1), ] DECISION_SCHEDULE_ISB = [ (LIST3, MARKET3), (LIST4, MARKET4), (LIST4, MARKET4), (LIST3, MARKET3), (LIST2, MARKET2), (LIST2, MARKET2), (LIST1, MARKET1), (LIST1, MARKET1), ] # ===================================================================================== # TREATMENT (ROBUST) HELPERS # ===================================================================================== def creating_session(subsession): if subsession.round_number == 1: import itertools treatments = itertools.cycle([C.TREAT_SIB, C.TREAT_ISB]) for player in subsession.get_players(): tr = next(treatments) # global storage player.participant.vars['treatment'] = tr # round 1 player.treatment = tr else: for player in subsession.get_players(): # propagate to all rounds player.treatment = player.participant.vars['treatment'] # ===================================================================================== # MARKET HELPERS # ===================================================================================== def get_schedule_for_player(player): tr = player.participant.vars.get('treatment') if tr == C.TREAT_SIB: return C.DECISION_SCHEDULE_SIB elif tr == C.TREAT_ISB: return C.DECISION_SCHEDULE_ISB def decision_list_market_for_round(player): if player.round_number == 1: return C.LIST0, C.MARKET0, 0 idx = player.round_number - 2 schedule = get_schedule_for_player(player) list_dict, market_dict = schedule[idx] return list_dict, market_dict, idx + 1 def parse_deal_from_sku(sku): if sku.endswith('SINGLE'): return 'single' if sku.startswith('ALPHA'): return 'ALPHA' if sku.startswith('BETA'): return 'BETA' if sku.startswith('GAMMA'): return 'GAMMA' if sku.startswith('DELTA'): return 'DELTA' if sku.endswith('P3'): return 'P3' if sku.endswith('P4'): return 'P4' if sku.endswith('P5'): return 'P5' return None def item_from_sku(sku): if sku.startswith('ALPHA') or sku.startswith('BETA'): return 'B' if sku.startswith('GAMMA') or sku.startswith('DELTA'): return 'D' return sku[0] def build_prices_gives_labels(market_dict): prices, gives, labels = {}, {}, {} for d in market_dict['market']: sku = d['sku'] deal = parse_deal_from_sku(sku) if not deal: continue item = item_from_sku(sku) field = f'q{item}_{deal}' prices[field] = d['price'] gives[field] = d['gives'] labels[field] = d.get('label') or field return prices, gives, labels def build_deal_to_sku(market_dict): m = {} for d in market_dict['market']: sku = d['sku'] deal = parse_deal_from_sku(sku) if not deal: continue item = item_from_sku(sku) m[(item, deal)] = sku return m def build_gives_by_sku(market_dict): return {d['sku']: d['gives'] for d in market_dict['market']} def tally_totals_from_values(values, item_deals, deal_to_sku, gives_by_sku): totals = {k: 0 for k in ['A', 'B', 'C', 'D', 'E']} for item in ['A', 'B', 'C', 'D', 'E']: for deal in item_deals[item]: key = f'q{item}_{deal}' q = values.get(key) or 0 sku = deal_to_sku.get((item, deal)) if not sku: continue gives = gives_by_sku[sku] for k, v in gives.items(): totals[k] += q * v return totals def set_list_satisfied_from_values(player, values): list_dict, market_dict, _ = decision_list_market_for_round(player) target = list_dict['shopping'] deal_to_sku = build_deal_to_sku(market_dict) gives_by_sku = build_gives_by_sku(market_dict) totals = tally_totals_from_values( values=values, item_deals=C.ITEM_DEALS, deal_to_sku=deal_to_sku, gives_by_sku=gives_by_sku, ) ok = True for k in ['A', 'B', 'C', 'D', 'E']: if totals[k] < target[k]: ok = False break player.list_satisfied = 1 if ok else 0 return ok # ===================================================================================== # SERVER-TRUTH COST/PAYMENT HELPERS (FIX) # ===================================================================================== def _compute_subtotal_from_player(player, market_dict): prices, _, _ = build_prices_gives_labels(market_dict) subtotal = 0 for field, price in prices.items(): q = getattr(player, field, 0) or 0 subtotal += int(q) * int(price) return int(subtotal) def _compute_costs_and_earnings(player): list_dict, market_dict, _ = decision_list_market_for_round(player) endowment = int(market_dict.get('endowment') or 0) subtotal = _compute_subtotal_from_player(player, market_dict) promo_threshold = market_dict.get('promo_threshold', None) promo_save = int(market_dict.get('promo_threshold_save') or 0) promo_applied = False total = subtotal apply_promo = player.round_number in [3, 5, 7, 9] if promo_threshold is not None and apply_promo: promo_threshold = int(promo_threshold) if subtotal >= promo_threshold and promo_save > 0: promo_applied = True total = subtotal - promo_save endowment_left = endowment - total if promo_threshold is not None and promo_applied: pct = float(market_dict.get('percent_promo') or market_dict.get('percent') or 0) else: pct = float(market_dict.get('percent') or 0) if int(getattr(player, 'list_satisfied', 0) or 0) == 1: earnings = (float(endowment_left) * pct) / 100.0 if earnings < 0: earnings = 0.0 else: earnings = 0.0 player.subtotal_cost = int(subtotal) player.total_cost = int(total) player.endowment_left = int(endowment_left) player.earnings_if_selected = float(earnings) # ===================================================================================== # NUMERACY INCENTIVE HELPER # ===================================================================================== def _numeracy_correct(qnum, value): if value is None: return False try: v = int(value) except Exception: return False if qnum == 1: return v == 55 if qnum == 2: return v == 20 if qnum == 3: return v == 31 if qnum == 4: return v == 10 return False # ===================================================================================== # DB HELPERS (CLICK LOG) # ===================================================================================== _ENGINE = None def _parse_ts(s): if not s: return None try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except Exception: return None def _get_clicklog_engine(): global _ENGINE url = os.environ.get("CLICKLOG_DATABASE_URL") or os.environ.get("DATABASE_URL") print("CLICKLOG/DATABASE_URL =", url) if not url: return None if url.startswith("postgres://"): url = "postgresql://" + url[len("postgres://"):] if isinstance(_ENGINE, dict) and url in _ENGINE: return _ENGINE[url] eng = create_engine(url, pool_pre_ping=True) if _ENGINE is None or not isinstance(_ENGINE, dict): _ENGINE = {} _ENGINE[url] = eng return eng def _ensure_qty_click_log_table(engine): ddl = """ CREATE TABLE IF NOT EXISTS public.qty_click_log ( id bigserial PRIMARY KEY, session_code text, participant_code text, round_number integer, player_id_in_subsession integer, page_name text, field_name text, action text, new_value integer, total_cost integer, clicked_at timestamptz DEFAULT now() ); """ with engine.begin() as conn: conn.execute(text(ddl)) def _ensure_calc_eq_log_table(engine): ddl = """ CREATE TABLE IF NOT EXISTS public.calc_eq_log ( id bigserial PRIMARY KEY, session_code text, participant_code text, round_number integer, player_id_in_subsession integer, page_name text, equation text, result_display text, total_cost integer, clicked_at timestamptz DEFAULT now() ); """ with engine.begin() as conn: conn.execute(text(ddl)) def log_qty_click(player, data): engine = _get_clicklog_engine() if engine is None: print("NO ENGINE -> not writing to Postgres (set CLICKLOG_DATABASE_URL or DATABASE_URL).") return try: _ensure_qty_click_log_table(engine) ins = text(""" INSERT INTO public.qty_click_log (session_code, participant_code, round_number, player_id_in_subsession, page_name, field_name, action, new_value, total_cost, clicked_at) VALUES (:session_code, :participant_code, :round_number, :player_id_in_subsession, :page_name, :field_name, :action, :new_value, :total_cost, COALESCE(:clicked_at, now())); """) payload = dict( session_code=player.session.code if player.session else None, participant_code=player.participant.code if player.participant else None, round_number=player.round_number, player_id_in_subsession=player.id_in_subsession, page_name=data.get("page_name"), field_name=data.get("field_name"), action=data.get("action"), new_value=int(data.get("new_value") or 0), total_cost=int(data.get("total_cost") or 0), clicked_at=_parse_ts(data.get("clicked_at")), ) with engine.begin() as conn: conn.execute(ins, payload) except Exception as e: print("QTY INSERT FAILED:", repr(e)) def log_calc_eq(player, data): engine = _get_clicklog_engine() if engine is None: print("NO ENGINE -> not writing calc to Postgres.") return try: _ensure_calc_eq_log_table(engine) ins = text(""" INSERT INTO public.calc_eq_log (session_code, participant_code, round_number, player_id_in_subsession, page_name, equation, result_display, total_cost, clicked_at) VALUES (:session_code, :participant_code, :round_number, :player_id_in_subsession, :page_name, :equation, :result_display, :total_cost, COALESCE(:clicked_at, now())); """) payload = dict( session_code=player.session.code if player.session else None, participant_code=player.participant.code if player.participant else None, round_number=player.round_number, player_id_in_subsession=player.id_in_subsession, page_name=data.get("page_name"), equation=data.get("equation"), result_display=data.get("result_display"), total_cost=int(data.get("total_cost") or 0), clicked_at=_parse_ts(data.get("clicked_at")), ) with engine.begin() as conn: conn.execute(ins, payload) except Exception as e: print("CALC INSERT FAILED:", repr(e)) # ===================================================================================== # MODELS # ===================================================================================== class Subsession(BaseSubsession): def creating_session(self): if self.round_number == 1: import itertools treatments = itertools.cycle([C.TREAT_SIB, C.TREAT_ISB]) for p in self.get_players(): tr = next(treatments) p.participant.treatment = tr p.treatment = tr engine = _get_clicklog_engine() if engine is None: print("ENGINE IS NONE β†’ SKIPPING TABLE CREATION") else: try: print("Creating qty_click_log...") _ensure_qty_click_log_table(engine) print("Creating calc_eq_log...") _ensure_calc_eq_log_table(engine) print("CLICKLOG TABLES ENSURED OK") except Exception as e: print("CLICKLOG TABLE ENSURE FAILED:", repr(e)) class Group(BaseGroup): pass class Player(BasePlayer): treatment = models.StringField() task_start_ts = models.FloatField(initial=0) task_time_spent = models.FloatField(initial=0) reset_previous_count = models.IntegerField(initial=0) reset_zero_count = models.IntegerField(initial=0) list_satisfied = models.IntegerField(initial=0) endowment_left = models.IntegerField(min=-100000, initial=0) overbudget_warning_shown = models.BooleanField(initial=False) total_cost0 = models.IntegerField(initial=0) total_cost = models.IntegerField(initial=0) subtotal_cost = models.IntegerField(initial=0) payment_round = models.IntegerField(initial=0) task_paid = models.IntegerField(initial=0) earnings_if_selected = models.FloatField(initial=0) final_payout = models.FloatField(initial=0) memory_score = models.IntegerField() memory_bonus = models.FloatField() memory1 = models.IntegerField(required=True, min=10, max=99) memory2 = models.IntegerField(required=True, min=10, max=99) memory3 = models.IntegerField(required=True, min=10, max=99) memory4 = models.IntegerField(required=True, min=10, max=99) memory5 = models.IntegerField(required=True, min=10, max=99) memory6 = models.IntegerField(required=True, min=10, max=99) memory7 = models.IntegerField(required=True, min=10, max=99) memory8 = models.IntegerField(required=True, min=10, max=99) memory9 = models.IntegerField(required=True, min=10, max=99) memory10 = models.IntegerField(required=True, min=10, max=99) numeracy_bonus_q = models.IntegerField(initial=0) # 1..4 numeracy_bonus_correct = models.IntegerField(initial=0) # 0/1 numeracy_bonus = models.FloatField(initial=0.0) # 0.0 or 1.0 question1 = models.IntegerField( required=True, widget=widgets.RadioSelect, choices=[ [1, 'Purchase a pack of 4 As for $450'], [2, 'Purchase a pack of 5 As for $490'], [3, 'Purchase a pack of 6 As for $480'], [4, 'I would need to know the proportion I can keep before deciding'], ], label="In a decision round with an endowment of $500, the shopping list requires 5 units of item A. " "Which of the following options would give you the highest payout?" ) question2 = models.IntegerField( required=True, widget=widgets.RadioSelect, choices=[ [1, 'Spend $580'], [2, 'Spend $600'], [3, 'Spend $620'], [4, 'Spend $650'], ], label="In a decision round with an endowment of $650 and a β€œSpend $600, Save $50” promotion, " "you meet all required quantities in all options. " "Which of the following options gives you the highest payout?" ) gender = models.IntegerField( required=True, label="What is your gender?", choices=[ [0, 'Male'], [1, 'Female'], [2, 'Others'], ] ) age = models.IntegerField(required=True, min=15, max=80) collegelevel = models.IntegerField( required=True, label="What is your college level?", choices=[ [0, "Undergraduate"], [1, "Graduate"], ] ) major = models.StringField(required=True, label="What is your major at the University?") numeracy1 = models.IntegerField( required=True, label="Question 1: A bat and a ball cost $210 in total. " "The bat costs $100 more than the ball. How much does the ball cost?" ) numeracy2 = models.IntegerField( required=True, label="Question 2: A store says: 'Buy 2 items, get 1 free.' " "If each item normally costs $30, what is the average price per item if you buy 3 items?" ) numeracy3 = models.IntegerField( required=True, label="Question 3: You paid $400 to purchase 8 chicken pies and 8 tuna pies. " "Each chicken pie costs $12 more than each tuna pie. " "How much did 1 chicken pie cost?" ) numeracy4 = models.IntegerField( required=True, label="Question 4: If it takes 10 machines 10 minutes to make 10 widgets, " "how long would it take 100 machines to make 100 widgets?" ) shopping = models.IntegerField( required=True, label="How often do you shop online?", choices=[ [0, 'Never'], [1, 'Less than once a month'], [2, 'Less than once a week'], [3, 'More than once a week'] ] ) confidence = models.IntegerField( required=True, widget=widgets.RadioSelectHorizontal, choices=[1, 2, 3, 4], label="Out of the 4 math questions, how many do you think you answered correctly?" ) memory_estimate = models.IntegerField( required=True, min=0, max=10, label="Out of the numbers you entered, how many do you think are correct? (0–10)" ) freeresponse1 = models.LongStringField( required=True, label="Please describe your strategy when shopping with and " "without basket promotion (e.g. Spend $800, save $30) available." ) freeresponse2 = models.LongStringField( required=True, label="Did you notice that some purchase tasks differed only in whether a basket promotion was present? " "If yes, please describe your strategy when solving such similar problems." ) freeresponse3 = models.LongStringField( required=True, label="Did you believe that there always exists a basket configuration that just meets the promotion requirement (without exceeding it)? " "(None / Some / All) " "Did you try to find it? Please describe how you decided when to stop searching for the EXACT basket and proceed." ) def endowment_left_error_message(player, value): # πŸ”’ ComprehensionCheck if player.round_number == 1: if value < 0: return 'Your total cost exceeds your budget. Please reduce your purchase and try again.' # 🎯 per-round flag key = f'overbudget_warned_r{player.round_number}' warned = player.participant.vars.get(key, False) if value < 0: if not warned: player.participant.vars[key] = True return ( 'Your total cost exceeds your budget. ' 'Click Next again if you want to proceed anyway. ' '(Note: if this round is selected for payment, you will receive $0.)' ) def total_cost0_error_message(player, value): if player.round_number != 1: return endowment = C.MARKET0['endowment'] if value > endowment: return if value > 280: return ( 'Your purchase does not minimize total cost. ' 'Please review the available options and try again.' ) def _list_not_satisfied_error(player): list_dict, _, _ = decision_list_market_for_round(player) return ( "Your basket does not meet the required quantities on the shopping list. " "Please adjust your purchase and try again." ) # Quantity fields for item, deals in C.ITEM_DEALS.items(): for deal in deals: setattr(Player, f'q{item}_{deal}', models.IntegerField(min=0, initial=0)) # ===================================================================================== # PAGES # ===================================================================================== class Instructions(Page): def is_displayed(player): return player.round_number == 1 class ComprehensionQuestions(Page): def is_displayed(player): return player.round_number == 1 form_model = 'player' form_fields = ['question1', 'question2'] @staticmethod def error_message(player, values): if values['question1'] != 3 or values['question2'] != 2: return "One or more answers are incorrect. Please review and try again." class ComprehensionCheck(Page): def is_displayed(player): return player.round_number == 1 form_model = 'player' form_fields = ( [f'qA_{d}' for d in ['single', 'P3']] + [f'qB_{d}' for d in ['single', 'BETA']] + ['qC_single'] + [f'qD_{d}' for d in ['single', 'DELTA']] + [f'qE_{d}' for d in ['single', 'P4']] + ['total_cost0', 'endowment_left'] ) @staticmethod def vars_for_template(player): list_dict, market_dict, _ = decision_list_market_for_round(player) prices, gives, labels = build_prices_gives_labels(market_dict) return dict( target=list_dict['shopping'], market0=market_dict, PRICES_JSON=json.dumps(prices), GIVES_JSON=json.dumps(gives), LABELS_JSON=json.dumps(labels), ) @staticmethod def live_method(player, data): try: if isinstance(data, dict): ev = data.get("event") if ev == "qty_click": log_qty_click(player, data) elif ev == "calc_eq": log_calc_eq(player, data) elif ev == "reset_zero": player.reset_zero_count = (player.reset_zero_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, # cleaner "action": "reset_zero", # πŸ”₯ key change "new_value": player.reset_zero_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "reset_previous": player.reset_previous_count = (player.reset_previous_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, "action": "reset_previous", # πŸ”₯ key change "new_value": player.reset_previous_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "page_enter": log_qty_click(player, { "page_name": data.get("page_name"), "field_name": "PAGE", "action": "enter", "new_value": None, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) except Exception as e: print("LIVE ERROR:", repr(e)) return {} @staticmethod def error_message(player, values): # block next unless the shopping list quantities are satisfied ok = set_list_satisfied_from_values(player, values) if not ok: return _list_not_satisfied_error(player) @staticmethod def before_next_page(player, timeout_happened): values = {} for item, deals in C.ITEM_DEALS.items(): for deal in deals: k = f"q{item}_{deal}" values[k] = getattr(player, k, 0) or 0 set_list_satisfied_from_values(player, values) # FIX: compute server-truth costs/earnings so round 1 is consistent too _compute_costs_and_earnings(player) # keep your existing total_cost0 validation logic working player.total_cost0 = int(player.total_cost or 0) def _ensure_task_timer_started(player): if not player.task_start_ts: player.task_start_ts = datetime.now().timestamp() def _finalize_task_time(player): if not player.task_time_spent and player.task_start_ts: end_ts = datetime.now().timestamp() dt = end_ts - player.task_start_ts if dt < 0: dt = 0 if dt > 3600: dt = 3600 player.task_time_spent = dt class PageA(Page): def is_displayed(player): return player.round_number in [2, 4, 6, 8] form_model = 'player' # FIX: remove earnings_if_selected from form (server computes it) form_fields = ( [f'qA_{d}' for d in ['single', 'P3', 'P4', 'P5']] + [f'qB_{d}' for d in ['single', 'ALPHA', 'BETA']] + ['qC_single'] + [f'qD_{d}' for d in ['single', 'P3', 'P5', 'GAMMA', 'DELTA']] + [f'qE_{d}' for d in ['single', 'P4']] + ['total_cost', 'endowment_left'] ) @staticmethod def live_method(player, data): try: if isinstance(data, dict): ev = data.get("event") if ev == "qty_click": log_qty_click(player, data) elif ev == "calc_eq": log_calc_eq(player, data) elif ev == "reset_zero": player.reset_zero_count = (player.reset_zero_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, # cleaner "action": "reset_zero", # πŸ”₯ key change "new_value": player.reset_zero_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "reset_previous": player.reset_previous_count = (player.reset_previous_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, "action": "reset_previous", # πŸ”₯ key change "new_value": player.reset_previous_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "page_enter": log_qty_click(player, { "page_name": data.get("page_name"), "field_name": "PAGE", "action": "enter", "new_value": None, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) except Exception as e: print("LIVE ERROR:", repr(e)) return {} def vars_for_template(player): _ensure_task_timer_started(player) player.overbudget_warning_shown = False list_dict, market_dict, decision_index = decision_list_market_for_round(player) prices, gives, labels = build_prices_gives_labels(market_dict) prev_values = {} if player.round_number >= 3: prev = player.in_round(player.round_number - 1) for item, deals in C.ITEM_DEALS.items(): for deal in deals: key = f'q{item}_{deal}' prev_values[key] = getattr(prev, key, 0) or 0 return dict( target=list_dict['shopping'], market=market_dict, prices=prices, gives=gives, labels=labels, PRICES_JSON=json.dumps(prices), GIVES_JSON=json.dumps(gives), COST_FIELD='total_cost', list_round=decision_index, prev_values=prev_values, ) @staticmethod def error_message(player, values): ok = set_list_satisfied_from_values(player, values) if not ok: return _list_not_satisfied_error(player) @staticmethod def before_next_page(player, timeout_happened): values = {} for item, deals in C.ITEM_DEALS.items(): for deal in deals: k = f"q{item}_{deal}" values[k] = getattr(player, k, 0) or 0 set_list_satisfied_from_values(player, values) # FIX: compute server-truth costs/earnings for this round _compute_costs_and_earnings(player) _finalize_task_time(player) class PageAprime(Page): def is_displayed(player): return player.round_number in [3, 5, 7, 9] form_model = 'player' # FIX: remove earnings_if_selected from form (server computes it) form_fields = ( [f'qA_{d}' for d in ['single', 'P3', 'P4', 'P5']] + [f'qB_{d}' for d in ['single', 'ALPHA', 'BETA']] + ['qC_single'] + [f'qD_{d}' for d in ['single', 'P3', 'P5', 'GAMMA', 'DELTA']] + [f'qE_{d}' for d in ['single', 'P4']] + ['total_cost', 'endowment_left', 'subtotal_cost'] ) @staticmethod def live_method(player, data): try: if isinstance(data, dict): ev = data.get("event") if ev == "qty_click": log_qty_click(player, data) elif ev == "calc_eq": log_calc_eq(player, data) elif ev == "reset_zero": player.reset_zero_count = (player.reset_zero_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, # cleaner "action": "reset_zero", # πŸ”₯ key change "new_value": player.reset_zero_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "reset_previous": player.reset_previous_count = (player.reset_previous_count or 0) + 1 log_qty_click(player, { "page_name": data.get("page_name"), "field_name": None, "action": "reset_previous", # πŸ”₯ key change "new_value": player.reset_previous_count, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) elif ev == "page_enter": log_qty_click(player, { "page_name": data.get("page_name"), "field_name": "PAGE", "action": "enter", "new_value": None, "total_cost": data.get("total_cost"), "clicked_at": data.get("clicked_at"), }) except Exception as e: print("LIVE ERROR:", repr(e)) return {} @staticmethod def vars_for_template(player): player.overbudget_warning_shown = False _ensure_task_timer_started(player) list_dict, market_dict, decision_index = decision_list_market_for_round(player) prices, gives, labels = build_prices_gives_labels(market_dict) list_round = player.round_number - 1 prev_values = {} if player.round_number >= 3: prev = player.in_round(player.round_number - 1) for item, deals in C.ITEM_DEALS.items(): for deal in deals: key = f'q{item}_{deal}' prev_values[key] = getattr(prev, key, 0) or 0 return dict( target=list_dict['shopping'], market=market_dict, prices=prices, gives=gives, labels=labels, PRICES_JSON=json.dumps(prices), GIVES_JSON=json.dumps(gives), COST_FIELD='total_cost', list_round=list_round, prev_values=prev_values, ) @staticmethod def error_message(player, values): ok = set_list_satisfied_from_values(player, values) if not ok: return _list_not_satisfied_error(player) @staticmethod def before_next_page(player, timeout_happened): values = {} for item, deals in C.ITEM_DEALS.items(): for deal in deals: k = f"q{item}_{deal}" values[k] = getattr(player, k, 0) or 0 set_list_satisfied_from_values(player, values) # FIX: compute server-truth costs/earnings for this round _compute_costs_and_earnings(player) _finalize_task_time(player) class FreeResponse(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['shopping', 'freeresponse1', 'freeresponse2', 'freeresponse3'] class Memory(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS timeout_seconds = 30 class Memory2(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['memory1', 'memory2', 'memory3', 'memory4', 'memory5', 'memory6', 'memory7', 'memory8', 'memory9', 'memory10'] @staticmethod def error_message(player, values): # Check if any field is None or empty for i in range(1, 11): field_name = f'memory{i}' if values.get(field_name) is None: return f"Please fill in all memory number fields before proceeding." return None def before_next_page(player, timeout_happened): answers = { player.memory1, player.memory2, player.memory3, player.memory4, player.memory5, player.memory6, player.memory7, player.memory8, player.memory9, player.memory10 } player.memory_score = len(answers.intersection(C.MEMORY_NUMBERS)) player.memory_bonus = player.memory_score * 0.125 class Confidence(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['memory_estimate'] class Confidence_Maths(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['confidence'] class Numeracy(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['numeracy1', 'numeracy2', 'numeracy3', 'numeracy4'] @staticmethod def before_next_page(player, timeout_happened): qnum = random.randint(1, 4) player.numeracy_bonus_q = qnum val = getattr(player, f"numeracy{qnum}") ok = _numeracy_correct(qnum, val) player.numeracy_bonus_correct = 1 if ok else 0 player.numeracy_bonus = 1.0 if ok else 0.0 class Survey(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS form_model = 'player' form_fields = ['gender', 'age', 'collegelevel', 'major'] class Results(Page): def is_displayed(player): return player.round_number == C.NUM_ROUNDS @staticmethod def vars_for_template(player): if not player.payment_round: player.payment_round = random.randrange(2, C.NUM_ROUNDS+1) player.task_paid = player.payment_round - 1 base = 10.0 paid_player = player.in_round(player.payment_round) _compute_costs_and_earnings(paid_player) earn = float(paid_player.earnings_if_selected or 0.0) num_bonus = float(player.numeracy_bonus or 0.0) mem_bonus = float(player.memory_bonus or 0.0) player.final_payout = base + earn + num_bonus + mem_bonus paid_player = player.in_round(player.payment_round) return dict( payment_round=player.payment_round, earnings_selected=float(paid_player.earnings_if_selected or 0.0), numeracy_bonus=float(player.numeracy_bonus or 0.0), numeracy_bonus_q=player.numeracy_bonus_q, numeracy_bonus_correct=player.numeracy_bonus_correct, memory_bonus=float(player.memory_bonus or 0.0), memory_score=player.memory_score, memory_estimate=player.memory_estimate, final_payout=float(player.final_payout or 0.0), task_paid=player.task_paid ) page_sequence = [ Instructions, ComprehensionQuestions, ComprehensionCheck, PageA, PageAprime, FreeResponse, Memory, Memory2, Confidence, Numeracy, Confidence_Maths, Survey, Results, ]