""" 基于边(Edge)的VSL强化学习环境 每条边独立控制限速,状态为每条边的平均速度/占有率/流量 """ import os import sys import numpy as np from typing import Tuple, Dict, List, Optional try: import sumo as _sumo_pkg _tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools") if _tools not in sys.path: sys.path.insert(0, _tools) except ImportError: pass import traci from sumo_network_parser import SUMONetworkParser class SUMOEdgeVSLEnvironment: """基于边的VSL环境""" def __init__(self, config: dict): sumo_cfg = config["sumo"] env_cfg = config["environment"] # SUMO参数 self.net_file = sumo_cfg["net_file"] self.route_file = sumo_cfg["route_file"] self.detector_add_file = sumo_cfg["detector_add_file"] self.enex_add_file = sumo_cfg["enex_add_file"] self.step_length = sumo_cfg["step_length"] self.begin_time = sumo_cfg["begin_time"] self.end_time = sumo_cfg["end_time"] self.use_gui = sumo_cfg.get("gui", False) self.no_warnings = sumo_cfg.get("no_warnings", True) # 环境参数 self.control_interval = env_cfg["control_interval"] self.steps_per_action = int(self.control_interval / self.step_length) self.warmup_time = 900 # 跳过前15分钟 self.episode_length = int((self.end_time - self.begin_time - self.warmup_time) / self.control_interval) # 速度选项 self.speed_actions_kmh = np.array(env_cfg["speed_actions_kmh"], dtype=float) self.speed_actions_ms = self.speed_actions_kmh / 3.6 self.num_speed_actions = len(self.speed_actions_kmh) self.free_flow_speed = env_cfg["free_flow_speed"] # 控制边列表 self.control_edges: List[str] = env_cfg["control_edges"] self.num_edges = len(self.control_edges) # 奖励参数 self.reward_cfg = env_cfg.get("reward", {}) self.w1 = self.reward_cfg.get("w_flow", 0.4) self.w2 = self.reward_cfg.get("w_var", 0.3) self.w_base = self.reward_cfg.get("w_brake_base", 0.1) self.w_max = self.reward_cfg.get("w_brake_max", 0.5) self.w4 = self.reward_cfg.get("w_penalty", 0.2) self.rho_critical = self.reward_cfg.get("rho_critical", 35.0) self.k_sigmoid = self.reward_cfg.get("k_sigmoid", 0.2) self.d_th = self.reward_cfg.get("d_threshold", 3.0) self.d_max = self.reward_cfg.get("d_max", 8.0) self.C_max = self.reward_cfg.get("C_max", 6000.0) self.v_limit = self.reward_cfg.get("v_limit", 33.33) self.delta_vsl_max = self.reward_cfg.get("delta_vsl_max", 60.0 / 3.6) # 解析网络 self.parser = SUMONetworkParser( detector_add_file=self.detector_add_file, net_file=self.net_file, ) # 为每条边构建第一组检测器映射(所有车道) self.edge_detector_map: Dict[str, List[str]] = {} for edge_id in self.control_edges: ei = self.parser.edge_info.get(edge_id) if ei and ei.detectors: # 获取第一组检测器(pos_index=0)的所有交通车道 first_group_dets = [] for lane_idx in ei.traffic_lane_indices: det_id = ei.detectors.get((lane_idx, 0)) if det_id: first_group_dets.append(det_id) self.edge_detector_map[edge_id] = first_group_dets # 动作空间: 每条边独立选速度 self.action_dims = [self.num_speed_actions] * self.num_edges # 状态维度: 每条边3个特征 + 每条边限速 + 时间特征3 + 上一步奖励1 self.features_per_edge = 3 self._state_dim = (self.features_per_edge + 1) * self.num_edges + 3 + 1 # 运行时状态 self.current_step = 0 self._sumo_running = False self._episode_count = 0 self.current_edge_speeds = np.full(self.num_edges, self.free_flow_speed) self._prev_edge_speeds = np.full(self.num_edges, self.free_flow_speed) self._last_reward = 0.0 self.episode_metrics: List[Dict] = [] print(f"SUMO Edge VSL Environment 初始化完成:") print(f" 控制边数: {self.num_edges}") print(f" Action: MultiDiscrete {self.action_dims}") print(f" State dim: {self._state_dim}") print(f" Episode length: {self.episode_length} steps") @property def state_dim(self) -> int: return self._state_dim @property def action_dim(self) -> int: return self.num_speed_actions def _start_sumo(self, seed: Optional[int] = None): if self._sumo_running: self._close_sumo() binary_name = "sumo-gui" if self.use_gui else "sumo" try: import sumolib sumo_binary = sumolib.checkBinary(binary_name) except Exception: sumo_binary = binary_name cmd = [ sumo_binary, "-n", self.net_file, "-r", self.route_file, "-a", f"{self.detector_add_file},{self.enex_add_file}", "--step-length", str(self.step_length), "-b", str(self.begin_time), "-e", str(self.end_time), "--collision.action", "warn", "--quit-on-end", "true", ] if self.no_warnings: cmd += ["--no-warnings", "true"] if seed is not None: cmd += ["--seed", str(seed)] if self.use_gui: cmd += ["--start", "true", "--gui-settings-file", "sumo_resource/gui.settings.xml"] traci.start(cmd, label=f"vsl_{self._episode_count}") self._sumo_running = True def _close_sumo(self): if self._sumo_running: try: traci.close() except Exception: pass self._sumo_running = False def reset(self, seed: Optional[int] = None) -> np.ndarray: self._episode_count += 1 self.current_step = 0 self.episode_metrics = [] self.current_edge_speeds = np.full(self.num_edges, self.free_flow_speed) self._prev_edge_speeds = np.full(self.num_edges, self.free_flow_speed) self._last_reward = 0.0 self._start_sumo(seed=seed) # 跳过前15分钟(900秒) warmup_steps = int(900 / self.control_interval) for _ in range(warmup_steps): for _ in range(self.steps_per_action): traci.simulationStep() return self._collect_state() def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, Dict]: self._prev_edge_speeds = self.current_edge_speeds.copy() edge_speeds = self._decode_action(action) self.current_edge_speeds = edge_speeds self._apply_vsl(edge_speeds) self._interval_arrived = 0 self._interval_departed = 0 for _ in range(self.steps_per_action): traci.simulationStep() self._interval_arrived += traci.simulation.getArrivedNumber() self._interval_departed += traci.simulation.getDepartedNumber() # 只读取一次检测器数据 detector_data = self._get_edge_detector_data() state = self._collect_state(detector_data) info = self._collect_metrics(detector_data) reward = self._calculate_reward(info) self._last_reward = reward self.current_step += 1 done = self.current_step >= self.episode_length info["reward"] = reward info["step"] = self.current_step info["edge_speeds_kmh"] = (edge_speeds * 3.6).tolist() self.episode_metrics.append(info) if done: self._close_sumo() return state, reward, done, info def close(self): self._close_sumo() def _decode_action(self, action: np.ndarray) -> np.ndarray: return np.array([self.speed_actions_ms[int(a)] for a in action]) def _apply_vsl(self, edge_speeds: np.ndarray): for i, edge_id in enumerate(self.control_edges): traci.edge.setMaxSpeed(edge_id, float(edge_speeds[i])) def _get_edge_detector_data(self) -> Tuple[List[float], List[float], List[int], List[float]]: """获取所有控制边的检测器数据(速度、占有率、车辆数、有效速度)""" speeds, occs, counts, valid_speeds = [], [], [], [] for edge_id in self.control_edges: det_ids = self.edge_detector_map.get(edge_id, []) if not det_ids: speeds.append(self.free_flow_speed) occs.append(0.0) counts.append(0) continue # 获取所有车道的数据 lane_speeds, lane_occs, lane_counts = [], [], [] for det_id in det_ids: try: spd = traci.inductionloop.getLastIntervalMeanSpeed(det_id) occ = traci.inductionloop.getLastIntervalOccupancy(det_id) cnt = traci.inductionloop.getLastIntervalVehicleNumber(det_id) if spd > 0: lane_speeds.append(spd) lane_occs.append(occ) lane_counts.append(cnt) except Exception: pass # 聚合 speeds.append(np.mean(lane_speeds) if lane_speeds else self.free_flow_speed) occs.append(np.mean(lane_occs) if lane_occs else 0.0) counts.append(sum(lane_counts)) if lane_speeds: valid_speeds.append(np.mean(lane_speeds)) return speeds, occs, counts, valid_speeds def _collect_state(self, detector_data: Optional[Tuple[List[float], List[float], List[int], List[float]]] = None) -> np.ndarray: """收集状态:每条边的平均速度/占有率/流量""" state_parts = [] if detector_data is None: speeds, occs, counts, _ = self._get_edge_detector_data() else: speeds, occs, counts, _ = detector_data for spd, occ, cnt in zip(speeds, occs, counts): mean_speed_norm = np.clip(spd / self.free_flow_speed, 0.0, 1.5) mean_occ = np.clip(occ / 100.0, 0.0, 1.0) flow_norm = min(cnt / 50.0, 1.0) state_parts.extend([mean_speed_norm, mean_occ, flow_norm]) # 边限速 for i in range(self.num_edges): state_parts.append(self.current_edge_speeds[i] / self.free_flow_speed) # 时间特征 time_progress = self.current_step / max(self.episode_length, 1) state_parts.append(time_progress) state_parts.append(np.sin(2 * np.pi * time_progress)) state_parts.append(np.cos(2 * np.pi * time_progress)) # 上一步奖励 state_parts.append(self._last_reward / 10.0) return np.array(state_parts, dtype=np.float32) def _collect_metrics(self, detector_data: Tuple[List[float], List[float], List[int], List[float]]) -> Dict: """收集交通指标""" info = {} # 吞吐量 throughput = self._interval_arrived * (3600.0 / self.control_interval) info["throughput"] = throughput info["arrived_count"] = self._interval_arrived info["departed_count"] = self._interval_departed # 每条边的速度和占有率 edge_speeds, edge_occs, _, valid_speeds = detector_data info["edge_speeds_ms"] = edge_speeds info["edge_occupancies"] = edge_occs info["mean_speed"] = np.mean(valid_speeds) if valid_speeds else 0.0 info["mean_speed_kmh"] = info["mean_speed"] * 3.6 info["mean_occupancy"] = np.mean(edge_occs) if edge_occs else 0.0 info["speed_std"] = np.std(valid_speeds) if len(valid_speeds) > 1 else 0.0 # 车辆数和密度 try: info["num_vehicles"] = traci.vehicle.getIDCount() except Exception: info["num_vehicles"] = 0 total_length_km = sum(self.parser.edge_info[e].length for e in self.control_edges if e in self.parser.edge_info) / 1000.0 info["density"] = info["num_vehicles"] / total_length_km if total_length_km > 0 else 0.0 # 急刹车 brake_decels = [] try: for veh_id in traci.vehicle.getIDList(): accel = traci.vehicle.getAcceleration(veh_id) if accel < -self.d_th: brake_decels.append(abs(accel)) except Exception: pass info["brake_decels"] = brake_decels info["num_hard_brakes"] = len(brake_decels) try: info["sim_time"] = traci.simulation.getTime() except Exception: info["sim_time"] = 0.0 return info def _calculate_reward(self, info: Dict) -> float: """计算奖励""" # 通行效率 q_t = info["throughput"] R_flow = q_t / self.C_max # 速度稳定性 speed_std = info["speed_std"] R_var = -speed_std / self.v_limit # 急刹车惩罚(密度自适应) rho_t = info["density"] w3 = self.w_base + (self.w_max - self.w_base) / (1 + np.exp(-self.k_sigmoid * (rho_t - self.rho_critical))) brake_decels = info["brake_decels"] total_vehicles = max(info["num_vehicles"], 1) if brake_decels: sum_brake_penalty = sum([max(0, (d - self.d_th) / (self.d_max - self.d_th)) for d in brake_decels]) brake_penalty = sum_brake_penalty / total_vehicles else: brake_penalty = 0.0 R_brake = -brake_penalty # 控制平滑度 vsl_change = np.abs(self.current_edge_speeds - self._prev_edge_speeds) max_vsl_change = np.max(vsl_change) R_penalty = -max_vsl_change / self.delta_vsl_max reward = self.w1 * R_flow + self.w2 * R_var + w3 * R_brake + self.w4 * R_penalty return float(reward * 10.0)