ctm-dqn/envs/edge_vsl_env.py

643 lines
24 KiB
Python

"""SUMO VSL environment with 1000 m corridor control segments."""
import os
import sys
import numpy as np
import xml.etree.ElementTree as ET
from typing import Tuple, Dict, List, Optional, Set
try:
import sumo as _sumo_pkg
_tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools")
if _tools not in sys.path:
sys.path.insert(0, _tools)
except ImportError:
pass
import traci
import traci.constants as tc
from envs.network_parser import SUMONetworkParser
from utils.experiment_corridor import prepare_experiment_corridor_assets
class SUMOEdgeVSLEnvironment:
"""Edge-based VSL environment over derived corridor control segments."""
def __init__(self, config: dict):
sumo_cfg = config["sumo"]
env_cfg = config["environment"]
self.net_file = sumo_cfg["net_file"]
self.route_file = sumo_cfg["route_file"]
self.detector_add_file = sumo_cfg["detector_add_file"]
self.enex_add_file = sumo_cfg["enex_add_file"]
self.step_length = sumo_cfg["step_length"]
self.begin_time = sumo_cfg["begin_time"]
self.end_time = sumo_cfg["end_time"]
self.use_gui = sumo_cfg.get("gui", False)
self.no_warnings = sumo_cfg.get("no_warnings", True)
runtime_cfg = config.get("runtime", {})
self.runtime_output_dir = runtime_cfg.get("output_dir")
self.runtime_metrics_subdir = runtime_cfg.get("metrics_subdir", "sumo_metrics")
self.runtime_detector_add_file: Optional[str] = None
self.runtime_enex_add_file: Optional[str] = None
self.collect_detector_cells = runtime_cfg.get(
"collect_detector_cells",
env_cfg.get("collect_detector_cells", False),
)
self.use_vehicle_subscriptions = runtime_cfg.get(
"use_vehicle_subscriptions",
env_cfg.get("use_vehicle_subscriptions", True),
)
self.control_interval = env_cfg["control_interval"]
self.steps_per_action = int(self.control_interval / self.step_length)
self.warmup_time = 900
self.episode_length = int(
(self.end_time - self.begin_time - self.warmup_time) / self.control_interval
)
self.speed_actions_kmh = np.array(env_cfg["speed_actions_kmh"], dtype=float)
self.speed_actions_ms = self.speed_actions_kmh / 3.6
self.num_speed_actions = len(self.speed_actions_kmh)
self.free_flow_speed = env_cfg["free_flow_speed"]
self.base_corridor_edges: List[str] = env_cfg["control_edges"]
self.control_segment_length_m = float(env_cfg.get("control_segment_length_m", 1000.0))
self.detector_spacing_m = float(env_cfg.get("detector_spacing_m", 100.0))
self.passive_prefix_segment_count = int(env_cfg.get("passive_prefix_segment_count", 0))
corridor_assets = prepare_experiment_corridor_assets(
net_file=self.net_file,
route_file=self.route_file,
corridor_edges=self.base_corridor_edges,
control_segment_length_m=self.control_segment_length_m,
detector_spacing_m=self.detector_spacing_m,
output_root=env_cfg.get("generated_asset_dir"),
)
self.net_file = corridor_assets.net_file
self.route_file = corridor_assets.route_file
self.detector_add_file = corridor_assets.detector_add_file
self.enex_add_file = corridor_assets.enex_add_file
self._detector_add_template = self.detector_add_file
self._enex_add_template = self.enex_add_file
self.control_segments: List[Dict] = corridor_assets.control_segments
self.detector_cell_defs: List[Dict] = corridor_assets.detector_cells
self.control_edges: List[str] = [
segment["segment_id"] for segment in self.control_segments
]
self.segment_edge_map: Dict[str, List[str]] = {
segment["segment_id"]: list(segment["edge_ids"])
for segment in self.control_segments
}
self.segment_detector_map: Dict[str, List[str]] = {
segment["segment_id"]: list(segment.get("first_detector_group_ids", []))
for segment in self.control_segments
}
self.num_edges = len(self.control_edges)
self.passive_prefix_segment_count = min(self.passive_prefix_segment_count, self.num_edges)
self.passive_segment_indices = set(range(self.passive_prefix_segment_count))
self.passive_segments: List[str] = self.control_edges[: self.passive_prefix_segment_count]
self.active_control_segments: List[str] = self.control_edges[self.passive_prefix_segment_count :]
self.physical_control_edges: List[str] = list(corridor_assets.physical_edge_ids)
self.control_edge_set = set(self.physical_control_edges)
self.reward_cfg = env_cfg.get("reward", {})
self.w1 = self.reward_cfg.get("w_flow", 0.4)
self.w2 = self.reward_cfg.get("w_var", 0.3)
self.w_base = self.reward_cfg.get("w_brake_base", 0.1)
self.w_max = self.reward_cfg.get("w_brake_max", 0.5)
self.w4 = self.reward_cfg.get("w_penalty", 0.2)
self.rho_critical = self.reward_cfg.get("rho_critical", 35.0)
self.k_sigmoid = self.reward_cfg.get("k_sigmoid", 0.2)
self.d_th = self.reward_cfg.get("d_threshold", 3.0)
self.d_max = self.reward_cfg.get("d_max", 8.0)
self.C_max = self.reward_cfg.get("C_max", 6000.0)
self.v_limit = self.reward_cfg.get("v_limit", 33.33)
self.delta_vsl_max = self.reward_cfg.get("delta_vsl_max", 60.0 / 3.6)
self.parser = SUMONetworkParser(
detector_add_file=self._detector_add_template,
net_file=self.net_file,
)
self.controlled_length_km = corridor_assets.controlled_length_m / 1000.0
self.default_edge_speeds = self._build_default_segment_speeds()
self.action_dims = [self.num_speed_actions] * self.num_edges
self.features_per_edge = 3
self._state_dim = (self.features_per_edge + 1) * self.num_edges + 3 + 1
self.current_step = 0
self._sumo_running = False
self._episode_count = 0
self.current_edge_speeds = self.default_edge_speeds.copy()
self._prev_edge_speeds = self.default_edge_speeds.copy()
self._last_reward = 0.0
self.episode_metrics: List[Dict] = []
self._tracked_vehicle_ids: Set[str] = set()
print("SUMO Edge VSL Environment initialized")
print(f" Control segments: {self.num_edges}")
print(f" Passive prefix segments: {self.passive_prefix_segment_count}")
print(f" Physical corridor edges: {len(self.physical_control_edges)}")
print(f" Action: MultiDiscrete {self.action_dims}")
print(f" State dim: {self._state_dim}")
print(f" Episode length: {self.episode_length} steps")
@property
def state_dim(self) -> int:
return self._state_dim
@property
def action_dim(self) -> int:
return self.num_speed_actions
def _build_default_segment_speeds(self) -> np.ndarray:
default_speeds = []
for segment in self.control_segments:
edge_speeds = []
for edge_id in segment["edge_ids"]:
edge_info = self.parser.edge_info.get(edge_id)
if edge_info is not None:
edge_speeds.append(float(edge_info.speed_limit))
default_speeds.append(
float(min(edge_speeds)) if edge_speeds else float(self.free_flow_speed)
)
return np.array(default_speeds, dtype=float)
def _start_sumo(self, seed: Optional[int] = None):
if self._sumo_running:
self._close_sumo()
self._prepare_runtime_additional_files()
binary_name = "sumo-gui" if self.use_gui else "sumo"
try:
import sumolib
sumo_binary = sumolib.checkBinary(binary_name)
except Exception:
sumo_binary = binary_name
detector_add_file = self.runtime_detector_add_file or self.detector_add_file
enex_add_file = self.runtime_enex_add_file or self.enex_add_file
cmd = [
sumo_binary,
"-n",
self.net_file,
"-r",
self.route_file,
"-a",
f"{detector_add_file},{enex_add_file}",
"--step-length",
str(self.step_length),
"-b",
str(self.begin_time),
"-e",
str(self.end_time),
"--collision.action",
"warn",
"--quit-on-end",
"true",
]
if self.no_warnings:
cmd += ["--no-warnings", "true"]
if seed is not None:
cmd += ["--seed", str(seed)]
if self.use_gui:
cmd += ["--start", "true", "--gui-settings-file", "sumo_resource/gui.settings.xml"]
traci.start(cmd, label=f"vsl_{self._episode_count}")
self._sumo_running = True
@staticmethod
def _to_sumo_path(path: str) -> str:
return os.path.abspath(path).replace("\\", "/")
def _rewrite_additional_file(self, template_path: str, runtime_add_path: str, output_xml_path: str):
tree = ET.parse(template_path)
root = tree.getroot()
for elem in root.iter():
if "file" in elem.attrib:
elem.set("file", output_xml_path)
tree.write(runtime_add_path, encoding="utf-8", xml_declaration=True)
def _prepare_runtime_additional_files(self):
if not self.runtime_output_dir:
self.runtime_detector_add_file = None
self.runtime_enex_add_file = None
return
output_dir = os.path.join(
os.path.abspath(self.runtime_output_dir),
self.runtime_metrics_subdir,
)
os.makedirs(output_dir, exist_ok=True)
suffix = f"ep{self._episode_count:04d}"
detector_output_file = self._to_sumo_path(
os.path.join(output_dir, f"metrics_il_output_{suffix}.xml")
)
enex_output_file = self._to_sumo_path(
os.path.join(output_dir, f"metrics_enex_output_{suffix}.xml")
)
detector_add_file = os.path.join(output_dir, f"runtime_metrics_il_{suffix}.add.xml")
enex_add_file = os.path.join(output_dir, f"runtime_metrics_enex_{suffix}.add.xml")
self._rewrite_additional_file(
self._detector_add_template,
detector_add_file,
detector_output_file,
)
self._rewrite_additional_file(
self._enex_add_template,
enex_add_file,
enex_output_file,
)
self.runtime_detector_add_file = detector_add_file
self.runtime_enex_add_file = enex_add_file
def _close_sumo(self):
if self._sumo_running:
try:
traci.close()
except Exception:
pass
self._sumo_running = False
self._tracked_vehicle_ids.clear()
def reset(self, seed: Optional[int] = None) -> np.ndarray:
self._episode_count += 1
self.current_step = 0
self.episode_metrics = []
self.current_edge_speeds = self.default_edge_speeds.copy()
self._prev_edge_speeds = self.default_edge_speeds.copy()
self._last_reward = 0.0
self._tracked_vehicle_ids.clear()
self._start_sumo(seed=seed)
warmup_steps = int(900 / self.control_interval)
for _ in range(warmup_steps):
for _ in range(self.steps_per_action):
traci.simulationStep()
return self._collect_state()
def step(
self,
action: Optional[np.ndarray],
apply_control: bool = True,
) -> Tuple[np.ndarray, float, bool, Dict]:
self._prev_edge_speeds = self.current_edge_speeds.copy()
if apply_control:
if action is None:
raise ValueError("action must be provided when apply_control=True")
edge_speeds = self._decode_action(action)
if self.passive_prefix_segment_count > 0:
edge_speeds[: self.passive_prefix_segment_count] = self.default_edge_speeds[
: self.passive_prefix_segment_count
]
else:
edge_speeds = self.default_edge_speeds.copy()
self.current_edge_speeds = edge_speeds
if apply_control:
self._apply_vsl(edge_speeds)
self._interval_arrived = 0
self._interval_departed = 0
for _ in range(self.steps_per_action):
traci.simulationStep()
self._interval_arrived += traci.simulation.getArrivedNumber()
self._interval_departed += traci.simulation.getDepartedNumber()
detector_data = self._get_edge_detector_data()
state = self._collect_state(detector_data)
info = self._collect_runtime_metrics(detector_data)
info["detector_cells"] = self._collect_all_detector_cells() if self.collect_detector_cells else []
reward = self._calculate_reward(info)
self._last_reward = reward
self.current_step += 1
done = self.current_step >= self.episode_length
info["reward"] = reward
info["step"] = self.current_step
info["edge_speeds_kmh"] = (edge_speeds * 3.6).tolist()
info["action_applied_mask"] = [
bool(apply_control and idx not in self.passive_segment_indices)
for idx in range(self.num_edges)
]
self.episode_metrics.append(info)
if done:
self._close_sumo()
return state, reward, done, info
def close(self):
self._close_sumo()
def _decode_action(self, action: np.ndarray) -> np.ndarray:
return np.array([self.speed_actions_ms[int(a)] for a in action])
def _apply_vsl(self, edge_speeds: np.ndarray):
for idx, segment_id in enumerate(self.control_edges):
if idx in self.passive_segment_indices:
continue
for edge_id in self.segment_edge_map.get(segment_id, []):
traci.edge.setMaxSpeed(edge_id, float(edge_speeds[idx]))
def _get_edge_detector_data(self) -> Tuple[List[float], List[float], List[int], List[float]]:
speeds, occs, counts, valid_speeds = [], [], [], []
for segment_id in self.control_edges:
det_ids = self.segment_detector_map.get(segment_id, [])
if not det_ids:
speeds.append(self.free_flow_speed)
occs.append(0.0)
counts.append(0)
continue
lane_speeds, lane_occs, lane_counts = [], [], []
for det_id in det_ids:
try:
spd = traci.inductionloop.getLastIntervalMeanSpeed(det_id)
occ = traci.inductionloop.getLastIntervalOccupancy(det_id)
cnt = traci.inductionloop.getLastIntervalVehicleNumber(det_id)
if spd > 0:
lane_speeds.append(spd)
if occ >= 0:
lane_occs.append(occ)
if cnt >= 0:
lane_counts.append(cnt)
except Exception:
pass
speeds.append(np.mean(lane_speeds) if lane_speeds else self.free_flow_speed)
occs.append(np.mean(lane_occs) if lane_occs else 0.0)
counts.append(sum(lane_counts))
if lane_speeds:
valid_speeds.append(np.mean(lane_speeds))
return speeds, occs, counts, valid_speeds
def _collect_all_detector_cells(self) -> List[Dict]:
detector_rows = []
for cell in self.detector_cell_defs:
lane_speeds = []
lane_occs = []
lane_counts = []
for det_id in cell["detector_ids"]:
try:
speed = traci.inductionloop.getLastIntervalMeanSpeed(det_id)
occupancy = traci.inductionloop.getLastIntervalOccupancy(det_id)
count = traci.inductionloop.getLastIntervalVehicleNumber(det_id)
except Exception:
continue
if speed > 0:
lane_speeds.append(speed)
if occupancy >= 0:
lane_occs.append(occupancy)
if count >= 0:
lane_counts.append(count)
detector_rows.append(
{
"edge_index": int(cell["segment_index"]),
"edge_id": cell["segment_id"],
"physical_edge_id": cell["edge_id"],
"pos_index": int(cell["pos_index"]),
"position_m": float(cell["segment_position_m"]),
"distance_m": float(cell["distance_m"]),
"speed_ms": float(np.mean(lane_speeds)) if lane_speeds else np.nan,
"occupancy": float(np.mean(lane_occs)) if lane_occs else np.nan,
"vehicle_count": int(sum(lane_counts)),
}
)
return detector_rows
def _collect_state(
self,
detector_data: Optional[Tuple[List[float], List[float], List[int], List[float]]] = None,
) -> np.ndarray:
state_parts = []
if detector_data is None:
speeds, occs, counts, _ = self._get_edge_detector_data()
else:
speeds, occs, counts, _ = detector_data
for spd, occ, cnt in zip(speeds, occs, counts):
mean_speed_norm = np.clip(spd / self.free_flow_speed, 0.0, 1.5)
mean_occ = np.clip(occ / 100.0, 0.0, 1.0)
flow_norm = min(cnt / 50.0, 1.0)
state_parts.extend([mean_speed_norm, mean_occ, flow_norm])
for idx in range(self.num_edges):
state_parts.append(self.current_edge_speeds[idx] / self.free_flow_speed)
time_progress = self.current_step / max(self.episode_length, 1)
state_parts.append(time_progress)
state_parts.append(np.sin(2 * np.pi * time_progress))
state_parts.append(np.cos(2 * np.pi * time_progress))
state_parts.append(self._last_reward / 10.0)
return np.array(state_parts, dtype=np.float32)
def _sync_vehicle_subscriptions(self, current_vehicle_ids: Set[str]):
if not self.use_vehicle_subscriptions:
return
new_vehicle_ids = current_vehicle_ids - self._tracked_vehicle_ids
for veh_id in new_vehicle_ids:
try:
traci.vehicle.subscribe(veh_id, (tc.VAR_ROAD_ID, tc.VAR_ACCELERATION))
except Exception:
pass
self._tracked_vehicle_ids = set(current_vehicle_ids)
def _collect_controlled_vehicle_metrics(self) -> Tuple[int, List[float]]:
try:
current_vehicle_ids = set(traci.vehicle.getIDList())
except Exception:
return 0, []
if not current_vehicle_ids:
self._sync_vehicle_subscriptions(set())
return 0, []
self._sync_vehicle_subscriptions(current_vehicle_ids)
controlled_vehicle_count = 0
brake_decels: List[float] = []
if self.use_vehicle_subscriptions:
for veh_id in current_vehicle_ids:
try:
results = traci.vehicle.getSubscriptionResults(veh_id) or {}
except Exception:
continue
road_id = results.get(tc.VAR_ROAD_ID)
if road_id not in self.control_edge_set:
continue
controlled_vehicle_count += 1
accel = results.get(tc.VAR_ACCELERATION)
if accel is not None and accel < -self.d_th:
brake_decels.append(abs(accel))
if controlled_vehicle_count > 0:
return controlled_vehicle_count, brake_decels
for veh_id in current_vehicle_ids:
try:
road_id = traci.vehicle.getRoadID(veh_id)
except Exception:
continue
if road_id not in self.control_edge_set:
continue
controlled_vehicle_count += 1
try:
accel = traci.vehicle.getAcceleration(veh_id)
except Exception:
continue
if accel < -self.d_th:
brake_decels.append(abs(accel))
return controlled_vehicle_count, brake_decels
def _collect_runtime_metrics(
self,
detector_data: Tuple[List[float], List[float], List[int], List[float]],
) -> Dict:
info = {}
throughput = self._interval_arrived * (3600.0 / self.control_interval)
info["throughput"] = throughput
info["arrived_count"] = self._interval_arrived
info["departed_count"] = self._interval_departed
edge_speeds, edge_occs, _, valid_speeds = detector_data
info["edge_speeds_ms"] = edge_speeds
info["edge_occupancies"] = edge_occs
info["mean_speed"] = np.mean(valid_speeds) if valid_speeds else 0.0
info["mean_speed_kmh"] = info["mean_speed"] * 3.6
info["mean_occupancy"] = np.mean(edge_occs) if edge_occs else 0.0
info["speed_std"] = np.std(valid_speeds) if len(valid_speeds) > 1 else 0.0
controlled_vehicle_count, brake_decels = self._collect_controlled_vehicle_metrics()
info["num_vehicles"] = controlled_vehicle_count
info["density"] = (
controlled_vehicle_count / self.controlled_length_km
if self.controlled_length_km > 0
else 0.0
)
info["brake_decels"] = brake_decels
info["num_hard_brakes"] = len(brake_decels)
try:
info["sim_time"] = traci.simulation.getTime()
except Exception:
info["sim_time"] = 0.0
return info
def _collect_metrics(
self,
detector_data: Tuple[List[float], List[float], List[int], List[float]],
) -> Dict:
info = {}
throughput = self._interval_arrived * (3600.0 / self.control_interval)
info["throughput"] = throughput
info["arrived_count"] = self._interval_arrived
info["departed_count"] = self._interval_departed
edge_speeds, edge_occs, _, valid_speeds = detector_data
info["edge_speeds_ms"] = edge_speeds
info["edge_occupancies"] = edge_occs
info["mean_speed"] = np.mean(valid_speeds) if valid_speeds else 0.0
info["mean_speed_kmh"] = info["mean_speed"] * 3.6
info["mean_occupancy"] = np.mean(edge_occs) if edge_occs else 0.0
info["speed_std"] = np.std(valid_speeds) if len(valid_speeds) > 1 else 0.0
try:
info["num_vehicles"] = traci.vehicle.getIDCount()
except Exception:
info["num_vehicles"] = 0
info["density"] = (
info["num_vehicles"] / self.controlled_length_km
if self.controlled_length_km > 0
else 0.0
)
brake_decels = []
try:
for veh_id in traci.vehicle.getIDList():
accel = traci.vehicle.getAcceleration(veh_id)
if accel < -self.d_th:
brake_decels.append(abs(accel))
except Exception:
pass
info["brake_decels"] = brake_decels
info["num_hard_brakes"] = len(brake_decels)
try:
info["sim_time"] = traci.simulation.getTime()
except Exception:
info["sim_time"] = 0.0
return info
def _calculate_reward(self, info: Dict) -> float:
q_t = info["throughput"]
r_flow = q_t / self.C_max
speed_std = info["speed_std"]
r_var = -speed_std / self.v_limit
rho_t = info["density"]
w3 = self.w_base + (self.w_max - self.w_base) / (
1 + np.exp(-self.k_sigmoid * (rho_t - self.rho_critical))
)
brake_decels = info["brake_decels"]
total_vehicles = max(info["num_vehicles"], 1)
if brake_decels:
sum_brake_penalty = sum(
max(0, (d - self.d_th) / (self.d_max - self.d_th))
for d in brake_decels
)
brake_penalty = sum_brake_penalty / total_vehicles
else:
brake_penalty = 0.0
r_brake = -brake_penalty
vsl_change = np.abs(self.current_edge_speeds - self._prev_edge_speeds)
max_vsl_change = np.max(vsl_change)
r_penalty = -max_vsl_change / self.delta_vsl_max
info["r_flow"] = float(r_flow)
info["r_var"] = float(r_var)
info["r_brake"] = float(r_brake)
info["r_penalty"] = float(r_penalty)
reward = self.w1 * r_flow + self.w2 * r_var + w3 * r_brake + self.w4 * r_penalty
return float(reward * 10.0)