411 lines
12 KiB
Python
411 lines
12 KiB
Python
"""Generate a Leaflet map for corridor speed-limit segments."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Dict, List, Sequence, Tuple
|
|
|
|
import yaml
|
|
|
|
|
|
WGS84_A = 6378137.0
|
|
WGS84_E2 = 0.0066943799901413165
|
|
WGS84_EP2 = WGS84_E2 / (1.0 - WGS84_E2)
|
|
UTM_K0 = 0.9996
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Generate a Leaflet speed-limit layout map.")
|
|
parser.add_argument(
|
|
"--config",
|
|
default="config_sumo_vsl.yaml",
|
|
help="Project config path.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default="results/speed_limit_leaflet_map.html",
|
|
help="Output HTML path.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_project_path(project_root: Path, path_str: str) -> Path:
|
|
path = Path(path_str)
|
|
return path if path.is_absolute() else (project_root / path).resolve()
|
|
|
|
|
|
def parse_shape(shape_text: str) -> List[Tuple[float, float]]:
|
|
points = []
|
|
for raw_point in shape_text.strip().split():
|
|
x_str, y_str = raw_point.split(",")
|
|
points.append((float(x_str), float(y_str)))
|
|
return points
|
|
|
|
|
|
def parse_projection(location_elem: ET.Element) -> Tuple[float, float, int, bool]:
|
|
net_offset_x, net_offset_y = [
|
|
float(value) for value in location_elem.attrib["netOffset"].split(",")
|
|
]
|
|
proj_parameter = location_elem.attrib.get("projParameter", "")
|
|
zone = None
|
|
northern = True
|
|
for token in proj_parameter.split():
|
|
if token.startswith("+zone="):
|
|
zone = int(token.split("=", 1)[1])
|
|
elif token == "+south":
|
|
northern = False
|
|
if zone is None:
|
|
raise ValueError(f"Unable to parse UTM zone from projParameter: {proj_parameter}")
|
|
return net_offset_x, net_offset_y, zone, northern
|
|
|
|
|
|
def utm_to_latlon(easting: float, northing: float, zone: int, northern: bool) -> Tuple[float, float]:
|
|
x = easting - 500000.0
|
|
y = northing
|
|
if not northern:
|
|
y -= 10000000.0
|
|
|
|
m = y / UTM_K0
|
|
mu = m / (
|
|
WGS84_A
|
|
* (
|
|
1.0
|
|
- WGS84_E2 / 4.0
|
|
- 3.0 * WGS84_E2**2 / 64.0
|
|
- 5.0 * WGS84_E2**3 / 256.0
|
|
)
|
|
)
|
|
|
|
e1 = (1.0 - math.sqrt(1.0 - WGS84_E2)) / (1.0 + math.sqrt(1.0 - WGS84_E2))
|
|
j1 = 3.0 * e1 / 2.0 - 27.0 * e1**3 / 32.0
|
|
j2 = 21.0 * e1**2 / 16.0 - 55.0 * e1**4 / 32.0
|
|
j3 = 151.0 * e1**3 / 96.0
|
|
j4 = 1097.0 * e1**4 / 512.0
|
|
|
|
fp = (
|
|
mu
|
|
+ j1 * math.sin(2.0 * mu)
|
|
+ j2 * math.sin(4.0 * mu)
|
|
+ j3 * math.sin(6.0 * mu)
|
|
+ j4 * math.sin(8.0 * mu)
|
|
)
|
|
|
|
sin_fp = math.sin(fp)
|
|
cos_fp = math.cos(fp)
|
|
tan_fp = math.tan(fp)
|
|
|
|
c1 = WGS84_EP2 * cos_fp**2
|
|
t1 = tan_fp**2
|
|
n1 = WGS84_A / math.sqrt(1.0 - WGS84_E2 * sin_fp**2)
|
|
r1 = WGS84_A * (1.0 - WGS84_E2) / (1.0 - WGS84_E2 * sin_fp**2) ** 1.5
|
|
d = x / (n1 * UTM_K0)
|
|
|
|
q1 = n1 * tan_fp / r1
|
|
q2 = d**2 / 2.0
|
|
q3 = (
|
|
5.0
|
|
+ 3.0 * t1
|
|
+ 10.0 * c1
|
|
- 4.0 * c1**2
|
|
- 9.0 * WGS84_EP2
|
|
) * d**4 / 24.0
|
|
q4 = (
|
|
61.0
|
|
+ 90.0 * t1
|
|
+ 298.0 * c1
|
|
+ 45.0 * t1**2
|
|
- 252.0 * WGS84_EP2
|
|
- 3.0 * c1**2
|
|
) * d**6 / 720.0
|
|
lat = fp - q1 * (q2 - q3 + q4)
|
|
|
|
q5 = d
|
|
q6 = (1.0 + 2.0 * t1 + c1) * d**3 / 6.0
|
|
q7 = (
|
|
5.0
|
|
- 2.0 * c1
|
|
+ 28.0 * t1
|
|
- 3.0 * c1**2
|
|
+ 8.0 * WGS84_EP2
|
|
+ 24.0 * t1**2
|
|
) * d**5 / 120.0
|
|
lon_origin = math.radians((zone - 1) * 6 - 180 + 3)
|
|
lon = lon_origin + (q5 - q6 + q7) / cos_fp
|
|
|
|
return math.degrees(lat), math.degrees(lon)
|
|
|
|
|
|
def load_corridor_edges(config_path: Path) -> Tuple[Path, List[str]]:
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
config = yaml.safe_load(f)
|
|
project_root = Path.cwd()
|
|
net_path = resolve_project_path(project_root, config["sumo"]["net_file"])
|
|
control_edges = list(config["environment"]["control_edges"])
|
|
return net_path, control_edges
|
|
|
|
|
|
def load_edge_geometry_and_speed(
|
|
net_path: Path,
|
|
control_edges: Sequence[str],
|
|
) -> Tuple[List[dict], Tuple[float, float, int, bool]]:
|
|
root = ET.parse(net_path).getroot()
|
|
location_elem = root.find("location")
|
|
if location_elem is None:
|
|
raise ValueError("No <location> element found in net file.")
|
|
|
|
edge_lookup: Dict[str, dict] = {}
|
|
for edge_elem in root.findall("edge"):
|
|
edge_id = edge_elem.attrib.get("id", "")
|
|
if edge_elem.attrib.get("function") == "internal" or edge_id not in control_edges:
|
|
continue
|
|
lane_elems = edge_elem.findall("lane")
|
|
if not lane_elems:
|
|
continue
|
|
lane_speeds = [float(lane.attrib["speed"]) for lane in lane_elems]
|
|
speed_ms = min(lane_speeds)
|
|
edge_lookup[edge_id] = {
|
|
"edge_id": edge_id,
|
|
"speed_ms": speed_ms,
|
|
"speed_kmh": speed_ms * 3.6,
|
|
"shape": parse_shape(lane_elems[0].attrib["shape"]),
|
|
}
|
|
|
|
ordered_edges = [edge_lookup[edge_id] for edge_id in control_edges if edge_id in edge_lookup]
|
|
return ordered_edges, parse_projection(location_elem)
|
|
|
|
|
|
def aggregate_speed_groups(ordered_edges: Sequence[dict]) -> List[dict]:
|
|
groups: List[dict] = []
|
|
current_group = None
|
|
|
|
for edge in ordered_edges:
|
|
if current_group is None or abs(edge["speed_ms"] - current_group["speed_ms"]) > 1e-9:
|
|
current_group = {
|
|
"speed_ms": edge["speed_ms"],
|
|
"speed_kmh": edge["speed_kmh"],
|
|
"start_edge_id": edge["edge_id"],
|
|
"end_edge_id": edge["edge_id"],
|
|
"edge_ids": [edge["edge_id"]],
|
|
"shape": list(edge["shape"]),
|
|
}
|
|
groups.append(current_group)
|
|
continue
|
|
|
|
current_group["end_edge_id"] = edge["edge_id"]
|
|
current_group["edge_ids"].append(edge["edge_id"])
|
|
current_group["shape"].extend(edge["shape"][1:])
|
|
|
|
return groups
|
|
|
|
|
|
def convert_groups_to_latlngs(
|
|
groups: Sequence[dict],
|
|
projection: Tuple[float, float, int, bool],
|
|
) -> List[dict]:
|
|
net_offset_x, net_offset_y, zone, northern = projection
|
|
converted = []
|
|
for idx, group in enumerate(groups, start=1):
|
|
latlngs = []
|
|
for x, y in group["shape"]:
|
|
lat, lon = utm_to_latlon(
|
|
x - net_offset_x,
|
|
y - net_offset_y,
|
|
zone=zone,
|
|
northern=northern,
|
|
)
|
|
latlngs.append([lat, lon])
|
|
converted.append(
|
|
{
|
|
"group_index": idx,
|
|
"speed_ms": group["speed_ms"],
|
|
"speed_kmh": group["speed_kmh"],
|
|
"start_edge_id": group["start_edge_id"],
|
|
"end_edge_id": group["end_edge_id"],
|
|
"edge_ids": group["edge_ids"],
|
|
"edge_count": len(group["edge_ids"]),
|
|
"latlngs": latlngs,
|
|
}
|
|
)
|
|
return converted
|
|
|
|
|
|
def color_for_speed(speed_kmh: float) -> str:
|
|
if speed_kmh <= 85:
|
|
return "#dc2626"
|
|
if speed_kmh <= 95:
|
|
return "#ea580c"
|
|
if speed_kmh <= 105:
|
|
return "#ca8a04"
|
|
return "#16a34a"
|
|
|
|
|
|
def render_html(groups: Sequence[dict]) -> str:
|
|
groups_json = json.dumps(groups, ensure_ascii=False)
|
|
legend_rows = []
|
|
for speed_kmh in sorted({round(group["speed_kmh"], 2) for group in groups}):
|
|
color = color_for_speed(speed_kmh)
|
|
legend_rows.append(
|
|
f'<p><span class="legend-swatch" style="background:{color};"></span>{speed_kmh:.0f} km/h</p>'
|
|
)
|
|
|
|
summary_rows = []
|
|
for group in groups:
|
|
summary_rows.append(
|
|
(
|
|
"<li>"
|
|
f"G{group['group_index']}: {group['start_edge_id']} -> {group['end_edge_id']} | "
|
|
f"{group['speed_kmh']:.0f} km/h | {group['edge_count']} edges"
|
|
"</li>"
|
|
)
|
|
)
|
|
|
|
legend_html = "\n".join(legend_rows)
|
|
summary_html = "\n".join(summary_rows)
|
|
return f"""<!DOCTYPE html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="utf-8" />
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
|
<title>路段限速示意图</title>
|
|
<link
|
|
rel="stylesheet"
|
|
href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
|
|
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
|
|
crossorigin=""
|
|
/>
|
|
<style>
|
|
html, body, #map {{
|
|
height: 100%;
|
|
margin: 0;
|
|
}}
|
|
body {{
|
|
font-family: "Microsoft YaHei", "PingFang SC", sans-serif;
|
|
}}
|
|
.panel {{
|
|
position: absolute;
|
|
top: 12px;
|
|
right: 12px;
|
|
z-index: 1000;
|
|
width: 360px;
|
|
max-height: calc(100vh - 24px);
|
|
overflow: auto;
|
|
background: rgba(255, 255, 255, 0.95);
|
|
border: 1px solid rgba(0, 0, 0, 0.12);
|
|
border-radius: 12px;
|
|
padding: 12px 14px;
|
|
box-shadow: 0 10px 24px rgba(0, 0, 0, 0.14);
|
|
backdrop-filter: blur(6px);
|
|
}}
|
|
.panel h1 {{
|
|
margin: 0 0 8px;
|
|
font-size: 18px;
|
|
}}
|
|
.panel p {{
|
|
margin: 6px 0;
|
|
font-size: 13px;
|
|
line-height: 1.45;
|
|
}}
|
|
.panel ul {{
|
|
margin: 8px 0 0;
|
|
padding-left: 18px;
|
|
font-size: 12px;
|
|
line-height: 1.45;
|
|
}}
|
|
.panel li {{
|
|
margin: 4px 0;
|
|
}}
|
|
.legend-swatch {{
|
|
display: inline-block;
|
|
width: 14px;
|
|
height: 8px;
|
|
border-radius: 999px;
|
|
margin-right: 8px;
|
|
vertical-align: middle;
|
|
}}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div id="map"></div>
|
|
<div class="panel">
|
|
<h1>主走廊路段限速示意图</h1>
|
|
<p>聚合段数: <strong>{len(groups)}</strong></p>
|
|
<p>连续相同原始限速的 control edge 已合并显示。</p>
|
|
{legend_html}
|
|
<ul>
|
|
{summary_html}
|
|
</ul>
|
|
</div>
|
|
|
|
<script
|
|
src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
|
|
integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo="
|
|
crossorigin=""
|
|
></script>
|
|
<script>
|
|
const groups = {groups_json};
|
|
const map = L.map("map", {{
|
|
zoomControl: true,
|
|
preferCanvas: true
|
|
}});
|
|
|
|
const osm = L.tileLayer("https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png", {{
|
|
maxZoom: 19,
|
|
attribution: '© OpenStreetMap contributors'
|
|
}}).addTo(map);
|
|
|
|
const layers = [];
|
|
groups.forEach(group => {{
|
|
const speedColor = group.speed_kmh <= 85 ? "#dc2626"
|
|
: group.speed_kmh <= 95 ? "#ea580c"
|
|
: group.speed_kmh <= 105 ? "#ca8a04"
|
|
: "#16a34a";
|
|
const line = L.polyline(group.latlngs, {{
|
|
color: speedColor,
|
|
weight: 6,
|
|
opacity: 0.9
|
|
}});
|
|
line.bindPopup(`
|
|
<strong>限速: ${{group.speed_kmh.toFixed(0)}} km/h</strong><br/>
|
|
分组: G${{group.group_index}}<br/>
|
|
起点边: ${{group.start_edge_id}}<br/>
|
|
终点边: ${{group.end_edge_id}}<br/>
|
|
聚合边数: ${{group.edge_count}}<br/>
|
|
边列表: ${{group.edge_ids.join(", ")}}
|
|
`);
|
|
line.bindTooltip(`G${{group.group_index}} | ${{group.speed_kmh.toFixed(0)}} km/h`, {{sticky: true}});
|
|
line.addTo(map);
|
|
layers.push(line);
|
|
}});
|
|
|
|
const bounds = L.featureGroup(layers).getBounds();
|
|
if (bounds.isValid()) {{
|
|
map.fitBounds(bounds.pad(0.08));
|
|
}} else {{
|
|
map.setView([24.53, 117.50], 12);
|
|
}}
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
config_path = resolve_project_path(Path.cwd(), args.config)
|
|
net_path, control_edges = load_corridor_edges(config_path)
|
|
ordered_edges, projection = load_edge_geometry_and_speed(net_path, control_edges)
|
|
groups = aggregate_speed_groups(ordered_edges)
|
|
group_features = convert_groups_to_latlngs(groups, projection)
|
|
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(render_html(group_features), encoding="utf-8")
|
|
print(f"Generated {output_path} with {len(group_features)} aggregated speed-limit groups.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|