from collections import defaultdict from otree.api import * import json import time import random doc = """ Multilateral bargaining experiment involving bots and humans with a customizable game board and negotiation logic. """ POSITION_MAP = { 1: {"bottom": 1, "left": 3, "right": 2}, 2: {"bottom": 2, "left": 1, "right": 3}, 3: {"bottom": 3, "left": 2, "right": 1}, } # --- Group-level treatment cycling --- TREATMENT_ORDER = ["AAH", "AHA", "HAA", "HHA", "AHH", "HAH", "AAA", "HHH"] TREATMENT_AI_AGENT = "AI Agent" TREATMENT_AI_DELEGATION = "AI Delegation" DEFAULT_TREATMENT = TREATMENT_AI_DELEGATION def assign_instruction_treatment(player: "Player"): """ Fallback assignment for sessions that start directly in triangle_bargain (e.g., test configs that skip the Introduction app). """ if player.participant.vars.get("treatment"): return player.participant.vars["treatment"] block_index = (player.id_in_subsession - 1) // 3 treatment = ( TREATMENT_AI_AGENT if block_index % 2 == 0 else TREATMENT_AI_DELEGATION ) player.participant.vars["treatment"] = treatment player.participant.treatment = treatment return treatment def pattern_to_identities(pattern): """Map 'H'/'A' to 'Human'/'AI' for 3 players by id_in_group order 1..3.""" mapping = {"H": "Human", "A": "AI"} return [mapping[ch] for ch in pattern] # length 3 class Constants(BaseConstants): name_in_url = "botandhumanarguing" players_per_group = 3 num_rounds = 10 total_time = 120 # Total game time in seconds coalition_time_threshold = ( 10 # Time (in seconds) a coalition must last to end the game ) initial_money = 0.255 # Starting each point worth in Euros def timer(player, seconds): """Return seconds if timers are enabled, else None.""" return seconds if player.session.config.get('enable_timers', False) else None class Subsession(BaseSubsession): def creating_session(subsession): s = subsession.session # Initialize treatment list & index ONLY once (round 1, first time) if subsession.round_number == 1: for p in subsession.get_players(): assign_instruction_treatment(p) # If user provides a custom order in session config, use it order = s.config.get("treatment_order") or TREATMENT_ORDER # Do not overwrite if already set (e.g., multi-app sequence or reload) if "treatments" not in s.vars: s.vars["treatments"] = list(order) if "treatment_idx_by_instruction" not in s.vars: s.vars["treatment_idx_by_instruction"] = {} def group_by_arrival_time_method(subsession, waiting_players): import time now = time.time() # Stamp first arrival ONLY ONCE for p in waiting_players: p.participant.vars.setdefault('wait_page_arrival', now) assign_instruction_treatment(p) # Build treatment buckets and only group players within the same treatment arm. waiting_by_treatment = defaultdict(list) for p in waiting_players: treatment = p.participant.vars.get("treatment", DEFAULT_TREATMENT) waiting_by_treatment[treatment].append(p) # Preserve arrival priority: first waiting player's treatment gets checked first. for p in waiting_players: treatment = p.participant.vars.get("treatment", DEFAULT_TREATMENT) if len(waiting_by_treatment[treatment]) >= 3: return waiting_by_treatment[treatment][:3] for p in waiting_players: t0 = p.participant.vars['wait_page_arrival'] if now - t0 > 420: p.participant.vars['timeout'] = True return [p] class Group(BaseGroup): timer_start = models.FloatField(initial=0) early_end = models.BooleanField(initial=False) cell_states = models.LongStringField( initial=json.dumps({}), doc="JSON string representing the states of each cell." ) cell_history = models.LongStringField( initial=json.dumps([]), doc="List of all cell_states over time with timestamps." ) coalition = models.BooleanField(initial=False, doc="Tracks coalition state.") coalition_start_time = models.FloatField( doc="Timestamp when the coalition was first detected." ) end_game_time = models.FloatField( doc="Timestamp when the game was ended", null=True ) end_game_reason = models.StringField( blank=True, doc="Why the game ended" ) remaining_money = models.FloatField(initial=0) class Player(BasePlayer): color = models.StringField( doc="The color assigned to the player (e.g., green, blue)." ) identity = models.StringField( doc="assigned: 'Human' or 'AI'" ) made_selection = models.BooleanField( initial=False, doc="Indicates if the player has made a selection." ) was_in_coalition = models.BooleanField(initial=False) n_ai_opponents = models.IntegerField(initial=0) quiz_left_identity = models.StringField( choices=['Human', 'AI'], widget=widgets.RadioSelectHorizontal, label="Left opponent identity" ) quiz_right_identity = models.StringField( choices=['Human', 'AI'], widget=widgets.RadioSelectHorizontal, label="Right opponent identity" ) reinforce_left_identity = models.StringField( choices=['Human', 'AI'], widget=widgets.RadioSelectHorizontal, label="Left opponent identity (reinforcement)" ) reinforce_right_identity = models.StringField( choices=['Human', 'AI'], widget=widgets.RadioSelectHorizontal, label="Right opponent identity (reinforcement)" ) reinforce_failed = models.BooleanField(initial=False) class WaitStartGame(Page): """Wait until everyone finishes the identity preview pages. Converted from WaitPage to a live-method Page so it can detect and propagate early_end if a group member dropped out before arriving here.""" template_name = 'triangle_bargain/Wait_Round.html' @staticmethod def is_displayed(player): return player.round_number == 1 and not ( player.participant.vars.get('dropout') or player.participant.vars.get('early_end') ) @staticmethod def get_timeout_seconds(player): # Hard fallback: must exceed IdentityPreview (60s) + Identity_Preview_Reinforce (90s). return 125 @staticmethod def live_method(player, data): import time group = player.group players = group.get_players() # If any group member has already dropped out, propagate and advance everyone. if any(p.participant.vars.get('early_end') or p.participant.vars.get('dropout') for p in players): group.early_end = True for p in players: p.participant.vars['early_end'] = True return {p.id_in_group: {'advance': True} for p in players} arrival_key = 'arrived_waitstart' if data.get('arrived'): player.participant.vars[arrival_key] = True arrived_count = sum( 1 for p in players if p.participant.vars.get(arrival_key, False) ) if arrived_count == len(players): group.timer_start = time.time() return {p.id_in_group: {'advance': True} for p in players} return {player.id_in_group: {'advance': False, 'waiting': arrived_count}} @staticmethod def before_next_page(player, timeout_happened): if timeout_happened: grp = player.group grp.early_end = True for p in grp.get_players(): p.participant.vars['early_end'] = True player.participant.vars['early_end'] = True @staticmethod def app_after_this_page(player, upcoming_apps): if player.group.early_end or player.participant.vars.get('dropout') or player.participant.vars.get('early_end'): return "End_Page" class IdentityPreview(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def get_timeout_seconds(player): return timer(player, 60) @staticmethod def vars_for_template(player: Player): # Use the same logic as Negotiate to determine left/right opponents. ids = POSITION_MAP[player.id_in_group] grp = player.group left_player = grp.get_player_by_id(ids["left"]) right_player = grp.get_player_by_id(ids["right"]) return dict( left_role=left_player.identity, right_role=right_player.identity, ) class WaitPageGroupInitial(WaitPage): """Round-1 wait page: uses Subsession.group_by_arrival_time_method (PGG-style).""" template_name = 'triangle_bargain/Wait_Arrival.html' group_by_arrival_time = True @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def after_all_players_arrive(group): players = group.get_players() # bail early on solos (timeout pulled out by grouping) if any(p.participant.vars.get('timeout') for p in players): return session = group.session # normal per-group setup color_map = {1: "green", 2: "blue", 3: "purple"} # --- RUNTIME GUARD: initialize if missing (handles old/existing sessions) --- treatments = session.vars.get("treatments") if not treatments: treatments = list(session.config.get("treatment_order") or TREATMENT_ORDER) session.vars["treatments"] = treatments if "treatment_idx_by_instruction" not in session.vars: session.vars["treatment_idx_by_instruction"] = {} group_treatment = players[0].participant.vars.get("treatment", DEFAULT_TREATMENT) # ---- take the next pattern in the session cycle ---- treatments = session.vars["treatments"] idx_by_instruction = session.vars["treatment_idx_by_instruction"] idx = idx_by_instruction.get(group_treatment, 0) % len(treatments) pattern = treatments[idx] # e.g., "HAA" identities = pattern_to_identities(pattern) # ["Human","AI","AI"] # advance the pointer for the next group in this instruction treatment idx_by_instruction[group_treatment] = (idx + 1) % len(treatments) session.vars["treatment_idx_by_instruction"] = idx_by_instruction # apply fixed colors and the chosen identities by id_in_group order 1..3 for p in players: p.color = color_map[p.id_in_group] p.participant.vars.setdefault('color', p.color) p.participant.vars.setdefault('treatment', group_treatment) p.participant.treatment = p.participant.vars['treatment'] # set once; stored in participant.vars so it persists across rounds/apps assigned_identity = identities[p.id_in_group - 1] p.participant.vars['identity'] = assigned_identity p.identity = assigned_identity p.made_selection = False p.was_in_coalition = False # Vars assignment for number of AI in group for survey for p in players: others = [q for q in players if q.id_in_group != p.id_in_group] n_ai_opponents = sum(1 for q in others if q.identity == 'AI') p.participant.vars['n_ai_opponents'] = n_ai_opponents p.participant.vars['has_ai'] = n_ai_opponents > 0 p.participant.has_ai = n_ai_opponents > 0 p.n_ai_opponents = n_ai_opponents @staticmethod def app_after_this_page(player, upcoming_apps): # If Subsession grouping pulled them out for timeout, route them out now. if player.participant.vars.get('timeout') is True: return "End_Page" # arrange human-ai identities each round — custom live-method wait page class AssignRoundIdentities(Page): template_name = 'triangle_bargain/Wait_Round.html' @staticmethod def get_timeout_seconds(player): # Always-on hard fallback: ejects truly disconnected players after 2 minutes. return 120 @staticmethod def is_displayed(player): pv = player.participant.vars return not (pv.get('dropout') or pv.get('early_end')) @staticmethod def live_method(player, data): import time group = player.group players = group.get_players() # If any player in the group was already dropped (e.g. Negotiate timeout), # propagate early_end to all group members so app_after_this_page can route # everyone to End_Page, then advance them all immediately. if any(p.participant.vars.get('early_end') for p in players): group.early_end = True # persist to DB so the next HTTP request sees it for p in players: p.participant.vars['early_end'] = True return {p.id_in_group: {'advance': True} for p in players} # Track arrivals per round (round-specific key avoids carry-over). arrival_key = f'arrived_r{player.round_number}' if data.get('arrived'): player.participant.vars[arrival_key] = True arrived_count = sum( 1 for p in players if p.participant.vars.get(arrival_key, False) ) if arrived_count == len(players): # All group members present: run round setup and signal advance. color_map = {1: "green", 2: "blue", 3: "purple"} for p in players: pv = p.participant.vars p.color = pv.get('color', color_map[p.id_in_group]) p.identity = pv.get('identity') p.made_selection = False p.was_in_coalition = False group.timer_start = time.time() return {p.id_in_group: {'advance': True} for p in players} # Still waiting — report current count so JS can update the display. return {player.id_in_group: {'advance': False, 'waiting': arrived_count}} @staticmethod def before_next_page(player, timeout_happened): if timeout_happened: # Hard timeout fired: a player never arrived. End the group. grp = player.group grp.early_end = True for p in grp.get_players(): p.participant.vars['early_end'] = True # Also update THIS player's own in-memory dict — get_players() returns # different Python objects, so the loop above doesn't update player.participant.vars. player.participant.vars['early_end'] = True else: # Normal advance via live_method signal. Re-read from participant.vars # as a safety net in case live_method ran for a different player. pv = player.participant.vars color_map = {1: "green", 2: "blue", 3: "purple"} player.color = pv.get('color', color_map[player.id_in_group]) player.identity = pv.get('identity') @staticmethod def app_after_this_page(player, upcoming_apps): # Check group-level flag (set on same object in before_next_page) as well as # participant.vars, so routing works even if participant.vars caching is stale. if player.group.early_end or player.participant.vars.get('dropout') or player.participant.vars.get('early_end'): return "End_Page" class EndRouter(Page): """Invisible page that instantly sends early_end/dropout players to End_Page. Placed right after AssignRoundIdentities so it catches players whose early_end flag was set in a previous round (where AssignRoundIdentities was already skipped and its app_after_this_page never fired).""" template_name = 'triangle_bargain/End_Router.html' @staticmethod def is_displayed(player): pv = player.participant.vars return bool(pv.get('early_end') or pv.get('dropout')) @staticmethod def app_after_this_page(player, upcoming_apps): return "End_Page" def parse_coalition(cell_states): return any("red" in z.keys() for z in cell_states.values()) class Negotiate(Page): # Drop out Logic for Individual and Group End @staticmethod def is_displayed(player): pv = player.participant.vars return not (pv.get('dropout') or pv.get('early_end')) @staticmethod def get_timeout_seconds(player): return timer(player, 120) @staticmethod def before_next_page(player, timeout_happened): if timeout_happened and not player.made_selection: # True abandonment: player never interacted. Mark them and end the group. player.participant.vars['dropout'] = True grp = player.group grp.early_end = True for p in grp.get_players(): p.participant.vars['early_end'] = True @staticmethod def app_after_this_page(player, timeout_happened): if player.participant.vars.get('dropout') is True: return "End_Page" if player.participant.vars.get('early_end') is True: return "End_Page" @staticmethod def vars_for_template(player: Player): pv = player.participant.vars if pv.get('color'): player.color = pv['color'] if pv.get('identity'): player.identity = pv['identity'] ids = POSITION_MAP[player.id_in_group] grp = player.group left = grp.get_player_by_id(ids["left"]) right = grp.get_player_by_id(ids["right"]) return dict( left_role=left.field_maybe_none('identity') or 'Human', right_role=right.field_maybe_none('identity') or 'Human', ) @staticmethod def js_vars(player: Player): grp = player.group color_map = {1: "green", 2: "blue", 3: "purple"} return dict( my_color=player.field_maybe_none('color') or color_map[player.id_in_group], game_start=grp.timer_start, # ← per-group timer total_time=Constants.total_time, initial_money=Constants.initial_money, my_id=player.id_in_group, participants_colors={ p.id_in_group: (p.field_maybe_none('color') or color_map[p.id_in_group]) for p in player.group.get_players() }, coalition=grp.coalition, coalitionThreshold=Constants.coalition_time_threshold, ) @staticmethod def live_method(player: Player, data: dict): group = player.group current_time = time.time() # 1) Immediately handle any end_game request: if data.get("end_game"): # a) record final state + event + reason in history final_states = json.loads(group.cell_states) # Identify the final coalition cell (with red) final_cell_pos = next((pos for pos, st in final_states.items() if "red" in st), None) final_coalition_states = final_states.get(final_cell_pos, {}) if final_cell_pos else {} # Record whether each player was part of the coalition for p in group.get_players(): p.was_in_coalition = final_coalition_states.get(p.color, 0) == 1 participants_identities = {p.id_in_group: p.identity for p in player.group.get_players()} history = json.loads(group.cell_history or "[]") history.append({ "timestamp": current_time, "cell_states": final_states, "event": "endgame", "reason": data.get("reason", ""), "all_identities": participants_identities }) group.cell_history = json.dumps(history) # b) stamp metadata group.end_game_time = current_time group.end_game_reason = data.get("reason", "") group.remaining_money = data.get("remaining_money", 0) # c) Compute payoffs for p in group.get_players(): # 1) Find the red-flagged cell (if any) red_pos = next( (pos for pos, st in final_states.items() if "red" in st), None ) if not red_pos: p.payoff = 0 continue row_str, col_str = red_pos.split(",") row, col = int(row_str), int(col_str) # 2) Check player's inclusion in coalition states = final_states[red_pos] if states.get(p.color, 0) != 1: p.payoff = 0 continue # 3) Base payoff if p.id_in_group == 1: base = row - 1 elif p.id_in_group == 2: base = col - 1 else: base = 14 - row - col p.payoff = (base) # d) broadcast the end signal to everyone return { p.id_in_group: { "end_game": True, "reason": group.end_game_reason, "end_live": True, "payoff": p.payoff, } for p in group.get_players() } # 2. move/undo & coalition logic # Load or reset cell_states cell_states = defaultdict( lambda: {"green": 0, "blue": 0, "purple": 0, "order": []}.copy(), json.loads(group.cell_states), ) # Handle undoing if "undo_move" in data: cell_states = json.loads(group.cell_states) # Remove player's selection from all cells for cell in cell_states: cell_states[cell][player.color] = 0 if player.color in cell_states[cell]["order"]: cell_states[cell]["order"].remove(player.color) # Clear coalition flags where fewer than 2 are selected for cell, states in cell_states.items(): selected = sum(states[c] for c in ["green", "blue", "purple"]) if selected < 2: states.pop("red", None) # 1) Persist new state group.cell_states = json.dumps(cell_states) # 2) Recompute coalition flag group.coalition = parse_coalition(cell_states) # 3) Broadcast to everyone payload = {"cell_states": cell_states, "coalition": group.coalition} return {p.id_in_group: payload for p in player.group.get_players()} # Handle move if "move" in data: player.made_selection = True move = f'{data["move"][0]},{data["move"][1]}' participant_color = player.color # Clear previous selections for the participant's color for cell in cell_states.keys(): if participant_color in cell_states[cell]["order"]: cell_states[cell]["order"].remove(participant_color) cell_states[cell][participant_color] = 0 # Update the new cell state with the player's move cell_states[move][participant_color] = 1 cell_states[move]["order"].append(participant_color) # Check for coalition formation selected_colors = [ color for color in ["green", "blue", "purple"] if cell_states[move][color] == 1 ] if len(selected_colors) > 1: cell_states[move]["red"] = 1 # Mark as a coalition else: cell_states[move].pop( "red", None ) # Remove coalition if it no longer exists for cell, states in cell_states.items(): current_selected_colors = [ color for color in ["green", "blue", "purple"] if states[color] == 1 ] if len(current_selected_colors) > 1: states["red"] = 1 else: states.pop("red", None) # Save the updated cell states to the group group.cell_states = json.dumps(cell_states) # Record this move in the history history = json.loads(group.cell_history) history.append({ "timestamp": current_time, "cell_states": cell_states, "event": "move", "actor": { "color": player.color, "id_in_group": player.id_in_group, "identity": player.identity } }) group.cell_history = json.dumps(history) # Coalition parsing and end-game logic if (c2 := parse_coalition(cell_states)) != (c1 := group.coalition): if not c1 and c2: # coalition came into existence group.coalition_start_time = current_time else: # coalition was eviscerated group.coalition_start_time = None group.coalition = c2 # broadcast cell_states & coalition flag return { 0: { "cell_states": cell_states, "coalition": group.coalition, } } import json from otree.api import Page class Results(Page): """Show each player their final payoff and metadata about how the game ended.""" @staticmethod def get_timeout_seconds(player): # Example: 120s normally return timer(player, 60) @staticmethod def is_displayed(player: Player): return not player.participant.vars.get('early_end') @staticmethod def app_after_this_page(player, upcoming_apps): if player.group.early_end or player.participant.vars.get('early_end', False): return "End_Page" @staticmethod def vars_for_template(player: Player): group = player.group # Parse the complete cell history, including the 'endgame' entry history = json.loads(group.cell_history or "[]") round_gain = player.payoff if player.round_number == Constants.num_rounds: session = player.session if player.participant.vars.get("bonus_round") is None: player.participant.vars["bonus_round"] = random.randint(1, Constants.num_rounds) bonus_round = player.participant.vars["bonus_round"] bonus_player = player.in_round(bonus_round) final_gain = bonus_player.payoff * bonus_player .group.remaining_money # Set this as the final oTree payoff player.participant.payoff = final_gain # Save all these vars for later use player.participant.vars["bonus_round"] = bonus_round player.participant.vars["bonus_payoff"] = bonus_player.payoff player.participant.vars["bonus_value"] = bonus_player.group.remaining_money player.participant.vars["total_gain"] = final_gain return dict( payoff=player.payoff, end_time=group.end_game_time, reason=group.end_game_reason, remaining_money=group.remaining_money, gain=round_gain, total_gain=final_gain, was_in_coalition=player.was_in_coalition, ) return dict( payoff=player.payoff, end_time=group.end_game_time, reason=group.end_game_reason, remaining_money=group.remaining_money, gain=round_gain, was_in_coalition=player.was_in_coalition, ) class IdentityQuiz(Page): form_model = 'player' form_fields = ['quiz_left_identity', 'quiz_right_identity'] @staticmethod def get_timeout_seconds(player): return timer(player, 60) @staticmethod def is_displayed(player: Player): # Only once, right after IdentityPreview in round 1 return player.round_number == Constants.num_rounds @staticmethod def before_next_page(player: Player, timeout_happened): # Treat timeout as dropout (optional; remove this block if you don't want that) if timeout_happened: player.participant.vars['dropout'] = True return # Use the same logic as in Negotiate / IdentityPreview to get true identities ids = POSITION_MAP[player.id_in_group] grp = player.group left_player = grp.get_player_by_id(ids["left"]) right_player = grp.get_player_by_id(ids["right"]) correct_left = left_player.identity # 'Human' or 'AI' correct_right = right_player.identity # 'Human' or 'AI' # If any answer is wrong → mark as dropout if ( player.quiz_left_identity != correct_left or player.quiz_right_identity != correct_right ): player.participant.vars['dropout_manip'] = True @staticmethod def app_after_this_page(player: Player, upcoming_apps): # If marked as dropout, send them to End_Page; otherwise continue if ( player.participant.vars.get('dropout') is True or player.participant.vars.get('dropout_manip') is True ): return "End_Page" class Identity_Preview_Reinforce(Page): form_model = 'player' form_fields = ['reinforce_left_identity', 'reinforce_right_identity'] @staticmethod def is_displayed(player: Player): return player.round_number == 1 @staticmethod def get_timeout_seconds(player): return timer(player, 60) @staticmethod def before_next_page(player: Player, timeout_happened): if timeout_happened: # Inactive player → dropout (not paid); group members → early_end (paid). player.participant.vars['dropout'] = True grp = player.group grp.early_end = True for p in grp.get_players(): if p.id_in_group != player.id_in_group: p.participant.vars['early_end'] = True return ids = POSITION_MAP[player.id_in_group] grp = player.group correct_left = grp.get_player_by_id(ids["left"]).identity correct_right = grp.get_player_by_id(ids["right"]).identity # Check if the guess is incorrect if (player.reinforce_left_identity != correct_left or player.reinforce_right_identity != correct_right): player.reinforce_failed = True else: player.reinforce_failed = False @staticmethod def app_after_this_page(player: Player, upcoming_apps): if player.group.early_end or player.participant.vars.get('dropout') or player.participant.vars.get('early_end'): return "End_Page" class Identity_Preview_Correction(Page): @staticmethod def is_displayed(player: Player): return player.round_number == 1 and player.reinforce_failed @staticmethod def get_timeout_seconds(player): return timer(player, 60) @staticmethod def vars_for_template(player: Player): # Use the same logic as Negotiate to determine left/right opponents. ids = POSITION_MAP[player.id_in_group] grp = player.group left_player = grp.get_player_by_id(ids["left"]) right_player = grp.get_player_by_id(ids["right"]) return dict( left_role=left_player.identity, right_role=right_player.identity, ) @staticmethod def app_after_this_page(player: Player, upcoming_apps): if player.group.early_end or player.participant.vars.get('dropout') or player.participant.vars.get('early_end'): return "End_Page" # Define the sequence of pages for this app page_sequence = [WaitPageGroupInitial, AssignRoundIdentities, EndRouter, IdentityPreview, Identity_Preview_Reinforce, Identity_Preview_Correction, WaitStartGame, Negotiate, Results, IdentityQuiz]