140 lines
4.9 KiB
Python
140 lines
4.9 KiB
Python
"""
|
|
Cell Transmission Model (CTM) for traffic flow simulation.
|
|
"""
|
|
import numpy as np
|
|
from typing import Tuple, Optional
|
|
|
|
|
|
class CTMModel:
|
|
"""Cell Transmission Model for highway traffic simulation."""
|
|
|
|
def __init__(
|
|
self,
|
|
num_cells: int = 10,
|
|
cell_length: float = 500.0,
|
|
free_flow_speed: float = 30.0,
|
|
congestion_wave_speed: float = 5.0,
|
|
max_density: float = 180.0,
|
|
critical_density: float = 30.0,
|
|
jam_density: float = 180.0,
|
|
time_step: float = 10.0,
|
|
):
|
|
"""
|
|
Initialize CTM model.
|
|
|
|
Args:
|
|
num_cells: Number of road cells
|
|
cell_length: Length of each cell (meters)
|
|
free_flow_speed: Free flow speed (m/s)
|
|
congestion_wave_speed: Congestion wave speed (m/s)
|
|
max_density: Maximum density (vehicles/km)
|
|
critical_density: Critical density (vehicles/km)
|
|
jam_density: Jam density (vehicles/km)
|
|
time_step: Simulation time step (seconds)
|
|
"""
|
|
self.num_cells = num_cells
|
|
self.cell_length = cell_length
|
|
self.free_flow_speed = free_flow_speed
|
|
self.congestion_wave_speed = congestion_wave_speed
|
|
self.max_density = max_density
|
|
self.critical_density = critical_density
|
|
self.jam_density = jam_density
|
|
self.time_step = time_step
|
|
|
|
# Calculate capacity
|
|
self.capacity = self.critical_density * self.free_flow_speed
|
|
|
|
# Initialize state
|
|
self.densities = np.zeros(num_cells)
|
|
self.speed_limits = np.ones(num_cells) * free_flow_speed
|
|
|
|
def reset(self, initial_density: Optional[np.ndarray] = None) -> np.ndarray:
|
|
"""Reset the model state."""
|
|
if initial_density is not None:
|
|
self.densities = initial_density.copy()
|
|
else:
|
|
self.densities = np.random.uniform(5, 20, self.num_cells)
|
|
|
|
self.speed_limits = np.ones(self.num_cells) * self.free_flow_speed
|
|
return self.get_state()
|
|
|
|
def set_speed_limit(self, cell_idx: int, speed_limit: float):
|
|
"""Set speed limit for a specific cell."""
|
|
self.speed_limits[cell_idx] = np.clip(
|
|
speed_limit, 0, self.free_flow_speed
|
|
)
|
|
|
|
def get_state(self) -> np.ndarray:
|
|
"""Get current state (densities and speed limits)."""
|
|
return np.concatenate([self.densities, self.speed_limits])
|
|
|
|
def _calculate_sending_flow(self, cell_idx: int) -> float:
|
|
"""Calculate sending flow from a cell (vehicles per time step)."""
|
|
density = self.densities[cell_idx]
|
|
speed_limit = self.speed_limits[cell_idx]
|
|
|
|
effective_speed = min(speed_limit, self.free_flow_speed)
|
|
sending_flow = min(
|
|
density * effective_speed * self.time_step / 1000.0,
|
|
self.capacity * self.time_step / 1000.0
|
|
)
|
|
return sending_flow
|
|
|
|
def _calculate_receiving_flow(self, cell_idx: int) -> float:
|
|
"""Calculate receiving flow to a cell (vehicles per time step)."""
|
|
density = self.densities[cell_idx]
|
|
|
|
receiving_flow = min(
|
|
self.capacity * self.time_step / 1000.0,
|
|
self.congestion_wave_speed * (self.jam_density - density) * self.time_step / 1000.0
|
|
)
|
|
return receiving_flow
|
|
|
|
def step(self, inflow: float, outflow: float) -> Tuple[np.ndarray, dict]:
|
|
"""
|
|
Perform one simulation step.
|
|
|
|
Args:
|
|
inflow: Inflow to the first cell (vehicles/hour)
|
|
outflow: Outflow from the last cell (vehicles/hour)
|
|
|
|
Returns:
|
|
state: Updated state
|
|
info: Dictionary with simulation metrics
|
|
"""
|
|
new_densities = self.densities.copy()
|
|
flows = np.zeros(self.num_cells + 1)
|
|
|
|
# Convert inflow from veh/h to vehicles per time step
|
|
flows[0] = inflow * self.time_step / 3600.0
|
|
|
|
for i in range(self.num_cells):
|
|
sending = self._calculate_sending_flow(i)
|
|
if i < self.num_cells - 1:
|
|
receiving = self._calculate_receiving_flow(i + 1)
|
|
else:
|
|
# Convert outflow from veh/h to vehicles per time step
|
|
receiving = outflow * self.time_step / 3600.0
|
|
|
|
flows[i + 1] = min(sending, receiving)
|
|
|
|
for i in range(self.num_cells):
|
|
delta_vehicles = flows[i] - flows[i + 1]
|
|
new_densities[i] = self.densities[i] + delta_vehicles / (
|
|
self.cell_length / 1000.0
|
|
)
|
|
new_densities[i] = np.clip(new_densities[i], 0, self.jam_density)
|
|
|
|
self.densities = new_densities
|
|
|
|
info = {
|
|
"densities": self.densities.copy(),
|
|
"flows": flows.copy(),
|
|
"average_density": np.mean(self.densities),
|
|
"total_vehicles": np.sum(self.densities) * self.cell_length / 1000.0,
|
|
# flows[-1] is in vehicles per time step, convert to veh/h
|
|
"throughput": flows[-1] * 3600.0 / self.time_step,
|
|
}
|
|
|
|
return self.get_state(), info
|