375 lines
14 KiB
Python
375 lines
14 KiB
Python
"""
|
||
基于边(Edge)的VSL强化学习环境
|
||
|
||
每条边独立控制限速,状态为每条边的平均速度/占有率/流量
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import numpy as np
|
||
from typing import Tuple, Dict, List, Optional
|
||
|
||
try:
|
||
import sumo as _sumo_pkg
|
||
_tools = os.path.join(_sumo_pkg.SUMO_HOME, "tools")
|
||
if _tools not in sys.path:
|
||
sys.path.insert(0, _tools)
|
||
except ImportError:
|
||
pass
|
||
|
||
import traci
|
||
from envs.network_parser import SUMONetworkParser
|
||
|
||
|
||
class SUMOEdgeVSLEnvironment:
|
||
"""基于边的VSL环境"""
|
||
|
||
def __init__(self, config: dict):
|
||
sumo_cfg = config["sumo"]
|
||
env_cfg = config["environment"]
|
||
|
||
# SUMO参数
|
||
self.net_file = sumo_cfg["net_file"]
|
||
self.route_file = sumo_cfg["route_file"]
|
||
self.detector_add_file = sumo_cfg["detector_add_file"]
|
||
self.enex_add_file = sumo_cfg["enex_add_file"]
|
||
self.step_length = sumo_cfg["step_length"]
|
||
self.begin_time = sumo_cfg["begin_time"]
|
||
self.end_time = sumo_cfg["end_time"]
|
||
self.use_gui = sumo_cfg.get("gui", False)
|
||
self.no_warnings = sumo_cfg.get("no_warnings", True)
|
||
|
||
# 环境参数
|
||
self.control_interval = env_cfg["control_interval"]
|
||
self.steps_per_action = int(self.control_interval / self.step_length)
|
||
self.warmup_time = 900 # 跳过前15分钟
|
||
self.episode_length = int((self.end_time - self.begin_time - self.warmup_time) / self.control_interval)
|
||
|
||
# 速度选项
|
||
self.speed_actions_kmh = np.array(env_cfg["speed_actions_kmh"], dtype=float)
|
||
self.speed_actions_ms = self.speed_actions_kmh / 3.6
|
||
self.num_speed_actions = len(self.speed_actions_kmh)
|
||
self.free_flow_speed = env_cfg["free_flow_speed"]
|
||
|
||
# 控制边列表
|
||
self.control_edges: List[str] = env_cfg["control_edges"]
|
||
self.num_edges = len(self.control_edges)
|
||
|
||
# 奖励参数
|
||
self.reward_cfg = env_cfg.get("reward", {})
|
||
self.w1 = self.reward_cfg.get("w_flow", 0.4)
|
||
self.w2 = self.reward_cfg.get("w_var", 0.3)
|
||
self.w_base = self.reward_cfg.get("w_brake_base", 0.1)
|
||
self.w_max = self.reward_cfg.get("w_brake_max", 0.5)
|
||
self.w4 = self.reward_cfg.get("w_penalty", 0.2)
|
||
self.rho_critical = self.reward_cfg.get("rho_critical", 35.0)
|
||
self.k_sigmoid = self.reward_cfg.get("k_sigmoid", 0.2)
|
||
self.d_th = self.reward_cfg.get("d_threshold", 3.0)
|
||
self.d_max = self.reward_cfg.get("d_max", 8.0)
|
||
self.C_max = self.reward_cfg.get("C_max", 6000.0)
|
||
self.v_limit = self.reward_cfg.get("v_limit", 33.33)
|
||
self.delta_vsl_max = self.reward_cfg.get("delta_vsl_max", 60.0 / 3.6)
|
||
|
||
# 解析网络
|
||
self.parser = SUMONetworkParser(
|
||
detector_add_file=self.detector_add_file,
|
||
net_file=self.net_file,
|
||
)
|
||
|
||
# 为每条边构建第一组检测器映射(所有车道)
|
||
self.edge_detector_map: Dict[str, List[str]] = {}
|
||
for edge_id in self.control_edges:
|
||
ei = self.parser.edge_info.get(edge_id)
|
||
if ei and ei.detectors:
|
||
# 使用该 edge 实际存在的最小 pos_index,兼容从 metrics_1 开始编号的检测器
|
||
available_pos_indices = sorted({
|
||
pos_index
|
||
for (lane_idx, pos_index) in ei.detectors
|
||
if lane_idx in ei.traffic_lane_indices
|
||
})
|
||
if not available_pos_indices:
|
||
continue
|
||
|
||
first_group_dets = []
|
||
first_pos_index = available_pos_indices[0]
|
||
for lane_idx in ei.traffic_lane_indices:
|
||
det_id = ei.detectors.get((lane_idx, first_pos_index))
|
||
if det_id:
|
||
first_group_dets.append(det_id)
|
||
self.edge_detector_map[edge_id] = first_group_dets
|
||
|
||
# 动作空间: 每条边独立选速度
|
||
self.action_dims = [self.num_speed_actions] * self.num_edges
|
||
|
||
# 状态维度: 每条边3个特征 + 每条边限速 + 时间特征3 + 上一步奖励1
|
||
self.features_per_edge = 3
|
||
self._state_dim = (self.features_per_edge + 1) * self.num_edges + 3 + 1
|
||
|
||
# 运行时状态
|
||
self.current_step = 0
|
||
self._sumo_running = False
|
||
self._episode_count = 0
|
||
self.current_edge_speeds = np.full(self.num_edges, self.free_flow_speed)
|
||
self._prev_edge_speeds = np.full(self.num_edges, self.free_flow_speed)
|
||
self._last_reward = 0.0
|
||
self.episode_metrics: List[Dict] = []
|
||
|
||
print(f"SUMO Edge VSL Environment 初始化完成:")
|
||
print(f" 控制边数: {self.num_edges}")
|
||
print(f" Action: MultiDiscrete {self.action_dims}")
|
||
print(f" State dim: {self._state_dim}")
|
||
print(f" Episode length: {self.episode_length} steps")
|
||
|
||
@property
|
||
def state_dim(self) -> int:
|
||
return self._state_dim
|
||
|
||
@property
|
||
def action_dim(self) -> int:
|
||
return self.num_speed_actions
|
||
|
||
def _start_sumo(self, seed: Optional[int] = None):
|
||
if self._sumo_running:
|
||
self._close_sumo()
|
||
|
||
binary_name = "sumo-gui" if self.use_gui else "sumo"
|
||
try:
|
||
import sumolib
|
||
sumo_binary = sumolib.checkBinary(binary_name)
|
||
except Exception:
|
||
sumo_binary = binary_name
|
||
|
||
cmd = [
|
||
sumo_binary, "-n", self.net_file, "-r", self.route_file,
|
||
"-a", f"{self.detector_add_file},{self.enex_add_file}",
|
||
"--step-length", str(self.step_length),
|
||
"-b", str(self.begin_time), "-e", str(self.end_time),
|
||
"--collision.action", "warn", "--quit-on-end", "true",
|
||
]
|
||
if self.no_warnings:
|
||
cmd += ["--no-warnings", "true"]
|
||
if seed is not None:
|
||
cmd += ["--seed", str(seed)]
|
||
if self.use_gui:
|
||
cmd += ["--start", "true", "--gui-settings-file", "sumo_resource/gui.settings.xml"]
|
||
|
||
traci.start(cmd, label=f"vsl_{self._episode_count}")
|
||
self._sumo_running = True
|
||
|
||
def _close_sumo(self):
|
||
if self._sumo_running:
|
||
try:
|
||
traci.close()
|
||
except Exception:
|
||
pass
|
||
self._sumo_running = False
|
||
|
||
def reset(self, seed: Optional[int] = None) -> np.ndarray:
|
||
self._episode_count += 1
|
||
self.current_step = 0
|
||
self.episode_metrics = []
|
||
self.current_edge_speeds = np.full(self.num_edges, self.free_flow_speed)
|
||
self._prev_edge_speeds = np.full(self.num_edges, self.free_flow_speed)
|
||
self._last_reward = 0.0
|
||
self._start_sumo(seed=seed)
|
||
|
||
# 跳过前15分钟(900秒)
|
||
warmup_steps = int(900 / self.control_interval)
|
||
for _ in range(warmup_steps):
|
||
for _ in range(self.steps_per_action):
|
||
traci.simulationStep()
|
||
|
||
return self._collect_state()
|
||
|
||
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, Dict]:
|
||
self._prev_edge_speeds = self.current_edge_speeds.copy()
|
||
edge_speeds = self._decode_action(action)
|
||
self.current_edge_speeds = edge_speeds
|
||
self._apply_vsl(edge_speeds)
|
||
|
||
self._interval_arrived = 0
|
||
self._interval_departed = 0
|
||
for _ in range(self.steps_per_action):
|
||
traci.simulationStep()
|
||
self._interval_arrived += traci.simulation.getArrivedNumber()
|
||
self._interval_departed += traci.simulation.getDepartedNumber()
|
||
|
||
# 只读取一次检测器数据
|
||
detector_data = self._get_edge_detector_data()
|
||
state = self._collect_state(detector_data)
|
||
info = self._collect_metrics(detector_data)
|
||
reward = self._calculate_reward(info)
|
||
self._last_reward = reward
|
||
|
||
self.current_step += 1
|
||
done = self.current_step >= self.episode_length
|
||
|
||
info["reward"] = reward
|
||
info["step"] = self.current_step
|
||
info["edge_speeds_kmh"] = (edge_speeds * 3.6).tolist()
|
||
self.episode_metrics.append(info)
|
||
|
||
if done:
|
||
self._close_sumo()
|
||
|
||
return state, reward, done, info
|
||
|
||
def close(self):
|
||
self._close_sumo()
|
||
|
||
def _decode_action(self, action: np.ndarray) -> np.ndarray:
|
||
return np.array([self.speed_actions_ms[int(a)] for a in action])
|
||
|
||
def _apply_vsl(self, edge_speeds: np.ndarray):
|
||
for i, edge_id in enumerate(self.control_edges):
|
||
traci.edge.setMaxSpeed(edge_id, float(edge_speeds[i]))
|
||
|
||
def _get_edge_detector_data(self) -> Tuple[List[float], List[float], List[int], List[float]]:
|
||
"""获取所有控制边的检测器数据(速度、占有率、车辆数、有效速度)"""
|
||
speeds, occs, counts, valid_speeds = [], [], [], []
|
||
|
||
for edge_id in self.control_edges:
|
||
det_ids = self.edge_detector_map.get(edge_id, [])
|
||
if not det_ids:
|
||
speeds.append(self.free_flow_speed)
|
||
occs.append(0.0)
|
||
counts.append(0)
|
||
continue
|
||
|
||
# 获取所有车道的数据
|
||
lane_speeds, lane_occs, lane_counts = [], [], []
|
||
for det_id in det_ids:
|
||
try:
|
||
spd = traci.inductionloop.getLastIntervalMeanSpeed(det_id)
|
||
occ = traci.inductionloop.getLastIntervalOccupancy(det_id)
|
||
cnt = traci.inductionloop.getLastIntervalVehicleNumber(det_id)
|
||
if spd > 0:
|
||
lane_speeds.append(spd)
|
||
lane_occs.append(occ)
|
||
lane_counts.append(cnt)
|
||
except Exception:
|
||
pass
|
||
|
||
# 聚合
|
||
speeds.append(np.mean(lane_speeds) if lane_speeds else self.free_flow_speed)
|
||
occs.append(np.mean(lane_occs) if lane_occs else 0.0)
|
||
counts.append(sum(lane_counts))
|
||
if lane_speeds:
|
||
valid_speeds.append(np.mean(lane_speeds))
|
||
|
||
return speeds, occs, counts, valid_speeds
|
||
|
||
def _collect_state(self, detector_data: Optional[Tuple[List[float], List[float], List[int], List[float]]] = None) -> np.ndarray:
|
||
"""收集状态:每条边的平均速度/占有率/流量"""
|
||
state_parts = []
|
||
|
||
if detector_data is None:
|
||
speeds, occs, counts, _ = self._get_edge_detector_data()
|
||
else:
|
||
speeds, occs, counts, _ = detector_data
|
||
|
||
for spd, occ, cnt in zip(speeds, occs, counts):
|
||
mean_speed_norm = np.clip(spd / self.free_flow_speed, 0.0, 1.5)
|
||
mean_occ = np.clip(occ / 100.0, 0.0, 1.0)
|
||
flow_norm = min(cnt / 50.0, 1.0)
|
||
state_parts.extend([mean_speed_norm, mean_occ, flow_norm])
|
||
|
||
# 边限速
|
||
for i in range(self.num_edges):
|
||
state_parts.append(self.current_edge_speeds[i] / self.free_flow_speed)
|
||
|
||
# 时间特征
|
||
time_progress = self.current_step / max(self.episode_length, 1)
|
||
state_parts.append(time_progress)
|
||
state_parts.append(np.sin(2 * np.pi * time_progress))
|
||
state_parts.append(np.cos(2 * np.pi * time_progress))
|
||
|
||
# 上一步奖励
|
||
state_parts.append(self._last_reward / 10.0)
|
||
|
||
return np.array(state_parts, dtype=np.float32)
|
||
|
||
def _collect_metrics(self, detector_data: Tuple[List[float], List[float], List[int], List[float]]) -> Dict:
|
||
"""收集交通指标"""
|
||
info = {}
|
||
|
||
# 吞吐量
|
||
throughput = self._interval_arrived * (3600.0 / self.control_interval)
|
||
info["throughput"] = throughput
|
||
info["arrived_count"] = self._interval_arrived
|
||
info["departed_count"] = self._interval_departed
|
||
|
||
# 每条边的速度和占有率
|
||
edge_speeds, edge_occs, _, valid_speeds = detector_data
|
||
|
||
info["edge_speeds_ms"] = edge_speeds
|
||
info["edge_occupancies"] = edge_occs
|
||
info["mean_speed"] = np.mean(valid_speeds) if valid_speeds else 0.0
|
||
info["mean_speed_kmh"] = info["mean_speed"] * 3.6
|
||
info["mean_occupancy"] = np.mean(edge_occs) if edge_occs else 0.0
|
||
info["speed_std"] = np.std(valid_speeds) if len(valid_speeds) > 1 else 0.0
|
||
|
||
# 车辆数和密度
|
||
try:
|
||
info["num_vehicles"] = traci.vehicle.getIDCount()
|
||
except Exception:
|
||
info["num_vehicles"] = 0
|
||
|
||
total_length_km = sum(self.parser.edge_info[e].length for e in self.control_edges if e in self.parser.edge_info) / 1000.0
|
||
info["density"] = info["num_vehicles"] / total_length_km if total_length_km > 0 else 0.0
|
||
|
||
# 急刹车
|
||
brake_decels = []
|
||
try:
|
||
for veh_id in traci.vehicle.getIDList():
|
||
accel = traci.vehicle.getAcceleration(veh_id)
|
||
if accel < -self.d_th:
|
||
brake_decels.append(abs(accel))
|
||
except Exception:
|
||
pass
|
||
info["brake_decels"] = brake_decels
|
||
info["num_hard_brakes"] = len(brake_decels)
|
||
|
||
try:
|
||
info["sim_time"] = traci.simulation.getTime()
|
||
except Exception:
|
||
info["sim_time"] = 0.0
|
||
|
||
return info
|
||
|
||
def _calculate_reward(self, info: Dict) -> float:
|
||
"""计算奖励"""
|
||
# 通行效率
|
||
q_t = info["throughput"]
|
||
R_flow = q_t / self.C_max
|
||
|
||
# 速度稳定性
|
||
speed_std = info["speed_std"]
|
||
R_var = -speed_std / self.v_limit
|
||
|
||
# 急刹车惩罚(密度自适应)
|
||
rho_t = info["density"]
|
||
w3 = self.w_base + (self.w_max - self.w_base) / (1 + np.exp(-self.k_sigmoid * (rho_t - self.rho_critical)))
|
||
|
||
brake_decels = info["brake_decels"]
|
||
total_vehicles = max(info["num_vehicles"], 1)
|
||
if brake_decels:
|
||
sum_brake_penalty = sum([max(0, (d - self.d_th) / (self.d_max - self.d_th)) for d in brake_decels])
|
||
brake_penalty = sum_brake_penalty / total_vehicles
|
||
else:
|
||
brake_penalty = 0.0
|
||
R_brake = -brake_penalty
|
||
|
||
# 控制平滑度
|
||
vsl_change = np.abs(self.current_edge_speeds - self._prev_edge_speeds)
|
||
max_vsl_change = np.max(vsl_change)
|
||
R_penalty = -max_vsl_change / self.delta_vsl_max
|
||
|
||
info["r_flow"] = float(R_flow)
|
||
info["r_var"] = float(R_var)
|
||
info["r_brake"] = float(R_brake)
|
||
info["r_penalty"] = float(R_penalty)
|
||
reward = self.w1 * R_flow + self.w2 * R_var + w3 * R_brake + self.w4 * R_penalty
|
||
return float(reward * 10.0)
|
||
|