149 lines
4.9 KiB
Python
149 lines
4.9 KiB
Python
"""Shared reward configuration and calculation for freeway VSL environments."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Mapping, Sequence
|
|
|
|
import numpy as np
|
|
|
|
|
|
REWARD_COMPONENT_COLUMNS = (
|
|
"r_flow",
|
|
"r_var",
|
|
"r_brake",
|
|
"r_penalty",
|
|
)
|
|
|
|
REWARD_COMPONENT_LABELS = {
|
|
"r_flow": "R_flow",
|
|
"r_var": "R_var",
|
|
"r_brake": "R_brake",
|
|
"r_penalty": "R_penalty",
|
|
}
|
|
|
|
|
|
def init_reward_component_totals() -> Dict[str, float]:
|
|
return {column: 0.0 for column in REWARD_COMPONENT_COLUMNS}
|
|
|
|
|
|
def average_reward_components(totals: Mapping[str, float], steps: int) -> Dict[str, float]:
|
|
denom = max(int(steps), 1)
|
|
return {column: float(totals.get(column, 0.0)) / denom for column in REWARD_COMPONENT_COLUMNS}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RewardConfig:
|
|
w_flow: float = 0.4
|
|
w_var: float = 0.3
|
|
w_brake_base: float = 0.1
|
|
w_brake_max: float = 0.3
|
|
w_penalty: float = 0.2
|
|
rho_critical: float = 44.75
|
|
k_sigmoid: float = 0.2
|
|
d_threshold: float = 5.0
|
|
d_max: float = 20.0
|
|
C_max: float = 4924.0
|
|
v_limit: float = 30.56
|
|
delta_vsl_max: float = 16.67
|
|
ttc_threshold_s: float = 2.3
|
|
bottleneck_window_size: int = 3
|
|
leader_gap_threshold_m: float = 100.0
|
|
|
|
@classmethod
|
|
def from_dict(
|
|
cls,
|
|
raw_cfg: Mapping[str, object],
|
|
*,
|
|
speed_actions_ms: Sequence[float],
|
|
) -> "RewardConfig":
|
|
default_delta_vsl_max = 60.0 / 3.6
|
|
if len(speed_actions_ms) > 0:
|
|
default_delta_vsl_max = float(np.max(speed_actions_ms) - np.min(speed_actions_ms))
|
|
|
|
return cls(
|
|
w_flow=float(raw_cfg.get("w_flow", 0.4)),
|
|
w_var=float(raw_cfg.get("w_var", 0.3)),
|
|
w_brake_base=float(raw_cfg.get("w_brake_base", 0.1)),
|
|
w_brake_max=float(raw_cfg.get("w_brake_max", 0.3)),
|
|
w_penalty=float(raw_cfg.get("w_penalty", 0.2)),
|
|
rho_critical=float(raw_cfg.get("rho_critical", 44.75)),
|
|
k_sigmoid=float(raw_cfg.get("k_sigmoid", 0.2)),
|
|
d_threshold=float(raw_cfg.get("d_threshold", 5.0)),
|
|
d_max=float(raw_cfg.get("d_max", 20.0)),
|
|
C_max=float(raw_cfg.get("C_max", 4924.0)),
|
|
v_limit=float(raw_cfg.get("v_limit", 30.56)),
|
|
delta_vsl_max=float(raw_cfg.get("delta_vsl_max", default_delta_vsl_max)),
|
|
ttc_threshold_s=float(raw_cfg.get("ttc_threshold_s", 2.3)),
|
|
bottleneck_window_size=max(1, int(raw_cfg.get("bottleneck_window_size", 3))),
|
|
leader_gap_threshold_m=float(raw_cfg.get("leader_gap_threshold_m", 100.0)),
|
|
)
|
|
|
|
|
|
class RewardCalculator:
|
|
"""Four-term April reward: flow, speed variance, hard braking, and VSL smoothness."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
config: RewardConfig,
|
|
controlled_edge_start_index: int,
|
|
evaluation_mode: bool = False,
|
|
):
|
|
self.config = config
|
|
self.controlled_edge_start_index = int(controlled_edge_start_index)
|
|
self.evaluation_mode = bool(evaluation_mode)
|
|
|
|
def calculate(
|
|
self,
|
|
*,
|
|
info: Dict,
|
|
current_edge_speeds: np.ndarray,
|
|
prev_edge_speeds: np.ndarray,
|
|
episode_index: int,
|
|
) -> float:
|
|
_ = episode_index
|
|
|
|
q_t = float(info.get("throughput", 0.0))
|
|
r_flow = q_t / max(float(self.config.C_max), 1e-6)
|
|
|
|
speed_variance_norm = float(info.get("speed_variance_norm", 0.0))
|
|
r_var = -speed_variance_norm
|
|
|
|
rho_t = float(info.get("density", 0.0))
|
|
w_brake = self.config.w_brake_base + (
|
|
self.config.w_brake_max - self.config.w_brake_base
|
|
) / (1.0 + np.exp(-self.config.k_sigmoid * (rho_t - self.config.rho_critical)))
|
|
|
|
brake_decels = info.get("brake_decels", [])
|
|
total_vehicles = max(int(info.get("num_vehicles", 0)), 1)
|
|
brake_denominator = max(self.config.d_max - self.config.d_threshold, 1e-6)
|
|
if brake_decels:
|
|
sum_brake_penalty = sum(
|
|
max(0.0, (float(d) - self.config.d_threshold) / brake_denominator)
|
|
for d in brake_decels
|
|
)
|
|
brake_penalty = sum_brake_penalty / total_vehicles
|
|
else:
|
|
brake_penalty = 0.0
|
|
r_brake = -brake_penalty
|
|
|
|
vsl_change = np.abs(
|
|
np.asarray(current_edge_speeds, dtype=float) - np.asarray(prev_edge_speeds, dtype=float)
|
|
)
|
|
max_vsl_change = float(np.max(vsl_change)) if vsl_change.size > 0 else 0.0
|
|
r_penalty = -max_vsl_change / max(float(self.config.delta_vsl_max), 1e-6)
|
|
|
|
info["r_flow"] = float(r_flow)
|
|
info["r_var"] = float(r_var)
|
|
info["r_brake"] = float(r_brake)
|
|
info["r_penalty"] = float(r_penalty)
|
|
|
|
reward = (
|
|
self.config.w_flow * r_flow
|
|
+ self.config.w_var * r_var
|
|
+ w_brake * r_brake
|
|
+ self.config.w_penalty * r_penalty
|
|
)
|
|
return float(reward * 10.0)
|