""" Cell Transmission Model (CTM) for traffic flow simulation. """ import numpy as np from typing import Tuple, Optional class CTMModel: """Cell Transmission Model for highway traffic simulation.""" def __init__( self, num_cells: int = 10, cell_length: float = 500.0, free_flow_speed: float = 30.0, congestion_wave_speed: float = 5.0, max_density: float = 180.0, critical_density: float = 30.0, jam_density: float = 180.0, time_step: float = 10.0, ): """ Initialize CTM model. Args: num_cells: Number of road cells cell_length: Length of each cell (meters) free_flow_speed: Free flow speed (m/s) congestion_wave_speed: Congestion wave speed (m/s) max_density: Maximum density (vehicles/km) critical_density: Critical density (vehicles/km) jam_density: Jam density (vehicles/km) time_step: Simulation time step (seconds) """ self.num_cells = num_cells self.cell_length = cell_length self.free_flow_speed = free_flow_speed self.congestion_wave_speed = congestion_wave_speed self.max_density = max_density self.critical_density = critical_density self.jam_density = jam_density self.time_step = time_step # Calculate capacity self.capacity = self.critical_density * self.free_flow_speed # Initialize state self.densities = np.zeros(num_cells) self.speed_limits = np.ones(num_cells) * free_flow_speed def reset(self, initial_density: Optional[np.ndarray] = None) -> np.ndarray: """Reset the model state.""" if initial_density is not None: self.densities = initial_density.copy() else: self.densities = np.random.uniform(5, 20, self.num_cells) self.speed_limits = np.ones(self.num_cells) * self.free_flow_speed return self.get_state() def set_speed_limit(self, cell_idx: int, speed_limit: float): """Set speed limit for a specific cell.""" self.speed_limits[cell_idx] = np.clip( speed_limit, 0, self.free_flow_speed ) def get_state(self) -> np.ndarray: """Get current state (densities and speed limits).""" return np.concatenate([self.densities, self.speed_limits]) def _calculate_sending_flow(self, cell_idx: int) -> float: """Calculate sending flow from a cell.""" density = self.densities[cell_idx] speed_limit = self.speed_limits[cell_idx] effective_speed = min(speed_limit, self.free_flow_speed) sending_flow = min( density * effective_speed, self.capacity ) return sending_flow def _calculate_receiving_flow(self, cell_idx: int) -> float: """Calculate receiving flow to a cell.""" density = self.densities[cell_idx] receiving_flow = min( self.capacity, self.congestion_wave_speed * (self.jam_density - density) ) return receiving_flow def step(self, inflow: float, outflow: float) -> Tuple[np.ndarray, dict]: """ Perform one simulation step. Args: inflow: Inflow to the first cell (vehicles/hour) outflow: Outflow from the last cell (vehicles/hour) Returns: state: Updated state info: Dictionary with simulation metrics """ new_densities = self.densities.copy() flows = np.zeros(self.num_cells + 1) flows[0] = inflow / 3600.0 * self.time_step for i in range(self.num_cells): sending = self._calculate_sending_flow(i) if i < self.num_cells - 1: receiving = self._calculate_receiving_flow(i + 1) else: receiving = outflow / 3600.0 * self.time_step flows[i + 1] = min(sending, receiving) for i in range(self.num_cells): delta_vehicles = flows[i] - flows[i + 1] new_densities[i] = self.densities[i] + delta_vehicles / ( self.cell_length / 1000.0 ) new_densities[i] = np.clip(new_densities[i], 0, self.jam_density) self.densities = new_densities info = { "densities": self.densities.copy(), "flows": flows.copy(), "average_density": np.mean(self.densities), "total_vehicles": np.sum(self.densities) * self.cell_length / 1000.0, "throughput": flows[-1] * 3600.0 / self.time_step, } return self.get_state(), info