""" Training environment for DQN-based speed limit control. """ import numpy as np from typing import Tuple, Dict from ctm_model import CTMModel class TrafficEnvironment: """Traffic environment for speed limit control.""" def __init__(self, config: dict): """Initialize environment.""" env_config = config["environment"] reward_config = config["reward"] self.ctm = CTMModel( num_cells=env_config["num_cells"], cell_length=env_config["cell_length"], free_flow_speed=env_config["free_flow_speed"], congestion_wave_speed=env_config["congestion_wave_speed"], max_density=env_config["max_density"], critical_density=env_config["critical_density"], jam_density=env_config["jam_density"], time_step=env_config["time_step"], ) self.demand_mean = env_config["demand_mean"] self.demand_std = env_config["demand_std"] self.demand_pattern = env_config["demand_pattern"] self.min_speed_limit = env_config["min_speed_limit"] self.max_speed_limit = env_config["max_speed_limit"] self.num_speed_actions = env_config["num_speed_actions"] self.episode_length = env_config["episode_length"] self.time_step = env_config["time_step"] self.throughput_weight = reward_config["throughput_weight"] self.speed_weight = reward_config["speed_weight"] self.density_weight = reward_config["density_weight"] self.action_change_weight = reward_config["action_change_weight"] self.speed_actions = np.linspace( self.min_speed_limit, self.max_speed_limit, self.num_speed_actions ) self.current_step = 0 self.previous_action = None self.episode_metrics = [] def reset(self) -> np.ndarray: """Reset environment to initial state.""" self.ctm.reset() self.current_step = 0 self.previous_action = None self.episode_metrics = [] return self.ctm.get_state() def _generate_demand(self) -> float: """Generate traffic demand based on pattern.""" if self.demand_pattern == "constant": demand = self.demand_mean elif self.demand_pattern == "sine": t = self.current_step * self.time_step / 3600.0 demand = self.demand_mean + self.demand_std * np.sin(2 * np.pi * t / 2.0) elif self.demand_pattern == "random": demand = np.random.normal(self.demand_mean, self.demand_std) else: demand = self.demand_mean return max(0, demand) def _calculate_reward(self, info: Dict, action: int) -> float: """Calculate reward based on traffic metrics.""" throughput = info["throughput"] avg_density = info["average_density"] avg_speed = 0 for i in range(self.ctm.num_cells): if self.ctm.densities[i] > 0: avg_speed += min( self.ctm.speed_limits[i], self.ctm.free_flow_speed * (1 - self.ctm.densities[i] / self.ctm.jam_density) ) avg_speed /= self.ctm.num_cells reward = ( self.throughput_weight * (throughput / 2000.0) + self.speed_weight * (avg_speed / self.max_speed_limit) + self.density_weight * (avg_density / self.ctm.critical_density) ) if self.previous_action is not None and action != self.previous_action: reward += self.action_change_weight return reward def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]: """Execute one step in the environment.""" speed_limit = self.speed_actions[action] for i in range(self.ctm.num_cells): self.ctm.set_speed_limit(i, speed_limit) inflow = self._generate_demand() outflow = 2000.0 next_state, info = self.ctm.step(inflow, outflow) reward = self._calculate_reward(info, action) self.current_step += 1 done = self.current_step >= self.episode_length self.episode_metrics.append(info) self.previous_action = action info["reward"] = reward info["step"] = self.current_step return next_state, reward, done, info @property def state_dim(self) -> int: """Get state dimension.""" return self.ctm.num_cells * 2 @property def action_dim(self) -> int: """Get action dimension.""" return self.num_speed_actions