"""Generate a Leaflet detector layout map from SUMO net/additional files.""" from __future__ import annotations import argparse import hashlib import json import math import os import xml.etree.ElementTree as ET from collections import defaultdict from pathlib import Path from typing import Dict, List, Sequence, Tuple import yaml WGS84_A = 6378137.0 WGS84_E2 = 0.0066943799901413165 WGS84_EP2 = WGS84_E2 / (1.0 - WGS84_E2) UTM_K0 = 0.9996 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Generate a Leaflet detector layout map.") parser.add_argument( "--config", default="config_sumo_vsl.yaml", help="Project config used to resolve the currently generated corridor detector file.", ) parser.add_argument( "--net-file", default=None, help="Override SUMO net file containing lane shapes and projection metadata.", ) parser.add_argument( "--detector-file", default=None, help="Override SUMO additional file containing inductionLoop detectors.", ) parser.add_argument( "--output", default="results/detector_leaflet_map.html", help="Output HTML path.", ) return parser.parse_args() def resolve_project_path(project_root: Path, path_str: str) -> Path: path = Path(path_str) return path if path.is_absolute() else (project_root / path).resolve() def build_corridor_signature( *, net_path: Path, route_path: Path, corridor_edges: Sequence[str], control_segment_length_m: float, detector_spacing_m: float, detector_start_offset_m: float, ) -> str: payload = { "net_file": str(net_path), "net_mtime_ns": net_path.stat().st_mtime_ns, "route_file": str(route_path), "route_mtime_ns": route_path.stat().st_mtime_ns, "corridor_edges": list(corridor_edges), "control_segment_length_m": control_segment_length_m, "detector_spacing_m": detector_spacing_m, "detector_start_offset_m": detector_start_offset_m, } digest = hashlib.sha1( json.dumps(payload, ensure_ascii=True, sort_keys=True).encode("utf-8") ).hexdigest() return digest[:16] def resolve_generated_inputs(args: argparse.Namespace) -> Tuple[Path, Path]: project_root = Path.cwd() if args.net_file and args.detector_file: return resolve_project_path(project_root, args.net_file), resolve_project_path(project_root, args.detector_file) config_path = resolve_project_path(project_root, args.config) with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) sumo_cfg = config["sumo"] env_cfg = config["environment"] net_path = resolve_project_path(project_root, sumo_cfg["net_file"]) route_path = resolve_project_path(project_root, sumo_cfg["route_file"]) output_root = env_cfg.get("generated_asset_dir", "sumo_resource/generated_corridor") output_dir = resolve_project_path(project_root, output_root) signature = build_corridor_signature( net_path=net_path, route_path=route_path, corridor_edges=env_cfg["control_edges"], control_segment_length_m=float(env_cfg.get("control_segment_length_m", 1000.0)), detector_spacing_m=float(env_cfg.get("detector_spacing_m", 100.0)), detector_start_offset_m=float( env_cfg.get( "detector_start_offset_m", float(env_cfg.get("detector_spacing_m", 100.0)) * 0.5, ) ), ) artifact_dir = output_dir / signature generated_net_path = artifact_dir / "experiment_corridor.net.xml" generated_detector_path = artifact_dir / "experiment_corridor_metrics_il.add.xml" if not generated_net_path.is_file(): raise FileNotFoundError( f"Generated net file not found: {generated_net_path}. " "Run the environment once to materialize corridor assets, or pass --net-file explicitly." ) if not generated_detector_path.is_file(): raise FileNotFoundError( f"Generated detector file not found: {generated_detector_path}. " "Run the environment once to materialize corridor assets, or pass --detector-file explicitly." ) return generated_net_path, generated_detector_path def parse_shape(shape_text: str) -> List[Tuple[float, float]]: points = [] for raw_point in shape_text.strip().split(): x_str, y_str = raw_point.split(",") points.append((float(x_str), float(y_str))) return points def interpolate_along_polyline( shape: Sequence[Tuple[float, float]], distance_m: float, ) -> Tuple[float, float]: if not shape: raise ValueError("shape must not be empty") if len(shape) == 1: return shape[0] segment_lengths = [] total_length = 0.0 for idx in range(len(shape) - 1): x1, y1 = shape[idx] x2, y2 = shape[idx + 1] length = math.hypot(x2 - x1, y2 - y1) segment_lengths.append(length) total_length += length if total_length <= 1e-9: return shape[0] clamped_distance = min(max(distance_m, 0.0), total_length) traversed = 0.0 for idx, seg_length in enumerate(segment_lengths): if seg_length <= 1e-9: continue if traversed + seg_length >= clamped_distance: ratio = (clamped_distance - traversed) / seg_length x1, y1 = shape[idx] x2, y2 = shape[idx + 1] return (x1 + (x2 - x1) * ratio, y1 + (y2 - y1) * ratio) traversed += seg_length return shape[-1] def utm_to_latlon(easting: float, northing: float, zone: int, northern: bool) -> Tuple[float, float]: x = easting - 500000.0 y = northing if not northern: y -= 10000000.0 m = y / UTM_K0 mu = m / ( WGS84_A * ( 1.0 - WGS84_E2 / 4.0 - 3.0 * WGS84_E2**2 / 64.0 - 5.0 * WGS84_E2**3 / 256.0 ) ) e1 = (1.0 - math.sqrt(1.0 - WGS84_E2)) / (1.0 + math.sqrt(1.0 - WGS84_E2)) j1 = 3.0 * e1 / 2.0 - 27.0 * e1**3 / 32.0 j2 = 21.0 * e1**2 / 16.0 - 55.0 * e1**4 / 32.0 j3 = 151.0 * e1**3 / 96.0 j4 = 1097.0 * e1**4 / 512.0 fp = ( mu + j1 * math.sin(2.0 * mu) + j2 * math.sin(4.0 * mu) + j3 * math.sin(6.0 * mu) + j4 * math.sin(8.0 * mu) ) sin_fp = math.sin(fp) cos_fp = math.cos(fp) tan_fp = math.tan(fp) c1 = WGS84_EP2 * cos_fp**2 t1 = tan_fp**2 n1 = WGS84_A / math.sqrt(1.0 - WGS84_E2 * sin_fp**2) r1 = WGS84_A * (1.0 - WGS84_E2) / (1.0 - WGS84_E2 * sin_fp**2) ** 1.5 d = x / (n1 * UTM_K0) q1 = n1 * tan_fp / r1 q2 = d**2 / 2.0 q3 = ( 5.0 + 3.0 * t1 + 10.0 * c1 - 4.0 * c1**2 - 9.0 * WGS84_EP2 ) * d**4 / 24.0 q4 = ( 61.0 + 90.0 * t1 + 298.0 * c1 + 45.0 * t1**2 - 252.0 * WGS84_EP2 - 3.0 * c1**2 ) * d**6 / 720.0 lat = fp - q1 * (q2 - q3 + q4) q5 = d q6 = (1.0 + 2.0 * t1 + c1) * d**3 / 6.0 q7 = ( 5.0 - 2.0 * c1 + 28.0 * t1 - 3.0 * c1**2 + 8.0 * WGS84_EP2 + 24.0 * t1**2 ) * d**5 / 120.0 lon_origin = math.radians((zone - 1) * 6 - 180 + 3) lon = lon_origin + (q5 - q6 + q7) / cos_fp return math.degrees(lat), math.degrees(lon) def parse_projection(location_elem: ET.Element) -> Tuple[float, float, int, bool]: net_offset_x, net_offset_y = [ float(value) for value in location_elem.attrib["netOffset"].split(",") ] proj_parameter = location_elem.attrib.get("projParameter", "") zone = None northern = True for token in proj_parameter.split(): if token.startswith("+zone="): zone = int(token.split("=", 1)[1]) elif token == "+south": northern = False if zone is None: raise ValueError(f"Unable to parse UTM zone from projParameter: {proj_parameter}") return net_offset_x, net_offset_y, zone, northern def load_lane_shapes(net_path: Path) -> Tuple[Dict[str, List[Tuple[float, float]]], Dict[str, List[Tuple[float, float]]], Tuple[float, float, int, bool]]: tree = ET.parse(net_path) root = tree.getroot() location_elem = root.find("location") if location_elem is None: raise ValueError("No element found in net file.") lane_shapes: Dict[str, List[Tuple[float, float]]] = {} edge_shapes: Dict[str, List[Tuple[float, float]]] = {} for edge_elem in root.findall("edge"): edge_id = edge_elem.attrib.get("id", "") if edge_elem.attrib.get("function") == "internal": continue first_lane_shape = None for lane_elem in edge_elem.findall("lane"): lane_id = lane_elem.attrib["id"] shape = parse_shape(lane_elem.attrib["shape"]) lane_shapes[lane_id] = shape if first_lane_shape is None: first_lane_shape = shape if first_lane_shape is not None: edge_shapes[edge_id] = first_lane_shape return lane_shapes, edge_shapes, parse_projection(location_elem) def load_detectors(detector_path: Path) -> List[dict]: tree = ET.parse(detector_path) root = tree.getroot() detectors = [] for detector_elem in root.findall("inductionLoop"): lane_id = detector_elem.attrib["lane"] edge_id = lane_id.rsplit("_", 1)[0] pos = float(detector_elem.attrib["pos"]) detectors.append( { "id": detector_elem.attrib["id"], "lane_id": lane_id, "edge_id": edge_id, "pos_m": pos, } ) return detectors def build_station_records( detectors: Sequence[dict], lane_shapes: Dict[str, List[Tuple[float, float]]], projection: Tuple[float, float, int, bool], ) -> List[dict]: net_offset_x, net_offset_y, zone, northern = projection grouped: Dict[Tuple[str, str], List[dict]] = defaultdict(list) for detector in detectors: lane_shape = lane_shapes.get(detector["lane_id"]) if lane_shape is None: continue x, y = interpolate_along_polyline(lane_shape, detector["pos_m"]) projected_x = x - net_offset_x projected_y = y - net_offset_y lat, lon = utm_to_latlon(projected_x, projected_y, zone=zone, northern=northern) detector_record = { **detector, "x": x, "y": y, "projected_x": projected_x, "projected_y": projected_y, "lat": lat, "lon": lon, } station_key = (detector["edge_id"], f"{detector['pos_m']:.2f}") grouped[station_key].append(detector_record) station_records = [] for (edge_id, pos_label), items in sorted( grouped.items(), key=lambda kv: (kv[0][0], float(kv[0][1])), ): lat = sum(item["lat"] for item in items) / len(items) lon = sum(item["lon"] for item in items) / len(items) station_records.append( { "edge_id": edge_id, "pos_label_m": pos_label, "pos_m": float(pos_label), "lat": lat, "lon": lon, "lane_count": len(items), "lane_ids": [item["lane_id"] for item in items], "detector_ids": [item["id"] for item in items], "projected_x": sum(item["projected_x"] for item in items) / len(items), "projected_y": sum(item["projected_y"] for item in items) / len(items), } ) return station_records def build_edge_features( edge_shapes: Dict[str, List[Tuple[float, float]]], station_records: Sequence[dict], projection: Tuple[float, float, int, bool], ) -> List[dict]: _, _, zone, northern = projection net_offset_x, net_offset_y, _, _ = projection station_edge_ids = {record["edge_id"] for record in station_records} features = [] for edge_id in sorted(station_edge_ids): shape = edge_shapes.get(edge_id) if not shape: continue latlngs = [] for x, y in shape: projected_x = x - net_offset_x projected_y = y - net_offset_y lat, lon = utm_to_latlon(projected_x, projected_y, zone=zone, northern=northern) latlngs.append([lat, lon]) features.append({"edge_id": edge_id, "latlngs": latlngs}) return features def render_html( station_records: Sequence[dict], edge_features: Sequence[dict], projection: Tuple[float, float, int, bool], ) -> str: _, _, zone, northern = projection stations_json = json.dumps(station_records, ensure_ascii=False) edges_json = json.dumps(edge_features, ensure_ascii=False) total_loops = sum(record["lane_count"] for record in station_records) hemisphere = "N" if northern else "S" return f""" 车检器布设示意图

车检器布设示意图

站点数: {len(station_records)}

感应线圈总数: {total_loops}

坐标系: UTM Zone {zone}{hemisphere} 转 WGS84

车检器站点

车检器所在道路中心线

""" def main() -> None: args = parse_args() net_path, detector_path = resolve_generated_inputs(args) output_path = Path(args.output) lane_shapes, edge_shapes, projection = load_lane_shapes(net_path) detectors = load_detectors(detector_path) station_records = build_station_records(detectors, lane_shapes, projection) edge_features = build_edge_features(edge_shapes, station_records, projection) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( render_html(station_records, edge_features, projection), encoding="utf-8", ) print( f"Generated {output_path} with {len(station_records)} detector stations " f"from {len(detectors)} induction loops using {detector_path}." ) if __name__ == "__main__": main()