ctm-dqn/agents/rule_vsl_agent.py

239 lines
11 KiB
Python

"""Independent rule-based VSL controllers used as non-learning baselines."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Mapping
import numpy as np
@dataclass(frozen=True)
class RuleVSLConfig:
speed_actions_kmh: np.ndarray
free_flow_speed_kmh: float
total_edge_count: int
controlled_edge_start_index: int
num_controlled_edges: int
occupancy_release_pct: float = 12.0
occupancy_moderate_pct: float = 20.0
occupancy_high_pct: float = 30.0
occupancy_severe_pct: float = 40.0
speed_moderate_ratio: float = 0.75
speed_high_ratio: float = 0.60
speed_severe_ratio: float = 0.45
bottleneck_occupancy_pct: float = 25.0
bottleneck_high_occupancy_pct: float = 35.0
bottleneck_speed_ratio: float = 0.75
bottleneck_high_speed_ratio: float = 0.60
bottleneck_lookahead_segments: int = 3
speed_drop_warn_kmh: float = 10.0
speed_drop_severe_kmh: float = 18.0
harmonization_target_warn_kmh: float = 80.0
harmonization_target_severe_kmh: float = 60.0
temporal_step_limit: int = 1
spatial_step_limit: int = 1
@classmethod
def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None) -> "RuleVSLConfig":
raw_cfg = dict(raw_cfg or {})
return cls(
speed_actions_kmh=np.asarray(env.speed_actions_kmh, dtype=float),
free_flow_speed_kmh=float(env.free_flow_speed) * 3.6,
total_edge_count=int(env.num_edges),
controlled_edge_start_index=int(env.controlled_edge_start_index),
num_controlled_edges=int(env.num_controlled_edges),
occupancy_release_pct=float(raw_cfg.get("occupancy_release_pct", 12.0)),
occupancy_moderate_pct=float(raw_cfg.get("occupancy_moderate_pct", 20.0)),
occupancy_high_pct=float(raw_cfg.get("occupancy_high_pct", 30.0)),
occupancy_severe_pct=float(raw_cfg.get("occupancy_severe_pct", 40.0)),
speed_moderate_ratio=float(raw_cfg.get("speed_moderate_ratio", 0.75)),
speed_high_ratio=float(raw_cfg.get("speed_high_ratio", 0.60)),
speed_severe_ratio=float(raw_cfg.get("speed_severe_ratio", 0.45)),
bottleneck_occupancy_pct=float(raw_cfg.get("bottleneck_occupancy_pct", 25.0)),
bottleneck_high_occupancy_pct=float(raw_cfg.get("bottleneck_high_occupancy_pct", 35.0)),
bottleneck_speed_ratio=float(raw_cfg.get("bottleneck_speed_ratio", 0.75)),
bottleneck_high_speed_ratio=float(raw_cfg.get("bottleneck_high_speed_ratio", 0.60)),
bottleneck_lookahead_segments=max(1, int(raw_cfg.get("bottleneck_lookahead_segments", 3))),
speed_drop_warn_kmh=float(raw_cfg.get("speed_drop_warn_kmh", 10.0)),
speed_drop_severe_kmh=float(raw_cfg.get("speed_drop_severe_kmh", 18.0)),
harmonization_target_warn_kmh=float(raw_cfg.get("harmonization_target_warn_kmh", 80.0)),
harmonization_target_severe_kmh=float(raw_cfg.get("harmonization_target_severe_kmh", 60.0)),
temporal_step_limit=max(0, int(raw_cfg.get("temporal_step_limit", 1))),
spatial_step_limit=max(0, int(raw_cfg.get("spatial_step_limit", 1))),
)
class BaseRuleVSLAgent:
"""Base class for one-principle rule-based VSL policies."""
policy_name = "base_rule_vsl"
def __init__(self, config: RuleVSLConfig):
self.config = config
self.speed_actions_kmh = np.asarray(config.speed_actions_kmh, dtype=float)
if self.speed_actions_kmh.ndim != 1 or self.speed_actions_kmh.size == 0:
raise ValueError("speed_actions_kmh must be a non-empty 1-D sequence")
self.max_action = int(self.speed_actions_kmh.size - 1)
self.previous_actions = np.full(config.num_controlled_edges, self.max_action, dtype=np.int64)
@classmethod
def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None):
return cls(RuleVSLConfig.from_env(env, raw_cfg))
def reset_episode(self):
self.previous_actions[:] = self.max_action
def load(self, path: str):
_ = path
def save(self, path: str):
_ = path
def select_action(self, state: np.ndarray, deterministic: bool = True):
_ = deterministic
state_array = np.asarray(state, dtype=np.float32).reshape(-1)
speeds_kmh, occupancies_pct = self._parse_state(state_array)
target_actions = self._compute_targets(speeds_kmh, occupancies_pct)
target_actions = self._apply_temporal_smoothing(target_actions)
target_actions = self._apply_spatial_smoothing(target_actions)
self.previous_actions = target_actions.astype(np.int64, copy=True)
return self.previous_actions.copy(), 0.0, 0.0
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
_ = speeds_kmh, occupancies_pct
return np.full(self.config.num_controlled_edges, self.max_action, dtype=np.int64)
def _parse_state(self, state: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
n_edges = self.config.total_edge_count
required = n_edges * 3
if state.size < required:
raise ValueError(f"{self.policy_name} expected at least {required} state values, got {state.size}")
edge_features = state[:required].reshape(n_edges, 3)
speeds_kmh = np.clip(edge_features[:, 0], 0.0, 1.5) * self.config.free_flow_speed_kmh
occupancies_pct = np.clip(edge_features[:, 1], 0.0, 1.0) * 100.0
return speeds_kmh, occupancies_pct
def _nearest_action_at_or_below(self, speed_kmh: float) -> int:
eligible = np.where(self.speed_actions_kmh <= float(speed_kmh) + 1e-9)[0]
if eligible.size == 0:
return 0
return int(eligible[-1])
def _apply_temporal_smoothing(self, targets: np.ndarray) -> np.ndarray:
limit = self.config.temporal_step_limit
if limit <= 0:
return targets.astype(np.int64)
lower = self.previous_actions - limit
upper = self.previous_actions + limit
return np.clip(targets, lower, upper).astype(np.int64)
def _apply_spatial_smoothing(self, targets: np.ndarray) -> np.ndarray:
limit = self.config.spatial_step_limit
if limit <= 0 or targets.size <= 1:
return targets.astype(np.int64)
smoothed = targets.astype(np.int64, copy=True)
for idx in range(1, smoothed.size):
if smoothed[idx] > smoothed[idx - 1] + limit:
smoothed[idx] = smoothed[idx - 1] + limit
for idx in range(smoothed.size - 2, -1, -1):
if smoothed[idx] > smoothed[idx + 1] + limit:
smoothed[idx] = smoothed[idx + 1] + limit
return np.clip(smoothed, 0, self.max_action).astype(np.int64)
class OccupancyRuleVSLAgent(BaseRuleVSLAgent):
"""Smulders-style local occupancy/speed hysteresis rule."""
policy_name = "occ_rule_vsl"
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
cfg = self.config
targets = self.previous_actions.copy()
start = cfg.controlled_edge_start_index
for local_idx in range(cfg.num_controlled_edges):
edge_idx = start + local_idx
speed = float(speeds_kmh[edge_idx])
occ = float(occupancies_pct[edge_idx])
if occ >= cfg.occupancy_severe_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_severe_ratio:
targets[local_idx] = self._nearest_action_at_or_below(40.0)
elif occ >= cfg.occupancy_high_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_high_ratio:
targets[local_idx] = self._nearest_action_at_or_below(60.0)
elif occ >= cfg.occupancy_moderate_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_moderate_ratio:
targets[local_idx] = self._nearest_action_at_or_below(80.0)
elif occ <= cfg.occupancy_release_pct and speed >= cfg.free_flow_speed_kmh * 0.85:
targets[local_idx] = self.max_action
return targets
class BottleneckRuleVSLAgent(BaseRuleVSLAgent):
"""Hegyi-style downstream bottleneck pre-control rule."""
policy_name = "bottleneck_rule_vsl"
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
cfg = self.config
start = cfg.controlled_edge_start_index
targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64)
for local_idx in range(cfg.num_controlled_edges):
edge_idx = start + local_idx
lookahead_end = min(
cfg.total_edge_count,
edge_idx + cfg.bottleneck_lookahead_segments + 1,
)
if edge_idx + 1 >= lookahead_end:
continue
downstream_occs = occupancies_pct[edge_idx + 1 : lookahead_end]
downstream_speeds = speeds_kmh[edge_idx + 1 : lookahead_end]
if downstream_occs.size == 0:
continue
bottleneck_occ = float(np.max(downstream_occs))
bottleneck_speed = float(np.min(downstream_speeds))
if (
bottleneck_occ >= cfg.bottleneck_high_occupancy_pct
or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_high_speed_ratio
):
targets[local_idx] = self._nearest_action_at_or_below(60.0)
elif (
bottleneck_occ >= cfg.bottleneck_occupancy_pct
or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_speed_ratio
):
targets[local_idx] = self._nearest_action_at_or_below(80.0)
return targets
class HarmonizationRuleVSLAgent(BaseRuleVSLAgent):
"""Allaby-style speed-harmonization rule driven by downstream speed drop."""
policy_name = "harmonization_rule_vsl"
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
_ = occupancies_pct
cfg = self.config
start = cfg.controlled_edge_start_index
targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64)
for local_idx in range(cfg.num_controlled_edges - 1):
edge_idx = start + local_idx
speed_drop = float(speeds_kmh[edge_idx] - speeds_kmh[edge_idx + 1])
if speed_drop >= cfg.speed_drop_severe_kmh:
targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_severe_kmh)
elif speed_drop >= cfg.speed_drop_warn_kmh:
targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_warn_kmh)
return targets
RULE_VSL_AGENT_CLASSES = {
"occ_rule_vsl": OccupancyRuleVSLAgent,
"bottleneck_rule_vsl": BottleneckRuleVSLAgent,
"harmonization_rule_vsl": HarmonizationRuleVSLAgent,
}