884 lines
29 KiB
Python
884 lines
29 KiB
Python
"""Build derived SUMO assets for segment-based VSL experiments."""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Sequence, Tuple
|
|
|
|
try:
|
|
import sumo as _sumo_pkg
|
|
|
|
_tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools")
|
|
if _tools not in sys.path:
|
|
sys.path.insert(0, _tools)
|
|
except ImportError:
|
|
pass
|
|
|
|
import sumolib
|
|
|
|
|
|
EPS = 1e-6
|
|
BUILD_LOCK_TIMEOUT_S = 300.0
|
|
BUILD_LOCK_STALE_S = 900.0
|
|
BUILD_LOCK_POLL_S = 0.2
|
|
DERIVED_ASSET_VERSION = 2
|
|
|
|
|
|
@dataclass
|
|
class CorridorArtifacts:
|
|
net_file: str
|
|
route_file: str
|
|
detector_add_file: str
|
|
enex_add_file: str
|
|
control_segments: List[Dict]
|
|
detector_cells: List[Dict]
|
|
physical_edge_ids: List[str]
|
|
controlled_length_m: float
|
|
|
|
|
|
def prepare_experiment_corridor_assets(
|
|
*,
|
|
net_file: str,
|
|
route_file: str,
|
|
corridor_edges: Sequence[str],
|
|
control_segment_length_m: float,
|
|
detector_spacing_m: float,
|
|
detector_start_offset_m: float,
|
|
output_root: str | None = None,
|
|
) -> CorridorArtifacts:
|
|
"""Generate cached derived assets for the configured experiment corridor."""
|
|
if not corridor_edges:
|
|
raise ValueError("corridor_edges must not be empty")
|
|
|
|
project_root = Path.cwd()
|
|
net_path = (project_root / net_file).resolve()
|
|
route_path = (project_root / route_file).resolve()
|
|
output_dir = (
|
|
(project_root / output_root).resolve()
|
|
if output_root
|
|
else (project_root / "sumo_resource" / "generated_corridor").resolve()
|
|
)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
signature = _build_signature(
|
|
net_path=net_path,
|
|
route_path=route_path,
|
|
corridor_edges=corridor_edges,
|
|
control_segment_length_m=control_segment_length_m,
|
|
detector_spacing_m=detector_spacing_m,
|
|
detector_start_offset_m=detector_start_offset_m,
|
|
)
|
|
artifact_dir = output_dir / signature
|
|
artifact_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest_path = artifact_dir / "manifest.json"
|
|
ready_artifacts = _load_ready_corridor_artifacts(manifest_path, project_root)
|
|
if ready_artifacts is not None:
|
|
return ready_artifacts
|
|
|
|
lock_path = artifact_dir / ".build.lock"
|
|
with _artifact_build_lock(lock_path):
|
|
ready_artifacts = _load_ready_corridor_artifacts(manifest_path, project_root)
|
|
if ready_artifacts is not None:
|
|
return ready_artifacts
|
|
|
|
_clear_incomplete_artifact_dir(artifact_dir, preserve_paths={lock_path})
|
|
|
|
plain_dir = artifact_dir / "plain"
|
|
plain_dir.mkdir(parents=True, exist_ok=True)
|
|
plain_prefix = plain_dir / "base"
|
|
_export_plain_net(net_path, plain_prefix)
|
|
|
|
base_net = sumolib.net.readNet(str(net_path))
|
|
corridor_specs = _build_corridor_edge_specs(base_net, corridor_edges)
|
|
total_length = sum(edge["length_m"] for edge in corridor_specs)
|
|
if total_length <= 0:
|
|
raise ValueError("Corridor length must be positive")
|
|
|
|
split_positions_by_edge = _compute_split_positions(
|
|
corridor_specs=corridor_specs,
|
|
control_segment_length_m=control_segment_length_m,
|
|
)
|
|
split_map, generated_edge_ids = _rewrite_plain_network(
|
|
plain_prefix=plain_prefix,
|
|
artifact_dir=artifact_dir,
|
|
corridor_specs=corridor_specs,
|
|
split_positions_by_edge=split_positions_by_edge,
|
|
)
|
|
|
|
generated_net = artifact_dir / "experiment_corridor.net.xml"
|
|
_build_net_from_plain(
|
|
artifact_dir=artifact_dir,
|
|
generated_net=generated_net,
|
|
)
|
|
|
|
derived_net = sumolib.net.readNet(str(generated_net))
|
|
physical_edge_ids, physical_edge_ranges = _build_physical_edge_ranges(
|
|
derived_net=derived_net,
|
|
corridor_specs=corridor_specs,
|
|
split_map=split_map,
|
|
)
|
|
control_segments = _build_control_segments(
|
|
physical_edge_ranges=physical_edge_ranges,
|
|
control_segment_length_m=control_segment_length_m,
|
|
total_length_m=total_length,
|
|
)
|
|
|
|
generated_route = artifact_dir / "experiment_corridor.rou.xml"
|
|
_rewrite_route_file(
|
|
route_path=route_path,
|
|
generated_route=generated_route,
|
|
split_map=split_map,
|
|
)
|
|
|
|
generated_detector = artifact_dir / "experiment_corridor_metrics_il.add.xml"
|
|
detector_cells = _build_detector_additional(
|
|
generated_detector=generated_detector,
|
|
derived_net=derived_net,
|
|
control_segments=control_segments,
|
|
physical_edge_ranges=physical_edge_ranges,
|
|
detector_spacing_m=detector_spacing_m,
|
|
detector_start_offset_m=detector_start_offset_m,
|
|
)
|
|
_attach_first_detector_groups(control_segments, detector_cells)
|
|
|
|
generated_enex = artifact_dir / "experiment_corridor_metrics_enex.add.xml"
|
|
_build_enex_additional(
|
|
generated_enex=generated_enex,
|
|
detector_cells=detector_cells,
|
|
)
|
|
|
|
manifest = {
|
|
"signature": signature,
|
|
"net_file": _relpath_str(generated_net, project_root),
|
|
"route_file": _relpath_str(generated_route, project_root),
|
|
"detector_add_file": _relpath_str(generated_detector, project_root),
|
|
"enex_add_file": _relpath_str(generated_enex, project_root),
|
|
"control_segments": control_segments,
|
|
"detector_cells": detector_cells,
|
|
"physical_edge_ids": physical_edge_ids,
|
|
"controlled_length_m": total_length,
|
|
}
|
|
_write_json_atomic(manifest_path, manifest)
|
|
|
|
return CorridorArtifacts(
|
|
net_file=manifest["net_file"],
|
|
route_file=manifest["route_file"],
|
|
detector_add_file=manifest["detector_add_file"],
|
|
enex_add_file=manifest["enex_add_file"],
|
|
control_segments=control_segments,
|
|
detector_cells=detector_cells,
|
|
physical_edge_ids=physical_edge_ids,
|
|
controlled_length_m=total_length,
|
|
)
|
|
|
|
|
|
def _build_signature(
|
|
*,
|
|
net_path: Path,
|
|
route_path: Path,
|
|
corridor_edges: Sequence[str],
|
|
control_segment_length_m: float,
|
|
detector_spacing_m: float,
|
|
detector_start_offset_m: float,
|
|
) -> str:
|
|
payload = {
|
|
"derived_asset_version": DERIVED_ASSET_VERSION,
|
|
"net_file": str(net_path),
|
|
"net_mtime_ns": net_path.stat().st_mtime_ns,
|
|
"route_file": str(route_path),
|
|
"route_mtime_ns": route_path.stat().st_mtime_ns,
|
|
"corridor_edges": list(corridor_edges),
|
|
"control_segment_length_m": control_segment_length_m,
|
|
"detector_spacing_m": detector_spacing_m,
|
|
"detector_start_offset_m": detector_start_offset_m,
|
|
}
|
|
digest = hashlib.sha1(
|
|
json.dumps(payload, ensure_ascii=True, sort_keys=True).encode("utf-8")
|
|
).hexdigest()
|
|
return digest[:16]
|
|
|
|
|
|
def _lane_permission_tokens(lane: sumolib.net.lane.Lane) -> set[str]:
|
|
permissions = lane.getPermissions()
|
|
if permissions is None:
|
|
return set()
|
|
if isinstance(permissions, str):
|
|
return {token for token in permissions.split() if token}
|
|
return {str(token).strip() for token in permissions if str(token).strip()}
|
|
|
|
|
|
def _is_emergency_only_lane(lane: sumolib.net.lane.Lane) -> bool:
|
|
return _lane_permission_tokens(lane) == {"emergency"}
|
|
|
|
|
|
def _get_traffic_lane_indices(edge: sumolib.net.edge.Edge) -> List[int]:
|
|
traffic_lane_indices = [
|
|
lane.getIndex() for lane in edge.getLanes() if not _is_emergency_only_lane(lane)
|
|
]
|
|
if traffic_lane_indices:
|
|
return traffic_lane_indices
|
|
return [lane.getIndex() for lane in edge.getLanes()]
|
|
|
|
|
|
def _load_ready_corridor_artifacts(
|
|
manifest_path: Path,
|
|
project_root: Path,
|
|
) -> CorridorArtifacts | None:
|
|
if not manifest_path.is_file():
|
|
return None
|
|
|
|
try:
|
|
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
required_keys = (
|
|
"net_file",
|
|
"route_file",
|
|
"detector_add_file",
|
|
"enex_add_file",
|
|
"control_segments",
|
|
"detector_cells",
|
|
"physical_edge_ids",
|
|
"controlled_length_m",
|
|
)
|
|
if any(key not in manifest for key in required_keys):
|
|
return None
|
|
|
|
for key in ("net_file", "route_file", "detector_add_file", "enex_add_file"):
|
|
candidate = (project_root / manifest[key]).resolve()
|
|
if not candidate.is_file():
|
|
return None
|
|
|
|
return CorridorArtifacts(
|
|
net_file=manifest["net_file"],
|
|
route_file=manifest["route_file"],
|
|
detector_add_file=manifest["detector_add_file"],
|
|
enex_add_file=manifest["enex_add_file"],
|
|
control_segments=manifest["control_segments"],
|
|
detector_cells=manifest["detector_cells"],
|
|
physical_edge_ids=manifest["physical_edge_ids"],
|
|
controlled_length_m=float(manifest["controlled_length_m"]),
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def _artifact_build_lock(lock_path: Path):
|
|
start_time = time.monotonic()
|
|
while True:
|
|
try:
|
|
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
try:
|
|
payload = json.dumps(
|
|
{"pid": os.getpid(), "created_at": time.time()},
|
|
ensure_ascii=True,
|
|
)
|
|
os.write(fd, payload.encode("utf-8"))
|
|
finally:
|
|
os.close(fd)
|
|
break
|
|
except FileExistsError:
|
|
try:
|
|
age_s = time.time() - lock_path.stat().st_mtime
|
|
except FileNotFoundError:
|
|
continue
|
|
|
|
if age_s > BUILD_LOCK_STALE_S:
|
|
try:
|
|
lock_path.unlink()
|
|
continue
|
|
except FileNotFoundError:
|
|
continue
|
|
except OSError:
|
|
pass
|
|
|
|
if time.monotonic() - start_time > BUILD_LOCK_TIMEOUT_S:
|
|
raise TimeoutError(f"Timed out waiting for corridor artifact lock: {lock_path}")
|
|
time.sleep(BUILD_LOCK_POLL_S)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
try:
|
|
lock_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def _clear_incomplete_artifact_dir(artifact_dir: Path, preserve_paths: set[Path] | None = None):
|
|
preserve_resolved = {path.resolve() for path in (preserve_paths or set())}
|
|
for child in artifact_dir.iterdir():
|
|
if child.resolve() in preserve_resolved:
|
|
continue
|
|
if child.is_dir():
|
|
shutil.rmtree(child)
|
|
else:
|
|
child.unlink()
|
|
|
|
|
|
def _write_json_atomic(path: Path, payload: Dict):
|
|
temp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}")
|
|
temp_path.write_text(
|
|
json.dumps(payload, ensure_ascii=True, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
os.replace(temp_path, path)
|
|
|
|
|
|
def _export_plain_net(net_path: Path, plain_prefix: Path):
|
|
required = [
|
|
plain_prefix.with_suffix(".nod.xml"),
|
|
plain_prefix.with_suffix(".edg.xml"),
|
|
plain_prefix.with_suffix(".con.xml"),
|
|
plain_prefix.with_suffix(".tll.xml"),
|
|
plain_prefix.with_suffix(".typ.xml"),
|
|
]
|
|
if all(path.exists() for path in required):
|
|
return
|
|
|
|
subprocess.run(
|
|
[
|
|
"netconvert",
|
|
"--sumo-net-file",
|
|
str(net_path),
|
|
"--plain-output-prefix",
|
|
str(plain_prefix),
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
|
|
def _build_corridor_edge_specs(
|
|
base_net: sumolib.net.Net,
|
|
corridor_edges: Sequence[str],
|
|
) -> List[Dict]:
|
|
specs = []
|
|
cumulative = 0.0
|
|
for edge_id in corridor_edges:
|
|
edge = base_net.getEdge(edge_id)
|
|
length = float(edge.getLength())
|
|
specs.append(
|
|
{
|
|
"edge_id": edge_id,
|
|
"length_m": length,
|
|
"start_m": cumulative,
|
|
"end_m": cumulative + length,
|
|
}
|
|
)
|
|
cumulative += length
|
|
return specs
|
|
|
|
|
|
def _compute_split_positions(
|
|
*,
|
|
corridor_specs: Sequence[Dict],
|
|
control_segment_length_m: float,
|
|
) -> Dict[str, List[float]]:
|
|
if control_segment_length_m <= 0:
|
|
raise ValueError("control_segment_length_m must be positive")
|
|
|
|
total_length = corridor_specs[-1]["end_m"]
|
|
boundaries = []
|
|
boundary = control_segment_length_m
|
|
while boundary < total_length - EPS:
|
|
boundaries.append(boundary)
|
|
boundary += control_segment_length_m
|
|
|
|
split_positions_by_edge: Dict[str, List[float]] = {}
|
|
for spec in corridor_specs:
|
|
local_positions = []
|
|
for boundary in boundaries:
|
|
if spec["start_m"] + EPS < boundary < spec["end_m"] - EPS:
|
|
local_positions.append(boundary - spec["start_m"])
|
|
if local_positions:
|
|
split_positions_by_edge[spec["edge_id"]] = local_positions
|
|
return split_positions_by_edge
|
|
|
|
|
|
def _rewrite_plain_network(
|
|
*,
|
|
plain_prefix: Path,
|
|
artifact_dir: Path,
|
|
corridor_specs: Sequence[Dict],
|
|
split_positions_by_edge: Dict[str, List[float]],
|
|
) -> Tuple[Dict[str, List[str]], List[str]]:
|
|
nodes_path = plain_prefix.with_suffix(".nod.xml")
|
|
edges_path = plain_prefix.with_suffix(".edg.xml")
|
|
con_path = plain_prefix.with_suffix(".con.xml")
|
|
tll_path = plain_prefix.with_suffix(".tll.xml")
|
|
typ_path = plain_prefix.with_suffix(".typ.xml")
|
|
|
|
node_tree = ET.parse(nodes_path)
|
|
edge_tree = ET.parse(edges_path)
|
|
con_tree = ET.parse(con_path)
|
|
|
|
node_root = node_tree.getroot()
|
|
edge_root = edge_tree.getroot()
|
|
con_root = con_tree.getroot()
|
|
|
|
node_elems = {node.get("id"): node for node in node_root.findall("node")}
|
|
edge_elems = {edge.get("id"): edge for edge in edge_root.findall("edge")}
|
|
|
|
split_map: Dict[str, List[str]] = {}
|
|
generated_edge_ids: List[str] = []
|
|
|
|
for spec in corridor_specs:
|
|
edge_id = spec["edge_id"]
|
|
split_positions = split_positions_by_edge.get(edge_id, [])
|
|
if not split_positions:
|
|
split_map[edge_id] = [edge_id]
|
|
generated_edge_ids.append(edge_id)
|
|
continue
|
|
|
|
edge_elem = edge_elems[edge_id]
|
|
split_result = _split_edge_element(
|
|
edge_elem=edge_elem,
|
|
node_elems=node_elems,
|
|
split_positions=split_positions,
|
|
node_root=node_root,
|
|
)
|
|
split_map[edge_id] = [part["edge_id"] for part in split_result]
|
|
generated_edge_ids.extend(split_map[edge_id])
|
|
|
|
insert_index = list(edge_root).index(edge_elem)
|
|
edge_root.remove(edge_elem)
|
|
for offset, part in enumerate(split_result):
|
|
edge_root.insert(insert_index + offset, part["edge_elem"])
|
|
|
|
_rewrite_connections(con_root, split_map, edge_elems)
|
|
|
|
derived_nodes = artifact_dir / "derived.nod.xml"
|
|
derived_edges = artifact_dir / "derived.edg.xml"
|
|
derived_cons = artifact_dir / "derived.con.xml"
|
|
derived_tll = artifact_dir / "derived.tll.xml"
|
|
derived_typ = artifact_dir / "derived.typ.xml"
|
|
|
|
node_tree.write(derived_nodes, encoding="utf-8", xml_declaration=True)
|
|
edge_tree.write(derived_edges, encoding="utf-8", xml_declaration=True)
|
|
con_tree.write(derived_cons, encoding="utf-8", xml_declaration=True)
|
|
ET.parse(tll_path).write(derived_tll, encoding="utf-8", xml_declaration=True)
|
|
ET.parse(typ_path).write(derived_typ, encoding="utf-8", xml_declaration=True)
|
|
|
|
return split_map, generated_edge_ids
|
|
|
|
|
|
def _split_edge_element(
|
|
*,
|
|
edge_elem: ET.Element,
|
|
node_elems: Dict[str, ET.Element],
|
|
split_positions: Sequence[float],
|
|
node_root: ET.Element,
|
|
) -> List[Dict]:
|
|
from_node = node_elems[edge_elem.get("from")]
|
|
to_node = node_elems[edge_elem.get("to")]
|
|
points = _extract_edge_shape_points(edge_elem, from_node, to_node)
|
|
total_length = _polyline_length(points)
|
|
|
|
distances = [0.0] + list(split_positions) + [total_length]
|
|
part_defs = []
|
|
previous_node_id = edge_elem.get("from")
|
|
|
|
for idx, (start_dist, end_dist) in enumerate(zip(distances[:-1], distances[1:])):
|
|
if idx == len(distances) - 2:
|
|
next_node_id = edge_elem.get("to")
|
|
else:
|
|
split_point = _interpolate_point_on_polyline(points, end_dist)
|
|
next_node_id = f"{edge_elem.get('id')}__split_node_{idx + 1}"
|
|
if next_node_id not in {node.get("id") for node in node_root.findall("node")}:
|
|
new_node = ET.Element(
|
|
"node",
|
|
{
|
|
"id": next_node_id,
|
|
"x": f"{split_point[0]:.2f}",
|
|
"y": f"{split_point[1]:.2f}",
|
|
"type": "priority",
|
|
},
|
|
)
|
|
node_root.append(new_node)
|
|
|
|
part_edge = ET.fromstring(ET.tostring(edge_elem, encoding="unicode"))
|
|
part_edge_id = f"{edge_elem.get('id')}__part_{idx + 1}"
|
|
part_edge.set("id", part_edge_id)
|
|
part_edge.set("from", previous_node_id)
|
|
part_edge.set("to", next_node_id)
|
|
part_shape = _slice_polyline(points, start_dist, end_dist)
|
|
part_edge.set("shape", _shape_to_string(part_shape))
|
|
|
|
part_defs.append(
|
|
{
|
|
"edge_id": part_edge_id,
|
|
"edge_elem": part_edge,
|
|
}
|
|
)
|
|
previous_node_id = next_node_id
|
|
|
|
return part_defs
|
|
|
|
|
|
def _extract_edge_shape_points(
|
|
edge_elem: ET.Element,
|
|
from_node: ET.Element,
|
|
to_node: ET.Element,
|
|
) -> List[Tuple[float, float]]:
|
|
shape_attr = edge_elem.get("shape", "").strip()
|
|
if shape_attr:
|
|
points = []
|
|
for raw_pair in shape_attr.split():
|
|
x_str, y_str = raw_pair.split(",")
|
|
points.append((float(x_str), float(y_str)))
|
|
return points
|
|
|
|
return [
|
|
(float(from_node.get("x")), float(from_node.get("y"))),
|
|
(float(to_node.get("x")), float(to_node.get("y"))),
|
|
]
|
|
|
|
|
|
def _polyline_length(points: Sequence[Tuple[float, float]]) -> float:
|
|
length = 0.0
|
|
for a, b in zip(points[:-1], points[1:]):
|
|
length += _distance(a, b)
|
|
return length
|
|
|
|
|
|
def _interpolate_point_on_polyline(
|
|
points: Sequence[Tuple[float, float]],
|
|
target_dist: float,
|
|
) -> Tuple[float, float]:
|
|
if target_dist <= 0:
|
|
return points[0]
|
|
|
|
traversed = 0.0
|
|
for a, b in zip(points[:-1], points[1:]):
|
|
segment_length = _distance(a, b)
|
|
if traversed + segment_length >= target_dist - EPS:
|
|
ratio = 0.0 if segment_length <= EPS else (target_dist - traversed) / segment_length
|
|
return (a[0] + (b[0] - a[0]) * ratio, a[1] + (b[1] - a[1]) * ratio)
|
|
traversed += segment_length
|
|
|
|
return points[-1]
|
|
|
|
|
|
def _slice_polyline(
|
|
points: Sequence[Tuple[float, float]],
|
|
start_dist: float,
|
|
end_dist: float,
|
|
) -> List[Tuple[float, float]]:
|
|
sliced = [_interpolate_point_on_polyline(points, start_dist)]
|
|
traversed = 0.0
|
|
for a, b in zip(points[:-1], points[1:]):
|
|
segment_length = _distance(a, b)
|
|
next_traversed = traversed + segment_length
|
|
if traversed > end_dist - EPS:
|
|
break
|
|
if start_dist + EPS < next_traversed and traversed + EPS < end_dist:
|
|
if traversed >= start_dist - EPS and next_traversed <= end_dist + EPS:
|
|
sliced.append(b)
|
|
traversed = next_traversed
|
|
sliced.append(_interpolate_point_on_polyline(points, end_dist))
|
|
|
|
deduped = []
|
|
for point in sliced:
|
|
if not deduped or _distance(point, deduped[-1]) > EPS:
|
|
deduped.append(point)
|
|
return deduped
|
|
|
|
|
|
def _shape_to_string(points: Sequence[Tuple[float, float]]) -> str:
|
|
return " ".join(f"{x:.2f},{y:.2f}" for x, y in points)
|
|
|
|
|
|
def _distance(a: Tuple[float, float], b: Tuple[float, float]) -> float:
|
|
dx = a[0] - b[0]
|
|
dy = a[1] - b[1]
|
|
return (dx * dx + dy * dy) ** 0.5
|
|
|
|
|
|
def _rewrite_connections(
|
|
con_root: ET.Element,
|
|
split_map: Dict[str, List[str]],
|
|
edge_elems: Dict[str, ET.Element],
|
|
):
|
|
new_connections = []
|
|
for connection in con_root.findall("connection"):
|
|
updated = ET.fromstring(ET.tostring(connection, encoding="unicode"))
|
|
from_edge = updated.get("from")
|
|
to_edge = updated.get("to")
|
|
if from_edge in split_map and len(split_map[from_edge]) > 1:
|
|
updated.set("from", split_map[from_edge][-1])
|
|
if to_edge in split_map and len(split_map[to_edge]) > 1:
|
|
updated.set("to", split_map[to_edge][0])
|
|
new_connections.append(updated)
|
|
|
|
for edge_id, part_ids in split_map.items():
|
|
if len(part_ids) <= 1:
|
|
continue
|
|
lane_count = len(edge_elems[edge_id].findall("lane"))
|
|
for from_part, to_part in zip(part_ids[:-1], part_ids[1:]):
|
|
for lane_idx in range(lane_count):
|
|
new_connections.append(
|
|
ET.Element(
|
|
"connection",
|
|
{
|
|
"from": from_part,
|
|
"to": to_part,
|
|
"fromLane": str(lane_idx),
|
|
"toLane": str(lane_idx),
|
|
},
|
|
)
|
|
)
|
|
|
|
for connection in list(con_root):
|
|
con_root.remove(connection)
|
|
for connection in new_connections:
|
|
con_root.append(connection)
|
|
|
|
|
|
def _build_net_from_plain(
|
|
*,
|
|
artifact_dir: Path,
|
|
generated_net: Path,
|
|
):
|
|
subprocess.run(
|
|
[
|
|
"netconvert",
|
|
"-n",
|
|
str(artifact_dir / "derived.nod.xml"),
|
|
"-e",
|
|
str(artifact_dir / "derived.edg.xml"),
|
|
"-x",
|
|
str(artifact_dir / "derived.con.xml"),
|
|
"-i",
|
|
str(artifact_dir / "derived.tll.xml"),
|
|
"-t",
|
|
str(artifact_dir / "derived.typ.xml"),
|
|
"-o",
|
|
str(generated_net),
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
|
|
def _build_physical_edge_ranges(
|
|
*,
|
|
derived_net: sumolib.net.Net,
|
|
corridor_specs: Sequence[Dict],
|
|
split_map: Dict[str, List[str]],
|
|
) -> Tuple[List[str], List[Dict]]:
|
|
physical_edge_ids: List[str] = []
|
|
physical_edge_ranges: List[Dict] = []
|
|
cumulative = 0.0
|
|
for spec in corridor_specs:
|
|
for part_id in split_map[spec["edge_id"]]:
|
|
edge = derived_net.getEdge(part_id)
|
|
length = float(edge.getLength())
|
|
physical_edge_ids.append(part_id)
|
|
physical_edge_ranges.append(
|
|
{
|
|
"edge_id": part_id,
|
|
"start_m": cumulative,
|
|
"end_m": cumulative + length,
|
|
"length_m": length,
|
|
}
|
|
)
|
|
cumulative += length
|
|
return physical_edge_ids, physical_edge_ranges
|
|
|
|
|
|
def _build_control_segments(
|
|
*,
|
|
physical_edge_ranges: Sequence[Dict],
|
|
control_segment_length_m: float,
|
|
total_length_m: float,
|
|
) -> List[Dict]:
|
|
segments = []
|
|
start = 0.0
|
|
index = 0
|
|
while start < total_length_m - EPS:
|
|
end = min(start + control_segment_length_m, total_length_m)
|
|
member_edges = [
|
|
edge["edge_id"]
|
|
for edge in physical_edge_ranges
|
|
if edge["start_m"] < end - EPS and edge["end_m"] > start + EPS
|
|
]
|
|
segments.append(
|
|
{
|
|
"segment_id": f"corridor_seg_{index + 1:02d}",
|
|
"segment_index": index,
|
|
"start_distance_m": round(start, 6),
|
|
"end_distance_m": round(end, 6),
|
|
"length_m": round(end - start, 6),
|
|
"edge_ids": member_edges,
|
|
"first_detector_group_ids": [],
|
|
}
|
|
)
|
|
start = end
|
|
index += 1
|
|
return segments
|
|
|
|
|
|
def _rewrite_route_file(
|
|
*,
|
|
route_path: Path,
|
|
generated_route: Path,
|
|
split_map: Dict[str, List[str]],
|
|
):
|
|
route_tree = ET.parse(route_path)
|
|
route_root = route_tree.getroot()
|
|
|
|
for elem in route_root.iter():
|
|
edges_attr = elem.get("edges")
|
|
if not edges_attr:
|
|
continue
|
|
expanded = []
|
|
for edge_id in edges_attr.split():
|
|
expanded.extend(split_map.get(edge_id, [edge_id]))
|
|
elem.set("edges", " ".join(expanded))
|
|
|
|
route_tree.write(generated_route, encoding="utf-8", xml_declaration=True)
|
|
|
|
|
|
def _build_detector_additional(
|
|
*,
|
|
generated_detector: Path,
|
|
derived_net: sumolib.net.Net,
|
|
control_segments: List[Dict],
|
|
physical_edge_ranges: Sequence[Dict],
|
|
detector_spacing_m: float,
|
|
detector_start_offset_m: float,
|
|
) -> List[Dict]:
|
|
root = ET.Element("additional")
|
|
detector_cells: List[Dict] = []
|
|
per_edge_pos_index: Dict[str, int] = {}
|
|
|
|
for segment in control_segments:
|
|
segment_length = float(segment["length_m"])
|
|
detector_offset = detector_start_offset_m
|
|
segment_pos_index = 0
|
|
while detector_offset < segment_length - EPS:
|
|
global_distance = float(segment["start_distance_m"]) + detector_offset
|
|
edge_range = _locate_edge_range(global_distance, physical_edge_ranges)
|
|
if edge_range is None:
|
|
detector_offset += detector_spacing_m
|
|
segment_pos_index += 1
|
|
continue
|
|
|
|
edge = derived_net.getEdge(edge_range["edge_id"])
|
|
traffic_lane_indices = _get_traffic_lane_indices(edge)
|
|
|
|
local_position = global_distance - edge_range["start_m"]
|
|
edge_pos_index = per_edge_pos_index.get(edge_range["edge_id"], 0)
|
|
detector_ids = []
|
|
for lane_idx in traffic_lane_indices:
|
|
det_id = f"{edge_range['edge_id']}_{lane_idx}_metrics_{edge_pos_index}"
|
|
detector_ids.append(det_id)
|
|
ET.SubElement(
|
|
root,
|
|
"inductionLoop",
|
|
{
|
|
"id": det_id,
|
|
"lane": f"{edge_range['edge_id']}_{lane_idx}",
|
|
"pos": f"{local_position:.2f}",
|
|
"freq": "60",
|
|
"file": "./metrics_il_output.xml",
|
|
},
|
|
)
|
|
|
|
detector_cells.append(
|
|
{
|
|
"segment_id": segment["segment_id"],
|
|
"segment_index": int(segment["segment_index"]),
|
|
"pos_index": int(segment_pos_index),
|
|
"distance_m": round(global_distance, 6),
|
|
"segment_position_m": round(detector_offset, 6),
|
|
"edge_id": edge_range["edge_id"],
|
|
"position_m": round(local_position, 6),
|
|
"detector_ids": detector_ids,
|
|
}
|
|
)
|
|
per_edge_pos_index[edge_range["edge_id"]] = edge_pos_index + 1
|
|
detector_offset += detector_spacing_m
|
|
segment_pos_index += 1
|
|
|
|
ET.ElementTree(root).write(generated_detector, encoding="utf-8", xml_declaration=True)
|
|
return detector_cells
|
|
|
|
|
|
def _locate_edge_range(global_distance: float, physical_edge_ranges: Sequence[Dict]) -> Dict | None:
|
|
for edge_range in physical_edge_ranges:
|
|
if edge_range["start_m"] + EPS < global_distance <= edge_range["end_m"] + EPS:
|
|
return edge_range
|
|
if physical_edge_ranges and abs(global_distance - physical_edge_ranges[0]["start_m"]) <= EPS:
|
|
return physical_edge_ranges[0]
|
|
return None
|
|
|
|
|
|
def _attach_first_detector_groups(control_segments: List[Dict], detector_cells: Sequence[Dict]):
|
|
first_by_segment = {}
|
|
for cell in detector_cells:
|
|
first_by_segment.setdefault(cell["segment_id"], cell["detector_ids"])
|
|
for segment in control_segments:
|
|
segment["first_detector_group_ids"] = list(first_by_segment.get(segment["segment_id"], []))
|
|
|
|
|
|
def _build_enex_additional(
|
|
*,
|
|
generated_enex: Path,
|
|
detector_cells: Sequence[Dict],
|
|
):
|
|
root = ET.Element("additional")
|
|
if detector_cells:
|
|
first_cell = detector_cells[0]
|
|
last_cell = detector_cells[-1]
|
|
enex = ET.SubElement(
|
|
root,
|
|
"entryExitDetector",
|
|
{
|
|
"id": "experiment_corridor_enexdet",
|
|
"period": "60",
|
|
"file": "./metrics_enex_output.xml",
|
|
"openEntry": "true",
|
|
},
|
|
)
|
|
for det_id in first_cell["detector_ids"]:
|
|
lane_id = det_id.rsplit("_metrics_", 1)[0]
|
|
ET.SubElement(
|
|
enex,
|
|
"detEntry",
|
|
{
|
|
"lane": lane_id,
|
|
"pos": f"{first_cell['position_m']:.2f}",
|
|
},
|
|
)
|
|
for det_id in last_cell["detector_ids"]:
|
|
lane_id = det_id.rsplit("_metrics_", 1)[0]
|
|
ET.SubElement(
|
|
enex,
|
|
"detExit",
|
|
{
|
|
"lane": lane_id,
|
|
"pos": f"{last_cell['position_m']:.2f}",
|
|
},
|
|
)
|
|
ET.ElementTree(root).write(generated_enex, encoding="utf-8", xml_declaration=True)
|
|
|
|
|
|
def _relpath_str(path: Path, project_root: Path) -> str:
|
|
return os.path.relpath(path, project_root).replace("\\", "/")
|