ctm-dqn/utils/experiment_corridor.py

866 lines
28 KiB
Python

"""Build derived SUMO assets for segment-based VSL experiments."""
from __future__ import annotations
import hashlib
import json
import os
import shutil
import subprocess
import sys
import time
import xml.etree.ElementTree as ET
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
try:
import sumo as _sumo_pkg
_tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools")
if _tools not in sys.path:
sys.path.insert(0, _tools)
except ImportError:
pass
import sumolib
EPS = 1e-6
BUILD_LOCK_TIMEOUT_S = 300.0
BUILD_LOCK_STALE_S = 900.0
BUILD_LOCK_POLL_S = 0.2
@dataclass
class CorridorArtifacts:
net_file: str
route_file: str
detector_add_file: str
enex_add_file: str
control_segments: List[Dict]
detector_cells: List[Dict]
physical_edge_ids: List[str]
controlled_length_m: float
def prepare_experiment_corridor_assets(
*,
net_file: str,
route_file: str,
corridor_edges: Sequence[str],
control_segment_length_m: float,
detector_spacing_m: float,
detector_start_offset_m: float,
output_root: str | None = None,
) -> CorridorArtifacts:
"""Generate cached derived assets for the configured experiment corridor."""
if not corridor_edges:
raise ValueError("corridor_edges must not be empty")
project_root = Path.cwd()
net_path = (project_root / net_file).resolve()
route_path = (project_root / route_file).resolve()
output_dir = (
(project_root / output_root).resolve()
if output_root
else (project_root / "sumo_resource" / "generated_corridor").resolve()
)
output_dir.mkdir(parents=True, exist_ok=True)
signature = _build_signature(
net_path=net_path,
route_path=route_path,
corridor_edges=corridor_edges,
control_segment_length_m=control_segment_length_m,
detector_spacing_m=detector_spacing_m,
detector_start_offset_m=detector_start_offset_m,
)
artifact_dir = output_dir / signature
artifact_dir.mkdir(parents=True, exist_ok=True)
manifest_path = artifact_dir / "manifest.json"
ready_artifacts = _load_ready_corridor_artifacts(manifest_path, project_root)
if ready_artifacts is not None:
return ready_artifacts
lock_path = artifact_dir / ".build.lock"
with _artifact_build_lock(lock_path):
ready_artifacts = _load_ready_corridor_artifacts(manifest_path, project_root)
if ready_artifacts is not None:
return ready_artifacts
_clear_incomplete_artifact_dir(artifact_dir, preserve_paths={lock_path})
plain_dir = artifact_dir / "plain"
plain_dir.mkdir(parents=True, exist_ok=True)
plain_prefix = plain_dir / "base"
_export_plain_net(net_path, plain_prefix)
base_net = sumolib.net.readNet(str(net_path))
corridor_specs = _build_corridor_edge_specs(base_net, corridor_edges)
total_length = sum(edge["length_m"] for edge in corridor_specs)
if total_length <= 0:
raise ValueError("Corridor length must be positive")
split_positions_by_edge = _compute_split_positions(
corridor_specs=corridor_specs,
control_segment_length_m=control_segment_length_m,
)
split_map, generated_edge_ids = _rewrite_plain_network(
plain_prefix=plain_prefix,
artifact_dir=artifact_dir,
corridor_specs=corridor_specs,
split_positions_by_edge=split_positions_by_edge,
)
generated_net = artifact_dir / "experiment_corridor.net.xml"
_build_net_from_plain(
artifact_dir=artifact_dir,
generated_net=generated_net,
)
derived_net = sumolib.net.readNet(str(generated_net))
physical_edge_ids, physical_edge_ranges = _build_physical_edge_ranges(
derived_net=derived_net,
corridor_specs=corridor_specs,
split_map=split_map,
)
control_segments = _build_control_segments(
physical_edge_ranges=physical_edge_ranges,
control_segment_length_m=control_segment_length_m,
total_length_m=total_length,
)
generated_route = artifact_dir / "experiment_corridor.rou.xml"
_rewrite_route_file(
route_path=route_path,
generated_route=generated_route,
split_map=split_map,
)
generated_detector = artifact_dir / "experiment_corridor_metrics_il.add.xml"
detector_cells = _build_detector_additional(
generated_detector=generated_detector,
derived_net=derived_net,
control_segments=control_segments,
physical_edge_ranges=physical_edge_ranges,
detector_spacing_m=detector_spacing_m,
detector_start_offset_m=detector_start_offset_m,
)
_attach_first_detector_groups(control_segments, detector_cells)
generated_enex = artifact_dir / "experiment_corridor_metrics_enex.add.xml"
_build_enex_additional(
generated_enex=generated_enex,
detector_cells=detector_cells,
)
manifest = {
"signature": signature,
"net_file": _relpath_str(generated_net, project_root),
"route_file": _relpath_str(generated_route, project_root),
"detector_add_file": _relpath_str(generated_detector, project_root),
"enex_add_file": _relpath_str(generated_enex, project_root),
"control_segments": control_segments,
"detector_cells": detector_cells,
"physical_edge_ids": physical_edge_ids,
"controlled_length_m": total_length,
}
_write_json_atomic(manifest_path, manifest)
return CorridorArtifacts(
net_file=manifest["net_file"],
route_file=manifest["route_file"],
detector_add_file=manifest["detector_add_file"],
enex_add_file=manifest["enex_add_file"],
control_segments=control_segments,
detector_cells=detector_cells,
physical_edge_ids=physical_edge_ids,
controlled_length_m=total_length,
)
def _build_signature(
*,
net_path: Path,
route_path: Path,
corridor_edges: Sequence[str],
control_segment_length_m: float,
detector_spacing_m: float,
detector_start_offset_m: float,
) -> str:
payload = {
"net_file": str(net_path),
"net_mtime_ns": net_path.stat().st_mtime_ns,
"route_file": str(route_path),
"route_mtime_ns": route_path.stat().st_mtime_ns,
"corridor_edges": list(corridor_edges),
"control_segment_length_m": control_segment_length_m,
"detector_spacing_m": detector_spacing_m,
"detector_start_offset_m": detector_start_offset_m,
}
digest = hashlib.sha1(
json.dumps(payload, ensure_ascii=True, sort_keys=True).encode("utf-8")
).hexdigest()
return digest[:16]
def _load_ready_corridor_artifacts(
manifest_path: Path,
project_root: Path,
) -> CorridorArtifacts | None:
if not manifest_path.is_file():
return None
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
required_keys = (
"net_file",
"route_file",
"detector_add_file",
"enex_add_file",
"control_segments",
"detector_cells",
"physical_edge_ids",
"controlled_length_m",
)
if any(key not in manifest for key in required_keys):
return None
for key in ("net_file", "route_file", "detector_add_file", "enex_add_file"):
candidate = (project_root / manifest[key]).resolve()
if not candidate.is_file():
return None
return CorridorArtifacts(
net_file=manifest["net_file"],
route_file=manifest["route_file"],
detector_add_file=manifest["detector_add_file"],
enex_add_file=manifest["enex_add_file"],
control_segments=manifest["control_segments"],
detector_cells=manifest["detector_cells"],
physical_edge_ids=manifest["physical_edge_ids"],
controlled_length_m=float(manifest["controlled_length_m"]),
)
@contextmanager
def _artifact_build_lock(lock_path: Path):
start_time = time.monotonic()
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
payload = json.dumps(
{"pid": os.getpid(), "created_at": time.time()},
ensure_ascii=True,
)
os.write(fd, payload.encode("utf-8"))
finally:
os.close(fd)
break
except FileExistsError:
try:
age_s = time.time() - lock_path.stat().st_mtime
except FileNotFoundError:
continue
if age_s > BUILD_LOCK_STALE_S:
try:
lock_path.unlink()
continue
except FileNotFoundError:
continue
except OSError:
pass
if time.monotonic() - start_time > BUILD_LOCK_TIMEOUT_S:
raise TimeoutError(f"Timed out waiting for corridor artifact lock: {lock_path}")
time.sleep(BUILD_LOCK_POLL_S)
try:
yield
finally:
try:
lock_path.unlink()
except FileNotFoundError:
pass
def _clear_incomplete_artifact_dir(artifact_dir: Path, preserve_paths: set[Path] | None = None):
preserve_resolved = {path.resolve() for path in (preserve_paths or set())}
for child in artifact_dir.iterdir():
if child.resolve() in preserve_resolved:
continue
if child.is_dir():
shutil.rmtree(child)
else:
child.unlink()
def _write_json_atomic(path: Path, payload: Dict):
temp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}")
temp_path.write_text(
json.dumps(payload, ensure_ascii=True, indent=2),
encoding="utf-8",
)
os.replace(temp_path, path)
def _export_plain_net(net_path: Path, plain_prefix: Path):
required = [
plain_prefix.with_suffix(".nod.xml"),
plain_prefix.with_suffix(".edg.xml"),
plain_prefix.with_suffix(".con.xml"),
plain_prefix.with_suffix(".tll.xml"),
plain_prefix.with_suffix(".typ.xml"),
]
if all(path.exists() for path in required):
return
subprocess.run(
[
"netconvert",
"--sumo-net-file",
str(net_path),
"--plain-output-prefix",
str(plain_prefix),
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _build_corridor_edge_specs(
base_net: sumolib.net.Net,
corridor_edges: Sequence[str],
) -> List[Dict]:
specs = []
cumulative = 0.0
for edge_id in corridor_edges:
edge = base_net.getEdge(edge_id)
length = float(edge.getLength())
specs.append(
{
"edge_id": edge_id,
"length_m": length,
"start_m": cumulative,
"end_m": cumulative + length,
}
)
cumulative += length
return specs
def _compute_split_positions(
*,
corridor_specs: Sequence[Dict],
control_segment_length_m: float,
) -> Dict[str, List[float]]:
if control_segment_length_m <= 0:
raise ValueError("control_segment_length_m must be positive")
total_length = corridor_specs[-1]["end_m"]
boundaries = []
boundary = control_segment_length_m
while boundary < total_length - EPS:
boundaries.append(boundary)
boundary += control_segment_length_m
split_positions_by_edge: Dict[str, List[float]] = {}
for spec in corridor_specs:
local_positions = []
for boundary in boundaries:
if spec["start_m"] + EPS < boundary < spec["end_m"] - EPS:
local_positions.append(boundary - spec["start_m"])
if local_positions:
split_positions_by_edge[spec["edge_id"]] = local_positions
return split_positions_by_edge
def _rewrite_plain_network(
*,
plain_prefix: Path,
artifact_dir: Path,
corridor_specs: Sequence[Dict],
split_positions_by_edge: Dict[str, List[float]],
) -> Tuple[Dict[str, List[str]], List[str]]:
nodes_path = plain_prefix.with_suffix(".nod.xml")
edges_path = plain_prefix.with_suffix(".edg.xml")
con_path = plain_prefix.with_suffix(".con.xml")
tll_path = plain_prefix.with_suffix(".tll.xml")
typ_path = plain_prefix.with_suffix(".typ.xml")
node_tree = ET.parse(nodes_path)
edge_tree = ET.parse(edges_path)
con_tree = ET.parse(con_path)
node_root = node_tree.getroot()
edge_root = edge_tree.getroot()
con_root = con_tree.getroot()
node_elems = {node.get("id"): node for node in node_root.findall("node")}
edge_elems = {edge.get("id"): edge for edge in edge_root.findall("edge")}
split_map: Dict[str, List[str]] = {}
generated_edge_ids: List[str] = []
for spec in corridor_specs:
edge_id = spec["edge_id"]
split_positions = split_positions_by_edge.get(edge_id, [])
if not split_positions:
split_map[edge_id] = [edge_id]
generated_edge_ids.append(edge_id)
continue
edge_elem = edge_elems[edge_id]
split_result = _split_edge_element(
edge_elem=edge_elem,
node_elems=node_elems,
split_positions=split_positions,
node_root=node_root,
)
split_map[edge_id] = [part["edge_id"] for part in split_result]
generated_edge_ids.extend(split_map[edge_id])
insert_index = list(edge_root).index(edge_elem)
edge_root.remove(edge_elem)
for offset, part in enumerate(split_result):
edge_root.insert(insert_index + offset, part["edge_elem"])
_rewrite_connections(con_root, split_map, edge_elems)
derived_nodes = artifact_dir / "derived.nod.xml"
derived_edges = artifact_dir / "derived.edg.xml"
derived_cons = artifact_dir / "derived.con.xml"
derived_tll = artifact_dir / "derived.tll.xml"
derived_typ = artifact_dir / "derived.typ.xml"
node_tree.write(derived_nodes, encoding="utf-8", xml_declaration=True)
edge_tree.write(derived_edges, encoding="utf-8", xml_declaration=True)
con_tree.write(derived_cons, encoding="utf-8", xml_declaration=True)
ET.parse(tll_path).write(derived_tll, encoding="utf-8", xml_declaration=True)
ET.parse(typ_path).write(derived_typ, encoding="utf-8", xml_declaration=True)
return split_map, generated_edge_ids
def _split_edge_element(
*,
edge_elem: ET.Element,
node_elems: Dict[str, ET.Element],
split_positions: Sequence[float],
node_root: ET.Element,
) -> List[Dict]:
from_node = node_elems[edge_elem.get("from")]
to_node = node_elems[edge_elem.get("to")]
points = _extract_edge_shape_points(edge_elem, from_node, to_node)
total_length = _polyline_length(points)
distances = [0.0] + list(split_positions) + [total_length]
part_defs = []
previous_node_id = edge_elem.get("from")
for idx, (start_dist, end_dist) in enumerate(zip(distances[:-1], distances[1:])):
if idx == len(distances) - 2:
next_node_id = edge_elem.get("to")
else:
split_point = _interpolate_point_on_polyline(points, end_dist)
next_node_id = f"{edge_elem.get('id')}__split_node_{idx + 1}"
if next_node_id not in {node.get("id") for node in node_root.findall("node")}:
new_node = ET.Element(
"node",
{
"id": next_node_id,
"x": f"{split_point[0]:.2f}",
"y": f"{split_point[1]:.2f}",
"type": "priority",
},
)
node_root.append(new_node)
part_edge = ET.fromstring(ET.tostring(edge_elem, encoding="unicode"))
part_edge_id = f"{edge_elem.get('id')}__part_{idx + 1}"
part_edge.set("id", part_edge_id)
part_edge.set("from", previous_node_id)
part_edge.set("to", next_node_id)
part_shape = _slice_polyline(points, start_dist, end_dist)
part_edge.set("shape", _shape_to_string(part_shape))
part_defs.append(
{
"edge_id": part_edge_id,
"edge_elem": part_edge,
}
)
previous_node_id = next_node_id
return part_defs
def _extract_edge_shape_points(
edge_elem: ET.Element,
from_node: ET.Element,
to_node: ET.Element,
) -> List[Tuple[float, float]]:
shape_attr = edge_elem.get("shape", "").strip()
if shape_attr:
points = []
for raw_pair in shape_attr.split():
x_str, y_str = raw_pair.split(",")
points.append((float(x_str), float(y_str)))
return points
return [
(float(from_node.get("x")), float(from_node.get("y"))),
(float(to_node.get("x")), float(to_node.get("y"))),
]
def _polyline_length(points: Sequence[Tuple[float, float]]) -> float:
length = 0.0
for a, b in zip(points[:-1], points[1:]):
length += _distance(a, b)
return length
def _interpolate_point_on_polyline(
points: Sequence[Tuple[float, float]],
target_dist: float,
) -> Tuple[float, float]:
if target_dist <= 0:
return points[0]
traversed = 0.0
for a, b in zip(points[:-1], points[1:]):
segment_length = _distance(a, b)
if traversed + segment_length >= target_dist - EPS:
ratio = 0.0 if segment_length <= EPS else (target_dist - traversed) / segment_length
return (a[0] + (b[0] - a[0]) * ratio, a[1] + (b[1] - a[1]) * ratio)
traversed += segment_length
return points[-1]
def _slice_polyline(
points: Sequence[Tuple[float, float]],
start_dist: float,
end_dist: float,
) -> List[Tuple[float, float]]:
sliced = [_interpolate_point_on_polyline(points, start_dist)]
traversed = 0.0
for a, b in zip(points[:-1], points[1:]):
segment_length = _distance(a, b)
next_traversed = traversed + segment_length
if traversed > end_dist - EPS:
break
if start_dist + EPS < next_traversed and traversed + EPS < end_dist:
if traversed >= start_dist - EPS and next_traversed <= end_dist + EPS:
sliced.append(b)
traversed = next_traversed
sliced.append(_interpolate_point_on_polyline(points, end_dist))
deduped = []
for point in sliced:
if not deduped or _distance(point, deduped[-1]) > EPS:
deduped.append(point)
return deduped
def _shape_to_string(points: Sequence[Tuple[float, float]]) -> str:
return " ".join(f"{x:.2f},{y:.2f}" for x, y in points)
def _distance(a: Tuple[float, float], b: Tuple[float, float]) -> float:
dx = a[0] - b[0]
dy = a[1] - b[1]
return (dx * dx + dy * dy) ** 0.5
def _rewrite_connections(
con_root: ET.Element,
split_map: Dict[str, List[str]],
edge_elems: Dict[str, ET.Element],
):
new_connections = []
for connection in con_root.findall("connection"):
updated = ET.fromstring(ET.tostring(connection, encoding="unicode"))
from_edge = updated.get("from")
to_edge = updated.get("to")
if from_edge in split_map and len(split_map[from_edge]) > 1:
updated.set("from", split_map[from_edge][-1])
if to_edge in split_map and len(split_map[to_edge]) > 1:
updated.set("to", split_map[to_edge][0])
new_connections.append(updated)
for edge_id, part_ids in split_map.items():
if len(part_ids) <= 1:
continue
lane_count = len(edge_elems[edge_id].findall("lane"))
for from_part, to_part in zip(part_ids[:-1], part_ids[1:]):
for lane_idx in range(lane_count):
new_connections.append(
ET.Element(
"connection",
{
"from": from_part,
"to": to_part,
"fromLane": str(lane_idx),
"toLane": str(lane_idx),
},
)
)
for connection in list(con_root):
con_root.remove(connection)
for connection in new_connections:
con_root.append(connection)
def _build_net_from_plain(
*,
artifact_dir: Path,
generated_net: Path,
):
subprocess.run(
[
"netconvert",
"-n",
str(artifact_dir / "derived.nod.xml"),
"-e",
str(artifact_dir / "derived.edg.xml"),
"-x",
str(artifact_dir / "derived.con.xml"),
"-i",
str(artifact_dir / "derived.tll.xml"),
"-t",
str(artifact_dir / "derived.typ.xml"),
"-o",
str(generated_net),
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _build_physical_edge_ranges(
*,
derived_net: sumolib.net.Net,
corridor_specs: Sequence[Dict],
split_map: Dict[str, List[str]],
) -> Tuple[List[str], List[Dict]]:
physical_edge_ids: List[str] = []
physical_edge_ranges: List[Dict] = []
cumulative = 0.0
for spec in corridor_specs:
for part_id in split_map[spec["edge_id"]]:
edge = derived_net.getEdge(part_id)
length = float(edge.getLength())
physical_edge_ids.append(part_id)
physical_edge_ranges.append(
{
"edge_id": part_id,
"start_m": cumulative,
"end_m": cumulative + length,
"length_m": length,
}
)
cumulative += length
return physical_edge_ids, physical_edge_ranges
def _build_control_segments(
*,
physical_edge_ranges: Sequence[Dict],
control_segment_length_m: float,
total_length_m: float,
) -> List[Dict]:
segments = []
start = 0.0
index = 0
while start < total_length_m - EPS:
end = min(start + control_segment_length_m, total_length_m)
member_edges = [
edge["edge_id"]
for edge in physical_edge_ranges
if edge["start_m"] < end - EPS and edge["end_m"] > start + EPS
]
segments.append(
{
"segment_id": f"corridor_seg_{index + 1:02d}",
"segment_index": index,
"start_distance_m": round(start, 6),
"end_distance_m": round(end, 6),
"length_m": round(end - start, 6),
"edge_ids": member_edges,
"first_detector_group_ids": [],
}
)
start = end
index += 1
return segments
def _rewrite_route_file(
*,
route_path: Path,
generated_route: Path,
split_map: Dict[str, List[str]],
):
route_tree = ET.parse(route_path)
route_root = route_tree.getroot()
for elem in route_root.iter():
edges_attr = elem.get("edges")
if not edges_attr:
continue
expanded = []
for edge_id in edges_attr.split():
expanded.extend(split_map.get(edge_id, [edge_id]))
elem.set("edges", " ".join(expanded))
route_tree.write(generated_route, encoding="utf-8", xml_declaration=True)
def _build_detector_additional(
*,
generated_detector: Path,
derived_net: sumolib.net.Net,
control_segments: List[Dict],
physical_edge_ranges: Sequence[Dict],
detector_spacing_m: float,
detector_start_offset_m: float,
) -> List[Dict]:
root = ET.Element("additional")
detector_cells: List[Dict] = []
per_edge_pos_index: Dict[str, int] = {}
for segment in control_segments:
segment_length = float(segment["length_m"])
detector_offset = detector_start_offset_m
segment_pos_index = 0
while detector_offset < segment_length - EPS:
global_distance = float(segment["start_distance_m"]) + detector_offset
edge_range = _locate_edge_range(global_distance, physical_edge_ranges)
if edge_range is None:
detector_offset += detector_spacing_m
segment_pos_index += 1
continue
edge = derived_net.getEdge(edge_range["edge_id"])
traffic_lane_indices = [
lane.getIndex()
for lane in edge.getLanes()
if "emergency" not in lane.getPermissions()
]
if not traffic_lane_indices:
traffic_lane_indices = [lane.getIndex() for lane in edge.getLanes()]
local_position = global_distance - edge_range["start_m"]
edge_pos_index = per_edge_pos_index.get(edge_range["edge_id"], 0)
detector_ids = []
for lane_idx in traffic_lane_indices:
det_id = f"{edge_range['edge_id']}_{lane_idx}_metrics_{edge_pos_index}"
detector_ids.append(det_id)
ET.SubElement(
root,
"inductionLoop",
{
"id": det_id,
"lane": f"{edge_range['edge_id']}_{lane_idx}",
"pos": f"{local_position:.2f}",
"freq": "60",
"file": "./metrics_il_output.xml",
},
)
detector_cells.append(
{
"segment_id": segment["segment_id"],
"segment_index": int(segment["segment_index"]),
"pos_index": int(segment_pos_index),
"distance_m": round(global_distance, 6),
"segment_position_m": round(detector_offset, 6),
"edge_id": edge_range["edge_id"],
"position_m": round(local_position, 6),
"detector_ids": detector_ids,
}
)
per_edge_pos_index[edge_range["edge_id"]] = edge_pos_index + 1
detector_offset += detector_spacing_m
segment_pos_index += 1
ET.ElementTree(root).write(generated_detector, encoding="utf-8", xml_declaration=True)
return detector_cells
def _locate_edge_range(global_distance: float, physical_edge_ranges: Sequence[Dict]) -> Dict | None:
for edge_range in physical_edge_ranges:
if edge_range["start_m"] + EPS < global_distance <= edge_range["end_m"] + EPS:
return edge_range
if physical_edge_ranges and abs(global_distance - physical_edge_ranges[0]["start_m"]) <= EPS:
return physical_edge_ranges[0]
return None
def _attach_first_detector_groups(control_segments: List[Dict], detector_cells: Sequence[Dict]):
first_by_segment = {}
for cell in detector_cells:
first_by_segment.setdefault(cell["segment_id"], cell["detector_ids"])
for segment in control_segments:
segment["first_detector_group_ids"] = list(first_by_segment.get(segment["segment_id"], []))
def _build_enex_additional(
*,
generated_enex: Path,
detector_cells: Sequence[Dict],
):
root = ET.Element("additional")
if detector_cells:
first_cell = detector_cells[0]
last_cell = detector_cells[-1]
enex = ET.SubElement(
root,
"entryExitDetector",
{
"id": "experiment_corridor_enexdet",
"period": "60",
"file": "./metrics_enex_output.xml",
"openEntry": "true",
},
)
for det_id in first_cell["detector_ids"]:
lane_id = det_id.rsplit("_metrics_", 1)[0]
ET.SubElement(
enex,
"detEntry",
{
"lane": lane_id,
"pos": f"{first_cell['position_m']:.2f}",
},
)
for det_id in last_cell["detector_ids"]:
lane_id = det_id.rsplit("_metrics_", 1)[0]
ET.SubElement(
enex,
"detExit",
{
"lane": lane_id,
"pos": f"{last_cell['position_m']:.2f}",
},
)
ET.ElementTree(root).write(generated_enex, encoding="utf-8", xml_declaration=True)
def _relpath_str(path: Path, project_root: Path) -> str:
return os.path.relpath(path, project_root).replace("\\", "/")