"""Independent rule-based VSL controllers used as non-learning baselines.""" from __future__ import annotations from dataclasses import dataclass from typing import Mapping import numpy as np @dataclass(frozen=True) class RuleVSLConfig: speed_actions_kmh: np.ndarray free_flow_speed_kmh: float total_edge_count: int controlled_edge_start_index: int num_controlled_edges: int occupancy_release_pct: float = 12.0 occupancy_moderate_pct: float = 20.0 occupancy_high_pct: float = 30.0 occupancy_severe_pct: float = 40.0 speed_moderate_ratio: float = 0.75 speed_high_ratio: float = 0.60 speed_severe_ratio: float = 0.45 bottleneck_occupancy_pct: float = 25.0 bottleneck_high_occupancy_pct: float = 35.0 bottleneck_speed_ratio: float = 0.75 bottleneck_high_speed_ratio: float = 0.60 bottleneck_lookahead_segments: int = 3 speed_drop_warn_kmh: float = 10.0 speed_drop_severe_kmh: float = 18.0 harmonization_target_warn_kmh: float = 80.0 harmonization_target_severe_kmh: float = 60.0 temporal_step_limit: int = 1 spatial_step_limit: int = 1 @classmethod def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None) -> "RuleVSLConfig": raw_cfg = dict(raw_cfg or {}) return cls( speed_actions_kmh=np.asarray(env.speed_actions_kmh, dtype=float), free_flow_speed_kmh=float(env.free_flow_speed) * 3.6, total_edge_count=int(env.num_edges), controlled_edge_start_index=int(env.controlled_edge_start_index), num_controlled_edges=int(env.num_controlled_edges), occupancy_release_pct=float(raw_cfg.get("occupancy_release_pct", 12.0)), occupancy_moderate_pct=float(raw_cfg.get("occupancy_moderate_pct", 20.0)), occupancy_high_pct=float(raw_cfg.get("occupancy_high_pct", 30.0)), occupancy_severe_pct=float(raw_cfg.get("occupancy_severe_pct", 40.0)), speed_moderate_ratio=float(raw_cfg.get("speed_moderate_ratio", 0.75)), speed_high_ratio=float(raw_cfg.get("speed_high_ratio", 0.60)), speed_severe_ratio=float(raw_cfg.get("speed_severe_ratio", 0.45)), bottleneck_occupancy_pct=float(raw_cfg.get("bottleneck_occupancy_pct", 25.0)), bottleneck_high_occupancy_pct=float(raw_cfg.get("bottleneck_high_occupancy_pct", 35.0)), bottleneck_speed_ratio=float(raw_cfg.get("bottleneck_speed_ratio", 0.75)), bottleneck_high_speed_ratio=float(raw_cfg.get("bottleneck_high_speed_ratio", 0.60)), bottleneck_lookahead_segments=max(1, int(raw_cfg.get("bottleneck_lookahead_segments", 3))), speed_drop_warn_kmh=float(raw_cfg.get("speed_drop_warn_kmh", 10.0)), speed_drop_severe_kmh=float(raw_cfg.get("speed_drop_severe_kmh", 18.0)), harmonization_target_warn_kmh=float(raw_cfg.get("harmonization_target_warn_kmh", 80.0)), harmonization_target_severe_kmh=float(raw_cfg.get("harmonization_target_severe_kmh", 60.0)), temporal_step_limit=max(0, int(raw_cfg.get("temporal_step_limit", 1))), spatial_step_limit=max(0, int(raw_cfg.get("spatial_step_limit", 1))), ) class BaseRuleVSLAgent: """Base class for one-principle rule-based VSL policies.""" policy_name = "base_rule_vsl" def __init__(self, config: RuleVSLConfig): self.config = config self.speed_actions_kmh = np.asarray(config.speed_actions_kmh, dtype=float) if self.speed_actions_kmh.ndim != 1 or self.speed_actions_kmh.size == 0: raise ValueError("speed_actions_kmh must be a non-empty 1-D sequence") self.max_action = int(self.speed_actions_kmh.size - 1) self.previous_actions = np.full(config.num_controlled_edges, self.max_action, dtype=np.int64) @classmethod def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None): return cls(RuleVSLConfig.from_env(env, raw_cfg)) def reset_episode(self): self.previous_actions[:] = self.max_action def load(self, path: str): _ = path def save(self, path: str): _ = path def select_action(self, state: np.ndarray, deterministic: bool = True): _ = deterministic state_array = np.asarray(state, dtype=np.float32).reshape(-1) speeds_kmh, occupancies_pct = self._parse_state(state_array) target_actions = self._compute_targets(speeds_kmh, occupancies_pct) target_actions = self._apply_temporal_smoothing(target_actions) target_actions = self._apply_spatial_smoothing(target_actions) self.previous_actions = target_actions.astype(np.int64, copy=True) return self.previous_actions.copy(), 0.0, 0.0 def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray: _ = speeds_kmh, occupancies_pct return np.full(self.config.num_controlled_edges, self.max_action, dtype=np.int64) def _parse_state(self, state: np.ndarray) -> tuple[np.ndarray, np.ndarray]: n_edges = self.config.total_edge_count required = n_edges * 3 if state.size < required: raise ValueError(f"{self.policy_name} expected at least {required} state values, got {state.size}") edge_features = state[:required].reshape(n_edges, 3) speeds_kmh = np.clip(edge_features[:, 0], 0.0, 1.5) * self.config.free_flow_speed_kmh occupancies_pct = np.clip(edge_features[:, 1], 0.0, 1.0) * 100.0 return speeds_kmh, occupancies_pct def _nearest_action_at_or_below(self, speed_kmh: float) -> int: eligible = np.where(self.speed_actions_kmh <= float(speed_kmh) + 1e-9)[0] if eligible.size == 0: return 0 return int(eligible[-1]) def _apply_temporal_smoothing(self, targets: np.ndarray) -> np.ndarray: limit = self.config.temporal_step_limit if limit <= 0: return targets.astype(np.int64) lower = self.previous_actions - limit upper = self.previous_actions + limit return np.clip(targets, lower, upper).astype(np.int64) def _apply_spatial_smoothing(self, targets: np.ndarray) -> np.ndarray: limit = self.config.spatial_step_limit if limit <= 0 or targets.size <= 1: return targets.astype(np.int64) smoothed = targets.astype(np.int64, copy=True) for idx in range(1, smoothed.size): if smoothed[idx] > smoothed[idx - 1] + limit: smoothed[idx] = smoothed[idx - 1] + limit for idx in range(smoothed.size - 2, -1, -1): if smoothed[idx] > smoothed[idx + 1] + limit: smoothed[idx] = smoothed[idx + 1] + limit return np.clip(smoothed, 0, self.max_action).astype(np.int64) class OccupancyRuleVSLAgent(BaseRuleVSLAgent): """Smulders-style local occupancy/speed hysteresis rule.""" policy_name = "occ_rule_vsl" def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray: cfg = self.config targets = self.previous_actions.copy() start = cfg.controlled_edge_start_index for local_idx in range(cfg.num_controlled_edges): edge_idx = start + local_idx speed = float(speeds_kmh[edge_idx]) occ = float(occupancies_pct[edge_idx]) if occ >= cfg.occupancy_severe_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_severe_ratio: targets[local_idx] = self._nearest_action_at_or_below(40.0) elif occ >= cfg.occupancy_high_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_high_ratio: targets[local_idx] = self._nearest_action_at_or_below(60.0) elif occ >= cfg.occupancy_moderate_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_moderate_ratio: targets[local_idx] = self._nearest_action_at_or_below(80.0) elif occ <= cfg.occupancy_release_pct and speed >= cfg.free_flow_speed_kmh * 0.85: targets[local_idx] = self.max_action return targets class BottleneckRuleVSLAgent(BaseRuleVSLAgent): """Hegyi-style downstream bottleneck pre-control rule.""" policy_name = "bottleneck_rule_vsl" def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray: cfg = self.config start = cfg.controlled_edge_start_index targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64) for local_idx in range(cfg.num_controlled_edges): edge_idx = start + local_idx lookahead_end = min( cfg.total_edge_count, edge_idx + cfg.bottleneck_lookahead_segments + 1, ) if edge_idx + 1 >= lookahead_end: continue downstream_occs = occupancies_pct[edge_idx + 1 : lookahead_end] downstream_speeds = speeds_kmh[edge_idx + 1 : lookahead_end] if downstream_occs.size == 0: continue bottleneck_occ = float(np.max(downstream_occs)) bottleneck_speed = float(np.min(downstream_speeds)) if ( bottleneck_occ >= cfg.bottleneck_high_occupancy_pct or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_high_speed_ratio ): targets[local_idx] = self._nearest_action_at_or_below(60.0) elif ( bottleneck_occ >= cfg.bottleneck_occupancy_pct or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_speed_ratio ): targets[local_idx] = self._nearest_action_at_or_below(80.0) return targets class HarmonizationRuleVSLAgent(BaseRuleVSLAgent): """Allaby-style speed-harmonization rule driven by downstream speed drop.""" policy_name = "harmonization_rule_vsl" def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray: _ = occupancies_pct cfg = self.config start = cfg.controlled_edge_start_index targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64) for local_idx in range(cfg.num_controlled_edges - 1): edge_idx = start + local_idx speed_drop = float(speeds_kmh[edge_idx] - speeds_kmh[edge_idx + 1]) if speed_drop >= cfg.speed_drop_severe_kmh: targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_severe_kmh) elif speed_drop >= cfg.speed_drop_warn_kmh: targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_warn_kmh) return targets RULE_VSL_AGENT_CLASSES = { "occ_rule_vsl": OccupancyRuleVSLAgent, "bottleneck_rule_vsl": BottleneckRuleVSLAgent, "harmonization_rule_vsl": HarmonizationRuleVSLAgent, }