import time, sys, requests
from html.parser import HTMLParser
class FormFieldParser(HTMLParser):
def __init__(self):
super().__init__()
self.hidden_fields = {}
self.visible_fields = {}
self._in_select = None
self._in_textarea = None
self._textarea_value = ""
self._select_options = []
def handle_starttag(self, tag, attrs):
ad = dict(attrs)
tl = tag.lower()
if tl == "input":
name = ad.get("name")
if not name: return
it = ad.get("type", "text").lower()
val = ad.get("value", "")
if it == "hidden": self.hidden_fields[name] = val
elif it in ("submit","button","image","file"): pass
elif it == "checkbox": self.visible_fields[name] = {"type":"checkbox","value":val or "on"}
elif it == "radio":
if name not in self.visible_fields: self.visible_fields[name] = {"type":"radio","value":val or "1"}
elif it == "number": self.visible_fields[name] = {"type":"number","value":val}
else: self.visible_fields[name] = {"type":it,"value":val}
elif tl == "select":
name = ad.get("name")
if name: self._in_select = name; self._select_options = []
elif tl == "option" and self._in_select:
val = ad.get("value", "")
if val: self._select_options.append(val)
elif tl == "textarea":
name = ad.get("name")
if name: self._in_textarea = name; self._textarea_value = ""
def handle_data(self, data):
if self._in_textarea is not None: self._textarea_value += data
def handle_endtag(self, tag):
tl = tag.lower()
if tl == "select" and self._in_select:
name = self._in_select
if self._select_options: self.visible_fields[name] = {"type":"select","value":self._select_options[0]}
else: self.visible_fields[name] = {"type":"select","value":"1"}
self._in_select = None; self._select_options = []
elif tl == "textarea" and self._in_textarea is not None:
name = self._in_textarea
self.visible_fields[name] = {"type":"textarea","value":self._textarea_value.strip()}
self._in_textarea = None
def parse_form_fields(html_text):
parser = FormFieldParser()
parser.feed(html_text)
return parser.hidden_fields, parser.visible_fields
def build_post_data(hf, vf):
data = dict(hf)
for name, info in vf.items():
ft = info["type"]; ev = info["value"]
if ev: data[name] = ev
elif ft in ("number","radio"): data[name] = "1"
elif ft == "textarea": data[name] = "test"
elif ft == "checkbox": data[name] = info["value"]
elif ft == "select": data[name] = info["value"]
else: data[name] = "test"
return data
KEY_PAGES = ("Cases","ReviewStatements","Feedback","Redirect","Consent","Instructions","Demographics")
MAX_PAGES = 150
DELAY = 0.2
MAX_RETRIES = 3
START_URL = "http://localhost:8000/p/gcya43h4/welfare_study/Consent/3"
def page_label(url):
parts = url.rstrip("/").split("/")
try:
idx = parts.index("welfare_study")
return "/".join(parts[idx+1:])
except ValueError: return url.split("/")[-1] or url
def is_key_page(url):
for kw in KEY_PAGES:
if kw in url: return True
return False
def advance_participant():
session = requests.Session()
current_url = START_URL
page_count = 0
stuck_count = 0
print(f"Starting at: {current_url}")
print(f"Max pages: {MAX_PAGES}, delay: {DELAY}s")
print("-" * 60)
while page_count < MAX_PAGES:
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = session.get(current_url, timeout=15)
if resp.status_code == 500:
print(f" WARNING: 500 error on {current_url}")
print(f" Response snippet: {resp.text[:200]}")
# Still try to proceed
break
except (requests.ConnectionError, requests.Timeout) as e:
print(f" [retry {attempt}/{MAX_RETRIES}] GET failed: {e}")
if attempt == MAX_RETRIES:
print("FAILURE: could not connect after retries.")
return False, page_count
time.sleep(1)
html = resp.text
landed_url = resp.url
if "Redirect" in landed_url:
label = page_label(landed_url)
print(f" Page {page_count}: -> {label} *** REACHED REDIRECT ***")
print("-" * 60)
print(f"Total pages traversed: {page_count}")
print("SUCCESS")
return True, page_count
hidden, visible = parse_form_fields(html)
label = page_label(landed_url)
if page_count % 20 == 0 or is_key_page(landed_url):
vis_names = list(visible.keys())[:5]
print(f" Page {page_count}: {label} (hidden={len(hidden)}, visible={len(visible)}: {vis_names})")
post_data = build_post_data(hidden, visible)
for attempt in range(1, MAX_RETRIES + 1):
try:
post_resp = session.post(landed_url, data=post_data, timeout=15, allow_redirects=True)
break
except (requests.ConnectionError, requests.Timeout) as e:
print(f" [retry {attempt}/{MAX_RETRIES}] POST failed: {e}")
if attempt == MAX_RETRIES:
print("FAILURE: could not connect after retries.")
return False, page_count
time.sleep(1)
new_url = post_resp.url
if new_url == landed_url:
stuck_count += 1
if stuck_count == 1:
post_data_minimal = dict(hidden)
try:
post_resp = session.post(landed_url, data=post_data_minimal, timeout=15, allow_redirects=True)
new_url = post_resp.url
except (requests.ConnectionError, requests.Timeout): pass
if new_url == landed_url and stuck_count >= 3:
print(f" STUCK on {label} after {stuck_count} attempts. Trying empty POST...")
csrf = hidden.get("csrfmiddlewaretoken", "")
if csrf:
try:
post_resp = session.post(landed_url, data={"csrfmiddlewaretoken": csrf}, timeout=15, allow_redirects=True)
new_url = post_resp.url
except (requests.ConnectionError, requests.Timeout): pass
if new_url == landed_url and stuck_count >= 5:
print(f" FAILURE: stuck on {label} for {stuck_count} consecutive attempts.")
print(f"Total pages traversed: {page_count}")
print("FAILURE")
return False, page_count
else:
stuck_count = 0
current_url = new_url
page_count += 1
time.sleep(DELAY)
print(f"Total pages traversed: {page_count}")
print("FAILURE: reached max page limit.")
return False, page_count
if __name__ == "__main__":
success, count = advance_participant()
sys.exit(0 if success else 1)