177 lines
6.9 KiB
Python
177 lines
6.9 KiB
Python
"""Minimal reward blueprint for the current freeway VSL study."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Tuple
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RewardTerm:
|
|
name: str
|
|
symbol: str
|
|
objective: str
|
|
rationale: str
|
|
formula_tex: str
|
|
required_signals: Tuple[str, ...]
|
|
|
|
def to_markdown(self) -> str:
|
|
return "\n".join(
|
|
[
|
|
f"### {self.name}",
|
|
f"- Symbol: `{self.symbol}`",
|
|
f"- Objective: {self.objective}",
|
|
f"- Rationale: {self.rationale}",
|
|
f"- Formula: `{self.formula_tex}`",
|
|
"- Required signals: " + ", ".join(f"`{value}`" for value in self.required_signals),
|
|
]
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RewardBlueprint:
|
|
name: str
|
|
scenario_summary: str
|
|
primary_objective: str
|
|
design_principles: Tuple[str, ...]
|
|
terms: Tuple[RewardTerm, ...]
|
|
global_formula_tex: str
|
|
excluded_metrics: Tuple[str, ...] = ()
|
|
implementation_notes: Tuple[str, ...] = ()
|
|
|
|
def term_names(self) -> Tuple[str, ...]:
|
|
return tuple(term.name for term in self.terms)
|
|
|
|
def to_markdown(self) -> str:
|
|
lines = [
|
|
f"# {self.name}",
|
|
"",
|
|
"## Scenario",
|
|
self.scenario_summary,
|
|
"",
|
|
"## Primary Objective",
|
|
self.primary_objective,
|
|
"",
|
|
"## Global Formula",
|
|
f"`{self.global_formula_tex}`",
|
|
"",
|
|
"## Design Principles",
|
|
]
|
|
lines.extend(f"- {item}" for item in self.design_principles)
|
|
lines.append("")
|
|
lines.append("## Reward Terms")
|
|
for term in self.terms:
|
|
lines.append(term.to_markdown())
|
|
lines.append("")
|
|
if self.excluded_metrics:
|
|
lines.append("## Metrics To Avoid As Primary Reward Drivers")
|
|
lines.extend(f"- {item}" for item in self.excluded_metrics)
|
|
lines.append("")
|
|
if self.implementation_notes:
|
|
lines.append("## Implementation Notes")
|
|
lines.extend(f"- {item}" for item in self.implementation_notes)
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
def build_tac_mappo_reward_blueprint() -> RewardBlueprint:
|
|
"""Build the April-style four-term reward blueprint for corridor VSL."""
|
|
|
|
terms = (
|
|
RewardTerm(
|
|
name="flow",
|
|
symbol="R_flow",
|
|
objective="Normalize throughput into a bounded positive reward term.",
|
|
rationale=(
|
|
"Throughput should contribute directly but remain scale-bounded so that different flow levels "
|
|
"do not dominate the total reward by themselves."
|
|
),
|
|
formula_tex=r"R_{\mathrm{flow}}(t)=q_t/C_{\max}",
|
|
required_signals=("throughput", "reference capacity"),
|
|
),
|
|
RewardTerm(
|
|
name="variance",
|
|
symbol="R_var",
|
|
objective="Penalize spatial speed dispersion along the controlled corridor.",
|
|
rationale=(
|
|
"Large speed dispersion indicates unstable local traffic states. A negative variance term "
|
|
"rewards smoother corridor-wide speed profiles."
|
|
),
|
|
formula_tex=r"R_{\mathrm{var}}(t)=-\sigma_v^2(t)/v_{\max}^2",
|
|
required_signals=("relative speed samples", "speed limit"),
|
|
),
|
|
RewardTerm(
|
|
name="brake",
|
|
symbol="R_brake",
|
|
objective="Penalize hard braking events with a density-adaptive weight.",
|
|
rationale=(
|
|
"Hard braking is a stronger safety signal than coarse stop counting. The density-adaptive weight "
|
|
"keeps the safety term more influential when the corridor is crowded."
|
|
),
|
|
formula_tex=(
|
|
r"R_{\mathrm{brake}}(t)=-\frac{1}{N(t)}\sum_i"
|
|
r"\max\!\left(0,\frac{d_i-d_{\mathrm{th}}}{d_{\max}-d_{\mathrm{th}}}\right)"
|
|
),
|
|
required_signals=("vehicle acceleration", "hard-brake threshold", "vehicle count", "density"),
|
|
),
|
|
RewardTerm(
|
|
name="penalty",
|
|
symbol="R_penalty",
|
|
objective="Penalize abrupt VSL changes between consecutive control steps.",
|
|
rationale=(
|
|
"A separate smoothness penalty discourages aggressive limit jumps and helps stabilize policy "
|
|
"updates without mixing it into the traffic-state terms."
|
|
),
|
|
formula_tex=r"R_{\mathrm{penalty}}(t)=-\max_i |\Delta v_i(t)| / \Delta v_{\max}",
|
|
required_signals=("current VSL", "previous VSL", "action limits"),
|
|
),
|
|
)
|
|
|
|
return RewardBlueprint(
|
|
name="Four-Term Reward Blueprint For TAC-MAPPO",
|
|
scenario_summary=(
|
|
"The study controls a segmented freeway VSL corridor under fixed control intervals. "
|
|
"The reward should stay simple, bounded, and decomposable so that each term has a clear "
|
|
"traffic interpretation."
|
|
),
|
|
primary_objective=(
|
|
"Improve corridor throughput and smoothness while suppressing hard braking and abrupt control changes."
|
|
),
|
|
design_principles=(
|
|
"Keep the number of reward terms small enough to remain interpretable.",
|
|
"Use throughput as the only positive term.",
|
|
"Use separate negative terms for speed dispersion, hard braking, and control smoothness.",
|
|
"Let the braking weight vary with density instead of keeping it fixed.",
|
|
"Avoid cross-term utility transformations that blur the meaning of each signal.",
|
|
),
|
|
terms=terms,
|
|
global_formula_tex=(
|
|
r"R(t)=w_{\mathrm{flow}}R_{\mathrm{flow}}(t)+w_{\mathrm{var}}R_{\mathrm{var}}(t)"
|
|
r"+w_{\mathrm{brake}}(\rho_t)R_{\mathrm{brake}}(t)+w_{\mathrm{penalty}}R_{\mathrm{penalty}}(t)"
|
|
),
|
|
excluded_metrics=(
|
|
"Full nonlinear utility nesting that hides the contribution of each traffic signal.",
|
|
"Too many correlated penalties such as TTC, stop count, shockwave, and occupancy all at once.",
|
|
"Instantaneous outflow spikes as the only efficiency signal.",
|
|
),
|
|
implementation_notes=(
|
|
"The runtime code multiplies the weighted sum by 10 to keep the reward scale consistent.",
|
|
"The brake weight follows a logistic density gate.",
|
|
"Throughput, variance, braking, and penalty are all logged separately for analysis.",
|
|
),
|
|
)
|
|
|
|
|
|
def build_reward_blueprint_markdown() -> str:
|
|
return build_tac_mappo_reward_blueprint().to_markdown()
|
|
|
|
|
|
def iter_required_signals() -> Iterable[str]:
|
|
signals = set()
|
|
for term in build_tac_mappo_reward_blueprint().terms:
|
|
signals.update(term.required_signals)
|
|
return tuple(sorted(signals))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(build_reward_blueprint_markdown())
|