import time, sys, requests from html.parser import HTMLParser class FormFieldParser(HTMLParser): def __init__(self): super().__init__() self.hidden_fields = {} self.visible_fields = {} self._in_select = None self._in_textarea = None self._textarea_value = "" self._select_options = [] def handle_starttag(self, tag, attrs): ad = dict(attrs) tl = tag.lower() if tl == "input": name = ad.get("name") if not name: return it = ad.get("type", "text").lower() val = ad.get("value", "") if it == "hidden": self.hidden_fields[name] = val elif it in ("submit","button","image","file"): pass elif it == "checkbox": self.visible_fields[name] = {"type":"checkbox","value":val or "on"} elif it == "radio": if name not in self.visible_fields: self.visible_fields[name] = {"type":"radio","value":val or "1"} elif it == "number": self.visible_fields[name] = {"type":"number","value":val} else: self.visible_fields[name] = {"type":it,"value":val} elif tl == "select": name = ad.get("name") if name: self._in_select = name; self._select_options = [] elif tl == "option" and self._in_select: val = ad.get("value", "") if val: self._select_options.append(val) elif tl == "textarea": name = ad.get("name") if name: self._in_textarea = name; self._textarea_value = "" def handle_data(self, data): if self._in_textarea is not None: self._textarea_value += data def handle_endtag(self, tag): tl = tag.lower() if tl == "select" and self._in_select: name = self._in_select if self._select_options: self.visible_fields[name] = {"type":"select","value":self._select_options[0]} else: self.visible_fields[name] = {"type":"select","value":"1"} self._in_select = None; self._select_options = [] elif tl == "textarea" and self._in_textarea is not None: name = self._in_textarea self.visible_fields[name] = {"type":"textarea","value":self._textarea_value.strip()} self._in_textarea = None def parse_form_fields(html_text): parser = FormFieldParser() parser.feed(html_text) return parser.hidden_fields, parser.visible_fields def build_post_data(hf, vf): data = dict(hf) for name, info in vf.items(): ft = info["type"]; ev = info["value"] if ev: data[name] = ev elif ft in ("number","radio"): data[name] = "1" elif ft == "textarea": data[name] = "test" elif ft == "checkbox": data[name] = info["value"] elif ft == "select": data[name] = info["value"] else: data[name] = "test" return data KEY_PAGES = ("Cases","ReviewStatements","Feedback","Redirect","Consent","Instructions","Demographics") MAX_PAGES = 150 DELAY = 0.2 MAX_RETRIES = 3 START_URL = "http://localhost:8000/p/gcya43h4/welfare_study/Consent/3" def page_label(url): parts = url.rstrip("/").split("/") try: idx = parts.index("welfare_study") return "/".join(parts[idx+1:]) except ValueError: return url.split("/")[-1] or url def is_key_page(url): for kw in KEY_PAGES: if kw in url: return True return False def advance_participant(): session = requests.Session() current_url = START_URL page_count = 0 stuck_count = 0 print(f"Starting at: {current_url}") print(f"Max pages: {MAX_PAGES}, delay: {DELAY}s") print("-" * 60) while page_count < MAX_PAGES: for attempt in range(1, MAX_RETRIES + 1): try: resp = session.get(current_url, timeout=15) if resp.status_code == 500: print(f" WARNING: 500 error on {current_url}") print(f" Response snippet: {resp.text[:200]}") # Still try to proceed break except (requests.ConnectionError, requests.Timeout) as e: print(f" [retry {attempt}/{MAX_RETRIES}] GET failed: {e}") if attempt == MAX_RETRIES: print("FAILURE: could not connect after retries.") return False, page_count time.sleep(1) html = resp.text landed_url = resp.url if "Redirect" in landed_url: label = page_label(landed_url) print(f" Page {page_count}: -> {label} *** REACHED REDIRECT ***") print("-" * 60) print(f"Total pages traversed: {page_count}") print("SUCCESS") return True, page_count hidden, visible = parse_form_fields(html) label = page_label(landed_url) if page_count % 20 == 0 or is_key_page(landed_url): vis_names = list(visible.keys())[:5] print(f" Page {page_count}: {label} (hidden={len(hidden)}, visible={len(visible)}: {vis_names})") post_data = build_post_data(hidden, visible) for attempt in range(1, MAX_RETRIES + 1): try: post_resp = session.post(landed_url, data=post_data, timeout=15, allow_redirects=True) break except (requests.ConnectionError, requests.Timeout) as e: print(f" [retry {attempt}/{MAX_RETRIES}] POST failed: {e}") if attempt == MAX_RETRIES: print("FAILURE: could not connect after retries.") return False, page_count time.sleep(1) new_url = post_resp.url if new_url == landed_url: stuck_count += 1 if stuck_count == 1: post_data_minimal = dict(hidden) try: post_resp = session.post(landed_url, data=post_data_minimal, timeout=15, allow_redirects=True) new_url = post_resp.url except (requests.ConnectionError, requests.Timeout): pass if new_url == landed_url and stuck_count >= 3: print(f" STUCK on {label} after {stuck_count} attempts. Trying empty POST...") csrf = hidden.get("csrfmiddlewaretoken", "") if csrf: try: post_resp = session.post(landed_url, data={"csrfmiddlewaretoken": csrf}, timeout=15, allow_redirects=True) new_url = post_resp.url except (requests.ConnectionError, requests.Timeout): pass if new_url == landed_url and stuck_count >= 5: print(f" FAILURE: stuck on {label} for {stuck_count} consecutive attempts.") print(f"Total pages traversed: {page_count}") print("FAILURE") return False, page_count else: stuck_count = 0 current_url = new_url page_count += 1 time.sleep(DELAY) print(f"Total pages traversed: {page_count}") print("FAILURE: reached max page limit.") return False, page_count if __name__ == "__main__": success, count = advance_participant() sys.exit(0 if success else 1)