561 lines
18 KiB
Python
561 lines
18 KiB
Python
"""Generate a Leaflet detector layout map from SUMO net/additional files."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import math
|
|
import os
|
|
import xml.etree.ElementTree as ET
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Sequence, Tuple
|
|
|
|
import yaml
|
|
|
|
|
|
WGS84_A = 6378137.0
|
|
WGS84_E2 = 0.0066943799901413165
|
|
WGS84_EP2 = WGS84_E2 / (1.0 - WGS84_E2)
|
|
UTM_K0 = 0.9996
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Generate a Leaflet detector layout map.")
|
|
parser.add_argument(
|
|
"--config",
|
|
default="config_sumo_vsl.yaml",
|
|
help="Project config used to resolve the currently generated corridor detector file.",
|
|
)
|
|
parser.add_argument(
|
|
"--net-file",
|
|
default=None,
|
|
help="Override SUMO net file containing lane shapes and projection metadata.",
|
|
)
|
|
parser.add_argument(
|
|
"--detector-file",
|
|
default=None,
|
|
help="Override SUMO additional file containing inductionLoop detectors.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default="results/detector_leaflet_map.html",
|
|
help="Output HTML path.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_project_path(project_root: Path, path_str: str) -> Path:
|
|
path = Path(path_str)
|
|
return path if path.is_absolute() else (project_root / path).resolve()
|
|
|
|
|
|
def build_corridor_signature(
|
|
*,
|
|
net_path: Path,
|
|
route_path: Path,
|
|
corridor_edges: Sequence[str],
|
|
control_segment_length_m: float,
|
|
detector_spacing_m: float,
|
|
detector_start_offset_m: float,
|
|
) -> str:
|
|
payload = {
|
|
"net_file": str(net_path),
|
|
"net_mtime_ns": net_path.stat().st_mtime_ns,
|
|
"route_file": str(route_path),
|
|
"route_mtime_ns": route_path.stat().st_mtime_ns,
|
|
"corridor_edges": list(corridor_edges),
|
|
"control_segment_length_m": control_segment_length_m,
|
|
"detector_spacing_m": detector_spacing_m,
|
|
"detector_start_offset_m": detector_start_offset_m,
|
|
}
|
|
digest = hashlib.sha1(
|
|
json.dumps(payload, ensure_ascii=True, sort_keys=True).encode("utf-8")
|
|
).hexdigest()
|
|
return digest[:16]
|
|
|
|
|
|
def resolve_generated_inputs(args: argparse.Namespace) -> Tuple[Path, Path]:
|
|
project_root = Path.cwd()
|
|
if args.net_file and args.detector_file:
|
|
return resolve_project_path(project_root, args.net_file), resolve_project_path(project_root, args.detector_file)
|
|
|
|
config_path = resolve_project_path(project_root, args.config)
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
sumo_cfg = config["sumo"]
|
|
env_cfg = config["environment"]
|
|
net_path = resolve_project_path(project_root, sumo_cfg["net_file"])
|
|
route_path = resolve_project_path(project_root, sumo_cfg["route_file"])
|
|
output_root = env_cfg.get("generated_asset_dir", "sumo_resource/generated_corridor")
|
|
output_dir = resolve_project_path(project_root, output_root)
|
|
signature = build_corridor_signature(
|
|
net_path=net_path,
|
|
route_path=route_path,
|
|
corridor_edges=env_cfg["control_edges"],
|
|
control_segment_length_m=float(env_cfg.get("control_segment_length_m", 1000.0)),
|
|
detector_spacing_m=float(env_cfg.get("detector_spacing_m", 100.0)),
|
|
detector_start_offset_m=float(
|
|
env_cfg.get(
|
|
"detector_start_offset_m",
|
|
float(env_cfg.get("detector_spacing_m", 100.0)) * 0.5,
|
|
)
|
|
),
|
|
)
|
|
|
|
artifact_dir = output_dir / signature
|
|
generated_net_path = artifact_dir / "experiment_corridor.net.xml"
|
|
generated_detector_path = artifact_dir / "experiment_corridor_metrics_il.add.xml"
|
|
|
|
if not generated_net_path.is_file():
|
|
raise FileNotFoundError(
|
|
f"Generated net file not found: {generated_net_path}. "
|
|
"Run the environment once to materialize corridor assets, or pass --net-file explicitly."
|
|
)
|
|
if not generated_detector_path.is_file():
|
|
raise FileNotFoundError(
|
|
f"Generated detector file not found: {generated_detector_path}. "
|
|
"Run the environment once to materialize corridor assets, or pass --detector-file explicitly."
|
|
)
|
|
return generated_net_path, generated_detector_path
|
|
|
|
|
|
def parse_shape(shape_text: str) -> List[Tuple[float, float]]:
|
|
points = []
|
|
for raw_point in shape_text.strip().split():
|
|
x_str, y_str = raw_point.split(",")
|
|
points.append((float(x_str), float(y_str)))
|
|
return points
|
|
|
|
|
|
def interpolate_along_polyline(
|
|
shape: Sequence[Tuple[float, float]],
|
|
distance_m: float,
|
|
) -> Tuple[float, float]:
|
|
if not shape:
|
|
raise ValueError("shape must not be empty")
|
|
if len(shape) == 1:
|
|
return shape[0]
|
|
|
|
segment_lengths = []
|
|
total_length = 0.0
|
|
for idx in range(len(shape) - 1):
|
|
x1, y1 = shape[idx]
|
|
x2, y2 = shape[idx + 1]
|
|
length = math.hypot(x2 - x1, y2 - y1)
|
|
segment_lengths.append(length)
|
|
total_length += length
|
|
|
|
if total_length <= 1e-9:
|
|
return shape[0]
|
|
|
|
clamped_distance = min(max(distance_m, 0.0), total_length)
|
|
traversed = 0.0
|
|
for idx, seg_length in enumerate(segment_lengths):
|
|
if seg_length <= 1e-9:
|
|
continue
|
|
if traversed + seg_length >= clamped_distance:
|
|
ratio = (clamped_distance - traversed) / seg_length
|
|
x1, y1 = shape[idx]
|
|
x2, y2 = shape[idx + 1]
|
|
return (x1 + (x2 - x1) * ratio, y1 + (y2 - y1) * ratio)
|
|
traversed += seg_length
|
|
|
|
return shape[-1]
|
|
|
|
|
|
def utm_to_latlon(easting: float, northing: float, zone: int, northern: bool) -> Tuple[float, float]:
|
|
x = easting - 500000.0
|
|
y = northing
|
|
if not northern:
|
|
y -= 10000000.0
|
|
|
|
m = y / UTM_K0
|
|
mu = m / (
|
|
WGS84_A
|
|
* (
|
|
1.0
|
|
- WGS84_E2 / 4.0
|
|
- 3.0 * WGS84_E2**2 / 64.0
|
|
- 5.0 * WGS84_E2**3 / 256.0
|
|
)
|
|
)
|
|
|
|
e1 = (1.0 - math.sqrt(1.0 - WGS84_E2)) / (1.0 + math.sqrt(1.0 - WGS84_E2))
|
|
j1 = 3.0 * e1 / 2.0 - 27.0 * e1**3 / 32.0
|
|
j2 = 21.0 * e1**2 / 16.0 - 55.0 * e1**4 / 32.0
|
|
j3 = 151.0 * e1**3 / 96.0
|
|
j4 = 1097.0 * e1**4 / 512.0
|
|
|
|
fp = (
|
|
mu
|
|
+ j1 * math.sin(2.0 * mu)
|
|
+ j2 * math.sin(4.0 * mu)
|
|
+ j3 * math.sin(6.0 * mu)
|
|
+ j4 * math.sin(8.0 * mu)
|
|
)
|
|
|
|
sin_fp = math.sin(fp)
|
|
cos_fp = math.cos(fp)
|
|
tan_fp = math.tan(fp)
|
|
|
|
c1 = WGS84_EP2 * cos_fp**2
|
|
t1 = tan_fp**2
|
|
n1 = WGS84_A / math.sqrt(1.0 - WGS84_E2 * sin_fp**2)
|
|
r1 = WGS84_A * (1.0 - WGS84_E2) / (1.0 - WGS84_E2 * sin_fp**2) ** 1.5
|
|
d = x / (n1 * UTM_K0)
|
|
|
|
q1 = n1 * tan_fp / r1
|
|
q2 = d**2 / 2.0
|
|
q3 = (
|
|
5.0
|
|
+ 3.0 * t1
|
|
+ 10.0 * c1
|
|
- 4.0 * c1**2
|
|
- 9.0 * WGS84_EP2
|
|
) * d**4 / 24.0
|
|
q4 = (
|
|
61.0
|
|
+ 90.0 * t1
|
|
+ 298.0 * c1
|
|
+ 45.0 * t1**2
|
|
- 252.0 * WGS84_EP2
|
|
- 3.0 * c1**2
|
|
) * d**6 / 720.0
|
|
lat = fp - q1 * (q2 - q3 + q4)
|
|
|
|
q5 = d
|
|
q6 = (1.0 + 2.0 * t1 + c1) * d**3 / 6.0
|
|
q7 = (
|
|
5.0
|
|
- 2.0 * c1
|
|
+ 28.0 * t1
|
|
- 3.0 * c1**2
|
|
+ 8.0 * WGS84_EP2
|
|
+ 24.0 * t1**2
|
|
) * d**5 / 120.0
|
|
lon_origin = math.radians((zone - 1) * 6 - 180 + 3)
|
|
lon = lon_origin + (q5 - q6 + q7) / cos_fp
|
|
|
|
return math.degrees(lat), math.degrees(lon)
|
|
|
|
|
|
def parse_projection(location_elem: ET.Element) -> Tuple[float, float, int, bool]:
|
|
net_offset_x, net_offset_y = [
|
|
float(value) for value in location_elem.attrib["netOffset"].split(",")
|
|
]
|
|
proj_parameter = location_elem.attrib.get("projParameter", "")
|
|
zone = None
|
|
northern = True
|
|
for token in proj_parameter.split():
|
|
if token.startswith("+zone="):
|
|
zone = int(token.split("=", 1)[1])
|
|
elif token == "+south":
|
|
northern = False
|
|
if zone is None:
|
|
raise ValueError(f"Unable to parse UTM zone from projParameter: {proj_parameter}")
|
|
return net_offset_x, net_offset_y, zone, northern
|
|
|
|
|
|
def load_lane_shapes(net_path: Path) -> Tuple[Dict[str, List[Tuple[float, float]]], Dict[str, List[Tuple[float, float]]], Tuple[float, float, int, bool]]:
|
|
tree = ET.parse(net_path)
|
|
root = tree.getroot()
|
|
location_elem = root.find("location")
|
|
if location_elem is None:
|
|
raise ValueError("No <location> element found in net file.")
|
|
|
|
lane_shapes: Dict[str, List[Tuple[float, float]]] = {}
|
|
edge_shapes: Dict[str, List[Tuple[float, float]]] = {}
|
|
|
|
for edge_elem in root.findall("edge"):
|
|
edge_id = edge_elem.attrib.get("id", "")
|
|
if edge_elem.attrib.get("function") == "internal":
|
|
continue
|
|
first_lane_shape = None
|
|
for lane_elem in edge_elem.findall("lane"):
|
|
lane_id = lane_elem.attrib["id"]
|
|
shape = parse_shape(lane_elem.attrib["shape"])
|
|
lane_shapes[lane_id] = shape
|
|
if first_lane_shape is None:
|
|
first_lane_shape = shape
|
|
if first_lane_shape is not None:
|
|
edge_shapes[edge_id] = first_lane_shape
|
|
|
|
return lane_shapes, edge_shapes, parse_projection(location_elem)
|
|
|
|
|
|
def load_detectors(detector_path: Path) -> List[dict]:
|
|
tree = ET.parse(detector_path)
|
|
root = tree.getroot()
|
|
detectors = []
|
|
for detector_elem in root.findall("inductionLoop"):
|
|
lane_id = detector_elem.attrib["lane"]
|
|
edge_id = lane_id.rsplit("_", 1)[0]
|
|
pos = float(detector_elem.attrib["pos"])
|
|
detectors.append(
|
|
{
|
|
"id": detector_elem.attrib["id"],
|
|
"lane_id": lane_id,
|
|
"edge_id": edge_id,
|
|
"pos_m": pos,
|
|
}
|
|
)
|
|
return detectors
|
|
|
|
|
|
def build_station_records(
|
|
detectors: Sequence[dict],
|
|
lane_shapes: Dict[str, List[Tuple[float, float]]],
|
|
projection: Tuple[float, float, int, bool],
|
|
) -> List[dict]:
|
|
net_offset_x, net_offset_y, zone, northern = projection
|
|
grouped: Dict[Tuple[str, str], List[dict]] = defaultdict(list)
|
|
|
|
for detector in detectors:
|
|
lane_shape = lane_shapes.get(detector["lane_id"])
|
|
if lane_shape is None:
|
|
continue
|
|
x, y = interpolate_along_polyline(lane_shape, detector["pos_m"])
|
|
projected_x = x - net_offset_x
|
|
projected_y = y - net_offset_y
|
|
lat, lon = utm_to_latlon(projected_x, projected_y, zone=zone, northern=northern)
|
|
detector_record = {
|
|
**detector,
|
|
"x": x,
|
|
"y": y,
|
|
"projected_x": projected_x,
|
|
"projected_y": projected_y,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
}
|
|
station_key = (detector["edge_id"], f"{detector['pos_m']:.2f}")
|
|
grouped[station_key].append(detector_record)
|
|
|
|
station_records = []
|
|
for (edge_id, pos_label), items in sorted(
|
|
grouped.items(),
|
|
key=lambda kv: (kv[0][0], float(kv[0][1])),
|
|
):
|
|
lat = sum(item["lat"] for item in items) / len(items)
|
|
lon = sum(item["lon"] for item in items) / len(items)
|
|
station_records.append(
|
|
{
|
|
"edge_id": edge_id,
|
|
"pos_label_m": pos_label,
|
|
"pos_m": float(pos_label),
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"lane_count": len(items),
|
|
"lane_ids": [item["lane_id"] for item in items],
|
|
"detector_ids": [item["id"] for item in items],
|
|
"projected_x": sum(item["projected_x"] for item in items) / len(items),
|
|
"projected_y": sum(item["projected_y"] for item in items) / len(items),
|
|
}
|
|
)
|
|
return station_records
|
|
|
|
|
|
def build_edge_features(
|
|
edge_shapes: Dict[str, List[Tuple[float, float]]],
|
|
station_records: Sequence[dict],
|
|
projection: Tuple[float, float, int, bool],
|
|
) -> List[dict]:
|
|
_, _, zone, northern = projection
|
|
net_offset_x, net_offset_y, _, _ = projection
|
|
station_edge_ids = {record["edge_id"] for record in station_records}
|
|
features = []
|
|
for edge_id in sorted(station_edge_ids):
|
|
shape = edge_shapes.get(edge_id)
|
|
if not shape:
|
|
continue
|
|
latlngs = []
|
|
for x, y in shape:
|
|
projected_x = x - net_offset_x
|
|
projected_y = y - net_offset_y
|
|
lat, lon = utm_to_latlon(projected_x, projected_y, zone=zone, northern=northern)
|
|
latlngs.append([lat, lon])
|
|
features.append({"edge_id": edge_id, "latlngs": latlngs})
|
|
return features
|
|
|
|
|
|
def render_html(
|
|
station_records: Sequence[dict],
|
|
edge_features: Sequence[dict],
|
|
projection: Tuple[float, float, int, bool],
|
|
) -> str:
|
|
_, _, zone, northern = projection
|
|
stations_json = json.dumps(station_records, ensure_ascii=False)
|
|
edges_json = json.dumps(edge_features, ensure_ascii=False)
|
|
total_loops = sum(record["lane_count"] for record in station_records)
|
|
hemisphere = "N" if northern else "S"
|
|
return f"""<!DOCTYPE html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="utf-8" />
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
|
<title>车检器布设示意图</title>
|
|
<link
|
|
rel="stylesheet"
|
|
href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
|
|
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
|
|
crossorigin=""
|
|
/>
|
|
<style>
|
|
html, body, #map {{
|
|
height: 100%;
|
|
margin: 0;
|
|
}}
|
|
body {{
|
|
font-family: "Microsoft YaHei", "PingFang SC", sans-serif;
|
|
}}
|
|
.panel {{
|
|
position: absolute;
|
|
top: 12px;
|
|
right: 12px;
|
|
z-index: 1000;
|
|
width: 300px;
|
|
background: rgba(255, 255, 255, 0.94);
|
|
border: 1px solid rgba(0, 0, 0, 0.12);
|
|
border-radius: 12px;
|
|
padding: 12px 14px;
|
|
box-shadow: 0 10px 24px rgba(0, 0, 0, 0.14);
|
|
backdrop-filter: blur(6px);
|
|
}}
|
|
.panel h1 {{
|
|
margin: 0 0 8px;
|
|
font-size: 18px;
|
|
}}
|
|
.panel p {{
|
|
margin: 6px 0;
|
|
font-size: 13px;
|
|
line-height: 1.45;
|
|
}}
|
|
.legend-dot {{
|
|
display: inline-block;
|
|
width: 10px;
|
|
height: 10px;
|
|
border-radius: 999px;
|
|
margin-right: 6px;
|
|
vertical-align: middle;
|
|
}}
|
|
.detector-popup code {{
|
|
white-space: pre-wrap;
|
|
word-break: break-all;
|
|
}}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div id="map"></div>
|
|
<div class="panel">
|
|
<h1>车检器布设示意图</h1>
|
|
<p>站点数: <strong>{len(station_records)}</strong></p>
|
|
<p>感应线圈总数: <strong>{total_loops}</strong></p>
|
|
<p>坐标系: UTM Zone {zone}{hemisphere} 转 WGS84</p>
|
|
<p><span class="legend-dot" style="background:#f97316;"></span>车检器站点</p>
|
|
<p><span class="legend-dot" style="background:#2563eb;"></span>车检器所在道路中心线</p>
|
|
</div>
|
|
|
|
<script
|
|
src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
|
|
integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo="
|
|
crossorigin=""
|
|
></script>
|
|
<script>
|
|
const stations = {stations_json};
|
|
const edges = {edges_json};
|
|
|
|
const map = L.map("map", {{
|
|
zoomControl: true,
|
|
preferCanvas: true
|
|
}});
|
|
|
|
const osm = L.tileLayer("https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png", {{
|
|
maxZoom: 19,
|
|
attribution: '© OpenStreetMap contributors'
|
|
}}).addTo(map);
|
|
|
|
const edgeLayer = L.layerGroup();
|
|
edges.forEach(edge => {{
|
|
L.polyline(edge.latlngs, {{
|
|
color: "#2563eb",
|
|
weight: 4,
|
|
opacity: 0.75
|
|
}}).bindTooltip(edge.edge_id, {{sticky: true}}).addTo(edgeLayer);
|
|
}});
|
|
edgeLayer.addTo(map);
|
|
|
|
const detectorLayer = L.layerGroup();
|
|
stations.forEach(station => {{
|
|
const popupHtml = `
|
|
<div class="detector-popup">
|
|
<strong>边: ${{station.edge_id}}</strong><br/>
|
|
位置: ${{station.pos_label_m}} m<br/>
|
|
车道数: ${{station.lane_count}}<br/>
|
|
投影坐标: (${{station.projected_x.toFixed(2)}}, ${{station.projected_y.toFixed(2)}})<br/>
|
|
线圈ID:<br/>
|
|
<code>${{station.detector_ids.join(", ")}}</code>
|
|
</div>
|
|
`;
|
|
L.circleMarker([station.lat, station.lon], {{
|
|
radius: 6,
|
|
color: "#9a3412",
|
|
weight: 1,
|
|
fillColor: "#f97316",
|
|
fillOpacity: 0.88
|
|
}})
|
|
.bindPopup(popupHtml)
|
|
.bindTooltip(`${{station.edge_id}} @ ${{station.pos_label_m}} m`, {{sticky: true}})
|
|
.addTo(detectorLayer);
|
|
}});
|
|
detectorLayer.addTo(map);
|
|
|
|
const bounds = L.featureGroup([
|
|
...edges.map(edge => L.polyline(edge.latlngs)),
|
|
...stations.map(station => L.marker([station.lat, station.lon]))
|
|
]).getBounds();
|
|
if (bounds.isValid()) {{
|
|
map.fitBounds(bounds.pad(0.08));
|
|
}} else {{
|
|
map.setView([24.53, 117.50], 12);
|
|
}}
|
|
|
|
L.control.layers(
|
|
{{"OpenStreetMap": osm}},
|
|
{{
|
|
"道路中心线": edgeLayer,
|
|
"车检器站点": detectorLayer
|
|
}},
|
|
{{collapsed: false}}
|
|
).addTo(map);
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
net_path, detector_path = resolve_generated_inputs(args)
|
|
output_path = Path(args.output)
|
|
|
|
lane_shapes, edge_shapes, projection = load_lane_shapes(net_path)
|
|
detectors = load_detectors(detector_path)
|
|
station_records = build_station_records(detectors, lane_shapes, projection)
|
|
edge_features = build_edge_features(edge_shapes, station_records, projection)
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(
|
|
render_html(station_records, edge_features, projection),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
print(
|
|
f"Generated {output_path} with {len(station_records)} detector stations "
|
|
f"from {len(detectors)} induction loops using {detector_path}."
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|