""" Cell Transmission Model (CTM) for traffic flow simulation. """ import numpy as np from typing import Tuple, Optional class CTMModel: """Cell Transmission Model for highway traffic simulation.""" def __init__( self, num_cells: int = 10, cell_length: float = 500.0, free_flow_speed: float = 30.0, congestion_wave_speed: float = 5.0, max_density: float = 180.0, critical_density: float = 30.0, jam_density: float = 180.0, time_step: float = 10.0, ): """ Initialize CTM model. Args: num_cells: Number of road cells cell_length: Length of each cell (meters) free_flow_speed: Free flow speed (m/s) congestion_wave_speed: Congestion wave speed (m/s) max_density: Maximum density (vehicles/km) critical_density: Critical density (vehicles/km) jam_density: Jam density (vehicles/km) time_step: Simulation time step (seconds) """ self.num_cells = num_cells self.cell_length = cell_length self.free_flow_speed = free_flow_speed self.congestion_wave_speed = congestion_wave_speed self.max_density = max_density self.critical_density = critical_density self.jam_density = jam_density self.time_step = time_step # Calculate capacity self.capacity = self.critical_density * self.free_flow_speed # Initialize state self.densities = np.zeros(num_cells) self.speed_limits = np.ones(num_cells) * free_flow_speed def reset(self, initial_density: Optional[np.ndarray] = None) -> np.ndarray: """Reset the model state.""" if initial_density is not None: self.densities = initial_density.copy() else: self.densities = np.random.uniform(5, 20, self.num_cells) self.speed_limits = np.ones(self.num_cells) * self.free_flow_speed return self.get_state() def set_speed_limit(self, cell_idx: int, speed_limit: float): """Set speed limit for a specific cell.""" self.speed_limits[cell_idx] = np.clip( speed_limit, 0, self.free_flow_speed ) def get_state(self) -> np.ndarray: """Get current state (densities and speed limits).""" return np.concatenate([self.densities, self.speed_limits]) def _calculate_sending_flow(self, cell_idx: int) -> float: """ Calculate sending flow from a cell (vehicles per time step). Returns flow in vehicles that can leave the cell during one time step. """ density = self.densities[cell_idx] speed_limit = self.speed_limits[cell_idx] effective_speed = min(speed_limit, self.free_flow_speed) # Flow = density (veh/km) * speed (m/s) * 3.6 to get veh/h, then convert to veh/timestep # Simplified: density * speed * time_step / 1000 gives vehicles per time step sending_flow = min( density * effective_speed * self.time_step / 1000.0, self.capacity * self.time_step / 1000.0 ) return sending_flow def _calculate_receiving_flow(self, cell_idx: int) -> float: """ Calculate receiving flow to a cell (vehicles per time step). Returns flow in vehicles that can enter the cell during one time step. """ density = self.densities[cell_idx] # Receiving capacity based on available space # congestion_wave_speed (m/s) * (jam_density - density) (veh/km) * time_step / 1000 receiving_flow = min( self.capacity * self.time_step / 1000.0, self.congestion_wave_speed * (self.jam_density - density) * self.time_step / 1000.0 ) return receiving_flow def step(self, inflow: float, outflow: float) -> Tuple[np.ndarray, dict]: """ Perform one simulation step. Args: inflow: Inflow to the first cell (vehicles/hour) outflow: Outflow from the last cell (vehicles/hour) Returns: state: Updated state info: Dictionary with simulation metrics """ new_densities = self.densities.copy() flows = np.zeros(self.num_cells + 1) # Convert inflow from veh/h to vehicles per time step flows[0] = inflow * self.time_step / 3600.0 for i in range(self.num_cells): sending = self._calculate_sending_flow(i) if i < self.num_cells - 1: receiving = self._calculate_receiving_flow(i + 1) else: # Convert outflow from veh/h to vehicles per time step receiving = outflow * self.time_step / 3600.0 flows[i + 1] = min(sending, receiving) for i in range(self.num_cells): delta_vehicles = flows[i] - flows[i + 1] new_densities[i] = self.densities[i] + delta_vehicles / ( self.cell_length / 1000.0 ) new_densities[i] = np.clip(new_densities[i], 0, self.jam_density) self.densities = new_densities info = { "densities": self.densities.copy(), "flows": flows.copy(), "average_density": np.mean(self.densities), "total_vehicles": np.sum(self.densities) * self.cell_length / 1000.0, # flows[-1] is in vehicles per time step, convert to veh/h "throughput": flows[-1] * 3600.0 / self.time_step, } return self.get_state(), info