"""Shared reward configuration and calculation for freeway VSL environments.""" from __future__ import annotations from dataclasses import dataclass from typing import Dict, Mapping, Sequence import numpy as np REWARD_COMPONENT_COLUMNS = ( "r_flow", "r_var", "r_brake", "r_penalty", ) REWARD_COMPONENT_LABELS = { "r_flow": "R_flow", "r_var": "R_var", "r_brake": "R_brake", "r_penalty": "R_penalty", } def init_reward_component_totals() -> Dict[str, float]: return {column: 0.0 for column in REWARD_COMPONENT_COLUMNS} def average_reward_components(totals: Mapping[str, float], steps: int) -> Dict[str, float]: denom = max(int(steps), 1) return {column: float(totals.get(column, 0.0)) / denom for column in REWARD_COMPONENT_COLUMNS} @dataclass(frozen=True) class RewardConfig: w_flow: float = 0.4 w_var: float = 0.3 w_brake_base: float = 0.1 w_brake_max: float = 0.3 w_penalty: float = 0.2 rho_critical: float = 44.75 k_sigmoid: float = 0.2 d_threshold: float = 5.0 d_max: float = 20.0 C_max: float = 4924.0 v_limit: float = 30.56 delta_vsl_max: float = 16.67 ttc_threshold_s: float = 2.3 bottleneck_window_size: int = 3 leader_gap_threshold_m: float = 100.0 @classmethod def from_dict( cls, raw_cfg: Mapping[str, object], *, speed_actions_ms: Sequence[float], ) -> "RewardConfig": default_delta_vsl_max = 60.0 / 3.6 if len(speed_actions_ms) > 0: default_delta_vsl_max = float(np.max(speed_actions_ms) - np.min(speed_actions_ms)) return cls( w_flow=float(raw_cfg.get("w_flow", 0.4)), w_var=float(raw_cfg.get("w_var", 0.3)), w_brake_base=float(raw_cfg.get("w_brake_base", 0.1)), w_brake_max=float(raw_cfg.get("w_brake_max", 0.3)), w_penalty=float(raw_cfg.get("w_penalty", 0.2)), rho_critical=float(raw_cfg.get("rho_critical", 44.75)), k_sigmoid=float(raw_cfg.get("k_sigmoid", 0.2)), d_threshold=float(raw_cfg.get("d_threshold", 5.0)), d_max=float(raw_cfg.get("d_max", 20.0)), C_max=float(raw_cfg.get("C_max", 4924.0)), v_limit=float(raw_cfg.get("v_limit", 30.56)), delta_vsl_max=float(raw_cfg.get("delta_vsl_max", default_delta_vsl_max)), ttc_threshold_s=float(raw_cfg.get("ttc_threshold_s", 2.3)), bottleneck_window_size=max(1, int(raw_cfg.get("bottleneck_window_size", 3))), leader_gap_threshold_m=float(raw_cfg.get("leader_gap_threshold_m", 100.0)), ) class RewardCalculator: """Four-term April reward: flow, speed variance, hard braking, and VSL smoothness.""" def __init__( self, *, config: RewardConfig, controlled_edge_start_index: int, evaluation_mode: bool = False, ): self.config = config self.controlled_edge_start_index = int(controlled_edge_start_index) self.evaluation_mode = bool(evaluation_mode) def calculate( self, *, info: Dict, current_edge_speeds: np.ndarray, prev_edge_speeds: np.ndarray, episode_index: int, ) -> float: _ = episode_index q_t = float(info.get("throughput", 0.0)) r_flow = q_t / max(float(self.config.C_max), 1e-6) speed_variance_norm = float(info.get("speed_variance_norm", 0.0)) r_var = -speed_variance_norm rho_t = float(info.get("density", 0.0)) w_brake = self.config.w_brake_base + ( self.config.w_brake_max - self.config.w_brake_base ) / (1.0 + np.exp(-self.config.k_sigmoid * (rho_t - self.config.rho_critical))) brake_decels = info.get("brake_decels", []) total_vehicles = max(int(info.get("num_vehicles", 0)), 1) brake_denominator = max(self.config.d_max - self.config.d_threshold, 1e-6) if brake_decels: sum_brake_penalty = sum( max(0.0, (float(d) - self.config.d_threshold) / brake_denominator) for d in brake_decels ) brake_penalty = sum_brake_penalty / total_vehicles else: brake_penalty = 0.0 r_brake = -brake_penalty vsl_change = np.abs( np.asarray(current_edge_speeds, dtype=float) - np.asarray(prev_edge_speeds, dtype=float) ) max_vsl_change = float(np.max(vsl_change)) if vsl_change.size > 0 else 0.0 r_penalty = -max_vsl_change / max(float(self.config.delta_vsl_max), 1e-6) info["r_flow"] = float(r_flow) info["r_var"] = float(r_var) info["r_brake"] = float(r_brake) info["r_penalty"] = float(r_penalty) reward = ( self.config.w_flow * r_flow + self.config.w_var * r_var + w_brake * r_brake + self.config.w_penalty * r_penalty ) return float(reward * 10.0)