ctm-dqn/sumo_vsl_environment.py

470 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
基于 SUMO+TraCI 的可变限速(VSL)强化学习环境 (优化版)
优化点:
1. 状态压缩: 每zone聚合为 [mean_speed, mean_occ, flow] + zone限速 + 时间特征
2. 奖励函数: 多目标 (吞吐量 + 速度均匀性 + 限速平滑惩罚)
3. 动作空间: MultiDiscrete (每zone独立选速度), 不再指数爆炸
4. 批量TraCI: 减少单次调用量
5. 使用精简路由文件
"""
import os
import sys
import numpy as np
from typing import Tuple, Dict, List, Optional
from collections import defaultdict
# 确保 traci/sumolib 可导入
try:
import sumo as _sumo_pkg
_tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools")
if _tools not in sys.path:
sys.path.insert(0, _tools)
except ImportError:
pass
import traci
from sumo_network_parser import SUMONetworkParser
class SUMOVSLEnvironment:
"""基于 SUMO+TraCI 的 VSL 强化学习环境"""
def __init__(self, config: dict):
sumo_cfg = config["sumo"]
env_cfg = config["environment"]
# SUMO 参数
self.net_file = sumo_cfg["net_file"]
self.route_file = sumo_cfg["route_file"]
self.detector_add_file = sumo_cfg["detector_add_file"]
self.enex_add_file = sumo_cfg["enex_add_file"]
self.step_length = sumo_cfg["step_length"]
self.begin_time = sumo_cfg["begin_time"]
self.end_time = sumo_cfg["end_time"]
self.use_gui = sumo_cfg.get("gui", False)
self.no_warnings = sumo_cfg.get("no_warnings", True)
# 环境参数
self.control_interval = env_cfg["control_interval"] # 秒
self.steps_per_action = int(self.control_interval / self.step_length)
self.episode_length = int((self.end_time - self.begin_time) / self.control_interval)
# 速度选项
self.speed_actions_kmh = np.array(env_cfg["speed_actions_kmh"], dtype=float)
self.speed_actions_ms = self.speed_actions_kmh / 3.6
self.num_speed_actions = len(self.speed_actions_kmh)
self.free_flow_speed = env_cfg["free_flow_speed"]
# Zone 划分
self.zone_edges: List[List[str]] = env_cfg["zone_edges"]
self.num_control_zones = len(self.zone_edges)
# 奖励参数
self.reward_cfg = env_cfg.get("reward", {})
self.w1 = self.reward_cfg.get("w_flow", 0.4)
self.w2 = self.reward_cfg.get("w_var", 0.3)
self.w_base = self.reward_cfg.get("w_brake_base", 0.1)
self.w_max = self.reward_cfg.get("w_brake_max", 0.5)
self.w4 = self.reward_cfg.get("w_penalty", 0.2)
self.rho_critical = self.reward_cfg.get("rho_critical", 35.0)
self.k_sigmoid = self.reward_cfg.get("k_sigmoid", 0.2)
self.d_th = self.reward_cfg.get("d_threshold", 3.0)
self.d_max = self.reward_cfg.get("d_max", 8.0)
self.C_max = self.reward_cfg.get("C_max", 6000.0)
self.v_limit = self.reward_cfg.get("v_limit", 33.33)
self.delta_vsl_max = self.reward_cfg.get("delta_vsl_max", 60.0 / 3.6)
# 解析网络拓扑
self.parser = SUMONetworkParser(
detector_add_file=self.detector_add_file,
net_file=self.net_file,
)
# 构建 zone -> edge 映射
self.all_zone_edges = []
self.zone_of_edge: Dict[str, int] = {}
for zi, edges in enumerate(self.zone_edges):
for e in edges:
self.zone_of_edge[e] = zi
self.all_zone_edges.append(edges)
# 构建 zone -> 检测器ID列表 映射 (用于批量读取)
self.zone_detector_ids: List[List[str]] = []
for zi, edges in enumerate(self.zone_edges):
det_ids = []
for edge_id in edges:
ei = self.parser.edge_info.get(edge_id)
if ei:
for (lane_idx, pos_idx), det_id in ei.detectors.items():
det_ids.append(det_id)
self.zone_detector_ids.append(det_ids)
# 全部检测器列表(用于 metrics
self.all_detector_ids = []
for dets in self.zone_detector_ids:
self.all_detector_ids.extend(dets)
# 动作空间: MultiDiscrete, 每zone独立选速度
# action = List[int], 长度为 num_control_zones, 每个 ∈ [0, num_speed_actions)
# 但为了兼容单离散 PPO, 仍可用 flatten: action_dim = num_speed_actions * num_control_zones
# 这里采用 MultiDiscrete 方式, action_dim = num_speed_actions (每个head)
self.action_dims = [self.num_speed_actions] * self.num_control_zones
# 状态维度:
# 每zone: [mean_speed_norm, mean_occupancy, flow_norm] = 3
# zone限速: num_zones * 1
# 时间特征: [time_progress, sin_time, cos_time] = 3
# 上一步奖励: 1
self.features_per_zone = 3
self._state_dim = (self.features_per_zone + 1) * self.num_control_zones + 3 + 1
# 运行时状态
self.current_step = 0
self._sumo_running = False
self._episode_count = 0
self.current_zone_speeds = np.full(self.num_control_zones, self.free_flow_speed)
self._prev_zone_speeds = np.full(self.num_control_zones, self.free_flow_speed)
self._last_reward = 0.0
self.episode_metrics: List[Dict] = []
print(f"SUMO VSL Environment (优化版) 初始化完成:")
print(f" Edges: {self.parser.num_edges}, Zones: {self.num_control_zones}")
print(f" Action: MultiDiscrete {self.action_dims} (每zone {self.num_speed_actions} 档)")
print(f" State dim: {self._state_dim}")
print(f" Episode length: {self.episode_length} steps ({self.control_interval}s each)")
print(f" SUMO steps per action: {self.steps_per_action}")
print(f" 奖励权重: flow={self.w1}, var={self.w2}, brake={self.w_base}-{self.w_max}, penalty={self.w4}")
det_count = sum(len(d) for d in self.zone_detector_ids)
print(f" 检测器总数: {det_count} (分布在 {self.num_control_zones} 个zone)")
@property
def state_dim(self) -> int:
return self._state_dim
@property
def action_dim(self) -> int:
"""兼容旧接口: 返回每个zone的动作数"""
return self.num_speed_actions
# ==================== SUMO 进程管理 ====================
def _start_sumo(self, seed: Optional[int] = None):
"""启动 SUMO 仿真"""
if self._sumo_running:
self._close_sumo()
binary_name = "sumo-gui" if self.use_gui else "sumo"
try:
import sumolib
sumo_binary = sumolib.checkBinary(binary_name)
except Exception:
sumo_binary = binary_name
cmd = [
sumo_binary,
"-n", self.net_file,
"-r", self.route_file,
"-a", f"{self.detector_add_file},{self.enex_add_file}",
"--step-length", str(self.step_length),
"-b", str(self.begin_time),
"-e", str(self.end_time),
"--collision.action", "warn",
"--quit-on-end", "true",
]
if self.no_warnings:
cmd += ["--no-warnings", "true"]
if seed is not None:
cmd += ["--seed", str(seed)]
if self.use_gui:
cmd += ["--start", "true", "--gui-settings-file", "sumo_resource/gui.settings.xml"]
traci.start(cmd, label=f"vsl_{self._episode_count}")
self._sumo_running = True
def _close_sumo(self):
"""关闭 SUMO 仿真"""
if self._sumo_running:
try:
traci.close()
except Exception:
pass
self._sumo_running = False
# ==================== RL 接口 ====================
def reset(self, seed: Optional[int] = None) -> np.ndarray:
"""重置环境,启动新 episode"""
self._episode_count += 1
self.current_step = 0
self.episode_metrics = []
self.current_zone_speeds = np.full(self.num_control_zones, self.free_flow_speed)
self._prev_zone_speeds = np.full(self.num_control_zones, self.free_flow_speed)
self._last_reward = 0.0
self._start_sumo(seed=seed)
return self._collect_state()
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, Dict]:
"""
执行一步 RL 交互
Args:
action: shape=(num_control_zones,), 每个值 ∈ [0, num_speed_actions)
"""
# 1. 保存上一步限速,解码并应用新限速
self._prev_zone_speeds = self.current_zone_speeds.copy()
zone_speeds = self._decode_action(action)
self.current_zone_speeds = zone_speeds
self._apply_vsl(zone_speeds)
# 2. 推进 SUMO 仿真
self._interval_arrived = 0
self._interval_departed = 0
for _ in range(self.steps_per_action):
traci.simulationStep()
self._interval_arrived += traci.simulation.getArrivedNumber()
self._interval_departed += traci.simulation.getDepartedNumber()
# 3. 收集状态
state = self._collect_state()
# 4. 计算奖励和指标
info = self._collect_metrics()
reward = self._calculate_reward(info)
self._last_reward = reward
# 5. 判断终止
self.current_step += 1
done = self.current_step >= self.episode_length
# 6. 记录
info["reward"] = reward
info["step"] = self.current_step
info["zone_speeds_kmh"] = (zone_speeds * 3.6).tolist()
self.episode_metrics.append(info)
if done:
self._close_sumo()
return state, reward, done, info
def close(self):
"""关闭环境"""
self._close_sumo()
# ==================== 动作解码 ====================
def _decode_action(self, action: np.ndarray) -> np.ndarray:
"""将MultiDiscrete动作解码为各zone的限速 (m/s)
Args:
action: shape=(num_control_zones,), int array
"""
zone_speeds = np.array([
self.speed_actions_ms[int(a)] for a in action
])
return zone_speeds
def _apply_vsl(self, zone_speeds: np.ndarray):
"""通过 TraCI 将限速应用到各 edge"""
for zi, speed in enumerate(zone_speeds):
for edge_id in self.all_zone_edges[zi]:
traci.edge.setMaxSpeed(edge_id, float(speed))
# ==================== 状态收集 (压缩版) ====================
def _collect_state(self) -> np.ndarray:
"""
收集压缩状态向量
结构:
[
对每个 zone:
mean_speed_normalized (zone内所有有效检测器的均速 / free_flow)
mean_occupancy (zone内所有有效检测器的占有率 / 100)
flow_normalized (zone内通过检测器的车辆数归一化)
对每个 zone:
current_speed_limit_norm (当前限速 / free_flow)
time_progress (当前步 / episode总步)
sin(2π * time_progress) (时间周期特征)
cos(2π * time_progress)
last_reward_normalized (上一步奖励 / 10)
]
"""
state_parts = []
for zi, det_ids in enumerate(self.zone_detector_ids):
speeds = []
occupancies = []
flow = 0
for det_id in det_ids:
try:
spd = traci.inductionloop.getLastIntervalMeanSpeed(det_id)
occ = traci.inductionloop.getLastIntervalOccupancy(det_id)
cnt = traci.inductionloop.getLastIntervalVehicleNumber(det_id)
if spd <= 0:
spd = traci.lane.getMaxSpeed(traci.inductionloop.getLaneID(det_id))
speeds.append(spd)
occupancies.append(occ)
flow += cnt
except Exception:
pass
mean_speed_norm = (np.mean(speeds) / self.free_flow_speed) if speeds else 1.0
mean_occ = (np.mean(occupancies) / 100.0) if occupancies else 0.0
# 流量归一化: 假设每zone最大 ~200 veh/min
flow_norm = min(flow / 200.0, 1.0)
state_parts.extend([
np.clip(mean_speed_norm, 0.0, 1.5),
np.clip(mean_occ, 0.0, 1.0),
flow_norm,
])
# zone 限速
for zi in range(self.num_control_zones):
state_parts.append(self.current_zone_speeds[zi] / self.free_flow_speed)
# 时间特征
time_progress = self.current_step / max(self.episode_length, 1)
state_parts.append(time_progress)
state_parts.append(np.sin(2 * np.pi * time_progress))
state_parts.append(np.cos(2 * np.pi * time_progress))
# 上一步奖励
state_parts.append(self._last_reward / 10.0)
return np.array(state_parts, dtype=np.float32)
# ==================== 指标收集 ====================
def _collect_metrics(self) -> Dict:
"""从 SUMO 收集交通指标"""
info = {}
# 吞吐量
throughput = self._interval_arrived * (3600.0 / self.control_interval)
info["throughput"] = throughput
info["arrived_count"] = self._interval_arrived
info["departed_count"] = self._interval_departed
# 每zone的速度和占有率 (用于奖励计算)
zone_speeds = []
zone_occs = []
all_speeds = []
for zi, det_ids in enumerate(self.zone_detector_ids):
speeds = []
occs = []
for det_id in det_ids:
try:
spd = traci.inductionloop.getLastIntervalMeanSpeed(det_id)
occ = traci.inductionloop.getLastIntervalOccupancy(det_id)
if spd <= 0:
spd = traci.lane.getMaxSpeed(traci.inductionloop.getLaneID(det_id))
speeds.append(spd)
occs.append(occ)
except Exception:
pass
zone_mean_spd = np.mean(speeds) if speeds else self.free_flow_speed
zone_mean_occ = np.mean(occs) if occs else 0.0
zone_speeds.append(zone_mean_spd)
zone_occs.append(zone_mean_occ)
all_speeds.extend(speeds)
info["zone_speeds_ms"] = zone_speeds
info["zone_occupancies"] = zone_occs
info["mean_speed"] = np.mean(all_speeds) if all_speeds else 0.0
info["mean_speed_kmh"] = info["mean_speed"] * 3.6
info["mean_occupancy"] = np.mean(zone_occs) if zone_occs else 0.0
# 速度标准差 (用于均匀性奖励)
info["speed_std"] = np.std(all_speeds) if len(all_speeds) > 1 else 0.0
# 路网中当前车辆数
try:
info["num_vehicles"] = traci.vehicle.getIDCount()
except Exception:
info["num_vehicles"] = 0
# 计算密度 (辆/公里)
total_length_km = sum(self.parser.edge_info[e].length for edges in self.zone_edges for e in edges if e in self.parser.edge_info) / 1000.0
info["density"] = info["num_vehicles"] / total_length_km if total_length_km > 0 else 0.0
# 收集急刹车数据
brake_decels = []
try:
for veh_id in traci.vehicle.getIDList():
accel = traci.vehicle.getAcceleration(veh_id)
if accel < -self.d_th:
brake_decels.append(abs(accel))
except Exception:
pass
info["brake_decels"] = brake_decels
info["num_hard_brakes"] = len(brake_decels)
try:
info["sim_time"] = traci.simulation.getTime()
except Exception:
info["sim_time"] = 0.0
return info
# ==================== 奖励函数 (多目标) ====================
def _calculate_reward(self, info: Dict) -> float:
"""
综合多目标奖励函数:
R(t) = w1*R_flow + w2*R_var + w3(ρ)*R_brake + w4*R_penalty
R_flow: 通行效率奖励 (归一化流量)
R_var: 速度稳定性惩罚 (速度标准差)
R_brake: 急刹车惩罚 (密度自适应权重)
R_penalty: 控制平滑度惩罚 (限速变化)
"""
# 1. 通行效率奖励
q_t = info["throughput"]
R_flow = q_t / self.C_max
# 2. 速度稳定性惩罚
speed_std = info["speed_std"]
R_var = -speed_std / self.v_limit
# 3. 急刹车惩罚 (密度自适应权重)
rho_t = info["density"]
w3 = self.w_base + (self.w_max - self.w_base) / (1 + np.exp(-self.k_sigmoid * (rho_t - self.rho_critical)))
brake_decels = info["brake_decels"]
total_vehicles = max(info["num_vehicles"], 1) # 防止除以0
if brake_decels:
# 先求所有急刹车惩罚的总和,再除以路网总车辆数
sum_brake_penalty = sum([max(0, (d - self.d_th) / (self.d_max - self.d_th)) for d in brake_decels])
brake_penalty = sum_brake_penalty / total_vehicles
else:
brake_penalty = 0.0
R_brake = -brake_penalty
# brake_decels = info["brake_decels"]
# if brake_decels:
# brake_penalty = np.mean([max(0, (d - self.d_th) / (self.d_max - self.d_th)) for d in brake_decels])
# else:
# brake_penalty = 0.0
# R_brake = -brake_penalty
# 4. 控制平滑度惩罚
vsl_change = np.abs(self.current_zone_speeds - self._prev_zone_speeds)
# R_penalty = -np.mean(vsl_change) / self.delta_vsl_max
max_vsl_change = np.max(vsl_change)
R_penalty = -max_vsl_change / self.delta_vsl_max
# 综合奖励
reward = self.w1 * R_flow + self.w2 * R_var + w3 * R_brake + self.w4 * R_penalty
return float(reward * 10.0)