ctm-dqn/envs/reward_design_blueprint.py

177 lines
6.9 KiB
Python

"""Minimal reward blueprint for the current freeway VSL study."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Tuple
@dataclass(frozen=True)
class RewardTerm:
name: str
symbol: str
objective: str
rationale: str
formula_tex: str
required_signals: Tuple[str, ...]
def to_markdown(self) -> str:
return "\n".join(
[
f"### {self.name}",
f"- Symbol: `{self.symbol}`",
f"- Objective: {self.objective}",
f"- Rationale: {self.rationale}",
f"- Formula: `{self.formula_tex}`",
"- Required signals: " + ", ".join(f"`{value}`" for value in self.required_signals),
]
)
@dataclass(frozen=True)
class RewardBlueprint:
name: str
scenario_summary: str
primary_objective: str
design_principles: Tuple[str, ...]
terms: Tuple[RewardTerm, ...]
global_formula_tex: str
excluded_metrics: Tuple[str, ...] = ()
implementation_notes: Tuple[str, ...] = ()
def term_names(self) -> Tuple[str, ...]:
return tuple(term.name for term in self.terms)
def to_markdown(self) -> str:
lines = [
f"# {self.name}",
"",
"## Scenario",
self.scenario_summary,
"",
"## Primary Objective",
self.primary_objective,
"",
"## Global Formula",
f"`{self.global_formula_tex}`",
"",
"## Design Principles",
]
lines.extend(f"- {item}" for item in self.design_principles)
lines.append("")
lines.append("## Reward Terms")
for term in self.terms:
lines.append(term.to_markdown())
lines.append("")
if self.excluded_metrics:
lines.append("## Metrics To Avoid As Primary Reward Drivers")
lines.extend(f"- {item}" for item in self.excluded_metrics)
lines.append("")
if self.implementation_notes:
lines.append("## Implementation Notes")
lines.extend(f"- {item}" for item in self.implementation_notes)
return "\n".join(lines).rstrip() + "\n"
def build_tac_mappo_reward_blueprint() -> RewardBlueprint:
"""Build the April-style four-term reward blueprint for corridor VSL."""
terms = (
RewardTerm(
name="flow",
symbol="R_flow",
objective="Normalize throughput into a bounded positive reward term.",
rationale=(
"Throughput should contribute directly but remain scale-bounded so that different flow levels "
"do not dominate the total reward by themselves."
),
formula_tex=r"R_{\mathrm{flow}}(t)=q_t/C_{\max}",
required_signals=("throughput", "reference capacity"),
),
RewardTerm(
name="variance",
symbol="R_var",
objective="Penalize spatial speed dispersion along the controlled corridor.",
rationale=(
"Large speed dispersion indicates unstable local traffic states. A negative variance term "
"rewards smoother corridor-wide speed profiles."
),
formula_tex=r"R_{\mathrm{var}}(t)=-\sigma_v^2(t)/v_{\max}^2",
required_signals=("relative speed samples", "speed limit"),
),
RewardTerm(
name="brake",
symbol="R_brake",
objective="Penalize hard braking events with a density-adaptive weight.",
rationale=(
"Hard braking is a stronger safety signal than coarse stop counting. The density-adaptive weight "
"keeps the safety term more influential when the corridor is crowded."
),
formula_tex=(
r"R_{\mathrm{brake}}(t)=-\frac{1}{N(t)}\sum_i"
r"\max\!\left(0,\frac{d_i-d_{\mathrm{th}}}{d_{\max}-d_{\mathrm{th}}}\right)"
),
required_signals=("vehicle acceleration", "hard-brake threshold", "vehicle count", "density"),
),
RewardTerm(
name="penalty",
symbol="R_penalty",
objective="Penalize abrupt VSL changes between consecutive control steps.",
rationale=(
"A separate smoothness penalty discourages aggressive limit jumps and helps stabilize policy "
"updates without mixing it into the traffic-state terms."
),
formula_tex=r"R_{\mathrm{penalty}}(t)=-\max_i |\Delta v_i(t)| / \Delta v_{\max}",
required_signals=("current VSL", "previous VSL", "action limits"),
),
)
return RewardBlueprint(
name="Four-Term Reward Blueprint For TAC-MAPPO",
scenario_summary=(
"The study controls a segmented freeway VSL corridor under fixed control intervals. "
"The reward should stay simple, bounded, and decomposable so that each term has a clear "
"traffic interpretation."
),
primary_objective=(
"Improve corridor throughput and smoothness while suppressing hard braking and abrupt control changes."
),
design_principles=(
"Keep the number of reward terms small enough to remain interpretable.",
"Use throughput as the only positive term.",
"Use separate negative terms for speed dispersion, hard braking, and control smoothness.",
"Let the braking weight vary with density instead of keeping it fixed.",
"Avoid cross-term utility transformations that blur the meaning of each signal.",
),
terms=terms,
global_formula_tex=(
r"R(t)=w_{\mathrm{flow}}R_{\mathrm{flow}}(t)+w_{\mathrm{var}}R_{\mathrm{var}}(t)"
r"+w_{\mathrm{brake}}(\rho_t)R_{\mathrm{brake}}(t)+w_{\mathrm{penalty}}R_{\mathrm{penalty}}(t)"
),
excluded_metrics=(
"Full nonlinear utility nesting that hides the contribution of each traffic signal.",
"Too many correlated penalties such as TTC, stop count, shockwave, and occupancy all at once.",
"Instantaneous outflow spikes as the only efficiency signal.",
),
implementation_notes=(
"The runtime code multiplies the weighted sum by 10 to keep the reward scale consistent.",
"The brake weight follows a logistic density gate.",
"Throughput, variance, braking, and penalty are all logged separately for analysis.",
),
)
def build_reward_blueprint_markdown() -> str:
return build_tac_mappo_reward_blueprint().to_markdown()
def iter_required_signals() -> Iterable[str]:
signals = set()
for term in build_tac_mappo_reward_blueprint().terms:
signals.update(term.required_signals)
return tuple(sorted(signals))
if __name__ == "__main__":
print(build_reward_blueprint_markdown())