ctm-dqn/envs/reward_system.py

107 lines
3.2 KiB
Python

"""Shared reward configuration and calculation for freeway VSL environments."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Mapping, Sequence
import numpy as np
REWARD_COMPONENT_COLUMNS = (
"r_throughput",
"r_safety",
)
REWARD_COMPONENT_LABELS = {
"r_throughput": "R_throughput",
"r_safety": "R_safety",
}
def clip01(value: float) -> float:
return float(np.clip(value, 0.0, 1.0))
def init_reward_component_totals() -> Dict[str, float]:
return {column: 0.0 for column in REWARD_COMPONENT_COLUMNS}
def average_reward_components(totals: Mapping[str, float], steps: int) -> Dict[str, float]:
denom = max(int(steps), 1)
return {column: float(totals.get(column, 0.0)) / denom for column in REWARD_COMPONENT_COLUMNS}
@dataclass(frozen=True)
class RewardConfig:
reward_scale: float = 10.0
throughput_weight: float = 0.75
safety_weight: float = 0.20
throughput_ref_vehph: float = 3908.1
safety_stop_weight: float = 1.0
bottleneck_window_size: int = 3
v_limit: float = 33.33
leader_gap_threshold_m: float = 100.0
@classmethod
def from_dict(
cls,
raw_cfg: Mapping[str, object],
*,
speed_actions_ms: Sequence[float],
) -> "RewardConfig":
_ = speed_actions_ms
return cls(
reward_scale=float(raw_cfg.get("reward_scale", 10.0)),
throughput_weight=float(raw_cfg.get("throughput_weight", 0.75)),
safety_weight=float(raw_cfg.get("safety_weight", 0.20)),
throughput_ref_vehph=float(raw_cfg.get("throughput_ref_vehph", 3908.1)),
safety_stop_weight=float(raw_cfg.get("safety_stop_weight", 1.0)),
bottleneck_window_size=max(1, int(raw_cfg.get("bottleneck_window_size", 3))),
v_limit=float(raw_cfg.get("v_limit", 33.33)),
leader_gap_threshold_m=float(raw_cfg.get("leader_gap_threshold_m", 100.0)),
)
class RewardCalculator:
"""Encapsulates a minimal reward for freeway VSL control."""
def __init__(
self,
*,
config: RewardConfig,
controlled_edge_start_index: int,
evaluation_mode: bool = False,
):
self.config = config
self.controlled_edge_start_index = int(controlled_edge_start_index)
self.evaluation_mode = bool(evaluation_mode)
def calculate(
self,
*,
info: Dict,
current_edge_speeds: np.ndarray,
prev_edge_speeds: np.ndarray,
episode_index: int,
) -> float:
_ = current_edge_speeds, prev_edge_speeds, episode_index
throughput = float(info.get("throughput", 0.0))
r_throughput = clip01(throughput / max(self.config.throughput_ref_vehph, 1e-6))
stop_rate = clip01(float(info.get("stop_rate", 0.0)))
safety_penalty = clip01(self.config.safety_stop_weight * stop_rate)
r_safety = -safety_penalty
info["r_throughput"] = float(r_throughput)
info["r_safety"] = float(r_safety)
info["safety_penalty_norm"] = float(safety_penalty)
reward = (
self.config.throughput_weight * r_throughput
+ self.config.safety_weight * r_safety
)
return float(reward * self.config.reward_scale)