from otree.api import * import random import json import base64 import requests import time from datetime import datetime, timedelta, timezone import os # for testing on devserver from twilio.jwt.access_token import AccessToken from twilio.jwt.access_token.grants import VideoGrant from twilio.rest import Client from django.shortcuts import render from otree.models.participant import Participant from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) def create_twilio_room(name): client = Client(os.environ["TWILIO_API_KEY"], os.environ["TWILIO_API_SECRET"]) room = client.video.rooms.create( unique_name=name, type="group", # or 'group-small' record_participants_on_connect=True, # start recording when participant joins ) return room.sid def generate_twilio_token(identity: str, room_name: str) -> str: from twilio.jwt.access_token import AccessToken from twilio.jwt.access_token.grants import VideoGrant import os account_sid = os.environ["TWILIO_ACCOUNT_SID"] api_key_sid = os.environ["TWILIO_API_KEY"] api_key_secret = os.environ["TWILIO_API_SECRET"] print("Account SID (sub):", account_sid) print("API Key SID (iss):", api_key_sid) token = AccessToken(account_sid, api_key_sid, api_key_secret, identity=identity) token.ttl = 3600 # optional: 1 hour token.add_grant(VideoGrant(room=room_name)) return token.to_jwt() doc = """ Alternating Individual and Joint Pac-Man Phases with Valuation. There are 16 rounds. Participants alternate between individual and joint phases. Each round uses unique color pairings and randomized value mappings. """ class Constants(BaseConstants): name_in_url = "pacman_demo" players_per_group = 2 num_rounds = 20 # 16 rounds total: 4 practice + 16 main color_pairs = [ ["red", "cyan"], ["yellow", "magenta"], ["lime", "pink"], ["orange", "teal"], ["white", "hotpink"], ["greenyellow", "blue"], ["aqua", "orange"], ["lightgray", "deeppink"], ["limegreen", "salmon"], ["gold", "turquoise"], ["darkorange", "chartreuse"], ["coral", "dodgerblue"], ["tomato", "mediumseagreen"], ["plum", "deepskyblue"], ["slategray", "khaki"], ["maroon", "aquamarine"], ["orchid", "mediumturquoise"], ["steelblue", "palegreen"], ["crimson", "peachpuff"], ["lightsalmon", "mediumorchid"], ] maze_joint = [ "1111111111111", "1000000000001", "1011010101101", "1010010100101", "1000110110001", "1010000000101", "1010101010101", "1010000000101", "1000110110001", "1010010100101", "1011010101101", "1000000000001", "1111111111111", ] maze_ind = maze_joint class Subsession(BaseSubsession): pass def creating_session(subsession): print("🔍 Number of players in this subsession:", len(subsession.get_players())) # ---------- EVENT LOGGING HELPER ---------- def log_event(player, label: str): """ Append a time-stamped event to this participant's timeline. label examples: 'ind_phase_start', 'ind_phase_end', 'ind_rating_start', 'ind_rating_end', 'joint_phase_start', 'joint_phase_end', 'joint_outcome_fruit', 'joint_outcome_ghost', 'joint_outcome_timeout', 'joint_rating_start', 'joint_rating_end', ... """ now_abs = time.time() # seconds since epoch # reference: start of video / room video_start = player.participant.vars.get("video_start_ts", now_abs) events = player.participant.vars.get("event_log", []) events.append( { "round": player.round_number, "event": label, "t_abs": now_abs, "t_rel": now_abs - video_start, # seconds since video start } ) player.participant.vars["event_log"] = events class Group(BaseGroup): shared_row = models.IntegerField(initial=6) shared_col = models.IntegerField(initial=5) class Player(BasePlayer): discussion_condition = models.StringField() assigned_color_pair = models.StringField() sync_condition = models.StringField() metronome_condition = models.StringField() assigned_strong = models.StringField() assigned_weak = models.StringField() pac_column = models.IntegerField() round_outcome = models.StringField(blank=True) keypress_data = models.LongStringField(blank=True) option1_value_ind = models.FloatField(blank=True) option1_confidence_ind = models.FloatField(blank=True) option2_value_ind = models.FloatField(blank=True) option2_confidence_ind = models.FloatField(blank=True) option1_value_joint = models.FloatField(blank=True) option1_confidence_joint = models.FloatField(blank=True) option2_value_joint = models.FloatField(blank=True) option2_confidence_joint = models.FloatField(blank=True) noisy_value_high = models.LongStringField() noisy_value_low = models.LongStringField() noise_value = models.FloatField(blank=True) display_value = models.IntegerField() metronome_value = models.IntegerField() sync_window = models.IntegerField() presentation_no = models.IntegerField() pacman_ind_rt = models.FloatField() valuation_ind_rt = models.FloatField() pacman_joint_rt = models.FloatField() valuation_joint_rt = models.FloatField() keypress_log_individual = models.LongStringField() keypress_log_joint = models.LongStringField() sync_attempts_log = models.LongStringField() # Event timeline (JSON) for this participant (written on final round) event_timeline = models.LongStringField(blank=True) # Twilio linkage twilio_room_sid = models.StringField(blank=True) twilio_room_name = models.StringField(blank=True) video_start_ts = models.FloatField(blank=True) def live_method(player, data): print("Received data from player", player.id_in_group, ":", data) if "keypress_log" not in player.participant.vars: player.participant.vars["keypress_log"] = [] if "sync_attempts" not in player.participant.vars: player.participant.vars["sync_attempts"] = {} # If it's a final outcome message (e.g., fruit or ghost), broadcast it to both if "outcome" in data: outcome = data["outcome"] print(f"Outcome reported by player {player.id_in_group}: {outcome}") for p in player.group.get_players(): p.round_outcome = outcome return { p.id_in_group: {"outcome": outcome} for p in player.group.get_players() } if not isinstance(data, dict) or "key" not in data: return {p.id_in_group: {} for p in player.group.get_players()} key = data["key"].lower() deltas = {"w": (-1, 0), "s": (1, 0), "a": (0, -1), "d": (0, 1)} if key not in deltas: print("Invalid key:", key) return {p.id_in_group: {} for p in player.group.get_players()} # Log key press with timestamp keypress_log = player.participant.vars.get("keypress_log", []) keypress_log.append( { "round": player.round_number, "key": key, "ts": int(time.time() * 1000), } ) player.participant.vars["keypress_log"] = keypress_log group = player.group now_ms = int(time.time() * 1000) session_vars = player.session.vars pend_key = f"pend_moves_{group.id}" pending = session_vars.get(pend_key, {}) # Look up condition for this round condition = player.participant.vars["condition_array"][player.round_number - 1] # Get maze layout for wall checks maze = Constants.maze_joint sync_window = player.participant.vars["sync_windows"][player.round_number - 1] if condition == "control": # In control condition, accept any input and move Pac-Man immediately if valid dr, dc = deltas[key] new_row = group.shared_row + dr new_col = group.shared_col + dc if ( 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0" ): group.shared_row = new_row group.shared_col = new_col else: print("Blocked by wall (control):", new_row, new_col) return { p.id_in_group: { "row": group.shared_row, "col": group.shared_col, } for p in group.get_players() } elif condition == "sync": if "sync_attempts" not in player.participant.vars: player.participant.vars["sync_attempts"] = {} round_sync_log = player.participant.vars["sync_attempts"].setdefault( player.round_number, [] ) # Synchrony condition: require matching keys within window pending[str(player.id_in_group)] = {"key": key, "ts": now_ms} session_vars[pend_key] = pending if len(pending) < Constants.players_per_group: return {p.id_in_group: {} for p in group.get_players()} vals = list(pending.values()) response = {} if vals[0]["key"] == vals[1]["key"]: dt = abs(vals[0]["ts"] - vals[1]["ts"]) if dt <= sync_window: dr, dc = deltas[key] new_row = group.shared_row + dr new_col = group.shared_col + dc round_sync_log.append( { "timestamp": now_ms, "key": key, "dt": dt, "success": True, } ) if ( 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0" ): group.shared_row = new_row group.shared_col = new_col response = {"row": new_row, "col": new_col} else: print("Blocked by wall (sync):", new_row, new_col) else: print("Too slow – not synchronous:", dt, "ms") round_sync_log.append( { "timestamp": now_ms, "key": key, "dt": dt, "success": False, "reason": "too_slow", } ) else: print("Key mismatch:", vals[0]["key"], vals[1]["key"]) round_sync_log.append( { "timestamp": now_ms, "key1": vals[0]["key"], "key2": vals[1]["key"], "dt": abs(vals[0]["ts"] - vals[1]["ts"]), "success": False, "reason": "mismatch", } ) session_vars[pend_key] = {} # Reset after each attempt return {p.id_in_group: response for p in group.get_players()} else: print(f"⚠ Unknown condition: {condition}") return {p.id_in_group: {} for p in group.get_players()} class StartWait(WaitPage): group_by_arrival_time = True body_text = "Waiting for your partner to be ready...Thank you for your patience" @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def after_all_players_arrive(group: Group): players = group.get_players() # --- CONSTANTS FOR DESIGN --- num_practice = 4 # 4 joint-practice rounds num_full = Constants.num_rounds - num_practice # 16 full trials # Assign some conditions discussion_condition = random.choice(["advance", "on_the_fly"]) for p in players: p.participant.vars["discussion_condition"] = discussion_condition # Assign strong/weak roles players[0].participant.belief_condition = "strong" players[1].participant.belief_condition = "weak" # ---------- COLOR PAIRS (20 rounds) ---------- color_pairs = Constants.color_pairs.copy() random.shuffle(color_pairs) color_pairs = color_pairs[: Constants.num_rounds] # ---------- JOINT CONDITION ARRAY (4 practice + 16 full) ---------- # 4 practice rounds: 2 control, 2 sync practice = ["control", "sync"] * 2 # ['control','sync','control','sync'] # Alternate starting block randomly for the 16 full trials first = random.choice(["sync", "control"]) second = "control" if first == "sync" else "sync" blocks = [first, second, first, second] # 4 blocks full_conditions = [block for block in blocks for _ in range(4)] # 4 x 4 = 16 condition_array = practice + full_conditions # length 4 + 16 = 20 # ---------- VALUE PAIRS FOR FULL TRIALS ONLY ---------- value_pairs = [ ([80, 70], [80, 70]), ([70, 60], [70, 60]), ([60, 50], [60, 50]), ([50, 40], [50, 40]), ] # Build 16 values for full trials (4 blocks × 4 trials) full_strong_array, full_weak_array = [], [] for _ in range(4): block = random.sample(value_pairs, k=4) full_strong_array += [s for s, _ in block] full_weak_array += [w for _, w in block] # Pad with dummy values (None) for the first 4 practice rounds strong_array = [None] * num_practice + full_strong_array weak_array = [None] * num_practice + full_weak_array # ---------- STARTING POSITIONS (20 rounds, including practice) ---------- column_list, pac_list = [], [] for _ in range(Constants.num_rounds): col = random.choice([1, 11]) column_list.append(col) pac_list.append(7 if col == 1 else 5) # ---------- NOISES / DISPLAY / PRESENTATIONS / METRONOMES / SYNC WINDOWS ---------- full_noises = [1.34] * num_full random.shuffle(full_noises) full_display_values = [100] * num_full random.shuffle(full_display_values) full_presentations = [16] * num_full random.shuffle(full_presentations) full_metronomes = [1000] * num_full random.shuffle(full_metronomes) full_sync_windows = [1000] * num_full random.shuffle(full_sync_windows) full_metro_cond = ["fade"] * num_full random.shuffle(full_metro_cond) practice_noises = [1.34] * num_practice practice_display_values = [100] * num_practice practice_presentations = [16] * num_practice practice_metronomes = [1000] * num_practice practice_sync_windows = [1000] * num_practice practice_metro_cond = ["fade"] * num_practice noises = practice_noises + full_noises display_values = practice_display_values + full_display_values presentations = practice_presentations + full_presentations metronomes = practice_metronomes + full_metronomes sync_windows = practice_sync_windows + full_sync_windows metronome_condition = practice_metro_cond + full_metro_cond # ---------- SAVE TO PARTICIPANT.VARS ---------- for p in players: p.participant.vars.update( { "color_pairs": color_pairs, # len 20 "condition_array": condition_array, # len 20 "strong_array": strong_array, # len 20 (first 4 None) "weak_array": weak_array, # len 20 (first 4 None) "column_list": column_list, # len 20 "pac_list": pac_list, # len 20 "noises": noises, # len 20 "display_values": display_values, # len 20 "metronomes": metronomes, # len 20 "metronome_condition": metronome_condition, # len 20 "sync_windows": sync_windows, # len 20 "presentations": presentations, # len 20 } ) # ---------- Twilio token generation ---------- room_name = f"twilio_room_{group.id_in_subsession}_{datetime.utcnow().timestamp()}" room_sid = create_twilio_room(room_name) # Global reference time for this video/room video_start_ts = time.time() for p in players: identity = f"player_{p.id_in_group}_g{group.id_in_subsession}" token = generate_twilio_token(identity, room_name) p.participant.vars["twilio_token"] = token p.participant.vars["twilio_room"] = room_name p.participant.vars["video_start_ts"] = video_start_ts p.participant.vars.setdefault("event_log", []) # also store in DB for easier joining p.twilio_room_sid = room_sid p.twilio_room_name = room_name p.video_start_ts = video_start_ts class TwilioVideoRoom(Page): template_name = "pacman_demo/TwilioVideoRoom.html" @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return { "twilio_token": player.participant.vars["twilio_token"], "room_name": player.participant.vars["twilio_room"], "identity": f"player_{player.id_in_group}", } class TwilioPopupPage(Page): template_name = "pacman_demo/TwilioPopupPage.html" @staticmethod def is_displayed(player: Player): return False # Not part of the regular sequence @staticmethod def vars_for_template(player: Player): return { "twilio_token": player.participant.vars["twilio_token"], "room_name": player.participant.vars["twilio_room"], "identity": f"player_{player.id_in_group}", } class DiscussionConditionInfo(Page): template_name = "pacman_demo/ConditionAssign.html" @staticmethod def is_displayed(player: Player): return player.round_number == 1 # Only show at start @staticmethod def vars_for_template(player: Player): return { "discussion_condition": player.participant.vars["discussion_condition"] } class TrialWait(WaitPage): body_text = "Waiting for your partner to be ready..." @staticmethod def after_all_players_arrive(group: Group): # runs once when ALL players in the group have reached this page player = group.get_players()[0] round_idx = player.round_number - 1 group.shared_col = player.participant.vars["pac_list"][round_idx] class Pacman_prac(Page): template_name = "pacman_demo/Pacman_prac.html" timeout_seconds = 60 # 1 minute @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def vars_for_template(player: Player): return { "pac_column": 7, "maze": Constants.maze_ind, } class Pacman_ind(Page): template_name = "pacman_demo/Pacman_ind.html" timeout_seconds = 120 # 2 minutes @staticmethod def is_displayed(player: Player): # Skip individual phase in first 4 rounds (joint practice) return player.round_number > 4 @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] noise_value = player.participant.vars["noises"][round_idx] belief_condition = player.participant.belief_condition # LOG: individual phase start log_event(player, "ind_phase_start") # Presentations depend on role if belief_condition == "strong": presentation_no = 12 else: presentation_no = 4 player.participant.vars[f"pacman_ind_start_{player.round_number}"] = time.time() # Get the base value pair (vh, vl) for this round: [vh, vl] vh_raw, vl_raw = player.participant.vars[ "strong_array" if belief_condition == "strong" else "weak_array" ][round_idx] half_n = int(presentation_no / 2) if belief_condition == "strong": # Strong: sees true vh/vl mapping value_high = [vh_raw for _ in range(half_n)] value_low = [vl_raw for _ in range(half_n)] else: # Weak: reversed mapping – sees vl as “high” and vh as “low” value_high = [vl_raw for _ in range(half_n)] value_low = [vh_raw for _ in range(half_n)] # store per-round arrays + presentation count player.participant.vars[f"noisy_high_round_{round_idx}"] = value_high player.participant.vars[f"noisy_low_round_{round_idx}"] = value_low player.participant.vars[f"presentation_no_round_{round_idx}"] = presentation_no return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "pac_column": player.participant.vars["pac_list"][round_idx], "value_high": json.dumps(value_high), "value_low": json.dumps(value_low), "display_value": player.participant.vars["display_values"][round_idx], "column_number": player.participant.vars["column_list"][round_idx], "presentation_no": presentation_no, "belief_condition": player.participant.belief_condition, "maze": Constants.maze_ind, } @staticmethod def before_next_page(player: Player, timeout_happened): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] condition = player.participant.vars["condition_array"][round_idx] belief_condition = player.participant.belief_condition vh_raw, vl_raw = player.participant.vars[ "strong_array" if belief_condition == "strong" else "weak_array" ][round_idx] # Retrieve saved noisy arrays value_high = player.participant.vars[f"noisy_high_round_{round_idx}"] value_low = player.participant.vars[f"noisy_low_round_{round_idx}"] # Save to database player.discussion_condition = player.participant.vars["discussion_condition"] player.assigned_color_pair = json.dumps(pair) player.sync_condition = condition player.assigned_strong = str(vh_raw) player.assigned_weak = str(vl_raw) player.noisy_value_high = json.dumps(value_high) player.noisy_value_low = json.dumps(value_low) player.pac_column = player.participant.vars["pac_list"][round_idx] player.noise_value = player.participant.vars["noises"][round_idx] player.display_value = player.participant.vars["display_values"][round_idx] player.presentation_no = player.participant.vars[ f"presentation_no_round_{round_idx}" ] start = player.participant.vars.get( f"pacman_ind_start_{player.round_number}" ) if start: rt = time.time() - start player.pacman_ind_rt = rt # LOG: individual phase end log_event(player, "ind_phase_end") class Valuation_ind(Page): template_name = "pacman_demo/Valuation_ind.html" timeout_seconds = 120 # 2 minutes form_model = "player" form_fields = [ "option1_value_ind", "option1_confidence_ind", "option2_value_ind", "option2_confidence_ind", ] @staticmethod def is_displayed(player: Player): # Skip valuation in first 4 rounds (joint practice) return player.round_number > 4 @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] # LOG: individual rating start log_event(player, "ind_rating_start") player.participant.vars[f"valuation_ind_start_{player.round_number}"] = time.time() return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "phase": "individual", "belief_condition": player.participant.belief_condition, } @staticmethod def before_next_page(player: Player, timeout_happened): # retrieve stored start time start = player.participant.vars.get( f"valuation_ind_start_{player.round_number}" ) if start: duration = time.time() - start player.valuation_ind_rt = duration # LOG: individual rating end log_event(player, "ind_rating_end") class AdvanceDiscussion(Page): template_name = "pacman_demo/AdvanceDiscussion.html" timeout_seconds = 60 # 1 minute @staticmethod def is_displayed(player: Player): return player.participant.vars["discussion_condition"] == "advance" class Pacman_joint(Page): live_page = True template_name = "pacman_demo/Pacman_joint.html" timeout_seconds = 60 # 1 minute live_method = live_method @staticmethod def is_displayed(player: Player): return True @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] condition = player.participant.vars["condition_array"][round_idx] # mark practice rounds (1–4) is_practice = player.round_number <= 4 # In practice rounds, always target the first color in the pair if is_practice: target_color = pair[0] else: target_idx = random.randint(0, 1) target_color = pair[target_idx] # LOG: joint phase start log_event(player, "joint_phase_start") player.participant.vars[f"pacman_joint_start_{player.round_number}"] = time.time() # Ensure server-side Pac-Man starts at the correct column player.group.shared_col = player.participant.vars["pac_list"][round_idx] return { "option1_color": pair[0], "option2_color": pair[1], "column_number": player.participant.vars["column_list"][round_idx], "pac_column": player.participant.vars["pac_list"][round_idx], "round_number": player.round_number, "maze": Constants.maze_joint, "belief_condition": player.participant.belief_condition, "sync_condition": condition, "metronome_value": player.participant.vars["metronomes"][round_idx], "metronome_condition": player.participant.vars["metronome_condition"][round_idx], "sync_window": player.participant.vars["sync_windows"][round_idx], "target_color": target_color, "is_practice": is_practice, # flag for template } @staticmethod def before_next_page(player: Player, timeout_happened): round_idx = player.round_number - 1 player.metronome_value = player.participant.vars["metronomes"][round_idx] player.metronome_condition = player.participant.vars["metronome_condition"][round_idx] player.sync_window = player.participant.vars["sync_windows"][round_idx] # Save keypress log player.keypress_log_joint = json.dumps( player.participant.vars.get("keypress_log", []) ) # Save sync attempt log sync_attempts = player.participant.vars.get("sync_attempts", {}).get( player.round_number, [] ) player.sync_attempts_log = json.dumps(sync_attempts) # --- compute RT --- start = player.participant.vars.get( f"pacman_joint_start_{player.round_number}" ) if start: rt = time.time() - start player.pacman_joint_rt = rt # LOG: joint phase end log_event(player, "joint_phase_end") class OutcomePage(Page): template_name = "pacman_demo/OutcomePage.html" @staticmethod def is_displayed(player: Player): return True # Or restrict to just joint rounds if you want @staticmethod def vars_for_template(player: Player): # Safe read (won't raise if null) outcome = player.field_maybe_none("round_outcome") if outcome is None: outcome = "timeout" # persist so it's saved to DB player.round_outcome = outcome # LOG: outcome event for this round log_event(player, f"joint_outcome_{outcome}") return {"outcome": outcome} class Valuation_joint(Page): template_name = "pacman_demo/Valuation_joint.html" timeout_seconds = 120 # 2 minutes form_model = "player" form_fields = [ "option1_value_joint", "option1_confidence_joint", "option2_value_joint", "option2_confidence_joint", ] @staticmethod def is_displayed(player: Player): return True @staticmethod def vars_for_template(player: Player): round_idx = player.round_number - 1 pair = player.participant.vars["color_pairs"][round_idx] # LOG: joint rating start log_event(player, "joint_rating_start") player.participant.vars[ f"valuation_joint_start_{player.round_number}" ] = time.time() return { "option1_color": pair[0], "option2_color": pair[1], "round_number": player.round_number, "phase": "joint", "belief_condition": player.participant.belief_condition, } @staticmethod def before_next_page(player: Player, timeout_happened): # --- compute RT --- start = player.participant.vars.get( f"valuation_joint_start_{player.round_number}" ) if start: rt = time.time() - start player.valuation_joint_rt = rt # LOG: joint rating end log_event(player, "joint_rating_end") class SaveKeypressLog(Page): timeout_seconds = 1 template_name = "global/Page.html" @staticmethod def before_next_page(player: Player, timeout_happened): # Get all stored keypresses for this participant all_presses = player.participant.vars.get("keypress_log", []) print(all_presses) # Keep only the presses from this round round_presses = [ entry for entry in all_presses if entry["round"] == player.round_number ] # Save JSON for this player player.keypress_data = json.dumps(round_presses) # On final round, also persist the full event timeline if player.round_number == Constants.num_rounds: events = player.participant.vars.get("event_log", []) player.event_timeline = json.dumps(events) page_sequence = [ StartWait, TwilioVideoRoom, TrialWait, Pacman_prac, Pacman_ind, Valuation_ind, # AdvanceDiscussion, TrialWait, Pacman_joint, OutcomePage, SaveKeypressLog, Valuation_joint, ]