from otree.api import * import json import re import math from collections import defaultdict def process_annotations(annotation_data): # Load the JSON data data = json.loads(annotation_data) # Initialize empty lists processed_data = [] # Loop through each annotation for annotation in data: target = annotation.get('target', {}) selector = target.get('selector', {}) # Extract path data from selector if available selector_value = selector.get('value', '') if 'path d="' in selector_value: path_data = selector_value.split('path d="')[1].split('"')[0] processed_data.append({ "type": "path", "path": path_data, }) elif 'xywh=pixel:' in selector_value: parts = selector_value.split('xywh=pixel:')[1].split(',') if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) processed_data.append({ "type": "point", "x": x, "y": y, }) return processed_data def extract_svg_paths(json_str): data = json.loads(json_str) path_ds = [] for annotation in data: target = annotation.get('target', {}) selector = target.get('selector', {}) selector_value = selector.get('value', '') if 'path d="' in selector_value: path_d = selector_value.split('path d="')[1].split('"')[0] path_ds.append(path_d) return path_ds def parse_svg_path(path_string): """Extract raw (x, y) points from SVG and pixel annotations.""" data = json.loads(path_string) coordinates = [] for annotation in data: selector_value = annotation.get("target", {}).get("selector", {}).get("value", "") if 'path d="' in selector_value: coordinates += extract_from_path(selector_value) elif 'xywh=pixel:' in selector_value: coordinates += extract_from_pixel(selector_value) return coordinates def extract_from_path(selector_value): """Handles SVG path extraction (M and L commands only).""" path_data = selector_value.split('path d="')[1].split('"')[0] tokens = re.findall(r'[MLZmlz]|-?\d+\.?\d*', path_data) i = 0 coords = [] prev_x = prev_y = None while i < len(tokens): if tokens[i] in ('M', 'L'): x = float(tokens[i + 1]) y = float(tokens[i + 2]) if tokens[i] == 'M': prev_x, prev_y = x, y elif tokens[i] == 'L' and prev_x is not None: coords.append({'x': prev_x, 'y': prev_y}) coords.append({'x': x, 'y': y}) prev_x, prev_y = x, y i += 3 else: i += 1 return coords def extract_from_pixel(selector_value): parts = selector_value.split('xywh=pixel:')[1].split(',') if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) return [{'x': x, 'y': y}] return [] def find_two_distinct(points, from_start=True): """Find first two points with different x-values, from start or end.""" seen = [] direction = 1 if from_start else -1 idx = 0 while len(seen) < 2 and abs(idx) < len(points): p = points[idx] if from_start else points[-1 + idx] if not seen or p[0] != seen[-1][0]: seen.append(p) idx += direction return seen if len(seen) == 2 else None def interpolate_points(points, step=1): # print(points) interpolated = [] # points.sort(key=lambda p: p[0]) # sort by x value # print(f"Interpolating points: {points}") if len(points) < 2: x, y = points[0] points.append((x+10,y)) points.append((x-10,y)) # print(points) for i in range(1, len(points)): x0, y0 = points[i - 1] x1, y1 = points[i] if x0 == x1: continue x_start, x_end = sorted([x0, x1]) start_x = int(math.floor(x_start / step)) * step end_x = int(math.ceil(x_end / step)) * step # Include the starting point interpolated.append((x0, y0)) # Add interpolated points between for xi in range(start_x, end_x, step): if xi == x0 or xi == x1: continue m = (y1 - y0) / (x1 - x0) yi = y0 + m * (xi - x0) # print(f"Interpolating: x={xi}, y={yi} from ({x0}, {y0}) to ({x1}, {y1})") interpolated.append((xi, yi)) return interpolated def extrapolate_endpoints(points, x_min=62, x_max=890, step=1): """Extend curve to x_min and x_max via linear extrapolation.""" if not points: return [] # Step 1: Sort points by x points.sort(key=lambda p: p[0]) # Step 2: Get two distinct-x points on each end left_pair = find_two_distinct(points, from_start=True) right_pair = find_two_distinct(points, from_start=False) extrapolated = [] if left_pair: (x0, y0), (x1, y1) = left_pair slope = (y1 - y0) / (x1 - x0) if x1 != x0 else 0 y_extrap = y0 + slope * (x_min - x0) extrapolated.append((x_min, y_extrap)) # Interpolate between extrapolated left point and nearest real point x_interp = min(x0, x1) x_start, x_end = sorted([x_interp, x_min]) start_x = int(math.floor(x_start / step)) * step end_x = int(math.ceil(x_end / step)) * step for xi in range(start_x, end_x, step): if xi in (x_interp, x_min): continue m = (y_extrap - y0) / (x_min - x_interp) if (x_min - x_interp) != 0 else 0 yi = y0 + m * (xi - x_interp) extrapolated.append((xi, yi)) if right_pair: (x0, y0), (x1, y1) = right_pair slope = (y1 - y0) / (x1 - x0) if x1 != x0 else 0 y_extrap = y1 + slope * (x_max - x1) extrapolated.append((x_max, y_extrap)) # Interpolate between nearest real point and extrapolated right point x_start, x_end = sorted([x1, x_max]) start_x = int(math.floor(x_start / step)) * step end_x = int(math.ceil(x_end / step)) * step for xi in range(start_x, end_x, step): if xi in (x1, x_max): continue m = (y_extrap - y1) / (x_max - x1) if (x_max - x1) != 0 else 0 yi = y1 + m * (xi - x1) extrapolated.append((xi, yi)) # Step 3: Combine and sort extended_points = points + extrapolated extended_points.sort(key=lambda p: p[0]) # print(f"Extended points: {extended_points}") return extended_points def sort_clean_coordinates(coordinates, min_x, max_x, min_y, max_y, round_x=False): """ - Groups points by x. - If round_x=True, rounds x before grouping to merge near-duplicates. - Averages y-values for each x group. - Normalizes into target coordinate space. """ grouped = defaultdict(list) for point in coordinates: x_key = round(point['x']) if round_x else point['x'] grouped[x_key].append(point['y']) # print(f"Grouped points: {grouped}") averaged_points = [{'x': x, 'y': sum(ys) / len(ys)} for x, ys in grouped.items()] averaged_points.sort(key=lambda p: p['x']) norm_coords = [ [((point['x'] - 62) / (890 - 62) * (max_x - min_x) + min_x), ((-point['y'] + 494) / (494 - 48) * (max_y - min_y) + min_y)] for point in averaged_points ] return norm_coords def process_svg_path(json_str, min_x, max_x, min_y, max_y, round_x=False): """ Extracts, interpolates, extrapolates, and normalizes annotation points. round_x controls whether to merge close x-values (True) or keep exact (False). """ annotations = json.loads(json_str) all_points = [] # Process each annotation separately to preserve path structure for annotation in annotations: selector_value = annotation.get("target", {}).get("selector", {}).get("value", "") annotation_points = [] if 'path d="' in selector_value: annotation_points = [(pt['x'], pt['y']) for pt in extract_from_path(selector_value)] elif "xywh=pixel:" in selector_value: annotation_points = [(pt['x'], pt['y']) for pt in extract_from_pixel(selector_value)] if annotation_points: # Interpolate each annotation separately (preserves path order) interpolated_points = interpolate_points(annotation_points) all_points.extend(interpolated_points) # Sort all interpolated points by x-coordinate for proper curve shape all_points.sort(key=lambda p: p[0]) # Extrapolate once to extend the combined curve to full width all_points = extrapolate_endpoints(all_points) # Keep only visible range all_points = [(x, y) for x, y in all_points if 62 <= x <= 890] # Convert to dicts and average duplicates here (final step) all_points_dict = [{'x': x, 'y': y} for x, y in all_points] return sort_clean_coordinates(all_points_dict, min_x, max_x, min_y, max_y, round_x=round_x) def anno_types(path_string): data = json.loads(path_string) types = [] points = 0 paths = 0 for annotation in data: target = annotation.get('target', {}) selector = target.get('selector', {}) selector_value = selector.get('value', '') if 'path d="' in selector_value: types.append('path') paths += 1 elif 'xywh=pixel:' in selector_value: types.append('point') points += 1 return types, points, paths