from otree.api import * import random import json import base64 import requests import time from datetime import datetime, timedelta, timezone import os # for testing on devserver from twilio.jwt.access_token import AccessToken from twilio.jwt.access_token.grants import VideoGrant from twilio.rest import Client from django.shortcuts import render from otree.models.participant import Participant from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) ###### TO DO ###### # ====================================================== # TWILIO # ====================================================== def create_twilio_room(name): client = Client(os.environ["TWILIO_API_KEY"], os.environ["TWILIO_API_SECRET"]) room = client.video.rooms.create( unique_name=name, type="group", # or 'group-small' record_participants_on_connect=True, # start recording when participant joins ) return room.sid def generate_twilio_token(identity: str, room_name: str) -> str: from twilio.jwt.access_token import AccessToken from twilio.jwt.access_token.grants import VideoGrant import os account_sid = os.environ["TWILIO_ACCOUNT_SID"] api_key_sid = os.environ["TWILIO_API_KEY"] api_key_secret = os.environ["TWILIO_API_SECRET"] print("Account SID (sub):", account_sid) print("API Key SID (iss):", api_key_sid) token = AccessToken(account_sid, api_key_sid, api_key_secret, identity=identity) token.ttl = 3600 # optional: 1 hour token.add_grant(VideoGrant(room=room_name)) return token.to_jwt() def option_idx_from_pos(row: int, col: int, option_col: int): if col != option_col: return None if row == 3: return 0 if row == 9: return 1 return None doc = """ Alternating Individual and Joint Pac-Man Phases with Valuation. There are 24 rounds. Participants alternate between individual and joint phases. Each round uses unique color pairings and randomized value mappings. UPDATED: adds a 'parallel' condition (individual baseline shown side-by-side). """ # ====================================================== # CONSTANTS # ====================================================== class Constants(BaseConstants): name_in_url = "pacman_demo_1scale" players_per_group = 2 num_rounds = 22 # 16 rounds total: 4 practice + 12 main color_pairs = [ ["red", "cyan"], ["yellow", "magenta"], ["lime", "pink"], ["orange", "teal"], ["white", "hotpink"], ["greenyellow", "blue"], ["aqua", "orange"], ["lightgray", "deeppink"], ["limegreen", "salmon"], ["gold", "turquoise"], ["darkorange", "chartreuse"], ["coral", "dodgerblue"], ["tomato", "mediumseagreen"], ["plum", "deepskyblue"], ["slategray", "khaki"], ["maroon", "aquamarine"], ["orchid", "mediumturquoise"], ["steelblue", "palegreen"], ["crimson", "peachpuff"], ["lightsalmon", "mediumorchid"], ["mediumvioletred", "palegoldenrod"], ["springgreen", "orchid"], ["darkseagreen", "indianred"], ["goldenrod", "mediumorchid"], ["lawngreen", "lightcoral"], ["khaki", "hotpink"], ["yellowgreen", "plum"], ["salmon", "chartreuse"] ] maze_joint = [ "1111111111111", "1000000000001", "1011010101101", "1010010100101", "1000110110001", "1010000000101", "1010101010101", "1010000000101", "1000110110001", "1010010100101", "1011010101101", "1000000000001", "1111111111111", ] maze_ind = maze_joint # ====================================================== # SESSION / SUBSESSION # ====================================================== class Subsession(BaseSubsession): pass def creating_session(subsession): print("🔍 Number of players in this subsession:", len(subsession.get_players())) # ====================================================== # EVENT LOGGING HELPER # ====================================================== def log_event(player, label: str): now_abs = time.time() # seconds since epoch video_start = player.participant.vars.get("video_start_ts", now_abs) events = player.participant.vars.get("event_log", []) events.append( { "round": player.round_number, "event": label, "t_abs": now_abs, "t_rel": now_abs - video_start, } ) player.participant.vars["event_log"] = events # ====================================================== # MODELS # ====================================================== class Group(BaseGroup): shared_row = models.IntegerField(initial=6) shared_col = models.IntegerField(initial=5) # PARALLEL: each player has their own avatar (same maze, separate state) p1_row = models.IntegerField(initial=6) p1_col = models.IntegerField(initial=5) p2_row = models.IntegerField(initial=6) p2_col = models.IntegerField(initial=5) # PARALLEL: track completion p1_done = models.BooleanField(initial=False) p2_done = models.BooleanField(initial=False) # (optional) store target index/color choice per round so both see same one # 0 -> option1 color, 1 -> option2 color target_idx = models.IntegerField(initial=0) #ask group to return if one player did not proceed with reliable audio from both sides audio_gate_failed = models.BooleanField(initial=False) #is_dropout = models.BooleanField(initial=False) class Player(BasePlayer): discussion_condition = models.StringField() assigned_color_pair = models.StringField() sync_condition = models.StringField() metronome_condition = models.StringField() assigned_p1 = models.StringField() assigned_p2 = models.StringField() pac_column = models.IntegerField() round_outcome = models.StringField(blank=True) collected_option_idx = models.IntegerField(initial=-1) # -1 none/timeout/ghost, else 0/1 keypress_data = models.LongStringField(blank=True) # single bipolar “relative value + certainty” scale rel_value_ind = models.IntegerField(min=-100, max=100, blank=True) rel_value_joint = models.IntegerField(min=-100, max=100, blank=True) noisy_high_stream = models.LongStringField() noisy_low_stream = models.LongStringField() value_mapping = models.IntegerField() same_belief_trial = models.IntegerField() noise_value = models.FloatField(blank=True) display_value = models.IntegerField() metronome_value = models.IntegerField() sync_window = models.IntegerField() presentation_no = models.IntegerField() pacman_ind_rt = models.FloatField() valuation_ind_rt = models.FloatField() pacman_joint_rt = models.FloatField() valuation_joint_rt = models.FloatField() keypress_log_individual = models.LongStringField() keypress_log_joint = models.LongStringField() sync_attempts_log = models.LongStringField() event_timeline = models.LongStringField(blank=True) twilio_room_sid = models.StringField(blank=True) twilio_room_name = models.StringField(blank=True) twilio_identity = models.StringField(blank=True) video_start_ts = models.FloatField(blank=True) #video checks can_hear_partner = models.BooleanField( choices=[[True, "Yes"], [False, "No"]], widget=widgets.RadioSelectHorizontal, blank=True ) can_see_partner = models.BooleanField( choices=[[True, "Yes"], [False, "No"]], widget=widgets.RadioSelectHorizontal, blank=True ) """ tech_ok = models.BooleanField(initial=False) tech_fail_reason = models.LongStringField(blank=True) tech_audio_max_rms = models.FloatField(blank=True) tech_video_is_live = models.BooleanField(initial=False) tech_attempts = models.IntegerField(initial=0) """ # ====================================================== # LIVE METHOD # ====================================================== def live_method(player, data): print("Received data from player", player.id_in_group, ":", data) if "keypress_log" not in player.participant.vars: player.participant.vars["keypress_log"] = [] if "sync_attempts" not in player.participant.vars: player.participant.vars["sync_attempts"] = {} # -------------------------------------------------- # FINAL OUTCOMES # - control/sync: broadcast outcome to BOTH (existing behavior) # - parallel: outcome_for_you ends ONLY the sender # -------------------------------------------------- # (A) PARALLEL: per-participant outcome if isinstance(data, dict) and "outcome_for_you" in data: outcome = data["outcome_for_you"] print(f"[PARALLEL] outcome_for_you reported by player {player.id_in_group}: {outcome}") # store on THIS player's row only (so OutcomePage is correct for them) player.round_outcome = outcome # if fruit, map to option1/option2 for THIS player if outcome == "fruit": round_idx = player.round_number - 1 option_col = int(player.participant.vars["column_list"][round_idx]) # IMPORTANT: use the position for THIS player (parallel has 2 avatars) if player.id_in_group == 1: idx = option_idx_from_pos(player.group.p1_row, player.group.p1_col, option_col) else: idx = option_idx_from_pos(player.group.p2_row, player.group.p2_col, option_col) if idx is not None: player.collected_option_idx = idx # return outcome_for_you ONLY to sender (others get None) out = {} for p in player.group.get_players(): out[p.id_in_group] = {"outcome_for_you": outcome if p.id_in_group == player.id_in_group else None} return out # (B) CONTROL/SYNC: joint outcome to both (existing behavior) if isinstance(data, dict) and "outcome" in data: outcome = data["outcome"] group = player.group round_idx = player.round_number - 1 # save outcome for both for p in group.get_players(): p.round_outcome = outcome # if fruit, infer option index from the *shared* server position if outcome == "fruit": option_col = int(player.participant.vars["column_list"][round_idx]) idx = option_idx_from_pos(group.shared_row, group.shared_col, option_col) if idx is not None: for p in group.get_players(): p.collected_option_idx = idx return {p.id_in_group: {"outcome": outcome} for p in group.get_players()} if not isinstance(data, dict) or "key" not in data: return {p.id_in_group: {} for p in player.group.get_players()} key = data["key"].lower() deltas = {"w": (-1, 0), "s": (1, 0), "a": (0, -1), "d": (0, 1)} if key not in deltas: print("Invalid key:", key) return {p.id_in_group: {} for p in player.group.get_players()} # Log key press with timestamp keypress_log = player.participant.vars.get("keypress_log", []) keypress_log.append({"round": player.round_number, "key": key, "ts": int(time.time() * 1000)}) player.participant.vars["keypress_log"] = keypress_log group = player.group now_ms = int(time.time() * 1000) session_vars = player.session.vars pend_key = f"pend_moves_{group.id}" pending = session_vars.get(pend_key, {}) # Look up condition for this round condition = player.participant.vars["condition_array"][player.round_number - 1] #condition = "parallel" # Maze layout for wall checks maze = Constants.maze_joint sync_window = player.participant.vars["sync_windows"][player.round_number - 1] # -------------------------------------------------- # CONTROL (shared avatar) # -------------------------------------------------- if condition == "control": dr, dc = deltas[key] new_row = group.shared_row + dr new_col = group.shared_col + dc if 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0": group.shared_row = new_row group.shared_col = new_col else: print("Blocked by wall (control):", new_row, new_col) return { p.id_in_group: {"row": group.shared_row, "col": group.shared_col} for p in group.get_players() } # -------------------------------------------------- # SYNC (shared avatar; requires matching key within window) # -------------------------------------------------- elif condition == "sync": round_sync_log = player.participant.vars["sync_attempts"].setdefault(player.round_number, []) pending[str(player.id_in_group)] = {"key": key, "ts": now_ms} session_vars[pend_key] = pending if len(pending) < Constants.players_per_group: return {p.id_in_group: {} for p in group.get_players()} vals = list(pending.values()) response = {} if vals[0]["key"] == vals[1]["key"]: dt = abs(vals[0]["ts"] - vals[1]["ts"]) if dt <= sync_window: dr, dc = deltas[key] new_row = group.shared_row + dr new_col = group.shared_col + dc round_sync_log.append({"timestamp": now_ms, "key": key, "dt": dt, "success": True}) if 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0": group.shared_row = new_row group.shared_col = new_col response = {"row": new_row, "col": new_col} else: print("Blocked by wall (sync):", new_row, new_col) else: print("Too slow – not synchronous:", dt, "ms") round_sync_log.append( {"timestamp": now_ms, "key": key, "dt": dt, "success": False, "reason": "too_slow"} ) else: print("Key mismatch:", vals[0]["key"], vals[1]["key"]) round_sync_log.append( { "timestamp": now_ms, "key1": vals[0]["key"], "key2": vals[1]["key"], "dt": abs(vals[0]["ts"] - vals[1]["ts"]), "success": False, "reason": "mismatch", } ) session_vars[pend_key] = {} # Reset after each attempt return {p.id_in_group: response for p in group.get_players()} # -------------------------------------------------- # PARALLEL (two independent avatars, same maze, same target duplicated) # - each keypress moves only YOUR avatar # - both see both avatars updating from the server # - round_outcome set to "fruit" when BOTH have reached their own fruit # -------------------------------------------------- elif condition == "parallel": dr, dc = deltas[key] # --- move ONLY the mover's avatar --- if player.id_in_group == 1: cur_r, cur_c = group.p1_row, group.p1_col else: cur_r, cur_c = group.p2_row, group.p2_col new_r = cur_r + dr new_c = cur_c + dc if 0 <= new_r < len(maze) and 0 <= new_c < len(maze[0]) and maze[new_r][new_c] == "0": if player.id_in_group == 1: group.p1_row, group.p1_col = new_r, new_c else: group.p2_row, group.p2_col = new_r, new_c else: print("Blocked by wall (parallel):", new_r, new_c) # --- server checks fruit for THIS player only --- outcome_for_you = None option_col = int(player.participant.vars["column_list"][player.round_number - 1]) if player.id_in_group == 1: idx = option_idx_from_pos(group.p1_row, group.p1_col, option_col) if idx is not None: group.p1_done = True player.collected_option_idx = idx player.round_outcome = "fruit" outcome_for_you = "fruit" else: idx = option_idx_from_pos(group.p2_row, group.p2_col, option_col) if idx is not None: group.p2_done = True player.collected_option_idx = idx player.round_outcome = "fruit" outcome_for_you = "fruit" # (optional) ghost check per player, if you want: # if player hits ghost -> only that player ends # NOTE: you'd need server-side ghost positions for full authority; skip if ghosts are client-only. payload = { "p1_row": group.p1_row, "p1_col": group.p1_col, "p2_row": group.p2_row, "p2_col": group.p2_col, "p1_done": group.p1_done, "p2_done": group.p2_done, # IMPORTANT: only the mover gets this key "outcome_for_you": outcome_for_you, } # Send same positions to both, but outcome_for_you only to the mover out = {} for p in group.get_players(): msg = dict(payload) if p.id_in_group != player.id_in_group: msg["outcome_for_you"] = None out[p.id_in_group] = msg return out # ====================================================== # PAGES # ====================================================== class StartWait(WaitPage): group_by_arrival_time = True body_text = "Waiting for your partner to be ready...Thank you for your patience" @staticmethod def is_displayed(player: Player): #return player.round_number == 1 and player.tech_ok return player.round_number == 1 @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() # --- CONSTANTS FOR DESIGN --- num_practice = 4 num_full = Constants.num_rounds - num_practice # Assign some conditions discussion_condition = random.choice(["advance", "on_the_fly"]) for p in players: p.participant.vars["discussion_condition"] = discussion_condition # Assign p1/p2 roles players[0].participant.p_label = "p1" players[1].participant.p_label = "p2" # ---------- COLOR PAIRS (28 rounds) ---------- color_pairs = Constants.color_pairs.copy() random.shuffle(color_pairs) color_pairs = color_pairs[: Constants.num_rounds] # ========================================================== # CONDITIONS # 4 practice (2 control + 2 sync) + 24 test (6 blocks of 4) # Test blocks follow a Latin-square order over 3 conditions, # repeated twice (e.g., sync, parallel, control, sync, parallel, control). # ========================================================== # ---- Practice: 2 control + 2 sync ---- practice = ["control", "control", "sync", "sync"] random.shuffle(practice) # comment out if you want fixed practice order # ---- Latin square for 3 conditions ---- latin_orders = [ ["sync", "parallel", "control"], ["parallel", "control", "sync"], ["control", "sync", "parallel"], ] # pick one row (counterbalancing across groups) order = random.choice(latin_orders) # repeat once -> 3 blocks total block_order_3 = order # expand each block to 4 trials main = [] for cond in block_order_3: main += [cond] * 6 condition_array = practice + main # sanity checks assert len(main) == 18 assert len(condition_array) == Constants.num_rounds assert len(practice) == num_practice assert len(main) == num_full # ---------- VALUE PAIRS FOR FULL TRIALS ONLY ---------- value_pairs = [ ([70, 60], [70, 60]), ([60, 50], [60, 50]), ([50, 40], [50, 40]), ([70, 60], [60, 70]), ([60, 50], [50, 60]), ([50, 40], [40, 50]) ] full_p1_array, full_p2_array, full_val_array, full_same_belief_array = [], [], [], [] for _ in range(num_full // 6): block = random.sample(value_pairs, k=6) value_mapping = [0, 0, 0, 1, 1, 1] random.shuffle(value_mapping) full_p1_array += [s for s, _ in block] full_p2_array += [w for _, w in block] full_val_array += value_mapping # 1 = same-belief trial, 0 = different-belief trial full_same_belief_array += [1 if s == w else 0 for s, w in block] p1_array = [None] * num_practice + full_p1_array p2_array = [None] * num_practice + full_p2_array val_array = [None] * num_practice + full_val_array same_belief_array = [None] * num_practice + full_same_belief_array # ---------- STARTING POSITIONS (unchanged) ---------- column_list, pac_list = [], [] for _ in range(Constants.num_rounds): col = random.choice([1, 11]) column_list.append(col) pac_list.append(7 if col == 1 else 5) # ========================================================== # PARALLEL: per-round fruit positions (same for both players; duplicated) # We pick a single open cell per round and reuse for both P1 and P2. # ========================================================== # Also: store target index per round (which of the 2 fruits is the target) # practice: always option1 (0), main: random 0/1 (matches your old logic) target_idx_by_round = [] for rn in range(1, Constants.num_rounds + 1): if rn <= num_practice: target_idx_by_round.append(0) else: target_idx_by_round.append(random.randint(0, 1)) # ---------- NOISES / DISPLAY / PRESENTATIONS / METRONOMES / SYNC WINDOWS ---------- full_noises = [1.34] * num_full random.shuffle(full_noises) full_display_values = [100] * num_full random.shuffle(full_display_values) full_presentations = [16] * num_full random.shuffle(full_presentations) full_metronomes = [1000] * num_full random.shuffle(full_metronomes) full_sync_windows = [1000] * num_full random.shuffle(full_sync_windows) full_metro_cond = ["fade"] * num_full random.shuffle(full_metro_cond) practice_noises = [1.34] * num_practice practice_display_values = [100] * num_practice practice_presentations = [16] * num_practice practice_metronomes = [1000] * num_practice practice_sync_windows = [1000] * num_practice practice_metro_cond = ["fade"] * num_practice noises = practice_noises + full_noises display_values = practice_display_values + full_display_values presentations = practice_presentations + full_presentations metronomes = practice_metronomes + full_metronomes sync_windows = practice_sync_windows + full_sync_windows metronome_condition = practice_metro_cond + full_metro_cond # ---------- SAVE TO PARTICIPANT.VARS ---------- for p in players: p.participant.vars.update( { "color_pairs": color_pairs, "condition_array": condition_array, "p1_array": p1_array, "p2_array": p2_array, "val_array": val_array, "same_belief_array": same_belief_array, "column_list": column_list, "pac_list": pac_list, "noises": noises, "display_values": display_values, "metronomes": metronomes, "metronome_condition": metronome_condition, "sync_windows": sync_windows, "presentations": presentations, # NEW for parallel: "target_idx_by_round": target_idx_by_round, } ) # ---------- Twilio token generation ---------- room_name = f"twilio_room_{group.id_in_subsession}_{datetime.utcnow().timestamp()}" room_sid = create_twilio_room(room_name) #room_name = '123' #use to disable video #room_sid = '123' #use to disable video video_start_ts = time.time() for p in players: identity = f"player_{p.id_in_group}_g{group.id_in_subsession}" token = generate_twilio_token(identity, room_name) #token = '123' #use to disable video p.participant.vars["twilio_token"] = token p.participant.vars["twilio_room"] = room_name p.participant.vars["video_start_ts"] = video_start_ts p.participant.vars.setdefault("event_log", []) p.participant.vars["twilio_identity"] = identity p.twilio_room_sid = room_sid p.twilio_room_name = room_name p.twilio_identity = identity p.video_start_ts = video_start_ts """ #video check class TechCheck(Page): template_name = "pacman_demo_1scale/TechCheck.html" form_model = "player" form_fields = ["tech_ok", "tech_fail_reason", "tech_audio_max_rms", "tech_video_is_live"] @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def error_message(player, values): if not values.get("tech_ok", False): player.tech_attempts += 1 if player.tech_attempts >= 2: # lock them out (stays on TechCheck unless you add ReturnStudy) return "Tech check failed twice. Please return the study (you cannot continue)." return "Tech check failed. Please fix the issue and click 'Run test' again." return None """ class TwilioVideoRoom(Page): template_name = "pacman_demo_1scale/TwilioVideoRoom.html" @staticmethod def is_displayed(player: Player): #return player.round_number == 1 and player.tech_ok return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return { "twilio_token": player.participant.vars["twilio_token"], "room_name": player.participant.vars["twilio_room"], "identity": f"player_{player.id_in_group}", } @staticmethod def before_next_page(player: Player, timeout_happened): log_event(player, "twilio_room_opened") class TwilioPopupPage(Page): template_name = "pacman_demo_1scale/TwilioPopupPage.html" timeout_seconds = 120 @staticmethod def is_displayed(player: Player): return False @staticmethod def vars_for_template(player: Player): return { "twilio_token": player.participant.vars["twilio_token"], "room_name": player.participant.vars["twilio_room"], "identity": f"player_{player.id_in_group}", } class PartnerVideoWait(WaitPage): timeout_seconds = 300 body_text = ( "Waiting for your partner to open the video popup.

" "If they do not open within 3 minutes, you will be sent to the end of the task " "and will receive a partial payment." ) @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def before_next_page(player: Player, timeout_happened): if timeout_happened: player.group.audio_gate_failed = True # persist + make it apply to BOTH players for p in player.group.get_players(): p.participant.vars["audio_gate_failed"] = True @staticmethod def app_after_this_page(player, upcoming_apps): # redirect BOTH (because both will evaluate this, and the flag is on the group) if player.group.audio_gate_failed: return upcoming_apps[-1] class AudioGate(Page): template_name = "pacman_demo_1scale/AudioGate.html" form_model = "player" form_fields = ["can_hear_partner", "can_see_partner"] timeout_seconds = 180 timeout_submission = {"can_hear_partner": False, "can_see_partner": False} @staticmethod def is_displayed(player): # Don't even show if the dyad already failed at PartnerVideoWait return player.round_number == 1 and not player.group.audio_gate_failed @staticmethod def error_message(player, values): if values.get("can_hear_partner") is None or values.get("can_see_partner") is None: return "Please answer both questions before continuing." @staticmethod def before_next_page(player, timeout_happened): # Don’t decide failure here (per-person); do it in the WaitPage below. pass class AudioGateWait(WaitPage): @staticmethod def is_displayed(player): # Only wait if AudioGate actually ran return player.round_number == 1 and not player.group.audio_gate_failed @staticmethod def after_all_players_arrive(group: Group): # If either participant cannot hear, fail dyad if any(p.can_hear_partner is False for p in group.get_players()): group.audio_gate_failed = True #persist across apps for p in group.get_players(): p.participant.vars["audio_gate_failed"] = group.audio_gate_failed #@staticmethod #def app_after_this_page(player, upcoming_apps): # # Redirect BOTH after both have answered # if player.group.audio_gate_failed: # return upcoming_apps[-1] class AudioGateRoute(Page): template_name = "global/Page.html" timeout_seconds=1 timeout_submission = {} @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def app_after_this_page(player, upcoming_apps): if player.group.audio_gate_failed: return upcoming_apps[-1] # debrief app (last) return None class DiscussionConditionInfo(Page): template_name = "pacman_demo_1scale/ConditionAssign.html" @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return {"discussion_condition": player.participant.vars["discussion_condition"]} class TrialWait(WaitPage): body_text = "Waiting for your partner to be ready..." @staticmethod def after_all_players_arrive(group: Group): p1 = group.get_players()[0] round_idx = p1.round_number - 1 # reset shared start (control/sync) start_row = 6 start_col = p1.participant.vars["pac_list"][round_idx] group.shared_row = start_row group.shared_col = start_col # reset parallel starts (duplicated) group.p1_row = start_row group.p1_col = start_col group.p2_row = start_row group.p2_col = start_col # reset done flags each round group.p1_done = False group.p2_done = False # store target idx for this round so both see same target group.target_idx = int(p1.participant.vars["target_idx_by_round"][round_idx]) class Pacman_prac(Page): template_name = "pacman_demo_1scale/Pacman_prac.html" timeout_seconds = 120 @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return {"pac_column": 7, "maze": Constants.maze_ind} class Pacman_ind(Page): template_name = "pacman_demo_1scale/Pacman_ind.html" timeout_seconds = 120 @staticmethod def is_displayed(player: Player): return player.round_number > 4 @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] p_label = player.participant.p_label log_event(player, "ind_phase_start") presentation_no = 6 player.participant.vars[f"pacman_ind_start_{player.round_number}"] = time.time() vh_raw, vl_raw = player.participant.vars[ "p1_array" if p_label == "p1" else "p2_array" ][round_idx] half_n = int(presentation_no / 2) high_stream = [vh_raw for _ in range(half_n)] low_stream = [vl_raw for _ in range(half_n)] value_mapping = player.participant.vars["val_array"][round_idx] if value_mapping == 0: option1_stream = high_stream option2_stream = low_stream else: option1_stream = low_stream option2_stream = high_stream player.participant.vars[f"noisy_high_round_{round_idx}"] = high_stream player.participant.vars[f"noisy_low_round_{round_idx}"] = low_stream player.participant.vars[f"presentation_no_round_{round_idx}"] = presentation_no return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "pac_column": player.participant.vars["pac_list"][round_idx], "high_stream": json.dumps(high_stream), "low_stream": json.dumps(low_stream), "option1_stream": json.dumps(option1_stream), "option2_stream": json.dumps(option2_stream), "val_mapping": value_mapping, "same_belief_trial": player.participant.vars["same_belief_array"][round_idx], "display_value": player.participant.vars["display_values"][round_idx], "column_number": player.participant.vars["column_list"][round_idx], "presentation_no": presentation_no, "p_label": player.participant.p_label, "maze": Constants.maze_ind, } @staticmethod def before_next_page(player: Player, timeout_happened): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] condition = player.participant.vars["condition_array"][round_idx] p_label = player.participant.p_label vh_raw, vl_raw = player.participant.vars[ "p1_array" if p_label == "p1" else "p2_array" ][round_idx] high_stream = player.participant.vars[f"noisy_high_round_{round_idx}"] low_stream = player.participant.vars[f"noisy_low_round_{round_idx}"] player.discussion_condition = player.participant.vars["discussion_condition"] player.assigned_color_pair = json.dumps(pair) player.sync_condition = condition player.assigned_p1 = str(vh_raw) player.assigned_p2 = str(vl_raw) player.noisy_high_stream = json.dumps(high_stream) player.noisy_low_stream = json.dumps(low_stream) player.value_mapping = player.participant.vars["val_array"][round_idx] player.same_belief_trial = player.participant.vars["same_belief_array"][round_idx] player.pac_column = player.participant.vars["pac_list"][round_idx] player.noise_value = player.participant.vars["noises"][round_idx] player.display_value = player.participant.vars["display_values"][round_idx] player.presentation_no = player.participant.vars[f"presentation_no_round_{round_idx}"] start = player.participant.vars.get(f"pacman_ind_start_{player.round_number}") if start: player.pacman_ind_rt = time.time() - start log_event(player, "ind_phase_end") # ========================================================== # VALUATION: single bipolar slider (rel_value_ind) # ========================================================== class Valuation_ind(Page): template_name = "pacman_demo_1scale/Valuation_ind.html" timeout_seconds = 120 form_model = "player" form_fields = ["rel_value_ind"] @staticmethod def is_displayed(player: Player): # keep your existing display rule, but don't show if dyad already flagged return player.round_number > 4 @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] log_event(player, "ind_rating_start") player.participant.vars[f"valuation_ind_start_{player.round_number}"] = time.time() return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "phase": "individual", "p_label": player.participant.p_label, } @staticmethod def before_next_page(player: Player, timeout_happened): # NEW: if this page times out, mark dyad dropout #if timeout_happened: # player.group.is_dropout = True # player.participant.vars["is_dropout"] = player.group.is_dropout start = player.participant.vars.get(f"valuation_ind_start_{player.round_number}") if start: player.valuation_ind_rt = time.time() - start if player.rel_value_ind is None: player.rel_value_ind = 0 log_event(player, "ind_rating_end") #@staticmethod #def app_after_this_page(player: Player, upcoming_apps): # NEW: if dyad dropout (or audio gate failed earlier), jump to debrief app #if player.group.is_dropout: # return upcoming_apps[-1] class AdvanceDiscussion(Page): template_name = "pacman_demo_1scale/AdvanceDiscussion.html" timeout_seconds = 60 @staticmethod def is_displayed(player: Player): return player.participant.vars["discussion_condition"] == "advance" class Pacman_joint(Page): live_page = True template_name = "pacman_demo_1scale/Pacman_joint.html" timeout_seconds = 60 live_method = live_method @staticmethod def is_displayed(player: Player): return True @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] condition = player.participant.vars["condition_array"][round_idx] #condition = "parallel" is_practice = player.round_number <= 4 #is_practice = False # IMPORTANT: keep the "which fruit is the target" logic the same across conditions. # We now store target_idx on the group in TrialWait, so everyone sees the same. target_idx = player.group.target_idx target_color = pair[target_idx] log_event(player, "joint_phase_start") player.participant.vars[f"pacman_joint_start_{player.round_number}"] = time.time() # For control/sync: ensure shared start uses pac_list player.group.shared_col = player.participant.vars["pac_list"][round_idx] # For parallel: positions/fruits are already set in TrialWait (group fields) return { "option1_color": pair[0], "option2_color": pair[1], "column_number": player.participant.vars["column_list"][round_idx], "pac_column": player.participant.vars["pac_list"][round_idx], "round_number": player.round_number, "maze": Constants.maze_joint, "p_label": player.participant.p_label, "sync_condition": condition, "metronome_value": player.participant.vars["metronomes"][round_idx], "metronome_condition": player.participant.vars["metronome_condition"][round_idx], "sync_window": player.participant.vars["sync_windows"][round_idx], "target_color": target_color, "is_practice": is_practice, # NEW: give JS everything it needs to draw parallel side-by-side "p1_row": player.group.p1_row, "p1_col": player.group.p1_col, "p2_row": player.group.p2_row, "p2_col": player.group.p2_col, "p1_done": player.group.p1_done, "p2_done": player.group.p2_done, } @staticmethod def before_next_page(player: Player, timeout_happened): round_idx = player.round_number - 1 player.metronome_value = player.participant.vars["metronomes"][round_idx] player.metronome_condition = player.participant.vars["metronome_condition"][round_idx] player.sync_window = player.participant.vars["sync_windows"][round_idx] player.keypress_log_joint = json.dumps(player.participant.vars.get("keypress_log", [])) sync_attempts = player.participant.vars.get("sync_attempts", {}).get(player.round_number, []) player.sync_attempts_log = json.dumps(sync_attempts) start = player.participant.vars.get(f"pacman_joint_start_{player.round_number}") if start: player.pacman_joint_rt = time.time() - start log_event(player, "joint_phase_end") class OutcomePage(Page): template_name = "pacman_demo_1scale/OutcomePage.html" @staticmethod def is_displayed(player: Player): return True @staticmethod def vars_for_template(player: Player): outcome = player.field_maybe_none("round_outcome") if outcome is None: outcome = "timeout" player.round_outcome = outcome log_event(player, f"joint_outcome_{outcome}") return {"outcome": outcome} class Valuation_joint(Page): template_name = "pacman_demo_1scale/Valuation_joint.html" timeout_seconds = 120 form_model = "player" form_fields = ["rel_value_joint"] @staticmethod def is_displayed(player: Player): # keep your existing display rule, but don't show if dyad already flagged return player.round_number > 4 @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] log_event(player, "joint_rating_start") player.participant.vars[f"valuation_joint_start_{player.round_number}"] = time.time() return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "phase": "joint", "p_label": player.participant.p_label, } @staticmethod def before_next_page(player: Player, timeout_happened): # NEW: if this page times out, mark dyad dropout #if timeout_happened: # player.group.is_dropout = True # player.participant.vars["is_dropout"] = player.group.is_dropout start = player.participant.vars.get(f"valuation_joint_start_{player.round_number}") if start: player.valuation_joint_rt = time.time() - start if player.rel_value_joint is None: player.rel_value_joint = 0 log_event(player, "joint_rating_end") #@staticmethod #def app_after_this_page(player: Player, upcoming_apps): # # NEW: if dyad dropout (or audio gate failed earlier), jump to debrief app # if player.group.is_dropout: # return upcoming_apps[-1] # assumes debrief is last app; adjust if not class SaveKeypressLog(Page): timeout_seconds = 1 template_name = "global/Page.html" @staticmethod def before_next_page(player: Player, timeout_happened): if player.round_number == 10: player.participant.vars["enough_rounds"]=True all_presses = player.participant.vars.get("keypress_log", []) round_presses = [entry for entry in all_presses if entry["round"] == player.round_number] player.keypress_data = json.dumps(round_presses) if player.round_number == Constants.num_rounds: events = player.participant.vars.get("event_log", []) player.event_timeline = json.dumps(events) class BetweenBlock(Page): template_name = "pacman_demo_1scale/BetweenBlock.html" timeout_seconds = 30 # optional: force a short pause so they actually see it @staticmethod def is_displayed(player: Player): num_practice = 4 trials_per_block = 6 num_test_trials = 18 # only after practice if player.round_number <= num_practice: return False # first round of each 4-trial test block: # test blocks start at rounds 5, 9, 13, 17, 21, 25 (for 4 practice + 24 test) first_test_round = num_practice + 1 is_block_start = ((player.round_number - first_test_round) % trials_per_block) == 0 # safety: don’t show beyond test trials last_test_round = num_practice + num_test_trials return is_block_start and (player.round_number <= last_test_round) @staticmethod def vars_for_template(player: Player): num_practice = 4 trials_per_block = 6 first_test_round = num_practice + 1 block_index0 = (player.round_number - first_test_round) // trials_per_block block_no = block_index0 + 1 # 1..6 total_blocks = 3 # upcoming condition for this round/block condition = player.participant.vars["condition_array"][player.round_number - 1] titles = { "sync": "Synchrony block", "control": "Shared-control block", "parallel": "Parallel block", } reminders = { "sync": "You may talk to coordinate on the fruit. Pac-Man moves only if you both press the SAME key at (around) the SAME time.", "control": "You may talk to coordinate on the fruit. Pac-Man moves with either player’s keypress (no synchrony needed).", "parallel": "You must not talk to each other. You each control your own Pac-Man.", } return dict( block_no=block_no, total_blocks=total_blocks, condition=condition, block_title=titles.get(condition, "New block"), reminder=reminders.get(condition, ""), ) page_sequence = [ StartWait, #TechCheck, TwilioVideoRoom, PartnerVideoWait, AudioGate, AudioGateWait, AudioGateRoute, TrialWait, Pacman_prac, BetweenBlock, Pacman_ind, Valuation_ind, # AdvanceDiscussion, TrialWait, Pacman_joint, OutcomePage, SaveKeypressLog, Valuation_joint, ]