"""Generate a Leaflet map for corridor speed-limit segments.""" from __future__ import annotations import argparse import json import math import xml.etree.ElementTree as ET from pathlib import Path from typing import Dict, List, Sequence, Tuple import yaml WGS84_A = 6378137.0 WGS84_E2 = 0.0066943799901413165 WGS84_EP2 = WGS84_E2 / (1.0 - WGS84_E2) UTM_K0 = 0.9996 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Generate a Leaflet speed-limit layout map.") parser.add_argument( "--config", default="config_sumo_vsl.yaml", help="Project config path.", ) parser.add_argument( "--output", default="results/speed_limit_leaflet_map.html", help="Output HTML path.", ) return parser.parse_args() def resolve_project_path(project_root: Path, path_str: str) -> Path: path = Path(path_str) return path if path.is_absolute() else (project_root / path).resolve() def parse_shape(shape_text: str) -> List[Tuple[float, float]]: points = [] for raw_point in shape_text.strip().split(): x_str, y_str = raw_point.split(",") points.append((float(x_str), float(y_str))) return points def parse_projection(location_elem: ET.Element) -> Tuple[float, float, int, bool]: net_offset_x, net_offset_y = [ float(value) for value in location_elem.attrib["netOffset"].split(",") ] proj_parameter = location_elem.attrib.get("projParameter", "") zone = None northern = True for token in proj_parameter.split(): if token.startswith("+zone="): zone = int(token.split("=", 1)[1]) elif token == "+south": northern = False if zone is None: raise ValueError(f"Unable to parse UTM zone from projParameter: {proj_parameter}") return net_offset_x, net_offset_y, zone, northern def utm_to_latlon(easting: float, northing: float, zone: int, northern: bool) -> Tuple[float, float]: x = easting - 500000.0 y = northing if not northern: y -= 10000000.0 m = y / UTM_K0 mu = m / ( WGS84_A * ( 1.0 - WGS84_E2 / 4.0 - 3.0 * WGS84_E2**2 / 64.0 - 5.0 * WGS84_E2**3 / 256.0 ) ) e1 = (1.0 - math.sqrt(1.0 - WGS84_E2)) / (1.0 + math.sqrt(1.0 - WGS84_E2)) j1 = 3.0 * e1 / 2.0 - 27.0 * e1**3 / 32.0 j2 = 21.0 * e1**2 / 16.0 - 55.0 * e1**4 / 32.0 j3 = 151.0 * e1**3 / 96.0 j4 = 1097.0 * e1**4 / 512.0 fp = ( mu + j1 * math.sin(2.0 * mu) + j2 * math.sin(4.0 * mu) + j3 * math.sin(6.0 * mu) + j4 * math.sin(8.0 * mu) ) sin_fp = math.sin(fp) cos_fp = math.cos(fp) tan_fp = math.tan(fp) c1 = WGS84_EP2 * cos_fp**2 t1 = tan_fp**2 n1 = WGS84_A / math.sqrt(1.0 - WGS84_E2 * sin_fp**2) r1 = WGS84_A * (1.0 - WGS84_E2) / (1.0 - WGS84_E2 * sin_fp**2) ** 1.5 d = x / (n1 * UTM_K0) q1 = n1 * tan_fp / r1 q2 = d**2 / 2.0 q3 = ( 5.0 + 3.0 * t1 + 10.0 * c1 - 4.0 * c1**2 - 9.0 * WGS84_EP2 ) * d**4 / 24.0 q4 = ( 61.0 + 90.0 * t1 + 298.0 * c1 + 45.0 * t1**2 - 252.0 * WGS84_EP2 - 3.0 * c1**2 ) * d**6 / 720.0 lat = fp - q1 * (q2 - q3 + q4) q5 = d q6 = (1.0 + 2.0 * t1 + c1) * d**3 / 6.0 q7 = ( 5.0 - 2.0 * c1 + 28.0 * t1 - 3.0 * c1**2 + 8.0 * WGS84_EP2 + 24.0 * t1**2 ) * d**5 / 120.0 lon_origin = math.radians((zone - 1) * 6 - 180 + 3) lon = lon_origin + (q5 - q6 + q7) / cos_fp return math.degrees(lat), math.degrees(lon) def load_corridor_edges(config_path: Path) -> Tuple[Path, List[str]]: with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) project_root = Path.cwd() net_path = resolve_project_path(project_root, config["sumo"]["net_file"]) control_edges = list(config["environment"]["control_edges"]) return net_path, control_edges def load_edge_geometry_and_speed( net_path: Path, control_edges: Sequence[str], ) -> Tuple[List[dict], Tuple[float, float, int, bool]]: root = ET.parse(net_path).getroot() location_elem = root.find("location") if location_elem is None: raise ValueError("No element found in net file.") edge_lookup: Dict[str, dict] = {} for edge_elem in root.findall("edge"): edge_id = edge_elem.attrib.get("id", "") if edge_elem.attrib.get("function") == "internal" or edge_id not in control_edges: continue lane_elems = edge_elem.findall("lane") if not lane_elems: continue lane_speeds = [float(lane.attrib["speed"]) for lane in lane_elems] speed_ms = min(lane_speeds) edge_lookup[edge_id] = { "edge_id": edge_id, "speed_ms": speed_ms, "speed_kmh": speed_ms * 3.6, "shape": parse_shape(lane_elems[0].attrib["shape"]), } ordered_edges = [edge_lookup[edge_id] for edge_id in control_edges if edge_id in edge_lookup] return ordered_edges, parse_projection(location_elem) def aggregate_speed_groups(ordered_edges: Sequence[dict]) -> List[dict]: groups: List[dict] = [] current_group = None for edge in ordered_edges: if current_group is None or abs(edge["speed_ms"] - current_group["speed_ms"]) > 1e-9: current_group = { "speed_ms": edge["speed_ms"], "speed_kmh": edge["speed_kmh"], "start_edge_id": edge["edge_id"], "end_edge_id": edge["edge_id"], "edge_ids": [edge["edge_id"]], "shape": list(edge["shape"]), } groups.append(current_group) continue current_group["end_edge_id"] = edge["edge_id"] current_group["edge_ids"].append(edge["edge_id"]) current_group["shape"].extend(edge["shape"][1:]) return groups def convert_groups_to_latlngs( groups: Sequence[dict], projection: Tuple[float, float, int, bool], ) -> List[dict]: net_offset_x, net_offset_y, zone, northern = projection converted = [] for idx, group in enumerate(groups, start=1): latlngs = [] for x, y in group["shape"]: lat, lon = utm_to_latlon( x - net_offset_x, y - net_offset_y, zone=zone, northern=northern, ) latlngs.append([lat, lon]) converted.append( { "group_index": idx, "speed_ms": group["speed_ms"], "speed_kmh": group["speed_kmh"], "start_edge_id": group["start_edge_id"], "end_edge_id": group["end_edge_id"], "edge_ids": group["edge_ids"], "edge_count": len(group["edge_ids"]), "latlngs": latlngs, } ) return converted def color_for_speed(speed_kmh: float) -> str: if speed_kmh <= 85: return "#dc2626" if speed_kmh <= 95: return "#ea580c" if speed_kmh <= 105: return "#ca8a04" return "#16a34a" def render_html(groups: Sequence[dict]) -> str: groups_json = json.dumps(groups, ensure_ascii=False) legend_rows = [] for speed_kmh in sorted({round(group["speed_kmh"], 2) for group in groups}): color = color_for_speed(speed_kmh) legend_rows.append( f'

{speed_kmh:.0f} km/h

' ) summary_rows = [] for group in groups: summary_rows.append( ( "
  • " f"G{group['group_index']}: {group['start_edge_id']} -> {group['end_edge_id']} | " f"{group['speed_kmh']:.0f} km/h | {group['edge_count']} edges" "
  • " ) ) legend_html = "\n".join(legend_rows) summary_html = "\n".join(summary_rows) return f""" 路段限速示意图

    主走廊路段限速示意图

    聚合段数: {len(groups)}

    连续相同原始限速的 control edge 已合并显示。

    {legend_html}
    """ def main() -> None: args = parse_args() config_path = resolve_project_path(Path.cwd(), args.config) net_path, control_edges = load_corridor_edges(config_path) ordered_edges, projection = load_edge_geometry_and_speed(net_path, control_edges) groups = aggregate_speed_groups(ordered_edges) group_features = convert_groups_to_latlngs(groups, projection) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(render_html(group_features), encoding="utf-8") print(f"Generated {output_path} with {len(group_features)} aggregated speed-limit groups.") if __name__ == "__main__": main()