#!/usr/bin/env python3 import argparse import re import sqlite3 import time from pathlib import Path import pikepdf from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright ROOT = Path(__file__).resolve().parent DB_PATH = ROOT / "db.sqlite3" def latest_participant_code(session_id: int | None = None) -> str: conn = sqlite3.connect(DB_PATH) cur = conn.cursor() if session_id is None: cur.execute("SELECT id FROM otree_session ORDER BY id DESC LIMIT 1") row = cur.fetchone() if not row: raise RuntimeError("No oTree session found. Create a session first.") session_id = row[0] cur.execute( """ SELECT code FROM otree_participant WHERE session_id = ? ORDER BY id_in_session ASC LIMIT 1 """, (session_id,), ) p = cur.fetchone() conn.close() if not p: raise RuntimeError("No participants found in selected session.") return p[0] def slugify(value: str) -> str: value = value.strip().lower() value = re.sub(r"[^a-z0-9]+", "_", value) return value.strip("_") or "page" def fill_fields(page): page.evaluate( """ () => { document.querySelectorAll('input[type="checkbox"]').forEach(el => { if (!el.checked) el.click(); }); document.querySelectorAll('textarea').forEach(el => { if (!el.value) el.value = 'N/A'; }); document.querySelectorAll('input[type="text"]').forEach(el => { if (!el.value) el.value = 'worker_12345'; }); const known = { quiz_q1: '1', quiz_q2: '3', quiz_q3: '2', quiz_q4: '3', quiz_q5: '3', belief_q1: '4', belief_q2: '1', belief_q3: '3', attention_check: '1', }; Object.entries(known).forEach(([name, val]) => { const target = document.querySelector(`[name="${name}"]`); if (!target) return; if (target.tagName === 'SELECT') { target.value = val; target.dispatchEvent(new Event('change', { bubbles: true })); } }); document.querySelectorAll('input[type="number"]').forEach(el => { if (!el.value) { const min = el.min === '' ? 0 : Number(el.min); const max = el.max === '' ? 10 : Number(el.max); const v = Number.isFinite(max) ? Math.max(min, Math.min(max, 5)) : 5; el.value = String(v); } el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); }); document.querySelectorAll('input[type="range"]').forEach(el => { const min = el.min === '' ? 0 : Number(el.min); const max = el.max === '' ? 10 : Number(el.max); const v = Number.isFinite(max) ? Math.max(min, Math.min(max, 5)) : 5; el.value = String(v); el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); }); const groups = {}; document.querySelectorAll('input[type="radio"]').forEach(el => { if (el.name) groups[el.name] = groups[el.name] || []; if (el.name) groups[el.name].push(el); }); Object.values(groups).forEach(group => { const checked = group.find(el => el.checked); if (!checked && group.length) { const name = group[0].name; const forced = known[name]; if (forced) { const picked = group.find(el => String(el.value) === forced); (picked || group[0]).click(); } else { group[0].click(); } } }); document.querySelectorAll('select').forEach(el => { if (!el.value || el.value === '') { const opt = Array.from(el.options).find(o => o.value !== ''); if (opt) { el.value = opt.value; el.dispatchEvent(new Event('change', { bubbles: true })); } } }); const startBtn = document.querySelector('#button_start'); if (startBtn) startBtn.disabled = false; } """ ) def hide_debug_blocks(page): page.evaluate( """ () => { const labels = ['Debug info', 'Basic info', 'Session code', 'Participant label']; const hasLabel = (text) => labels.some(l => (text || '').includes(l)); document.querySelectorAll('table').forEach(tbl => { const txt = (tbl.innerText || '').trim(); if (hasLabel(txt)) tbl.remove(); }); document.querySelectorAll('th, td, div, p, h4, h5, strong').forEach(el => { const txt = (el.textContent || '').trim(); if (txt === 'Debug info' || txt === 'Basic info') { const block = el.closest('table, .card, .panel, .container, .otree-body'); if (block && block.tagName !== 'BODY') block.remove(); } }); } """ ) def click_visible(page, selector: str) -> bool: locator = page.locator(selector) count = locator.count() for i in range(count): item = locator.nth(i) if item.is_visible() and item.is_enabled(): item.click() return True return False def capture_page_pdf(page, out_path: Path): dims = page.evaluate( """ () => { const w = Math.max( document.documentElement.scrollWidth || 0, document.body ? document.body.scrollWidth : 0, 1200 ); const h = Math.max( document.documentElement.scrollHeight || 0, document.body ? document.body.scrollHeight : 0, 1200 ); return {w, h}; } """ ) width_px = int(min(max(dims["w"], 1000), 2000)) height_px = int(min(max(dims["h"], 1200), 12000)) page.pdf( path=str(out_path), print_background=True, width=f"{width_px}px", height=f"{height_px}px", margin={"top": "0px", "right": "0px", "bottom": "0px", "left": "0px"}, ) def merge_pdfs(parts: list[Path], output: Path): with pikepdf.Pdf.new() as merged: for part in parts: with pikepdf.Pdf.open(part) as src: merged.pages.extend(src.pages) merged.save(output) def main(): parser = argparse.ArgumentParser() parser.add_argument("--participant-code", default="") parser.add_argument("--session-id", type=int, default=0) parser.add_argument("--output", required=True) parser.add_argument("--keep-parts", action="store_true") args = parser.parse_args() participant_code = ( args.participant_code if args.participant_code else latest_participant_code(args.session_id or None) ) output = (ROOT / args.output).resolve() parts_dir = output.with_suffix("") parts_dir.mkdir(exist_ok=True, parents=True) for p in parts_dir.glob("*.pdf"): p.unlink() url = f"http://127.0.0.1:8000/InitializeParticipant/{participant_code}" print(f"Using participant URL: {url}") with sync_playwright() as pw: browser = pw.chromium.launch(headless=True) context = browser.new_context(viewport={"width": 1440, "height": 2200}) page = context.new_page() page.goto(url, wait_until="domcontentloaded", timeout=60000) captures = [] snapshots = set() stagnation = 0 step = 1 while step <= 220 and stagnation < 14: try: page.wait_for_load_state("networkidle", timeout=1200) except PlaywrightTimeoutError: pass time.sleep(0.2) hide_debug_blocks(page) title = slugify(page.title() or "untitled") path = slugify(page.url.split("?")[0].replace("http://127.0.0.1:8000/", "")) body = page.locator("body").inner_text(timeout=1000)[:500] key = f"{path}|{title}|{body}" if key not in snapshots: file_name = f"{step:03d}_{path}_{title}.pdf" out_path = parts_dir / file_name capture_page_pdf(page, out_path) captures.append(out_path) print(f"Captured {file_name}") snapshots.add(key) step += 1 stagnation = 0 else: stagnation += 1 fill_fields(page) clicked = False if click_visible(page, "#submitBtn"): clicked = True elif click_visible(page, "button[type='submit']"): clicked = True elif click_visible(page, "button.otree-btn-next"): clicked = True elif click_visible(page, "button.btn-primary"): clicked = True elif click_visible(page, "a.btn-primary"): clicked = True elif click_visible(page, "button:has-text('Next')"): clicked = True elif click_visible(page, "button:has-text('next')"): clicked = True elif click_visible(page, "button:has-text('Submit')"): clicked = True elif click_visible(page, "button:has-text('Start Task')"): clicked = True elif click_visible(page, "#button_start"): clicked = True elif click_visible(page, "input[type='submit']"): clicked = True if clicked: page.wait_for_timeout(500) browser.close() merge_pdfs(captures, output) print(f"Merged {len(captures)} pages into {output}") if not args.keep_parts: for p in captures: p.unlink(missing_ok=True) parts_dir.rmdir() if __name__ == "__main__": main()