""" oTree Experiment Screenshot Tool ================================= Automatically navigates through all pages of your experiment and captures full-page screenshots, then compiles them into a PDF. SETUP: pip3 install playwright Pillow img2pdf python3 -m playwright install chromium USAGE: 1. Start your oTree server or use your Heroku link 2. Go to Sessions -> New Session, create with 1 participant 3. Copy the P1 link and paste it as PARTICIPANT_URL below 4. Run: python3 screenshot_experiment.py OUTPUT: - screenshots/ folder with one PNG per page - experiment_appendix.pdf (all pages compiled) """ import asyncio import re from pathlib import Path from playwright.async_api import async_playwright # ── Configuration ───────────────────────────────────────────────────────────── OUTPUT_DIR = Path("screenshots") PDF_OUTPUT = "experiment_appendix.pdf" # Paste your P1 link here PARTICIPANT_URL = "https://effort-choice-experiment-41f0182cb990.herokuapp.com/InitializeParticipant/okb1og9j" # Page types to screenshot only ONCE (first occurrence only) SCREENSHOT_ONCE = [] #"Matrix", "SliderTask", "slider_task" # Page types to NEVER screenshot SKIP_ALWAYS = [] # Seconds to wait on matrix pages (matrix has 8s timer) TIMED_PAGE_WAIT = 9 # ───────────────────────────────────────────────────────────────────────────── OUTPUT_DIR.mkdir(exist_ok=True) async def screenshot_page(page, name: str, index: int): path = OUTPUT_DIR / f"{index:02d}_{name}.png" await page.wait_for_load_state("networkidle") await asyncio.sleep(1.0) await page.screenshot(path=str(path), full_page=True) print(f" ✓ {path.name}") return str(path) async def click_next(page, wait_for_timer: bool = False, auto_advance: bool = False): """Advance to the next page.""" current_url = page.url if auto_advance: # Slider pages auto-advance by themselves — just wait for URL to change print(f" ⏳ Waiting for auto-advance...") await page.wait_for_url(lambda url: url != current_url, timeout=30000) await page.wait_for_load_state("networkidle") return if wait_for_timer: print(f" ⏳ Waiting {TIMED_PAGE_WAIT}s for timer...") await asyncio.sleep(TIMED_PAGE_WAIT) # Try clicking the button btn = page.locator("button.otree-btn-next").first if await btn.count() > 0: await btn.click() try: await page.wait_for_url(lambda url: url != current_url, timeout=15000) await page.wait_for_load_state("networkidle") return except: pass # Fallback: submit form via JS form = page.locator("form") if await form.count() > 0: await page.evaluate("document.querySelector('form').submit()") try: await page.wait_for_url(lambda url: url != current_url, timeout=15000) await page.wait_for_load_state("networkidle") return except: pass raise RuntimeError("Could not advance to next page.") async def fill_comprehension(page): """Fill the comprehension check with correct answers.""" field = page.locator("[name=comp_decisions]") if await field.count() > 0 and await field.is_visible(): tag = await field.evaluate("el => el.tagName.toLowerCase()") await field.select_option("3") if tag == "select" else await field.fill("3") field = page.locator("[name=comp_scenarios]") if await field.count() > 0 and await field.is_visible(): tag = await field.evaluate("el => el.tagName.toLowerCase()") await field.select_option("1") if tag == "select" else await field.fill("1") for fname in ["comp_priming", "comp_reflexion"]: radio_true = page.locator(f"input[name={fname}][value='True'], input[name={fname}][value='1']").first if await radio_true.count() > 0 and await radio_true.is_visible(): await radio_true.check() else: cb = page.locator(f"input[name={fname}][type=checkbox]").first if await cb.count() > 0 and await cb.is_visible() and not await cb.is_checked(): await cb.check() async def fill_dummy_form(page): url = page.url if "Comprehension" in url or "comprehension" in url: await fill_comprehension(page) return for inp in await page.locator("input[type=number]").all(): if await inp.is_visible() and await inp.is_enabled(): min_val = float(await inp.get_attribute("min") or "0") max_val = float(await inp.get_attribute("max") or "100") await inp.fill(str(int((min_val + max_val) / 2))) for inp in await page.locator("input[type=range]").all(): if await inp.is_visible() and await inp.is_enabled(): min_val = float(await inp.get_attribute("min") or "0") max_val = float(await inp.get_attribute("max") or "100") await inp.fill(str(int((min_val + max_val) / 2))) for inp in await page.locator("input[type=text], textarea").all(): if await inp.is_visible() and await inp.is_enabled(): if not await inp.input_value(): await inp.fill("Test answer") for sel in await page.locator("select").all(): if await sel.is_visible() and await sel.is_enabled(): for opt in await sel.locator("option").all(): val = await opt.get_attribute("value") if val: await sel.select_option(val) break seen_names = set() for radio in await page.locator("input[type=radio]").all(): if await radio.is_visible(): name = await radio.get_attribute("name") if name and name not in seen_names: await radio.check() seen_names.add(name) for cb in await page.locator("input[type=checkbox]").all(): if await cb.is_visible() and not await cb.is_checked(): await cb.check() def is_timed_page(page_type: str) -> bool: """Matrix pages: have a button but need to wait 8s first.""" return "matrix" in page_type.lower() def is_auto_advance_page(page_type: str) -> bool: """Slider pages: no button, auto-advance on their own.""" return any(s in page_type.lower() for s in ["slidertask", "slider_task"]) async def run(): if "REPLACE_ME" in PARTICIPANT_URL: print("❌ Please set PARTICIPANT_URL before running.") return screenshots = [] seen_page_types = set() async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(viewport={"width": 1280, "height": 900}) page = await context.new_page() print(f"\n🚀 Starting from: {PARTICIPANT_URL}\n") await page.goto(PARTICIPANT_URL, timeout=60000) await page.wait_for_load_state("networkidle") print("📸 Taking screenshots...\n") page_index = 1 max_pages = 300 last_url = None stuck_count = 0 while page_index <= max_pages: url = page.url if url == last_url: stuck_count += 1 if stuck_count >= 3: print(f" ⚠ Stuck on {url} — stopping.") break try: await click_next(page) except: break continue else: stuck_count = 0 last_url = url parts = [x for x in url.split("/") if x and not x.startswith("?")] page_name = "_".join(parts[-2:]) if len(parts) >= 2 else parts[-1] if parts else f"page{page_index}" raw_type = "_".join(parts[-2:]) if len(parts) >= 2 else parts[-1] if parts else page_name page_type = re.sub(r'_?\d+$', '', raw_type) skip = any(s.lower() in page_type.lower() for s in SKIP_ALWAYS) once_only = any(s.lower() in page_type.lower() for s in SCREENSHOT_ONCE) already_seen = page_type in seen_page_types if skip: print(f" ✗ Skipping: {page_type}") elif once_only and already_seen: print(f" ↷ Skipping duplicate: {page_type}") else: shot = await screenshot_page(page, page_name, page_index) screenshots.append(shot) seen_page_types.add(page_type) page_index += 1 try: await fill_dummy_form(page) await click_next( page, wait_for_timer=is_timed_page(page_type), auto_advance=is_auto_advance_page(page_type) ) except Exception as e: print(f" ⚠ Could not advance: {e}") break await browser.close() if screenshots: print(f"\n📄 Compiling {len(screenshots)} screenshots into '{PDF_OUTPUT}'...") try: import img2pdf with open(PDF_OUTPUT, "wb") as f: f.write(img2pdf.convert(screenshots)) except Exception: from PIL import Image imgs = [Image.open(s).convert("RGB") for s in screenshots] imgs[0].save(PDF_OUTPUT, save_all=True, append_images=imgs[1:]) print(f"✅ Done! PDF saved at: {PDF_OUTPUT}") else: print("⚠ No screenshots were taken.") if __name__ == "__main__": asyncio.run(run())