from otree.api import *
import random
import json
import base64
import requests
import time
from datetime import datetime, timedelta, timezone
import os # for testing on devserver
from twilio.jwt.access_token import AccessToken
from twilio.jwt.access_token.grants import VideoGrant
from twilio.rest import Client
from django.shortcuts import render
from otree.models.participant import Participant
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
###### TO DO ######
# ======================================================
# TWILIO
# ======================================================
def create_twilio_room(name):
client = Client(os.environ["TWILIO_API_KEY"], os.environ["TWILIO_API_SECRET"])
room = client.video.rooms.create(
unique_name=name,
type="group", # or 'group-small'
record_participants_on_connect=True, # start recording when participant joins
)
return room.sid
def generate_twilio_token(identity: str, room_name: str) -> str:
from twilio.jwt.access_token import AccessToken
from twilio.jwt.access_token.grants import VideoGrant
import os
account_sid = os.environ["TWILIO_ACCOUNT_SID"]
api_key_sid = os.environ["TWILIO_API_KEY"]
api_key_secret = os.environ["TWILIO_API_SECRET"]
print("Account SID (sub):", account_sid)
print("API Key SID (iss):", api_key_sid)
token = AccessToken(account_sid, api_key_sid, api_key_secret, identity=identity)
token.ttl = 3600 # optional: 1 hour
token.add_grant(VideoGrant(room=room_name))
return token.to_jwt()
def option_idx_from_pos(row: int, col: int, option_col: int):
if col != option_col:
return None
if row == 3:
return 0
if row == 9:
return 1
return None
doc = """
Alternating Individual and Joint Pac-Man Phases with Valuation.
There are 24 rounds. Participants alternate between individual and joint phases.
Each round uses unique color pairings and randomized value mappings.
UPDATED: adds a 'parallel' condition (individual baseline shown side-by-side).
"""
# ======================================================
# CONSTANTS
# ======================================================
class Constants(BaseConstants):
name_in_url = "pacman_demo_1scale"
players_per_group = 2
num_rounds = 22 # 16 rounds total: 4 practice + 12 main
color_pairs = [
["red", "cyan"],
["yellow", "magenta"],
["lime", "pink"],
["orange", "teal"],
["white", "hotpink"],
["greenyellow", "blue"],
["aqua", "orange"],
["lightgray", "deeppink"],
["limegreen", "salmon"],
["gold", "turquoise"],
["darkorange", "chartreuse"],
["coral", "dodgerblue"],
["tomato", "mediumseagreen"],
["plum", "deepskyblue"],
["slategray", "khaki"],
["maroon", "aquamarine"],
["orchid", "mediumturquoise"],
["steelblue", "palegreen"],
["crimson", "peachpuff"],
["lightsalmon", "mediumorchid"],
["mediumvioletred", "palegoldenrod"],
["springgreen", "orchid"],
["darkseagreen", "indianred"],
["goldenrod", "mediumorchid"],
["lawngreen", "lightcoral"],
["khaki", "hotpink"],
["yellowgreen", "plum"],
["salmon", "chartreuse"]
]
maze_joint = [
"1111111111111",
"1000000000001",
"1011010101101",
"1010010100101",
"1000110110001",
"1010000000101",
"1010101010101",
"1010000000101",
"1000110110001",
"1010010100101",
"1011010101101",
"1000000000001",
"1111111111111",
]
maze_ind = maze_joint
# ======================================================
# SESSION / SUBSESSION
# ======================================================
class Subsession(BaseSubsession):
pass
def creating_session(subsession):
print("🔍 Number of players in this subsession:", len(subsession.get_players()))
# ======================================================
# EVENT LOGGING HELPER
# ======================================================
def log_event(player, label: str):
now_abs = time.time() # seconds since epoch
video_start = player.participant.vars.get("video_start_ts", now_abs)
events = player.participant.vars.get("event_log", [])
events.append(
{
"round": player.round_number,
"event": label,
"t_abs": now_abs,
"t_rel": now_abs - video_start,
}
)
player.participant.vars["event_log"] = events
# ======================================================
# MODELS
# ======================================================
class Group(BaseGroup):
shared_row = models.IntegerField(initial=6)
shared_col = models.IntegerField(initial=5)
# PARALLEL: each player has their own avatar (same maze, separate state)
p1_row = models.IntegerField(initial=6)
p1_col = models.IntegerField(initial=5)
p2_row = models.IntegerField(initial=6)
p2_col = models.IntegerField(initial=5)
# PARALLEL: track completion
p1_done = models.BooleanField(initial=False)
p2_done = models.BooleanField(initial=False)
# (optional) store target index/color choice per round so both see same one
# 0 -> option1 color, 1 -> option2 color
target_idx = models.IntegerField(initial=0)
#ask group to return if one player did not proceed with reliable audio from both sides
audio_gate_failed = models.BooleanField(initial=False)
#is_dropout = models.BooleanField(initial=False)
class Player(BasePlayer):
discussion_condition = models.StringField()
assigned_color_pair = models.StringField()
sync_condition = models.StringField()
metronome_condition = models.StringField()
assigned_p1 = models.StringField()
assigned_p2 = models.StringField()
pac_column = models.IntegerField()
round_outcome = models.StringField(blank=True)
collected_option_idx = models.IntegerField(initial=-1) # -1 none/timeout/ghost, else 0/1
keypress_data = models.LongStringField(blank=True)
# single bipolar “relative value + certainty” scale
rel_value_ind = models.IntegerField(min=-100, max=100, blank=True)
rel_value_joint = models.IntegerField(min=-100, max=100, blank=True)
noisy_high_stream = models.LongStringField()
noisy_low_stream = models.LongStringField()
value_mapping = models.IntegerField()
same_belief_trial = models.IntegerField()
noise_value = models.FloatField(blank=True)
display_value = models.IntegerField()
metronome_value = models.IntegerField()
sync_window = models.IntegerField()
presentation_no = models.IntegerField()
pacman_ind_rt = models.FloatField()
valuation_ind_rt = models.FloatField()
pacman_joint_rt = models.FloatField()
valuation_joint_rt = models.FloatField()
keypress_log_individual = models.LongStringField()
keypress_log_joint = models.LongStringField()
sync_attempts_log = models.LongStringField()
event_timeline = models.LongStringField(blank=True)
twilio_room_sid = models.StringField(blank=True)
twilio_room_name = models.StringField(blank=True)
twilio_identity = models.StringField(blank=True)
video_start_ts = models.FloatField(blank=True)
#video checks
can_hear_partner = models.BooleanField(
choices=[[True, "Yes"], [False, "No"]],
widget=widgets.RadioSelectHorizontal,
blank=True
)
can_see_partner = models.BooleanField(
choices=[[True, "Yes"], [False, "No"]],
widget=widgets.RadioSelectHorizontal,
blank=True
)
""" tech_ok = models.BooleanField(initial=False)
tech_fail_reason = models.LongStringField(blank=True)
tech_audio_max_rms = models.FloatField(blank=True)
tech_video_is_live = models.BooleanField(initial=False)
tech_attempts = models.IntegerField(initial=0) """
# ======================================================
# LIVE METHOD
# ======================================================
def live_method(player, data):
print("Received data from player", player.id_in_group, ":", data)
if "keypress_log" not in player.participant.vars:
player.participant.vars["keypress_log"] = []
if "sync_attempts" not in player.participant.vars:
player.participant.vars["sync_attempts"] = {}
# --------------------------------------------------
# FINAL OUTCOMES
# - control/sync: broadcast outcome to BOTH (existing behavior)
# - parallel: outcome_for_you ends ONLY the sender
# --------------------------------------------------
# (A) PARALLEL: per-participant outcome
if isinstance(data, dict) and "outcome_for_you" in data:
outcome = data["outcome_for_you"]
print(f"[PARALLEL] outcome_for_you reported by player {player.id_in_group}: {outcome}")
# store on THIS player's row only (so OutcomePage is correct for them)
player.round_outcome = outcome
# if fruit, map to option1/option2 for THIS player
if outcome == "fruit":
round_idx = player.round_number - 1
option_col = int(player.participant.vars["column_list"][round_idx])
# IMPORTANT: use the position for THIS player (parallel has 2 avatars)
if player.id_in_group == 1:
idx = option_idx_from_pos(player.group.p1_row, player.group.p1_col, option_col)
else:
idx = option_idx_from_pos(player.group.p2_row, player.group.p2_col, option_col)
if idx is not None:
player.collected_option_idx = idx
# return outcome_for_you ONLY to sender (others get None)
out = {}
for p in player.group.get_players():
out[p.id_in_group] = {"outcome_for_you": outcome if p.id_in_group == player.id_in_group else None}
return out
# (B) CONTROL/SYNC: joint outcome to both (existing behavior)
if isinstance(data, dict) and "outcome" in data:
outcome = data["outcome"]
group = player.group
round_idx = player.round_number - 1
# save outcome for both
for p in group.get_players():
p.round_outcome = outcome
# if fruit, infer option index from the *shared* server position
if outcome == "fruit":
option_col = int(player.participant.vars["column_list"][round_idx])
idx = option_idx_from_pos(group.shared_row, group.shared_col, option_col)
if idx is not None:
for p in group.get_players():
p.collected_option_idx = idx
return {p.id_in_group: {"outcome": outcome} for p in group.get_players()}
if not isinstance(data, dict) or "key" not in data:
return {p.id_in_group: {} for p in player.group.get_players()}
key = data["key"].lower()
deltas = {"w": (-1, 0), "s": (1, 0), "a": (0, -1), "d": (0, 1)}
if key not in deltas:
print("Invalid key:", key)
return {p.id_in_group: {} for p in player.group.get_players()}
# Log key press with timestamp
keypress_log = player.participant.vars.get("keypress_log", [])
keypress_log.append({"round": player.round_number, "key": key, "ts": int(time.time() * 1000)})
player.participant.vars["keypress_log"] = keypress_log
group = player.group
now_ms = int(time.time() * 1000)
session_vars = player.session.vars
pend_key = f"pend_moves_{group.id}"
pending = session_vars.get(pend_key, {})
# Look up condition for this round
condition = player.participant.vars["condition_array"][player.round_number - 1]
#condition = "parallel"
# Maze layout for wall checks
maze = Constants.maze_joint
sync_window = player.participant.vars["sync_windows"][player.round_number - 1]
# --------------------------------------------------
# CONTROL (shared avatar)
# --------------------------------------------------
if condition == "control":
dr, dc = deltas[key]
new_row = group.shared_row + dr
new_col = group.shared_col + dc
if 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0":
group.shared_row = new_row
group.shared_col = new_col
else:
print("Blocked by wall (control):", new_row, new_col)
return {
p.id_in_group: {"row": group.shared_row, "col": group.shared_col}
for p in group.get_players()
}
# --------------------------------------------------
# SYNC (shared avatar; requires matching key within window)
# --------------------------------------------------
elif condition == "sync":
round_sync_log = player.participant.vars["sync_attempts"].setdefault(player.round_number, [])
pending[str(player.id_in_group)] = {"key": key, "ts": now_ms}
session_vars[pend_key] = pending
if len(pending) < Constants.players_per_group:
return {p.id_in_group: {} for p in group.get_players()}
vals = list(pending.values())
response = {}
if vals[0]["key"] == vals[1]["key"]:
dt = abs(vals[0]["ts"] - vals[1]["ts"])
if dt <= sync_window:
dr, dc = deltas[key]
new_row = group.shared_row + dr
new_col = group.shared_col + dc
round_sync_log.append({"timestamp": now_ms, "key": key, "dt": dt, "success": True})
if 0 <= new_row < len(maze) and 0 <= new_col < len(maze[0]) and maze[new_row][new_col] == "0":
group.shared_row = new_row
group.shared_col = new_col
response = {"row": new_row, "col": new_col}
else:
print("Blocked by wall (sync):", new_row, new_col)
else:
print("Too slow – not synchronous:", dt, "ms")
round_sync_log.append(
{"timestamp": now_ms, "key": key, "dt": dt, "success": False, "reason": "too_slow"}
)
else:
print("Key mismatch:", vals[0]["key"], vals[1]["key"])
round_sync_log.append(
{
"timestamp": now_ms,
"key1": vals[0]["key"],
"key2": vals[1]["key"],
"dt": abs(vals[0]["ts"] - vals[1]["ts"]),
"success": False,
"reason": "mismatch",
}
)
session_vars[pend_key] = {} # Reset after each attempt
return {p.id_in_group: response for p in group.get_players()}
# --------------------------------------------------
# PARALLEL (two independent avatars, same maze, same target duplicated)
# - each keypress moves only YOUR avatar
# - both see both avatars updating from the server
# - round_outcome set to "fruit" when BOTH have reached their own fruit
# --------------------------------------------------
elif condition == "parallel":
dr, dc = deltas[key]
# --- move ONLY the mover's avatar ---
if player.id_in_group == 1:
cur_r, cur_c = group.p1_row, group.p1_col
else:
cur_r, cur_c = group.p2_row, group.p2_col
new_r = cur_r + dr
new_c = cur_c + dc
if 0 <= new_r < len(maze) and 0 <= new_c < len(maze[0]) and maze[new_r][new_c] == "0":
if player.id_in_group == 1:
group.p1_row, group.p1_col = new_r, new_c
else:
group.p2_row, group.p2_col = new_r, new_c
else:
print("Blocked by wall (parallel):", new_r, new_c)
# --- server checks fruit for THIS player only ---
outcome_for_you = None
option_col = int(player.participant.vars["column_list"][player.round_number - 1])
if player.id_in_group == 1:
idx = option_idx_from_pos(group.p1_row, group.p1_col, option_col)
if idx is not None:
group.p1_done = True
player.collected_option_idx = idx
player.round_outcome = "fruit"
outcome_for_you = "fruit"
else:
idx = option_idx_from_pos(group.p2_row, group.p2_col, option_col)
if idx is not None:
group.p2_done = True
player.collected_option_idx = idx
player.round_outcome = "fruit"
outcome_for_you = "fruit"
# (optional) ghost check per player, if you want:
# if player hits ghost -> only that player ends
# NOTE: you'd need server-side ghost positions for full authority; skip if ghosts are client-only.
payload = {
"p1_row": group.p1_row, "p1_col": group.p1_col,
"p2_row": group.p2_row, "p2_col": group.p2_col,
"p1_done": group.p1_done,
"p2_done": group.p2_done,
# IMPORTANT: only the mover gets this key
"outcome_for_you": outcome_for_you,
}
# Send same positions to both, but outcome_for_you only to the mover
out = {}
for p in group.get_players():
msg = dict(payload)
if p.id_in_group != player.id_in_group:
msg["outcome_for_you"] = None
out[p.id_in_group] = msg
return out
# ======================================================
# PAGES
# ======================================================
class StartWait(WaitPage):
group_by_arrival_time = True
body_text = "Waiting for your partner to be ready...Thank you for your patience"
@staticmethod
def is_displayed(player: Player):
#return player.round_number == 1 and player.tech_ok
return player.round_number == 1
@staticmethod
def after_all_players_arrive(group: Group):
players = group.get_players()
# --- CONSTANTS FOR DESIGN ---
num_practice = 4
num_full = Constants.num_rounds - num_practice
# Assign some conditions
discussion_condition = random.choice(["advance", "on_the_fly"])
for p in players:
p.participant.vars["discussion_condition"] = discussion_condition
# Assign p1/p2 roles
players[0].participant.p_label = "p1"
players[1].participant.p_label = "p2"
# ---------- COLOR PAIRS (28 rounds) ----------
color_pairs = Constants.color_pairs.copy()
random.shuffle(color_pairs)
color_pairs = color_pairs[: Constants.num_rounds]
# ==========================================================
# CONDITIONS
# 4 practice (2 control + 2 sync) + 24 test (6 blocks of 4)
# Test blocks follow a Latin-square order over 3 conditions,
# repeated twice (e.g., sync, parallel, control, sync, parallel, control).
# ==========================================================
# ---- Practice: 2 control + 2 sync ----
practice = ["control", "control", "sync", "sync"]
random.shuffle(practice) # comment out if you want fixed practice order
# ---- Latin square for 3 conditions ----
latin_orders = [
["sync", "parallel", "control"],
["parallel", "control", "sync"],
["control", "sync", "parallel"],
]
# pick one row (counterbalancing across groups)
order = random.choice(latin_orders)
# repeat once -> 3 blocks total
block_order_3 = order
# expand each block to 4 trials
main = []
for cond in block_order_3:
main += [cond] * 6
condition_array = practice + main
# sanity checks
assert len(main) == 18
assert len(condition_array) == Constants.num_rounds
assert len(practice) == num_practice
assert len(main) == num_full
# ---------- VALUE PAIRS FOR FULL TRIALS ONLY ----------
value_pairs = [
([70, 60], [70, 60]),
([60, 50], [60, 50]),
([50, 40], [50, 40]),
([70, 60], [60, 70]),
([60, 50], [50, 60]),
([50, 40], [40, 50])
]
full_p1_array, full_p2_array, full_val_array, full_same_belief_array = [], [], [], []
for _ in range(num_full // 6):
block = random.sample(value_pairs, k=6)
value_mapping = [0, 0, 0, 1, 1, 1]
random.shuffle(value_mapping)
full_p1_array += [s for s, _ in block]
full_p2_array += [w for _, w in block]
full_val_array += value_mapping
# 1 = same-belief trial, 0 = different-belief trial
full_same_belief_array += [1 if s == w else 0 for s, w in block]
p1_array = [None] * num_practice + full_p1_array
p2_array = [None] * num_practice + full_p2_array
val_array = [None] * num_practice + full_val_array
same_belief_array = [None] * num_practice + full_same_belief_array
# ---------- STARTING POSITIONS (unchanged) ----------
column_list, pac_list = [], []
for _ in range(Constants.num_rounds):
col = random.choice([1, 11])
column_list.append(col)
pac_list.append(7 if col == 1 else 5)
# ==========================================================
# PARALLEL: per-round fruit positions (same for both players; duplicated)
# We pick a single open cell per round and reuse for both P1 and P2.
# ==========================================================
# Also: store target index per round (which of the 2 fruits is the target)
# practice: always option1 (0), main: random 0/1 (matches your old logic)
target_idx_by_round = []
for rn in range(1, Constants.num_rounds + 1):
if rn <= num_practice:
target_idx_by_round.append(0)
else:
target_idx_by_round.append(random.randint(0, 1))
# ---------- NOISES / DISPLAY / PRESENTATIONS / METRONOMES / SYNC WINDOWS ----------
full_noises = [1.34] * num_full
random.shuffle(full_noises)
full_display_values = [100] * num_full
random.shuffle(full_display_values)
full_presentations = [16] * num_full
random.shuffle(full_presentations)
full_metronomes = [1000] * num_full
random.shuffle(full_metronomes)
full_sync_windows = [1000] * num_full
random.shuffle(full_sync_windows)
full_metro_cond = ["fade"] * num_full
random.shuffle(full_metro_cond)
practice_noises = [1.34] * num_practice
practice_display_values = [100] * num_practice
practice_presentations = [16] * num_practice
practice_metronomes = [1000] * num_practice
practice_sync_windows = [1000] * num_practice
practice_metro_cond = ["fade"] * num_practice
noises = practice_noises + full_noises
display_values = practice_display_values + full_display_values
presentations = practice_presentations + full_presentations
metronomes = practice_metronomes + full_metronomes
sync_windows = practice_sync_windows + full_sync_windows
metronome_condition = practice_metro_cond + full_metro_cond
# ---------- SAVE TO PARTICIPANT.VARS ----------
for p in players:
p.participant.vars.update(
{
"color_pairs": color_pairs,
"condition_array": condition_array,
"p1_array": p1_array,
"p2_array": p2_array,
"val_array": val_array,
"same_belief_array": same_belief_array,
"column_list": column_list,
"pac_list": pac_list,
"noises": noises,
"display_values": display_values,
"metronomes": metronomes,
"metronome_condition": metronome_condition,
"sync_windows": sync_windows,
"presentations": presentations,
# NEW for parallel:
"target_idx_by_round": target_idx_by_round,
}
)
# ---------- Twilio token generation ----------
room_name = f"twilio_room_{group.id_in_subsession}_{datetime.utcnow().timestamp()}"
room_sid = create_twilio_room(room_name)
#room_name = '123' #use to disable video
#room_sid = '123' #use to disable video
video_start_ts = time.time()
for p in players:
identity = f"player_{p.id_in_group}_g{group.id_in_subsession}"
token = generate_twilio_token(identity, room_name)
#token = '123' #use to disable video
p.participant.vars["twilio_token"] = token
p.participant.vars["twilio_room"] = room_name
p.participant.vars["video_start_ts"] = video_start_ts
p.participant.vars.setdefault("event_log", [])
p.participant.vars["twilio_identity"] = identity
p.twilio_room_sid = room_sid
p.twilio_room_name = room_name
p.twilio_identity = identity
p.video_start_ts = video_start_ts
""" #video check
class TechCheck(Page):
template_name = "pacman_demo_1scale/TechCheck.html"
form_model = "player"
form_fields = ["tech_ok", "tech_fail_reason", "tech_audio_max_rms", "tech_video_is_live"]
@staticmethod
def is_displayed(player):
return player.round_number == 1
@staticmethod
def error_message(player, values):
if not values.get("tech_ok", False):
player.tech_attempts += 1
if player.tech_attempts >= 2:
# lock them out (stays on TechCheck unless you add ReturnStudy)
return "Tech check failed twice. Please return the study (you cannot continue)."
return "Tech check failed. Please fix the issue and click 'Run test' again."
return None """
class TwilioVideoRoom(Page):
template_name = "pacman_demo_1scale/TwilioVideoRoom.html"
@staticmethod
def is_displayed(player: Player):
#return player.round_number == 1 and player.tech_ok
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
return {
"twilio_token": player.participant.vars["twilio_token"],
"room_name": player.participant.vars["twilio_room"],
"identity": f"player_{player.id_in_group}",
}
@staticmethod
def before_next_page(player: Player, timeout_happened):
log_event(player, "twilio_room_opened")
class TwilioPopupPage(Page):
template_name = "pacman_demo_1scale/TwilioPopupPage.html"
timeout_seconds = 120
@staticmethod
def is_displayed(player: Player):
return False
@staticmethod
def vars_for_template(player: Player):
return {
"twilio_token": player.participant.vars["twilio_token"],
"room_name": player.participant.vars["twilio_room"],
"identity": f"player_{player.id_in_group}",
}
class PartnerVideoWait(WaitPage):
timeout_seconds = 300
body_text = (
"Waiting for your partner to open the video popup.
"
"If they do not open within 3 minutes, you will be sent to the end of the task "
"and will receive a partial payment."
)
@staticmethod
def is_displayed(player):
return player.round_number == 1
@staticmethod
def before_next_page(player: Player, timeout_happened):
if timeout_happened:
player.group.audio_gate_failed = True
# persist + make it apply to BOTH players
for p in player.group.get_players():
p.participant.vars["audio_gate_failed"] = True
@staticmethod
def app_after_this_page(player, upcoming_apps):
# redirect BOTH (because both will evaluate this, and the flag is on the group)
if player.group.audio_gate_failed:
return upcoming_apps[-1]
class AudioGate(Page):
template_name = "pacman_demo_1scale/AudioGate.html"
form_model = "player"
form_fields = ["can_hear_partner", "can_see_partner"]
timeout_seconds = 180
timeout_submission = {"can_hear_partner": False, "can_see_partner": False}
@staticmethod
def is_displayed(player):
# Don't even show if the dyad already failed at PartnerVideoWait
return player.round_number == 1 and not player.group.audio_gate_failed
@staticmethod
def error_message(player, values):
if values.get("can_hear_partner") is None or values.get("can_see_partner") is None:
return "Please answer both questions before continuing."
@staticmethod
def before_next_page(player, timeout_happened):
# Don’t decide failure here (per-person); do it in the WaitPage below.
pass
class AudioGateWait(WaitPage):
@staticmethod
def is_displayed(player):
# Only wait if AudioGate actually ran
return player.round_number == 1 and not player.group.audio_gate_failed
@staticmethod
def after_all_players_arrive(group: Group):
# If either participant cannot hear, fail dyad
if any(p.can_hear_partner is False for p in group.get_players()):
group.audio_gate_failed = True
#persist across apps
for p in group.get_players():
p.participant.vars["audio_gate_failed"] = group.audio_gate_failed
#@staticmethod
#def app_after_this_page(player, upcoming_apps):
# # Redirect BOTH after both have answered
# if player.group.audio_gate_failed:
# return upcoming_apps[-1]
class AudioGateRoute(Page):
template_name = "global/Page.html"
timeout_seconds=1
timeout_submission = {}
@staticmethod
def is_displayed(player):
return player.round_number == 1
@staticmethod
def app_after_this_page(player, upcoming_apps):
if player.group.audio_gate_failed:
return upcoming_apps[-1] # debrief app (last)
return None
class DiscussionConditionInfo(Page):
template_name = "pacman_demo_1scale/ConditionAssign.html"
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
return {"discussion_condition": player.participant.vars["discussion_condition"]}
class TrialWait(WaitPage):
body_text = "Waiting for your partner to be ready..."
@staticmethod
def after_all_players_arrive(group: Group):
p1 = group.get_players()[0]
round_idx = p1.round_number - 1
# reset shared start (control/sync)
start_row = 6
start_col = p1.participant.vars["pac_list"][round_idx]
group.shared_row = start_row
group.shared_col = start_col
# reset parallel starts (duplicated)
group.p1_row = start_row
group.p1_col = start_col
group.p2_row = start_row
group.p2_col = start_col
# reset done flags each round
group.p1_done = False
group.p2_done = False
# store target idx for this round so both see same target
group.target_idx = int(p1.participant.vars["target_idx_by_round"][round_idx])
class Pacman_prac(Page):
template_name = "pacman_demo_1scale/Pacman_prac.html"
timeout_seconds = 120
@staticmethod
def is_displayed(player: Player):
return player.round_number == 1
@staticmethod
def vars_for_template(player: Player):
return {"pac_column": 7, "maze": Constants.maze_ind}
class Pacman_ind(Page):
template_name = "pacman_demo_1scale/Pacman_ind.html"
timeout_seconds = 120
@staticmethod
def is_displayed(player: Player):
return player.round_number > 4
@staticmethod
def vars_for_template(player: Player):
round_idx = player.round_number - 1
pair = player.participant.vars["color_pairs"][round_idx]
p_label = player.participant.p_label
log_event(player, "ind_phase_start")
presentation_no = 6
player.participant.vars[f"pacman_ind_start_{player.round_number}"] = time.time()
vh_raw, vl_raw = player.participant.vars[
"p1_array" if p_label == "p1" else "p2_array"
][round_idx]
half_n = int(presentation_no / 2)
high_stream = [vh_raw for _ in range(half_n)]
low_stream = [vl_raw for _ in range(half_n)]
value_mapping = player.participant.vars["val_array"][round_idx]
if value_mapping == 0:
option1_stream = high_stream
option2_stream = low_stream
else:
option1_stream = low_stream
option2_stream = high_stream
player.participant.vars[f"noisy_high_round_{round_idx}"] = high_stream
player.participant.vars[f"noisy_low_round_{round_idx}"] = low_stream
player.participant.vars[f"presentation_no_round_{round_idx}"] = presentation_no
return {
"option1_color": pair[0],
"option2_color": pair[1],
"round_number": player.round_number,
"pac_column": player.participant.vars["pac_list"][round_idx],
"high_stream": json.dumps(high_stream),
"low_stream": json.dumps(low_stream),
"option1_stream": json.dumps(option1_stream),
"option2_stream": json.dumps(option2_stream),
"val_mapping": value_mapping,
"same_belief_trial": player.participant.vars["same_belief_array"][round_idx],
"display_value": player.participant.vars["display_values"][round_idx],
"column_number": player.participant.vars["column_list"][round_idx],
"presentation_no": presentation_no,
"p_label": player.participant.p_label,
"maze": Constants.maze_ind,
}
@staticmethod
def before_next_page(player: Player, timeout_happened):
round_idx = player.round_number - 1
pair = player.participant.vars["color_pairs"][round_idx]
condition = player.participant.vars["condition_array"][round_idx]
p_label = player.participant.p_label
vh_raw, vl_raw = player.participant.vars[
"p1_array" if p_label == "p1" else "p2_array"
][round_idx]
high_stream = player.participant.vars[f"noisy_high_round_{round_idx}"]
low_stream = player.participant.vars[f"noisy_low_round_{round_idx}"]
player.discussion_condition = player.participant.vars["discussion_condition"]
player.assigned_color_pair = json.dumps(pair)
player.sync_condition = condition
player.assigned_p1 = str(vh_raw)
player.assigned_p2 = str(vl_raw)
player.noisy_high_stream = json.dumps(high_stream)
player.noisy_low_stream = json.dumps(low_stream)
player.value_mapping = player.participant.vars["val_array"][round_idx]
player.same_belief_trial = player.participant.vars["same_belief_array"][round_idx]
player.pac_column = player.participant.vars["pac_list"][round_idx]
player.noise_value = player.participant.vars["noises"][round_idx]
player.display_value = player.participant.vars["display_values"][round_idx]
player.presentation_no = player.participant.vars[f"presentation_no_round_{round_idx}"]
start = player.participant.vars.get(f"pacman_ind_start_{player.round_number}")
if start:
player.pacman_ind_rt = time.time() - start
log_event(player, "ind_phase_end")
# ==========================================================
# VALUATION: single bipolar slider (rel_value_ind)
# ==========================================================
class Valuation_ind(Page):
template_name = "pacman_demo_1scale/Valuation_ind.html"
timeout_seconds = 120
form_model = "player"
form_fields = ["rel_value_ind"]
@staticmethod
def is_displayed(player: Player):
# keep your existing display rule, but don't show if dyad already flagged
return player.round_number > 4
@staticmethod
def vars_for_template(player: Player):
round_idx = player.round_number - 1
pair = player.participant.vars["color_pairs"][round_idx]
log_event(player, "ind_rating_start")
player.participant.vars[f"valuation_ind_start_{player.round_number}"] = time.time()
return {
"option1_color": pair[0],
"option2_color": pair[1],
"round_number": player.round_number,
"phase": "individual",
"p_label": player.participant.p_label,
}
@staticmethod
def before_next_page(player: Player, timeout_happened):
# NEW: if this page times out, mark dyad dropout
#if timeout_happened:
# player.group.is_dropout = True
# player.participant.vars["is_dropout"] = player.group.is_dropout
start = player.participant.vars.get(f"valuation_ind_start_{player.round_number}")
if start:
player.valuation_ind_rt = time.time() - start
if player.rel_value_ind is None:
player.rel_value_ind = 0
log_event(player, "ind_rating_end")
#@staticmethod
#def app_after_this_page(player: Player, upcoming_apps):
# NEW: if dyad dropout (or audio gate failed earlier), jump to debrief app
#if player.group.is_dropout:
# return upcoming_apps[-1]
class AdvanceDiscussion(Page):
template_name = "pacman_demo_1scale/AdvanceDiscussion.html"
timeout_seconds = 60
@staticmethod
def is_displayed(player: Player):
return player.participant.vars["discussion_condition"] == "advance"
class Pacman_joint(Page):
live_page = True
template_name = "pacman_demo_1scale/Pacman_joint.html"
timeout_seconds = 60
live_method = live_method
@staticmethod
def is_displayed(player: Player):
return True
@staticmethod
def vars_for_template(player: Player):
round_idx = player.round_number - 1
pair = player.participant.vars["color_pairs"][round_idx]
condition = player.participant.vars["condition_array"][round_idx]
#condition = "parallel"
is_practice = player.round_number <= 4
#is_practice = False
# IMPORTANT: keep the "which fruit is the target" logic the same across conditions.
# We now store target_idx on the group in TrialWait, so everyone sees the same.
target_idx = player.group.target_idx
target_color = pair[target_idx]
log_event(player, "joint_phase_start")
player.participant.vars[f"pacman_joint_start_{player.round_number}"] = time.time()
# For control/sync: ensure shared start uses pac_list
player.group.shared_col = player.participant.vars["pac_list"][round_idx]
# For parallel: positions/fruits are already set in TrialWait (group fields)
return {
"option1_color": pair[0],
"option2_color": pair[1],
"column_number": player.participant.vars["column_list"][round_idx],
"pac_column": player.participant.vars["pac_list"][round_idx],
"round_number": player.round_number,
"maze": Constants.maze_joint,
"p_label": player.participant.p_label,
"sync_condition": condition,
"metronome_value": player.participant.vars["metronomes"][round_idx],
"metronome_condition": player.participant.vars["metronome_condition"][round_idx],
"sync_window": player.participant.vars["sync_windows"][round_idx],
"target_color": target_color,
"is_practice": is_practice,
# NEW: give JS everything it needs to draw parallel side-by-side
"p1_row": player.group.p1_row,
"p1_col": player.group.p1_col,
"p2_row": player.group.p2_row,
"p2_col": player.group.p2_col,
"p1_done": player.group.p1_done,
"p2_done": player.group.p2_done,
}
@staticmethod
def before_next_page(player: Player, timeout_happened):
round_idx = player.round_number - 1
player.metronome_value = player.participant.vars["metronomes"][round_idx]
player.metronome_condition = player.participant.vars["metronome_condition"][round_idx]
player.sync_window = player.participant.vars["sync_windows"][round_idx]
player.keypress_log_joint = json.dumps(player.participant.vars.get("keypress_log", []))
sync_attempts = player.participant.vars.get("sync_attempts", {}).get(player.round_number, [])
player.sync_attempts_log = json.dumps(sync_attempts)
start = player.participant.vars.get(f"pacman_joint_start_{player.round_number}")
if start:
player.pacman_joint_rt = time.time() - start
log_event(player, "joint_phase_end")
class OutcomePage(Page):
template_name = "pacman_demo_1scale/OutcomePage.html"
@staticmethod
def is_displayed(player: Player):
return True
@staticmethod
def vars_for_template(player: Player):
outcome = player.field_maybe_none("round_outcome")
if outcome is None:
outcome = "timeout"
player.round_outcome = outcome
log_event(player, f"joint_outcome_{outcome}")
return {"outcome": outcome}
class Valuation_joint(Page):
template_name = "pacman_demo_1scale/Valuation_joint.html"
timeout_seconds = 120
form_model = "player"
form_fields = ["rel_value_joint"]
@staticmethod
def is_displayed(player: Player):
# keep your existing display rule, but don't show if dyad already flagged
return player.round_number > 4
@staticmethod
def vars_for_template(player: Player):
round_idx = player.round_number - 1
pair = player.participant.vars["color_pairs"][round_idx]
log_event(player, "joint_rating_start")
player.participant.vars[f"valuation_joint_start_{player.round_number}"] = time.time()
return {
"option1_color": pair[0],
"option2_color": pair[1],
"round_number": player.round_number,
"phase": "joint",
"p_label": player.participant.p_label,
}
@staticmethod
def before_next_page(player: Player, timeout_happened):
# NEW: if this page times out, mark dyad dropout
#if timeout_happened:
# player.group.is_dropout = True
# player.participant.vars["is_dropout"] = player.group.is_dropout
start = player.participant.vars.get(f"valuation_joint_start_{player.round_number}")
if start:
player.valuation_joint_rt = time.time() - start
if player.rel_value_joint is None:
player.rel_value_joint = 0
log_event(player, "joint_rating_end")
#@staticmethod
#def app_after_this_page(player: Player, upcoming_apps):
# # NEW: if dyad dropout (or audio gate failed earlier), jump to debrief app
# if player.group.is_dropout:
# return upcoming_apps[-1] # assumes debrief is last app; adjust if not
class SaveKeypressLog(Page):
timeout_seconds = 1
template_name = "global/Page.html"
@staticmethod
def before_next_page(player: Player, timeout_happened):
if player.round_number == 10:
player.participant.vars["enough_rounds"]=True
all_presses = player.participant.vars.get("keypress_log", [])
round_presses = [entry for entry in all_presses if entry["round"] == player.round_number]
player.keypress_data = json.dumps(round_presses)
if player.round_number == Constants.num_rounds:
events = player.participant.vars.get("event_log", [])
player.event_timeline = json.dumps(events)
class BetweenBlock(Page):
template_name = "pacman_demo_1scale/BetweenBlock.html"
timeout_seconds = 30 # optional: force a short pause so they actually see it
@staticmethod
def is_displayed(player: Player):
num_practice = 4
trials_per_block = 6
num_test_trials = 18
# only after practice
if player.round_number <= num_practice:
return False
# first round of each 4-trial test block:
# test blocks start at rounds 5, 9, 13, 17, 21, 25 (for 4 practice + 24 test)
first_test_round = num_practice + 1
is_block_start = ((player.round_number - first_test_round) % trials_per_block) == 0
# safety: don’t show beyond test trials
last_test_round = num_practice + num_test_trials
return is_block_start and (player.round_number <= last_test_round)
@staticmethod
def vars_for_template(player: Player):
num_practice = 4
trials_per_block = 6
first_test_round = num_practice + 1
block_index0 = (player.round_number - first_test_round) // trials_per_block
block_no = block_index0 + 1 # 1..6
total_blocks = 3
# upcoming condition for this round/block
condition = player.participant.vars["condition_array"][player.round_number - 1]
titles = {
"sync": "Synchrony block",
"control": "Shared-control block",
"parallel": "Parallel block",
}
reminders = {
"sync": "You may talk to coordinate on the fruit. Pac-Man moves only if you both press the SAME key at (around) the SAME time.",
"control": "You may talk to coordinate on the fruit. Pac-Man moves with either player’s keypress (no synchrony needed).",
"parallel": "You must not talk to each other. You each control your own Pac-Man.",
}
return dict(
block_no=block_no,
total_blocks=total_blocks,
condition=condition,
block_title=titles.get(condition, "New block"),
reminder=reminders.get(condition, ""),
)
page_sequence = [
StartWait,
#TechCheck,
TwilioVideoRoom,
PartnerVideoWait,
AudioGate,
AudioGateWait,
AudioGateRoute,
TrialWait,
Pacman_prac,
BetweenBlock,
Pacman_ind,
Valuation_ind,
# AdvanceDiscussion,
TrialWait,
Pacman_joint,
OutcomePage,
SaveKeypressLog,
Valuation_joint,
]