157 lines
6.2 KiB
Python
157 lines
6.2 KiB
Python
"""Minimal reward blueprint for the current freeway VSL study."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Tuple
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RewardTerm:
|
|
name: str
|
|
symbol: str
|
|
objective: str
|
|
rationale: str
|
|
formula_tex: str
|
|
required_signals: Tuple[str, ...]
|
|
|
|
def to_markdown(self) -> str:
|
|
return "\n".join(
|
|
[
|
|
f"### {self.name}",
|
|
f"- Symbol: `{self.symbol}`",
|
|
f"- Objective: {self.objective}",
|
|
f"- Rationale: {self.rationale}",
|
|
f"- Formula: `{self.formula_tex}`",
|
|
"- Required signals: " + ", ".join(f"`{value}`" for value in self.required_signals),
|
|
]
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RewardBlueprint:
|
|
name: str
|
|
scenario_summary: str
|
|
primary_objective: str
|
|
design_principles: Tuple[str, ...]
|
|
terms: Tuple[RewardTerm, ...]
|
|
global_formula_tex: str
|
|
excluded_metrics: Tuple[str, ...] = ()
|
|
implementation_notes: Tuple[str, ...] = ()
|
|
|
|
def term_names(self) -> Tuple[str, ...]:
|
|
return tuple(term.name for term in self.terms)
|
|
|
|
def to_markdown(self) -> str:
|
|
lines = [
|
|
f"# {self.name}",
|
|
"",
|
|
"## Scenario",
|
|
self.scenario_summary,
|
|
"",
|
|
"## Primary Objective",
|
|
self.primary_objective,
|
|
"",
|
|
"## Global Formula",
|
|
f"`{self.global_formula_tex}`",
|
|
"",
|
|
"## Design Principles",
|
|
]
|
|
lines.extend(f"- {item}" for item in self.design_principles)
|
|
lines.append("")
|
|
lines.append("## Reward Terms")
|
|
for term in self.terms:
|
|
lines.append(term.to_markdown())
|
|
lines.append("")
|
|
if self.excluded_metrics:
|
|
lines.append("## Metrics To Avoid As Primary Reward Drivers")
|
|
lines.extend(f"- {item}" for item in self.excluded_metrics)
|
|
lines.append("")
|
|
if self.implementation_notes:
|
|
lines.append("## Implementation Notes")
|
|
lines.extend(f"- {item}" for item in self.implementation_notes)
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
def build_tca_mappo_reward_blueprint() -> RewardBlueprint:
|
|
"""Build the current minimal reward blueprint for corridor VSL."""
|
|
|
|
terms = (
|
|
RewardTerm(
|
|
name="efficiency",
|
|
symbol="R_efficiency",
|
|
objective="Maintain a high average running speed in the controlled corridor.",
|
|
rationale=(
|
|
"Instantaneous corridor outflow fluctuates strongly with simulation time and is noisy as a "
|
|
"step reward. A normalized mean-speed term is smoother, reacts more directly to VSL control, "
|
|
"and is easier for policy optimization to fit."
|
|
),
|
|
formula_tex=r"R_efficiency(t)=\begin{cases}\mathrm{clip}(\bar{v}(t)/v_{\max},0,1),&N(t)>0\\0,&N(t)=0\end{cases}",
|
|
required_signals=("controlled-corridor mean speed", "controlled-corridor active vehicle count", "speed limit"),
|
|
),
|
|
RewardTerm(
|
|
name="safety",
|
|
symbol="R_safety",
|
|
objective="Penalize corridor stopping with one compact safety term.",
|
|
rationale=(
|
|
"For this experiment, excessive stopping is already a strong proxy for unstable or "
|
|
"unsafe traffic operation. Using only stop rate keeps the objective simpler and "
|
|
"reduces interference from overlapping surrogate signals."
|
|
),
|
|
formula_tex=(
|
|
r"R_safety(t)=-clip(w_s \hat{s}(t),0,1)"
|
|
),
|
|
required_signals=("corridor stop rate",),
|
|
),
|
|
)
|
|
|
|
return RewardBlueprint(
|
|
name="Minimal Efficiency-Safety Reward Blueprint For TCA-MAPPO",
|
|
scenario_summary=(
|
|
"The study controls a segmented freeway VSL corridor under fixed control intervals. "
|
|
"For architecture comparison, the reward should emphasize a small number of stable, "
|
|
"interpretable traffic goals."
|
|
),
|
|
primary_objective=(
|
|
"Improve corridor running efficiency and reduce unsafe interactions with a compact reward."
|
|
),
|
|
design_principles=(
|
|
"Prefer stable per-step traffic signals over highly time-dependent outflow spikes.",
|
|
"Prefer direct operational efficiency metrics over too many intermediate bottleneck heuristics.",
|
|
"Keep safety in the reward, but merge it into a single compact term.",
|
|
"Avoid overlapping safety surrogates when one strong proxy is already available.",
|
|
"Keep the reward simple enough that architecture comparison remains credible.",
|
|
"Avoid auxiliary regularizers that change the objective across training stages.",
|
|
),
|
|
terms=terms,
|
|
global_formula_tex=(
|
|
r"R(t)=\kappa [\lambda_{\mathrm{eff}} R_{\mathrm{efficiency}}(t)+\lambda_{\mathrm{safe}} R_{\mathrm{safety}}(t)]"
|
|
),
|
|
excluded_metrics=(
|
|
"Instantaneous outflow as the primary step reward because it is strongly modulated by simulation time.",
|
|
"Too many correlated penalties such as shockwave, occupancy, braking, and variance all entering together.",
|
|
"Control smoothness penalties that distort architecture comparison.",
|
|
),
|
|
implementation_notes=(
|
|
"The efficiency term is currently built from normalized controlled-corridor mean speed.",
|
|
"The safety term is currently built only from corridor stop rate.",
|
|
"Throughput and bottleneck occupancy can still be logged for diagnosis even if they are no longer part of the reward.",
|
|
"Training and evaluate now share the same fixed reward structure.",
|
|
),
|
|
)
|
|
|
|
|
|
def build_reward_blueprint_markdown() -> str:
|
|
return build_tca_mappo_reward_blueprint().to_markdown()
|
|
|
|
|
|
def iter_required_signals() -> Iterable[str]:
|
|
signals = set()
|
|
for term in build_tca_mappo_reward_blueprint().terms:
|
|
signals.update(term.required_signals)
|
|
return tuple(sorted(signals))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(build_reward_blueprint_markdown())
|