239 lines
11 KiB
Python
239 lines
11 KiB
Python
"""Independent rule-based VSL controllers used as non-learning baselines."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Mapping
|
|
|
|
import numpy as np
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RuleVSLConfig:
|
|
speed_actions_kmh: np.ndarray
|
|
free_flow_speed_kmh: float
|
|
total_edge_count: int
|
|
controlled_edge_start_index: int
|
|
num_controlled_edges: int
|
|
occupancy_release_pct: float = 12.0
|
|
occupancy_moderate_pct: float = 20.0
|
|
occupancy_high_pct: float = 30.0
|
|
occupancy_severe_pct: float = 40.0
|
|
speed_moderate_ratio: float = 0.75
|
|
speed_high_ratio: float = 0.60
|
|
speed_severe_ratio: float = 0.45
|
|
bottleneck_occupancy_pct: float = 25.0
|
|
bottleneck_high_occupancy_pct: float = 35.0
|
|
bottleneck_speed_ratio: float = 0.75
|
|
bottleneck_high_speed_ratio: float = 0.60
|
|
bottleneck_lookahead_segments: int = 3
|
|
speed_drop_warn_kmh: float = 10.0
|
|
speed_drop_severe_kmh: float = 18.0
|
|
harmonization_target_warn_kmh: float = 80.0
|
|
harmonization_target_severe_kmh: float = 60.0
|
|
temporal_step_limit: int = 1
|
|
spatial_step_limit: int = 1
|
|
|
|
@classmethod
|
|
def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None) -> "RuleVSLConfig":
|
|
raw_cfg = dict(raw_cfg or {})
|
|
return cls(
|
|
speed_actions_kmh=np.asarray(env.speed_actions_kmh, dtype=float),
|
|
free_flow_speed_kmh=float(env.free_flow_speed) * 3.6,
|
|
total_edge_count=int(env.num_edges),
|
|
controlled_edge_start_index=int(env.controlled_edge_start_index),
|
|
num_controlled_edges=int(env.num_controlled_edges),
|
|
occupancy_release_pct=float(raw_cfg.get("occupancy_release_pct", 12.0)),
|
|
occupancy_moderate_pct=float(raw_cfg.get("occupancy_moderate_pct", 20.0)),
|
|
occupancy_high_pct=float(raw_cfg.get("occupancy_high_pct", 30.0)),
|
|
occupancy_severe_pct=float(raw_cfg.get("occupancy_severe_pct", 40.0)),
|
|
speed_moderate_ratio=float(raw_cfg.get("speed_moderate_ratio", 0.75)),
|
|
speed_high_ratio=float(raw_cfg.get("speed_high_ratio", 0.60)),
|
|
speed_severe_ratio=float(raw_cfg.get("speed_severe_ratio", 0.45)),
|
|
bottleneck_occupancy_pct=float(raw_cfg.get("bottleneck_occupancy_pct", 25.0)),
|
|
bottleneck_high_occupancy_pct=float(raw_cfg.get("bottleneck_high_occupancy_pct", 35.0)),
|
|
bottleneck_speed_ratio=float(raw_cfg.get("bottleneck_speed_ratio", 0.75)),
|
|
bottleneck_high_speed_ratio=float(raw_cfg.get("bottleneck_high_speed_ratio", 0.60)),
|
|
bottleneck_lookahead_segments=max(1, int(raw_cfg.get("bottleneck_lookahead_segments", 3))),
|
|
speed_drop_warn_kmh=float(raw_cfg.get("speed_drop_warn_kmh", 10.0)),
|
|
speed_drop_severe_kmh=float(raw_cfg.get("speed_drop_severe_kmh", 18.0)),
|
|
harmonization_target_warn_kmh=float(raw_cfg.get("harmonization_target_warn_kmh", 80.0)),
|
|
harmonization_target_severe_kmh=float(raw_cfg.get("harmonization_target_severe_kmh", 60.0)),
|
|
temporal_step_limit=max(0, int(raw_cfg.get("temporal_step_limit", 1))),
|
|
spatial_step_limit=max(0, int(raw_cfg.get("spatial_step_limit", 1))),
|
|
)
|
|
|
|
|
|
class BaseRuleVSLAgent:
|
|
"""Base class for one-principle rule-based VSL policies."""
|
|
|
|
policy_name = "base_rule_vsl"
|
|
|
|
def __init__(self, config: RuleVSLConfig):
|
|
self.config = config
|
|
self.speed_actions_kmh = np.asarray(config.speed_actions_kmh, dtype=float)
|
|
if self.speed_actions_kmh.ndim != 1 or self.speed_actions_kmh.size == 0:
|
|
raise ValueError("speed_actions_kmh must be a non-empty 1-D sequence")
|
|
self.max_action = int(self.speed_actions_kmh.size - 1)
|
|
self.previous_actions = np.full(config.num_controlled_edges, self.max_action, dtype=np.int64)
|
|
|
|
@classmethod
|
|
def from_env(cls, env, raw_cfg: Mapping[str, object] | None = None):
|
|
return cls(RuleVSLConfig.from_env(env, raw_cfg))
|
|
|
|
def reset_episode(self):
|
|
self.previous_actions[:] = self.max_action
|
|
|
|
def load(self, path: str):
|
|
_ = path
|
|
|
|
def save(self, path: str):
|
|
_ = path
|
|
|
|
def select_action(self, state: np.ndarray, deterministic: bool = True):
|
|
_ = deterministic
|
|
state_array = np.asarray(state, dtype=np.float32).reshape(-1)
|
|
speeds_kmh, occupancies_pct = self._parse_state(state_array)
|
|
target_actions = self._compute_targets(speeds_kmh, occupancies_pct)
|
|
target_actions = self._apply_temporal_smoothing(target_actions)
|
|
target_actions = self._apply_spatial_smoothing(target_actions)
|
|
self.previous_actions = target_actions.astype(np.int64, copy=True)
|
|
return self.previous_actions.copy(), 0.0, 0.0
|
|
|
|
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
|
|
_ = speeds_kmh, occupancies_pct
|
|
return np.full(self.config.num_controlled_edges, self.max_action, dtype=np.int64)
|
|
|
|
def _parse_state(self, state: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
|
n_edges = self.config.total_edge_count
|
|
required = n_edges * 3
|
|
if state.size < required:
|
|
raise ValueError(f"{self.policy_name} expected at least {required} state values, got {state.size}")
|
|
edge_features = state[:required].reshape(n_edges, 3)
|
|
speeds_kmh = np.clip(edge_features[:, 0], 0.0, 1.5) * self.config.free_flow_speed_kmh
|
|
occupancies_pct = np.clip(edge_features[:, 1], 0.0, 1.0) * 100.0
|
|
return speeds_kmh, occupancies_pct
|
|
|
|
def _nearest_action_at_or_below(self, speed_kmh: float) -> int:
|
|
eligible = np.where(self.speed_actions_kmh <= float(speed_kmh) + 1e-9)[0]
|
|
if eligible.size == 0:
|
|
return 0
|
|
return int(eligible[-1])
|
|
|
|
def _apply_temporal_smoothing(self, targets: np.ndarray) -> np.ndarray:
|
|
limit = self.config.temporal_step_limit
|
|
if limit <= 0:
|
|
return targets.astype(np.int64)
|
|
lower = self.previous_actions - limit
|
|
upper = self.previous_actions + limit
|
|
return np.clip(targets, lower, upper).astype(np.int64)
|
|
|
|
def _apply_spatial_smoothing(self, targets: np.ndarray) -> np.ndarray:
|
|
limit = self.config.spatial_step_limit
|
|
if limit <= 0 or targets.size <= 1:
|
|
return targets.astype(np.int64)
|
|
|
|
smoothed = targets.astype(np.int64, copy=True)
|
|
for idx in range(1, smoothed.size):
|
|
if smoothed[idx] > smoothed[idx - 1] + limit:
|
|
smoothed[idx] = smoothed[idx - 1] + limit
|
|
for idx in range(smoothed.size - 2, -1, -1):
|
|
if smoothed[idx] > smoothed[idx + 1] + limit:
|
|
smoothed[idx] = smoothed[idx + 1] + limit
|
|
return np.clip(smoothed, 0, self.max_action).astype(np.int64)
|
|
|
|
|
|
class OccupancyRuleVSLAgent(BaseRuleVSLAgent):
|
|
"""Smulders-style local occupancy/speed hysteresis rule."""
|
|
|
|
policy_name = "occ_rule_vsl"
|
|
|
|
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
|
|
cfg = self.config
|
|
targets = self.previous_actions.copy()
|
|
start = cfg.controlled_edge_start_index
|
|
|
|
for local_idx in range(cfg.num_controlled_edges):
|
|
edge_idx = start + local_idx
|
|
speed = float(speeds_kmh[edge_idx])
|
|
occ = float(occupancies_pct[edge_idx])
|
|
if occ >= cfg.occupancy_severe_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_severe_ratio:
|
|
targets[local_idx] = self._nearest_action_at_or_below(40.0)
|
|
elif occ >= cfg.occupancy_high_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_high_ratio:
|
|
targets[local_idx] = self._nearest_action_at_or_below(60.0)
|
|
elif occ >= cfg.occupancy_moderate_pct or speed <= cfg.free_flow_speed_kmh * cfg.speed_moderate_ratio:
|
|
targets[local_idx] = self._nearest_action_at_or_below(80.0)
|
|
elif occ <= cfg.occupancy_release_pct and speed >= cfg.free_flow_speed_kmh * 0.85:
|
|
targets[local_idx] = self.max_action
|
|
|
|
return targets
|
|
|
|
|
|
class BottleneckRuleVSLAgent(BaseRuleVSLAgent):
|
|
"""Hegyi-style downstream bottleneck pre-control rule."""
|
|
|
|
policy_name = "bottleneck_rule_vsl"
|
|
|
|
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
|
|
cfg = self.config
|
|
start = cfg.controlled_edge_start_index
|
|
targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64)
|
|
|
|
for local_idx in range(cfg.num_controlled_edges):
|
|
edge_idx = start + local_idx
|
|
lookahead_end = min(
|
|
cfg.total_edge_count,
|
|
edge_idx + cfg.bottleneck_lookahead_segments + 1,
|
|
)
|
|
if edge_idx + 1 >= lookahead_end:
|
|
continue
|
|
|
|
downstream_occs = occupancies_pct[edge_idx + 1 : lookahead_end]
|
|
downstream_speeds = speeds_kmh[edge_idx + 1 : lookahead_end]
|
|
if downstream_occs.size == 0:
|
|
continue
|
|
|
|
bottleneck_occ = float(np.max(downstream_occs))
|
|
bottleneck_speed = float(np.min(downstream_speeds))
|
|
if (
|
|
bottleneck_occ >= cfg.bottleneck_high_occupancy_pct
|
|
or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_high_speed_ratio
|
|
):
|
|
targets[local_idx] = self._nearest_action_at_or_below(60.0)
|
|
elif (
|
|
bottleneck_occ >= cfg.bottleneck_occupancy_pct
|
|
or bottleneck_speed <= cfg.free_flow_speed_kmh * cfg.bottleneck_speed_ratio
|
|
):
|
|
targets[local_idx] = self._nearest_action_at_or_below(80.0)
|
|
|
|
return targets
|
|
|
|
|
|
class HarmonizationRuleVSLAgent(BaseRuleVSLAgent):
|
|
"""Allaby-style speed-harmonization rule driven by downstream speed drop."""
|
|
|
|
policy_name = "harmonization_rule_vsl"
|
|
|
|
def _compute_targets(self, speeds_kmh: np.ndarray, occupancies_pct: np.ndarray) -> np.ndarray:
|
|
_ = occupancies_pct
|
|
cfg = self.config
|
|
start = cfg.controlled_edge_start_index
|
|
targets = np.full(cfg.num_controlled_edges, self.max_action, dtype=np.int64)
|
|
|
|
for local_idx in range(cfg.num_controlled_edges - 1):
|
|
edge_idx = start + local_idx
|
|
speed_drop = float(speeds_kmh[edge_idx] - speeds_kmh[edge_idx + 1])
|
|
if speed_drop >= cfg.speed_drop_severe_kmh:
|
|
targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_severe_kmh)
|
|
elif speed_drop >= cfg.speed_drop_warn_kmh:
|
|
targets[local_idx] = self._nearest_action_at_or_below(cfg.harmonization_target_warn_kmh)
|
|
|
|
return targets
|
|
|
|
|
|
RULE_VSL_AGENT_CLASSES = {
|
|
"occ_rule_vsl": OccupancyRuleVSLAgent,
|
|
"bottleneck_rule_vsl": BottleneckRuleVSLAgent,
|
|
"harmonization_rule_vsl": HarmonizationRuleVSLAgent,
|
|
}
|