ctm-dqn/scripts/generate_speed_limit_leafle...

411 lines
12 KiB
Python

"""Generate a Leaflet map for corridor speed-limit segments."""
from __future__ import annotations
import argparse
import json
import math
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Sequence, Tuple
import yaml
WGS84_A = 6378137.0
WGS84_E2 = 0.0066943799901413165
WGS84_EP2 = WGS84_E2 / (1.0 - WGS84_E2)
UTM_K0 = 0.9996
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a Leaflet speed-limit layout map.")
parser.add_argument(
"--config",
default="config_sumo_vsl.yaml",
help="Project config path.",
)
parser.add_argument(
"--output",
default="results/speed_limit_leaflet_map.html",
help="Output HTML path.",
)
return parser.parse_args()
def resolve_project_path(project_root: Path, path_str: str) -> Path:
path = Path(path_str)
return path if path.is_absolute() else (project_root / path).resolve()
def parse_shape(shape_text: str) -> List[Tuple[float, float]]:
points = []
for raw_point in shape_text.strip().split():
x_str, y_str = raw_point.split(",")
points.append((float(x_str), float(y_str)))
return points
def parse_projection(location_elem: ET.Element) -> Tuple[float, float, int, bool]:
net_offset_x, net_offset_y = [
float(value) for value in location_elem.attrib["netOffset"].split(",")
]
proj_parameter = location_elem.attrib.get("projParameter", "")
zone = None
northern = True
for token in proj_parameter.split():
if token.startswith("+zone="):
zone = int(token.split("=", 1)[1])
elif token == "+south":
northern = False
if zone is None:
raise ValueError(f"Unable to parse UTM zone from projParameter: {proj_parameter}")
return net_offset_x, net_offset_y, zone, northern
def utm_to_latlon(easting: float, northing: float, zone: int, northern: bool) -> Tuple[float, float]:
x = easting - 500000.0
y = northing
if not northern:
y -= 10000000.0
m = y / UTM_K0
mu = m / (
WGS84_A
* (
1.0
- WGS84_E2 / 4.0
- 3.0 * WGS84_E2**2 / 64.0
- 5.0 * WGS84_E2**3 / 256.0
)
)
e1 = (1.0 - math.sqrt(1.0 - WGS84_E2)) / (1.0 + math.sqrt(1.0 - WGS84_E2))
j1 = 3.0 * e1 / 2.0 - 27.0 * e1**3 / 32.0
j2 = 21.0 * e1**2 / 16.0 - 55.0 * e1**4 / 32.0
j3 = 151.0 * e1**3 / 96.0
j4 = 1097.0 * e1**4 / 512.0
fp = (
mu
+ j1 * math.sin(2.0 * mu)
+ j2 * math.sin(4.0 * mu)
+ j3 * math.sin(6.0 * mu)
+ j4 * math.sin(8.0 * mu)
)
sin_fp = math.sin(fp)
cos_fp = math.cos(fp)
tan_fp = math.tan(fp)
c1 = WGS84_EP2 * cos_fp**2
t1 = tan_fp**2
n1 = WGS84_A / math.sqrt(1.0 - WGS84_E2 * sin_fp**2)
r1 = WGS84_A * (1.0 - WGS84_E2) / (1.0 - WGS84_E2 * sin_fp**2) ** 1.5
d = x / (n1 * UTM_K0)
q1 = n1 * tan_fp / r1
q2 = d**2 / 2.0
q3 = (
5.0
+ 3.0 * t1
+ 10.0 * c1
- 4.0 * c1**2
- 9.0 * WGS84_EP2
) * d**4 / 24.0
q4 = (
61.0
+ 90.0 * t1
+ 298.0 * c1
+ 45.0 * t1**2
- 252.0 * WGS84_EP2
- 3.0 * c1**2
) * d**6 / 720.0
lat = fp - q1 * (q2 - q3 + q4)
q5 = d
q6 = (1.0 + 2.0 * t1 + c1) * d**3 / 6.0
q7 = (
5.0
- 2.0 * c1
+ 28.0 * t1
- 3.0 * c1**2
+ 8.0 * WGS84_EP2
+ 24.0 * t1**2
) * d**5 / 120.0
lon_origin = math.radians((zone - 1) * 6 - 180 + 3)
lon = lon_origin + (q5 - q6 + q7) / cos_fp
return math.degrees(lat), math.degrees(lon)
def load_corridor_edges(config_path: Path) -> Tuple[Path, List[str]]:
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
project_root = Path.cwd()
net_path = resolve_project_path(project_root, config["sumo"]["net_file"])
control_edges = list(config["environment"]["control_edges"])
return net_path, control_edges
def load_edge_geometry_and_speed(
net_path: Path,
control_edges: Sequence[str],
) -> Tuple[List[dict], Tuple[float, float, int, bool]]:
root = ET.parse(net_path).getroot()
location_elem = root.find("location")
if location_elem is None:
raise ValueError("No <location> element found in net file.")
edge_lookup: Dict[str, dict] = {}
for edge_elem in root.findall("edge"):
edge_id = edge_elem.attrib.get("id", "")
if edge_elem.attrib.get("function") == "internal" or edge_id not in control_edges:
continue
lane_elems = edge_elem.findall("lane")
if not lane_elems:
continue
lane_speeds = [float(lane.attrib["speed"]) for lane in lane_elems]
speed_ms = min(lane_speeds)
edge_lookup[edge_id] = {
"edge_id": edge_id,
"speed_ms": speed_ms,
"speed_kmh": speed_ms * 3.6,
"shape": parse_shape(lane_elems[0].attrib["shape"]),
}
ordered_edges = [edge_lookup[edge_id] for edge_id in control_edges if edge_id in edge_lookup]
return ordered_edges, parse_projection(location_elem)
def aggregate_speed_groups(ordered_edges: Sequence[dict]) -> List[dict]:
groups: List[dict] = []
current_group = None
for edge in ordered_edges:
if current_group is None or abs(edge["speed_ms"] - current_group["speed_ms"]) > 1e-9:
current_group = {
"speed_ms": edge["speed_ms"],
"speed_kmh": edge["speed_kmh"],
"start_edge_id": edge["edge_id"],
"end_edge_id": edge["edge_id"],
"edge_ids": [edge["edge_id"]],
"shape": list(edge["shape"]),
}
groups.append(current_group)
continue
current_group["end_edge_id"] = edge["edge_id"]
current_group["edge_ids"].append(edge["edge_id"])
current_group["shape"].extend(edge["shape"][1:])
return groups
def convert_groups_to_latlngs(
groups: Sequence[dict],
projection: Tuple[float, float, int, bool],
) -> List[dict]:
net_offset_x, net_offset_y, zone, northern = projection
converted = []
for idx, group in enumerate(groups, start=1):
latlngs = []
for x, y in group["shape"]:
lat, lon = utm_to_latlon(
x - net_offset_x,
y - net_offset_y,
zone=zone,
northern=northern,
)
latlngs.append([lat, lon])
converted.append(
{
"group_index": idx,
"speed_ms": group["speed_ms"],
"speed_kmh": group["speed_kmh"],
"start_edge_id": group["start_edge_id"],
"end_edge_id": group["end_edge_id"],
"edge_ids": group["edge_ids"],
"edge_count": len(group["edge_ids"]),
"latlngs": latlngs,
}
)
return converted
def color_for_speed(speed_kmh: float) -> str:
if speed_kmh <= 85:
return "#dc2626"
if speed_kmh <= 95:
return "#ea580c"
if speed_kmh <= 105:
return "#ca8a04"
return "#16a34a"
def render_html(groups: Sequence[dict]) -> str:
groups_json = json.dumps(groups, ensure_ascii=False)
legend_rows = []
for speed_kmh in sorted({round(group["speed_kmh"], 2) for group in groups}):
color = color_for_speed(speed_kmh)
legend_rows.append(
f'<p><span class="legend-swatch" style="background:{color};"></span>{speed_kmh:.0f} km/h</p>'
)
summary_rows = []
for group in groups:
summary_rows.append(
(
"<li>"
f"G{group['group_index']}: {group['start_edge_id']} -> {group['end_edge_id']} | "
f"{group['speed_kmh']:.0f} km/h | {group['edge_count']} edges"
"</li>"
)
)
legend_html = "\n".join(legend_rows)
summary_html = "\n".join(summary_rows)
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>路段限速示意图</title>
<link
rel="stylesheet"
href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
crossorigin=""
/>
<style>
html, body, #map {{
height: 100%;
margin: 0;
}}
body {{
font-family: "Microsoft YaHei", "PingFang SC", sans-serif;
}}
.panel {{
position: absolute;
top: 12px;
right: 12px;
z-index: 1000;
width: 360px;
max-height: calc(100vh - 24px);
overflow: auto;
background: rgba(255, 255, 255, 0.95);
border: 1px solid rgba(0, 0, 0, 0.12);
border-radius: 12px;
padding: 12px 14px;
box-shadow: 0 10px 24px rgba(0, 0, 0, 0.14);
backdrop-filter: blur(6px);
}}
.panel h1 {{
margin: 0 0 8px;
font-size: 18px;
}}
.panel p {{
margin: 6px 0;
font-size: 13px;
line-height: 1.45;
}}
.panel ul {{
margin: 8px 0 0;
padding-left: 18px;
font-size: 12px;
line-height: 1.45;
}}
.panel li {{
margin: 4px 0;
}}
.legend-swatch {{
display: inline-block;
width: 14px;
height: 8px;
border-radius: 999px;
margin-right: 8px;
vertical-align: middle;
}}
</style>
</head>
<body>
<div id="map"></div>
<div class="panel">
<h1>主走廊路段限速示意图</h1>
<p>聚合段数: <strong>{len(groups)}</strong></p>
<p>连续相同原始限速的 control edge 已合并显示。</p>
{legend_html}
<ul>
{summary_html}
</ul>
</div>
<script
src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo="
crossorigin=""
></script>
<script>
const groups = {groups_json};
const map = L.map("map", {{
zoomControl: true,
preferCanvas: true
}});
const osm = L.tileLayer("https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png", {{
maxZoom: 19,
attribution: '&copy; OpenStreetMap contributors'
}}).addTo(map);
const layers = [];
groups.forEach(group => {{
const speedColor = group.speed_kmh <= 85 ? "#dc2626"
: group.speed_kmh <= 95 ? "#ea580c"
: group.speed_kmh <= 105 ? "#ca8a04"
: "#16a34a";
const line = L.polyline(group.latlngs, {{
color: speedColor,
weight: 6,
opacity: 0.9
}});
line.bindPopup(`
<strong>限速: ${{group.speed_kmh.toFixed(0)}} km/h</strong><br/>
分组: G${{group.group_index}}<br/>
起点边: ${{group.start_edge_id}}<br/>
终点边: ${{group.end_edge_id}}<br/>
聚合边数: ${{group.edge_count}}<br/>
边列表: ${{group.edge_ids.join(", ")}}
`);
line.bindTooltip(`G${{group.group_index}} | ${{group.speed_kmh.toFixed(0)}} km/h`, {{sticky: true}});
line.addTo(map);
layers.push(line);
}});
const bounds = L.featureGroup(layers).getBounds();
if (bounds.isValid()) {{
map.fitBounds(bounds.pad(0.08));
}} else {{
map.setView([24.53, 117.50], 12);
}}
</script>
</body>
</html>
"""
def main() -> None:
args = parse_args()
config_path = resolve_project_path(Path.cwd(), args.config)
net_path, control_edges = load_corridor_edges(config_path)
ordered_edges, projection = load_edge_geometry_and_speed(net_path, control_edges)
groups = aggregate_speed_groups(ordered_edges)
group_features = convert_groups_to_latlngs(groups, projection)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(render_html(group_features), encoding="utf-8")
print(f"Generated {output_path} with {len(group_features)} aggregated speed-limit groups.")
if __name__ == "__main__":
main()