From 7becebfe6b389dc1ecbf0bbb1dbcc369d851116d Mon Sep 17 00:00:00 2001 From: Maple-YZ Date: Sat, 25 Apr 2026 08:23:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E5=9F=BA=E4=BA=8E=E6=A8=A1?= =?UTF-8?q?=E6=8B=9F=E8=BF=90=E8=A1=8C=E7=9A=84=E5=A5=96=E5=8A=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config_sumo_vsl.yaml | 14 ++- envs/edge_vsl_env.py | 49 +++++++- envs/reward_design_blueprint.py | 168 ------------------------- envs/reward_system.py | 160 ++++++++++++++++-------- run_all_training.py | 4 +- scripts/plot_live_training.py | 4 +- training/registry.py | 108 +++++++++++------ training/train_appo.py | 56 ++++----- training/train_dcmappo.py | 8 +- training/train_ddpg.py | 35 +++--- training/train_gpro.py | 8 +- training/train_mappo.py | 8 +- training/train_no_control.py | 209 ++++++++++++++++++++++++++++++++ training/train_ppo.py | 63 +++++----- training/train_sac.py | 8 +- training/train_tcamappo.py | 8 +- training/train_td3.py | 35 +++--- training/train_value_based.py | 8 +- utils/reward_baseline.py | 138 +++++++++++++++++++++ 19 files changed, 717 insertions(+), 374 deletions(-) delete mode 100644 envs/reward_design_blueprint.py create mode 100644 training/train_no_control.py create mode 100644 utils/reward_baseline.py diff --git a/config_sumo_vsl.yaml b/config_sumo_vsl.yaml index 4aa252c..d7f9c45 100644 --- a/config_sumo_vsl.yaml +++ b/config_sumo_vsl.yaml @@ -63,15 +63,17 @@ environment: free_flow_speed: 30.56 reward: - efficiency_alpha: 2.1159 - safety_beta: 7.8553 - efficiency_exponent: 0.50 - safety_exponent: 0.50 + mode: "paired_no_control" + baseline_dir: "" + baseline_key: "step" + baseline_wait_timeout_s: 3600.0 + baseline_poll_interval_s: 1.0 + travel_time_weight: 0.5 + ttc_weight: 0.5 + travel_time_min_denominator_s: 60.0 ttc_threshold_s: 2.3 bottleneck_window_size: 3 - - v_limit: 30.56 leader_gap_threshold_m: 100.0 training: diff --git a/envs/edge_vsl_env.py b/envs/edge_vsl_env.py index 3ce444d..e592040 100644 --- a/envs/edge_vsl_env.py +++ b/envs/edge_vsl_env.py @@ -21,6 +21,7 @@ import traci.constants as tc from envs.network_parser import SUMONetworkParser from envs.reward_system import RewardCalculator, RewardConfig from utils.experiment_corridor import prepare_experiment_corridor_assets +from utils.reward_baseline import resolve_baseline_dir, wait_for_episode_baseline class SUMOEdgeVSLEnvironment: @@ -41,6 +42,7 @@ class SUMOEdgeVSLEnvironment: runtime_cfg = config.get("runtime", {}) self.runtime_output_dir = runtime_cfg.get("output_dir") self.runtime_metrics_subdir = runtime_cfg.get("metrics_subdir", "sumo_metrics") + self.run_timestamp = runtime_cfg.get("run_timestamp") self.evaluation_mode = bool(runtime_cfg.get("evaluation_mode", False)) self.runtime_detector_add_file: Optional[str] = None self.runtime_enex_add_file: Optional[str] = None @@ -163,6 +165,8 @@ class SUMOEdgeVSLEnvironment: controlled_edge_start_index=self.controlled_edge_start_index, evaluation_mode=self.evaluation_mode, ) + self.reward_baseline_dir = resolve_baseline_dir(config, self.run_timestamp) + self.reward_calculator.set_step_baseline({}) self.action_dims = [self.num_speed_actions] * self.num_controlled_edges self.features_per_edge = 3 @@ -182,6 +186,8 @@ class SUMOEdgeVSLEnvironment: self._interval_mainline_travel_times: List[float] = [] self._episode_rng = np.random.default_rng() self._incident_state: Dict[str, object] = {} + self._reward_baseline_episode = 0 + self._reward_baseline_loaded_step = 0 print("SUMO Edge VSL Environment initialized") print(f" Control segments: {self.num_edges}") @@ -498,6 +504,38 @@ class SUMOEdgeVSLEnvironment: ) info["incident_lane_index"] = int(incident.get("lane_index", -1)) + def _load_episode_reward_baseline(self, episode: int) -> Dict[int, Dict[str, float]]: + mode = str(self.reward_config.mode).lower() + if mode in {"paired_no_control", "episode_baseline"}: + return wait_for_episode_baseline( + baseline_dir=self.reward_baseline_dir, + episode=episode, + min_step=self._reward_baseline_loaded_step + 1, + timeout_s=self.reward_config.baseline_wait_timeout_s, + poll_interval_s=self.reward_config.baseline_poll_interval_s, + ) + return {} + + def _sync_episode_reward_baseline(self, min_step: int) -> None: + mode = str(self.reward_config.mode).lower() + if mode not in {"paired_no_control", "episode_baseline"}: + return + if self._reward_baseline_episode != self._episode_count: + self._reward_baseline_episode = self._episode_count + self._reward_baseline_loaded_step = 0 + self.reward_calculator.set_step_baseline({}) + if self._reward_baseline_loaded_step >= min_step: + return + baseline = wait_for_episode_baseline( + baseline_dir=self.reward_baseline_dir, + episode=self._episode_count, + min_step=min_step, + timeout_s=self.reward_config.baseline_wait_timeout_s, + poll_interval_s=self.reward_config.baseline_poll_interval_s, + ) + self.reward_calculator.set_step_baseline(baseline) + self._reward_baseline_loaded_step = max(baseline) if baseline else 0 + def _start_sumo(self, seed: Optional[int] = None): if self._sumo_running: self._close_sumo() @@ -617,6 +655,13 @@ class SUMOEdgeVSLEnvironment: self._completed_mainline_travel_times = [] self._interval_mainline_travel_times = [] self._reset_incident_runtime(seed) + mode = str(self.reward_config.mode).lower() + if mode in {"paired_no_control", "episode_baseline"}: + self._reward_baseline_episode = self._episode_count + self._reward_baseline_loaded_step = 0 + self.reward_calculator.set_step_baseline({}) + else: + self.reward_calculator.set_step_baseline(self._load_episode_reward_baseline(self._episode_count)) self._start_sumo(seed=seed) warmup_steps = int(self.warmup_time / self.control_interval) @@ -659,6 +704,8 @@ class SUMOEdgeVSLEnvironment: detector_data = self._get_edge_detector_data() state = self._collect_state(detector_data) info = self._collect_runtime_metrics(detector_data) + info["step"] = self.current_step + 1 + self._sync_episode_reward_baseline(int(info["step"])) info["detector_cells"] = self._collect_all_detector_cells() if self.collect_detector_cells else [] reward = self._calculate_reward(info) self._last_reward = reward @@ -1040,7 +1087,7 @@ class SUMOEdgeVSLEnvironment: info["relative_speed_samples"] = relative_speed_samples info["relative_speed_variance"] = float(np.var(relative_speed_samples)) if relative_speed_samples else 0.0 info["speed_variance_norm"] = ( - info["relative_speed_variance"] / max(self.reward_config.v_limit ** 2, 1e-6) + info["relative_speed_variance"] / max(self.free_flow_speed ** 2, 1e-6) ) info["ttc_samples"] = ttc_samples positive_ttc_samples = [value for value in ttc_samples if value > 0.0] diff --git a/envs/reward_design_blueprint.py b/envs/reward_design_blueprint.py deleted file mode 100644 index f6dc5b9..0000000 --- a/envs/reward_design_blueprint.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Minimal reward blueprint for the current freeway VSL study.""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Iterable, Tuple - - -@dataclass(frozen=True) -class RewardTerm: - name: str - symbol: str - objective: str - rationale: str - formula_tex: str - required_signals: Tuple[str, ...] - - def to_markdown(self) -> str: - return "\n".join( - [ - f"### {self.name}", - f"- Symbol: `{self.symbol}`", - f"- Objective: {self.objective}", - f"- Rationale: {self.rationale}", - f"- Formula: `{self.formula_tex}`", - "- Required signals: " + ", ".join(f"`{value}`" for value in self.required_signals), - ] - ) - - -@dataclass(frozen=True) -class RewardBlueprint: - name: str - scenario_summary: str - primary_objective: str - design_principles: Tuple[str, ...] - terms: Tuple[RewardTerm, ...] - global_formula_tex: str - excluded_metrics: Tuple[str, ...] = () - implementation_notes: Tuple[str, ...] = () - - def term_names(self) -> Tuple[str, ...]: - return tuple(term.name for term in self.terms) - - def to_markdown(self) -> str: - lines = [ - f"# {self.name}", - "", - "## Scenario", - self.scenario_summary, - "", - "## Primary Objective", - self.primary_objective, - "", - "## Global Formula", - f"`{self.global_formula_tex}`", - "", - "## Design Principles", - ] - lines.extend(f"- {item}" for item in self.design_principles) - lines.append("") - lines.append("## Reward Terms") - for term in self.terms: - lines.append(term.to_markdown()) - lines.append("") - if self.excluded_metrics: - lines.append("## Metrics To Avoid As Primary Reward Drivers") - lines.extend(f"- {item}" for item in self.excluded_metrics) - lines.append("") - if self.implementation_notes: - lines.append("## Implementation Notes") - lines.extend(f"- {item}" for item in self.implementation_notes) - return "\n".join(lines).rstrip() + "\n" - - -def build_tca_mappo_reward_blueprint() -> RewardBlueprint: - """Build the current multiplicative-MAUT reward blueprint for corridor VSL.""" - - terms = ( - RewardTerm( - name="efficiency", - symbol="R_efficiency", - objective="Map normalized corridor running efficiency into a bounded utility term.", - rationale=( - "The raw efficiency indicator should not enter the reward linearly. A monotone concave utility " - "function emphasizes recovery from low-efficiency states while avoiding over-rewarding already " - "high-speed regimes." - ), - formula_tex=( - r"R_{\mathrm{efficiency}}(t)=1-\exp\!\left(-\alpha E(t)\right),\ " - r"E(t)=\begin{cases}\mathrm{clip}(\bar{v}(t)/v_{\max},0,1),&N(t)>0\\0,&N(t)=0\end{cases}" - ), - required_signals=("controlled-corridor mean speed", "controlled-corridor active vehicle count", "speed limit"), - ), - RewardTerm( - name="safety", - symbol="R_safety", - objective="Map TTC-based following risk into a bounded safety utility term.", - rationale=( - "For freeway VSL, rear-end conflict risk is better captured by time-to-collision than by " - "stop counting. An exponential safety utility makes the reward sensitive to the spread of " - "short-TTC following states while preserving boundedness and interpretability." - ), - formula_tex=( - r"R_{\mathrm{safety}}(t)=\exp\!\left(-\beta S(t)\right),\ " - r"S(t)=\frac{1}{N(t)}\sum_{i=1}^{N(t)}\max\!\left(0,1-\frac{\mathrm{TTC}_i(t)}{\tau_{\mathrm{ttc}}}\right)" - ), - required_signals=( - "controlled-corridor vehicle speeds", - "leader speeds", - "leader gaps", - "TTC threshold", - ), - ), - ) - - return RewardBlueprint( - name="Multiplicative MAUT Reward Blueprint For TCA-MAPPO", - scenario_summary=( - "The study controls a segmented freeway VSL corridor under fixed control intervals. " - "For architecture comparison, the reward should emphasize a small number of stable, " - "interpretable traffic goals while avoiding overly simplistic linear compensation between " - "efficiency and safety." - ), - primary_objective=( - "Improve corridor running efficiency and traffic safety through a nonlinear utility-based reward." - ), - design_principles=( - "Prefer stable per-step traffic signals over highly time-dependent outflow spikes.", - "Map raw traffic indicators into utility space before aggregation.", - "Avoid full linear substitutability between efficiency and safety.", - "Use a compact nonlinear aggregation that remains easy to interpret and implement.", - "Use a freeway-native rear-end risk surrogate instead of coarse stop counting.", - "Avoid auxiliary regularizers that change the objective across training stages.", - ), - terms=terms, - global_formula_tex=( - r"R(t)=R_{\mathrm{efficiency}}(t)^{\lambda_{\mathrm{eff}}}" - r"R_{\mathrm{safety}}(t)^{\lambda_{\mathrm{safe}}}" - ), - excluded_metrics=( - "Instantaneous outflow as the primary step reward because it is strongly modulated by simulation time.", - "Too many correlated penalties such as shockwave, occupancy, braking, and variance all entering together.", - "Purely linear reward aggregation when safety should not be fully compensated by efficiency gains.", - ), - implementation_notes=( - "The efficiency utility is built from normalized controlled-corridor mean speed.", - "The safety utility is built from a TTC-thresholded average following-risk indicator.", - "The exponents are normalized internally before multiplicative aggregation.", - "Throughput and bottleneck occupancy can still be logged for diagnosis even if they are no longer part of the reward.", - "Training and evaluate now share the same fixed reward structure.", - ), - ) - - -def build_reward_blueprint_markdown() -> str: - return build_tca_mappo_reward_blueprint().to_markdown() - - -def iter_required_signals() -> Iterable[str]: - signals = set() - for term in build_tca_mappo_reward_blueprint().terms: - signals.update(term.required_signals) - return tuple(sorted(signals)) - - -if __name__ == "__main__": - print(build_reward_blueprint_markdown()) diff --git a/envs/reward_system.py b/envs/reward_system.py index f5a7986..40e04e3 100644 --- a/envs/reward_system.py +++ b/envs/reward_system.py @@ -1,4 +1,4 @@ -"""Shared reward configuration and calculation for freeway VSL environments.""" +"""Shared reward calculation for freeway VSL environments.""" from __future__ import annotations @@ -9,22 +9,18 @@ import numpy as np REWARD_COMPONENT_COLUMNS = ( - "r_efficiency", - "r_safety", - "r_utility", + "r_travel_time_improvement", + "r_ttc_improvement", + "r_improvement", ) REWARD_COMPONENT_LABELS = { - "r_efficiency": "R_efficiency", - "r_safety": "R_safety", - "r_utility": "R_utility", + "r_travel_time_improvement": "R_travel_time", + "r_ttc_improvement": "R_ttc", + "r_improvement": "R_total", } -def clip01(value: float) -> float: - return float(np.clip(value, 0.0, 1.0)) - - def init_reward_component_totals() -> Dict[str, float]: return {column: 0.0 for column in REWARD_COMPONENT_COLUMNS} @@ -36,14 +32,17 @@ def average_reward_components(totals: Mapping[str, float], steps: int) -> Dict[s @dataclass(frozen=True) class RewardConfig: - efficiency_alpha: float = 2.1159 - safety_beta: float = 7.8553 - efficiency_exponent: float = 0.50 - safety_exponent: float = 0.50 ttc_threshold_s: float = 2.3 bottleneck_window_size: int = 3 - v_limit: float = 33.33 leader_gap_threshold_m: float = 100.0 + mode: str = "paired_no_control" + baseline_dir: str = "" + baseline_key: str = "step" + baseline_wait_timeout_s: float = 3600.0 + baseline_poll_interval_s: float = 1.0 + travel_time_weight: float = 0.5 + ttc_weight: float = 0.5 + travel_time_min_denominator_s: float = 60.0 @classmethod def from_dict( @@ -53,21 +52,26 @@ class RewardConfig: speed_actions_ms: Sequence[float], ) -> "RewardConfig": _ = speed_actions_ms - return cls( - efficiency_alpha=float(raw_cfg.get("efficiency_alpha", 2.1159)), - safety_beta=float(raw_cfg.get("safety_beta", 7.8553)), - efficiency_exponent=float(raw_cfg.get("efficiency_exponent", 0.50)), - safety_exponent=float(raw_cfg.get("safety_exponent", 0.50)), ttc_threshold_s=float(raw_cfg.get("ttc_threshold_s", 2.3)), bottleneck_window_size=max(1, int(raw_cfg.get("bottleneck_window_size", 3))), - v_limit=float(raw_cfg.get("v_limit", 33.33)), leader_gap_threshold_m=float(raw_cfg.get("leader_gap_threshold_m", 100.0)), + mode=str(raw_cfg.get("mode", "paired_no_control")).strip().lower(), + baseline_dir=str(raw_cfg.get("baseline_dir", "") or ""), + baseline_key=str(raw_cfg.get("baseline_key", "step")).strip().lower(), + baseline_wait_timeout_s=float(raw_cfg.get("baseline_wait_timeout_s", 3600.0)), + baseline_poll_interval_s=float(raw_cfg.get("baseline_poll_interval_s", 1.0)), + travel_time_weight=float(raw_cfg.get("travel_time_weight", 0.5)), + ttc_weight=float(raw_cfg.get("ttc_weight", 0.5)), + travel_time_min_denominator_s=max( + float(raw_cfg.get("travel_time_min_denominator_s", 60.0)), + 1e-6, + ), ) class RewardCalculator: - """Encapsulates a multiplicative MAUT reward for freeway VSL control.""" + """Counterfactual reward relative to synchronized no-control episodes.""" def __init__( self, @@ -79,14 +83,10 @@ class RewardCalculator: self.config = config self.controlled_edge_start_index = int(controlled_edge_start_index) self.evaluation_mode = bool(evaluation_mode) + self.baseline_by_step: Dict[int, Mapping[str, float]] = {} - def _normalized_preference_exponents(self) -> tuple[float, float]: - eff = max(float(self.config.efficiency_exponent), 0.0) - safe = max(float(self.config.safety_exponent), 0.0) - total = eff + safe - if total <= 1e-8: - return 0.5, 0.5 - return eff / total, safe / total + def set_step_baseline(self, baseline_by_step: Mapping[int, Mapping[str, float]]) -> None: + self.baseline_by_step = dict(baseline_by_step or {}) def calculate( self, @@ -98,26 +98,86 @@ class RewardCalculator: ) -> float: _ = current_edge_speeds, prev_edge_speeds, episode_index - mean_speed = max(float(info.get("mean_speed", 0.0)), 0.0) - num_vehicles = max(int(info.get("num_vehicles", 0)), 0) - efficiency_norm = clip01(mean_speed / max(self.config.v_limit, 1e-6)) if num_vehicles > 0 else 0.0 - r_efficiency = 1.0 - float(np.exp(-max(self.config.efficiency_alpha, 0.0) * efficiency_norm)) - - ttc_risk_rate = clip01(float(info.get("ttc_risk_rate", 0.0))) - safety_risk = ttc_risk_rate - r_safety = float(np.exp(-max(self.config.safety_beta, 0.0) * safety_risk)) - lambda_eff, lambda_safe = self._normalized_preference_exponents() - r_utility = float( - np.power(max(r_efficiency, 1e-8), lambda_eff) - * np.power(max(r_safety, 1e-8), lambda_safe) + current_ttc_risk = _clip(float(info.get("ttc_risk_rate", 0.0)), 0.0, 1.0) + current_travel_time = float( + info.get("mainline_travel_time_cumulative_mean_s", np.nan) ) - info["r_efficiency"] = float(r_efficiency) - info["r_safety"] = float(r_safety) - info["r_utility"] = float(r_utility) - info["efficiency_norm"] = float(efficiency_norm) - info["safety_risk_norm"] = float(safety_risk) + baseline = self._get_baseline(info) + baseline_ttc_risk = _safe_baseline_float(baseline, "ttc_risk_rate") + baseline_travel_time = _safe_baseline_float( + baseline, + "mainline_travel_time_cumulative_mean_s", + ) + + travel_time_improvement = self._travel_time_improvement( + current_travel_time=current_travel_time, + baseline_travel_time=baseline_travel_time, + ) + ttc_improvement = self._ttc_improvement( + current_ttc_risk=current_ttc_risk, + baseline_ttc_risk=baseline_ttc_risk, + ) + reward = float( + self.config.travel_time_weight * travel_time_improvement + + self.config.ttc_weight * ttc_improvement + ) + + info["reward_mode"] = self.config.mode + info["safety_risk_norm"] = float(current_ttc_risk) info["ttc_threshold_s"] = float(self.config.ttc_threshold_s) - info["efficiency_lambda"] = float(lambda_eff) - info["safety_lambda"] = float(lambda_safe) - return float(r_utility) + info["baseline_mainline_travel_time_cumulative_mean_s"] = float(baseline_travel_time) + info["baseline_ttc_risk_rate"] = float(baseline_ttc_risk) + info["travel_time_relative_denominator_s"] = float( + self._travel_time_denominator(baseline_travel_time) + ) + info["r_travel_time_improvement"] = float(travel_time_improvement) + info["r_ttc_improvement"] = float(ttc_improvement) + info["r_improvement"] = float(reward) + + if self.config.mode in {"paired_no_control", "episode_baseline"}: + return reward + return 0.0 + + def _get_baseline(self, info: Mapping[str, object]) -> Mapping[str, float] | None: + lookup_key = "sim_time" if self.config.baseline_key == "sim_time" else "step" + try: + key_value = int(round(float(info.get(lookup_key, 0.0)))) + except (TypeError, ValueError): + key_value = 0 + return self.baseline_by_step.get(key_value) + + def _travel_time_denominator(self, baseline_travel_time: float) -> float: + if not np.isfinite(baseline_travel_time): + return float("nan") + return max(float(baseline_travel_time), self.config.travel_time_min_denominator_s) + + def _travel_time_improvement( + self, + *, + current_travel_time: float, + baseline_travel_time: float, + ) -> float: + denominator = self._travel_time_denominator(baseline_travel_time) + if not np.isfinite(current_travel_time) or not np.isfinite(denominator): + return 0.0 + return _clip((baseline_travel_time - current_travel_time) / denominator, -1.0, 1.0) + + @staticmethod + def _ttc_improvement(*, current_ttc_risk: float, baseline_ttc_risk: float) -> float: + if not np.isfinite(baseline_ttc_risk): + return 0.0 + return _clip(baseline_ttc_risk - current_ttc_risk, -1.0, 1.0) + + +def _safe_baseline_float(baseline: Mapping[str, float] | None, key: str) -> float: + if baseline is None: + return float("nan") + try: + return float(baseline.get(key, np.nan)) + except (TypeError, ValueError): + return float("nan") + + +def _clip(value: float, lower: float, upper: float) -> float: + return float(np.clip(value, lower, upper)) diff --git a/run_all_training.py b/run_all_training.py index 9b65efb..4099d5d 100644 --- a/run_all_training.py +++ b/run_all_training.py @@ -130,7 +130,9 @@ def print_failure_excerpt(name, log_path, returncode): if tail_lines: print(" Last log lines:") for line in tail_lines: - print(f" {line}") + encoding = sys.stdout.encoding or "utf-8" + safe_line = line.encode(encoding, errors="replace").decode(encoding, errors="replace") + print(f" {safe_line}") else: print(" No readable log content found.") diff --git a/scripts/plot_live_training.py b/scripts/plot_live_training.py index bd83b34..4c111cc 100644 --- a/scripts/plot_live_training.py +++ b/scripts/plot_live_training.py @@ -57,8 +57,8 @@ MODEL_COLORS = { "td3": "#17becf", "sctd3": "#bcbd22", } -EFFICIENCY_COLUMN = "r_efficiency" -EFFICIENCY_LABEL = REWARD_COMPONENT_LABELS.get(EFFICIENCY_COLUMN, "Running Efficiency") +EFFICIENCY_COLUMN = "r_travel_time_improvement" +EFFICIENCY_LABEL = REWARD_COMPONENT_LABELS.get(EFFICIENCY_COLUMN, "Travel Time Improvement") def parse_args(): diff --git a/training/registry.py b/training/registry.py index 7eae428..1232bb7 100644 --- a/training/registry.py +++ b/training/registry.py @@ -1,52 +1,80 @@ """Central registry for training entry functions.""" -from typing import Callable, Dict, List -from training.train_appo import train_sumo_appo -from training.train_dcmappo import train_sumo_dcmappo -from training.train_dcqmix import train_sumo_dcqmix -from training.train_d3pg import train_sumo_d3pg -from training.train_ddqn import train_sumo_ddqn -from training.train_ddpg import train_sumo_ddpg -from training.train_dqn import train_sumo_dqn -from training.train_gpro import train_sumo_gpro -from training.train_madqn import train_sumo_madqn -from training.train_mappo import train_sumo_mappo -from training.train_ppo import train_sumo_ppo -from training.train_qmix import train_sumo_qmix -from training.train_sac import train_sumo_sac -from training.train_sctd3 import train_sumo_sctd3 -from training.train_tcamappo import train_sumo_tcamappo -from training.train_td3 import train_sumo_td3 +from __future__ import annotations + +import importlib +from typing import Callable, Dict, List, Tuple -# DEFAULT_MODELS: List[str] = ["ppo"] -DEFAULT_MODELS: List[str] = ["ppo", "gpro", "mappo", "tcamappo", "dcmappo", "dqn", "madqn", "ddqn", "qmix", "dcqmix", "ddpg", "d3pg", "sac", "td3"] -ALL_MODELS: List[str] = ["ppo", "gpro", "appo", "mappo", "tcamappo", "dcmappo", "dqn", "madqn", "ddqn", "qmix", "dcqmix", "ddpg", "d3pg", "sac", "td3", "sctd3"] - - -TRAINERS: Dict[str, Callable] = { - "ppo": train_sumo_ppo, - "gpro": train_sumo_gpro, - "appo": train_sumo_appo, - "mappo": train_sumo_mappo, - "tcamappo": train_sumo_tcamappo, - "dcmappo": train_sumo_dcmappo, - "dqn": train_sumo_dqn, - "madqn": train_sumo_madqn, - "ddqn": train_sumo_ddqn, - "qmix": train_sumo_qmix, - "dcqmix": train_sumo_dcqmix, - "ddpg": train_sumo_ddpg, - "d3pg": train_sumo_d3pg, - "sac": train_sumo_sac, - "td3": train_sumo_td3, - "sctd3": train_sumo_sctd3, +TRAINER_SPECS: Dict[str, Tuple[str, str]] = { + "no_control": ("training.train_no_control", "train_sumo_no_control"), + "ppo": ("training.train_ppo", "train_sumo_ppo"), + "gpro": ("training.train_gpro", "train_sumo_gpro"), + "appo": ("training.train_appo", "train_sumo_appo"), + "mappo": ("training.train_mappo", "train_sumo_mappo"), + "tcamappo": ("training.train_tcamappo", "train_sumo_tcamappo"), + "dcmappo": ("training.train_dcmappo", "train_sumo_dcmappo"), + "dqn": ("training.train_dqn", "train_sumo_dqn"), + "madqn": ("training.train_madqn", "train_sumo_madqn"), + "ddqn": ("training.train_ddqn", "train_sumo_ddqn"), + "qmix": ("training.train_qmix", "train_sumo_qmix"), + "dcqmix": ("training.train_dcqmix", "train_sumo_dcqmix"), + "ddpg": ("training.train_ddpg", "train_sumo_ddpg"), + "d3pg": ("training.train_d3pg", "train_sumo_d3pg"), + "sac": ("training.train_sac", "train_sumo_sac"), + "td3": ("training.train_td3", "train_sumo_td3"), + "sctd3": ("training.train_sctd3", "train_sumo_sctd3"), } +DEFAULT_MODELS: List[str] = [ + "no_control", + "ppo", + "gpro", + "mappo", + "tcamappo", + "dcmappo", + "dqn", + "madqn", + "ddqn", + "qmix", + "dcqmix", + "ddpg", + "d3pg", + "sac", + "td3", +] +ALL_MODELS: List[str] = list(TRAINER_SPECS.keys()) + + +def get_trainer(model_name: str) -> Callable: + model = normalize_model_name(model_name) + module_name, function_name = TRAINER_SPECS[model] + module = importlib.import_module(module_name) + return getattr(module, function_name) + + +class _TrainerRegistry(dict): + def __contains__(self, key): + return str(key).strip().lower() in TRAINER_SPECS + + def __getitem__(self, key): + return get_trainer(str(key)) + + def keys(self): + return TRAINER_SPECS.keys() + + def items(self): + for key in TRAINER_SPECS: + yield key, get_trainer(key) + + +TRAINERS: Dict[str, Callable] = _TrainerRegistry() + + def normalize_model_name(name: str) -> str: model = name.strip().lower() - if model not in TRAINERS: + if model not in TRAINER_SPECS: raise ValueError(f"Unsupported model name: {name}") return model diff --git a/training/train_appo.py b/training/train_appo.py index 4208439..f30c2e5 100644 --- a/training/train_appo.py +++ b/training/train_appo.py @@ -1,6 +1,6 @@ """ -基于 SUMO+TraCI 的 APPO 训练脚本 -使用微观仿真环境训练 VSL 控制策略 +鍩轰簬 SUMO+TraCI 鐨?APPO 璁粌鑴氭湰 +浣跨敤寰浠跨湡鐜璁粌 VSL 鎺у埗绛栫暐 """ import os import sys @@ -26,7 +26,7 @@ from utils.seeding import derive_seed, resolve_base_seed, set_global_seed def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): - """SUMO 环境下的 APPO 训练主函数""" + """Train APPO on the SUMO VSL environment.""" with open("config_sumo_vsl.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) @@ -36,7 +36,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): set_global_seed(base_seed) start_episode = 1 - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "appo", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -47,6 +47,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -61,15 +62,14 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): action_dims = [env.action_dim] * env.num_controlled_edges print("=" * 70) - print("APPO训练 - SUMO+TraCI VSL 环境") + print("APPO training - SUMO+TraCI VSL") print("=" * 70) - print(f" 状态维度: {state_dim}") - print(f" 动作空间: {action_dims}") - print(f" Episode 步数: {env.episode_length}") - print(f" 控制间隔: {env.control_interval}s") - print(f" 隐藏维度: {agent_config.get('hidden_dim', 128)}") - print(f" 学习率: {agent_config.get('learning_rate', 3e-4)}") - print(f" 设备: {agent_config.get('device', 'cuda')}") + print(f" State dim: {state_dim}") + print(f" Action dims: {action_dims}") + print(f" Episode length: {env.episode_length}") + print(f" Control interval: {env.control_interval}s") + print(f" Learning rate: {agent_config.get('learning_rate', 3e-4)}") + print(f" Device: {agent_config.get('device', 'cuda')}") print() agent = APPOAgent( @@ -95,12 +95,12 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): total_episodes=train_config["num_episodes"] ) - # 训练参数 + # 璁粌鍙傛暟 num_episodes = train_config["num_episodes"] save_freq = train_config.get("save_freq", 50) log_freq = train_config.get("log_freq", 10) - # 统计变量 + # 缁熻鍙橀噺 episode_rewards = [] episode_throughputs = [] episode_mean_speeds = [] @@ -111,11 +111,11 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): entropies = [] best_reward = -float("inf") - print("开始训练...\n") + print("Starting training...\n") try: for episode in range(start_episode, num_episodes + 1): - # 每个 episode 使用不同 seed 引入随机性 + # 姣忎釜 episode 浣跨敤涓嶅悓 seed 寮曞叆闅忔満鎬? seed = derive_seed(base_seed, episode) state = env.reset(seed=seed) episode_reward = 0 @@ -158,7 +158,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): pbar.close() - # GAE 计算和策略更新 + # GAE 璁$畻鍜岀瓥鐣ユ洿鏂? if done: next_value = 0.0 else: @@ -168,7 +168,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): train_stats = agent.update(next_value) - # 记录统计 + # 璁板綍缁熻 avg_tp = episode_throughput / max(step, 1) avg_speed = episode_speed / max(step, 1) avg_speed_variance_norm = episode_speed_variance_norm / max(step, 1) @@ -200,7 +200,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): ttc_risk=episode_ttc_risk, ) - # 保存最佳模型 + # 淇濆瓨鏈€浣虫ā鍨? episode_summary = { "episode": episode, "reward": float(episode_reward), @@ -229,7 +229,7 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): best_reward = episode_reward agent.save(os.path.join(checkpoint_dir, "model_best.pt")) - # 定期日志 + # 瀹氭湡鏃ュ織 if episode % log_freq == 0: recent_rewards = episode_rewards[-log_freq:] print(f"\nEpisode {episode}/{num_episodes}") @@ -249,20 +249,20 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Value Loss: {train_stats['value_loss']:.4f}") print(f" Entropy: {train_stats['entropy']:.4f}") - # 定期保存 + # 瀹氭湡淇濆瓨 if episode % save_freq == 0: agent.save(os.path.join(checkpoint_dir, f"model_ep{episode}.pt")) except KeyboardInterrupt: - print("\n训练被中断,保存当前模型...") + print("\nTraining interrupted, saving current model...") agent.save(os.path.join(checkpoint_dir, "model_interrupted.pt")) finally: env.close() - # 最终保存 + # 鏈€缁堜繚瀛? agent.save(os.path.join(checkpoint_dir, f"model_ep{num_episodes}.pt")) - # 绘制训练曲线 + # 缁樺埗璁粌鏇茬嚎 plot_training_curves( episode_rewards, episode_throughputs, episode_mean_speeds, episode_speed_variance_norms, episode_ttc_risks, policy_losses, value_losses, @@ -270,8 +270,8 @@ def train_sumo_appo(log_dir=None, checkpoint_dir=None, run_timestamp=None): ) print("=" * 70) - print("训练完成!") - print(f" 最佳奖励: {best_reward:.2f}") - print(f" 模型目录: {checkpoint_dir}") - print(f" 日志目录: {log_dir}") + print("Training complete") + print(f" Best reward: {best_reward:.2f}") + print(f" Model dir: {checkpoint_dir}") + print(f" Log dir: {log_dir}") print("=" * 70) diff --git a/training/train_dcmappo.py b/training/train_dcmappo.py index 0e71392..025b743 100644 --- a/training/train_dcmappo.py +++ b/training/train_dcmappo.py @@ -1,4 +1,4 @@ -""" +""" Directional Corridor MAPPO training script for SUMO + TraCI VSL. """ import os @@ -30,7 +30,7 @@ def train_sumo_dcmappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "dcmappo", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -41,6 +41,7 @@ def train_sumo_dcmappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -263,3 +264,6 @@ def train_sumo_dcmappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Model dir: {checkpoint_dir}") print(f" Log dir: {log_dir}") print("=" * 70) + + + diff --git a/training/train_ddpg.py b/training/train_ddpg.py index c7c3a34..7ee856a 100644 --- a/training/train_ddpg.py +++ b/training/train_ddpg.py @@ -1,6 +1,6 @@ """ -基于 SUMO+TraCI 的 DDPG 训练脚本 -使用 Stable-Baselines3 的 DDPG 算法 +鍩轰簬 SUMO+TraCI 鐨?DDPG 璁粌鑴氭湰 +浣跨敤 Stable-Baselines3 鐨?DDPG 绠楁硶 """ import os import copy @@ -23,7 +23,7 @@ from utils.seeding import derive_seed, resolve_base_seed, set_global_seed def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): - """SUMO 环境下的 DDPG 训练主函数""" + """Train DDPG on the SUMO VSL environment.""" with open("config_sumo_vsl.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) @@ -32,7 +32,7 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "ddpg", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -43,6 +43,7 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -57,14 +58,14 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): action_dims = [env.action_dim] * env.num_controlled_edges print("=" * 70) - print("DDPG训练 - SUMO+TraCI VSL 环境") + print("DDPG training - SUMO+TraCI VSL") print("=" * 70) - print(f" 状态维度: {state_dim}") - print(f" 动作空间: {action_dims}") - print(f" Episode 步数: {env.episode_length}") - print(f" 控制间隔: {env.control_interval}s") - print(f" 学习率: {agent_config.get('learning_rate', 3e-4)}") - print(f" 设备: {agent_config.get('device', 'cuda')}") + print(f" State dim: {state_dim}") + print(f" Action dims: {action_dims}") + print(f" Episode length: {env.episode_length}") + print(f" Control interval: {env.control_interval}s") + print(f" Learning rate: {agent_config.get('learning_rate', 3e-4)}") + print(f" Device: {agent_config.get('device', 'cuda')}") print() agent = DDPGAgent( @@ -94,7 +95,7 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): episode_ttc_risks = [] best_reward = -float("inf") - print("开始训练...\n") + print("Starting training...\n") try: for episode in range(1, num_episodes + 1): @@ -191,7 +192,7 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): agent.save(os.path.join(checkpoint_dir, f"model_ep{episode}")) except KeyboardInterrupt: - print("\n训练被中断,保存当前模型...") + print("\nTraining interrupted, saving current model...") agent.save(os.path.join(checkpoint_dir, "model_interrupted")) finally: env.close() @@ -204,8 +205,8 @@ def train_sumo_ddpg(log_dir=None, checkpoint_dir=None, run_timestamp=None): ) print("=" * 70) - print("训练完成!") - print(f" 最佳奖励: {best_reward:.2f}") - print(f" 模型目录: {checkpoint_dir}") - print(f" 日志目录: {log_dir}") + print("Training complete") + print(f" Best reward: {best_reward:.2f}") + print(f" Model dir: {checkpoint_dir}") + print(f" Log dir: {log_dir}") print("=" * 70) diff --git a/training/train_gpro.py b/training/train_gpro.py index ebd24e0..3327d16 100644 --- a/training/train_gpro.py +++ b/training/train_gpro.py @@ -1,4 +1,4 @@ -"""GRPO-inspired PPO training entrypoint for corridor VSL control.""" +"""GRPO-inspired PPO training entrypoint for corridor VSL control.""" import os import copy import yaml @@ -29,7 +29,7 @@ def train_sumo_gpro(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "gpro", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -40,6 +40,7 @@ def train_sumo_gpro(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -282,3 +283,6 @@ def train_sumo_gpro(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Checkpoints: {checkpoint_dir}") print(f" Logs: {log_dir}") print("=" * 70) + + + diff --git a/training/train_mappo.py b/training/train_mappo.py index 7e11cba..1a53f2e 100644 --- a/training/train_mappo.py +++ b/training/train_mappo.py @@ -1,4 +1,4 @@ -""" +""" MAPPO training script for SUMO + TraCI VSL. """ import os @@ -31,7 +31,7 @@ def train_sumo_mappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "mappo", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -42,6 +42,7 @@ def train_sumo_mappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -260,3 +261,6 @@ def train_sumo_mappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Model dir: {checkpoint_dir}") print(f" Log dir: {log_dir}") print("=" * 70) + + + diff --git a/training/train_no_control.py b/training/train_no_control.py new file mode 100644 index 0000000..6fbefb5 --- /dev/null +++ b/training/train_no_control.py @@ -0,0 +1,209 @@ +"""No-control baseline runner for synchronized reward baselines.""" + +from __future__ import annotations + +import copy +import os + +import matplotlib +import numpy as np +import yaml +from tqdm import tqdm + +matplotlib.use("Agg") + +from envs.edge_vsl_env import SUMOEdgeVSLEnvironment +from envs.reward_system import REWARD_COMPONENT_COLUMNS, average_reward_components, init_reward_component_totals +from utils.config import get_training_config +from utils.episode_artifacts import save_training_episode_artifacts +from utils.logger import TrainingLogger +from utils.plot import plot_training_curves +from utils.reward_baseline import EpisodeBaselineWriter, resolve_baseline_dir +from utils.run_dirs import resolve_run_dirs, write_shared_run_config +from utils.seeding import derive_seed, resolve_base_seed, set_global_seed + + +def _select_no_control_action(env: SUMOEdgeVSLEnvironment) -> np.ndarray: + if env.num_controlled_edges <= 0: + return np.zeros(0, dtype=np.int64) + return np.full(env.num_controlled_edges, env.action_dim - 1, dtype=np.int64) + + +def _baseline_row(episode: int, seed: int | None, reward: float, info: dict) -> dict: + return { + "episode": int(episode), + "step": int(info.get("step", 0)), + "seed": "" if seed is None else int(seed), + "sim_time": float(info.get("sim_time", np.nan)), + "reward": float(reward), + "mean_speed_kmh": float(info.get("mean_speed_kmh", np.nan)), + "num_vehicles": int(info.get("num_vehicles", 0)), + "mainline_completed_count": int(info.get("mainline_completed_count", 0)), + "mainline_interval_travel_time_mean_s": float( + info.get("mainline_interval_travel_time_mean_s", np.nan) + ), + "mainline_travel_time_cumulative_mean_s": float( + info.get("mainline_travel_time_cumulative_mean_s", np.nan) + ), + "ttc_risk_rate": float(info.get("ttc_risk_rate", np.nan)), + } + + +def train_sumo_no_control(log_dir=None, checkpoint_dir=None, run_timestamp=None): + with open("config_sumo_vsl.yaml", "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + train_config = get_training_config(config) + base_seed = resolve_base_seed(train_config) + set_global_seed(base_seed) + + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( + "no_control", + log_dir=log_dir, + checkpoint_dir=checkpoint_dir, + run_timestamp=run_timestamp, + ) + os.makedirs(checkpoint_dir, exist_ok=True) + os.makedirs(log_dir, exist_ok=True) + + runtime_config = copy.deepcopy(config) + runtime_config.setdefault("runtime", {})["output_dir"] = log_dir + runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp + reward_cfg = runtime_config.setdefault("environment", {}).setdefault("reward", {}) + reward_cfg["mode"] = "absolute" + reward_cfg["baseline_dir"] = resolve_baseline_dir(runtime_config, resolved_run_timestamp) + baseline_dir = reward_cfg["baseline_dir"] + + write_shared_run_config( + runtime_config, + log_dir=log_dir, + checkpoint_dir=checkpoint_dir, + run_timestamp=run_timestamp, + ) + + logger = TrainingLogger(log_dir, "no_control") + env = SUMOEdgeVSLEnvironment(runtime_config) + + num_episodes = train_config["num_episodes"] + log_freq = train_config.get("log_freq", 10) + snapshot_interval = int(train_config.get("artifact_snapshot_interval", 50)) + episode_rewards = [] + episode_throughputs = [] + episode_mean_speeds = [] + episode_speed_variance_norms = [] + episode_ttc_risks = [] + + print("=" * 70) + print("NO_CONTROL baseline runner - synchronized reward baseline") + print("=" * 70) + print(f" Episode steps: {env.episode_length}") + print(f" Baseline dir: {baseline_dir}") + print(f" Global seed: {base_seed if base_seed is not None else 'None (random)'}") + print() + + try: + for episode in range(1, num_episodes + 1): + seed = derive_seed(base_seed, episode) + env.reset(seed=seed) + + baseline_writer = EpisodeBaselineWriter(baseline_dir=baseline_dir, episode=episode) + episode_reward = 0.0 + episode_throughput = 0.0 + episode_speed = 0.0 + episode_speed_variance_norm = 0.0 + episode_ttc_risk = 0.0 + episode_reward_components = init_reward_component_totals() + done = False + step = 0 + + pbar = tqdm(total=env.episode_length, desc=f"NO_CONTROL Ep {episode}/{num_episodes}", leave=False) + while not done: + action = _select_no_control_action(env) + _, reward, done, info = env.step(action, apply_control=True) + baseline_writer.append(_baseline_row(episode, seed, reward, info)) + + episode_reward += reward + episode_throughput += info["throughput"] + episode_speed += info["mean_speed_kmh"] + episode_speed_variance_norm += info["speed_variance_norm"] + episode_ttc_risk += float(info.get("ttc_risk_rate", 0.0)) + for column in REWARD_COMPONENT_COLUMNS: + episode_reward_components[column] += float(info.get(column, 0.0)) + step += 1 + + pbar.set_postfix(r=f"{episode_reward:.1f}", v=f"{info['mean_speed_kmh']:.1f}") + pbar.update(1) + + pbar.close() + + avg_tp = episode_throughput / max(step, 1) + avg_speed = episode_speed / max(step, 1) + avg_speed_variance_norm = episode_speed_variance_norm / max(step, 1) + avg_reward_components = average_reward_components(episode_reward_components, step) + + episode_rewards.append(episode_reward) + episode_throughputs.append(avg_tp) + episode_mean_speeds.append(avg_speed) + episode_speed_variance_norms.append(avg_speed_variance_norm) + episode_ttc_risks.append(episode_ttc_risk) + + logger.log( + episode, + episode_reward, + avg_tp, + avg_speed, + speed_variance_norm=avg_speed_variance_norm, + reward_components=avg_reward_components, + ttc_risk=episode_ttc_risk, + ) + + episode_summary = { + "episode": int(episode), + "reward": float(episode_reward), + "avg_throughput": float(avg_tp), + "avg_mean_speed_kmh": float(avg_speed), + "avg_speed_variance_norm": float(avg_speed_variance_norm), + "ttc_risk": float(episode_ttc_risk), + "baseline_dir": baseline_dir, + } + for column, value in avg_reward_components.items(): + episode_summary[f"avg_{column}"] = float(value) + + save_training_episode_artifacts( + log_dir=log_dir, + episode=episode, + episode_metrics=env.episode_metrics, + control_edges=env.control_edges, + summary=episode_summary, + snapshot_interval=snapshot_interval, + ) + + if episode % log_freq == 0: + recent_rewards = episode_rewards[-log_freq:] + print(f"\nNO_CONTROL episode {episode}/{num_episodes}") + print(f" Reward: {episode_reward:.2f} (Avg: {np.mean(recent_rewards):.2f})") + print(f" Mean Speed: {avg_speed:.1f} km/h") + + finally: + env.close() + + plot_training_curves( + episode_rewards, + episode_throughputs, + episode_mean_speeds, + episode_speed_variance_norms, + episode_ttc_risks, + save_path=os.path.join(log_dir, "training_curves.png"), + ) + return { + "model": "no_control", + "log_dir": log_dir, + "checkpoint_dir": checkpoint_dir, + "baseline_dir": baseline_dir, + } + + +if __name__ == "__main__": + train_sumo_no_control() + diff --git a/training/train_ppo.py b/training/train_ppo.py index 51cfa8d..725e51b 100644 --- a/training/train_ppo.py +++ b/training/train_ppo.py @@ -1,6 +1,6 @@ """ -基于 SUMO+TraCI 的 PPO 训练脚本 -使用微观仿真环境训练 VSL 控制策略 +鍩轰簬 SUMO+TraCI 鐨?PPO 璁粌鑴氭湰 +浣跨敤寰浠跨湡鐜璁粌 VSL 鎺у埗绛栫暐 """ import os import sys @@ -26,8 +26,8 @@ from utils.seeding import derive_seed, resolve_base_seed, set_global_seed def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): - """SUMO 环境下的 PPO 训练主函数""" - # 加载配置 + """Train PPO on the SUMO VSL environment.""" + # 鍔犺浇閰嶇疆 with open("config_sumo_vsl.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) @@ -37,7 +37,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): set_global_seed(base_seed) start_episode = 1 - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "ppo", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -48,6 +48,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -57,26 +58,24 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): logger = TrainingLogger(log_dir, "ppo") - # 创建环境 + # 鍒涘缓鐜 env = SUMOEdgeVSLEnvironment(runtime_config) state_dim = env.state_dim action_dims = [env.action_dim] * env.num_controlled_edges print("=" * 70) - print("PPO 训练 - SUMO+TraCI VSL 环境") + print("PPO training - SUMO+TraCI VSL") print("=" * 70) - print(f" 状态维度: {state_dim}") - print(f" 控制边数: {env.num_edges}") - print(f" 每边动作数: {env.action_dim}") - print(f" Episode 步数: {env.episode_length}") - print(f" 控制间隔: {env.control_interval}s") - print(f" 隐藏层: {agent_config.get('hidden_layers', [512, 256])}") - print(f" 学习率: {agent_config.get('learning_rate', 3e-4)}") - print(f" 设备: {agent_config.get('device', 'cuda')}") + print(f" State dim: {state_dim}") + print(f" Action dims: {action_dims}") + print(f" Episode length: {env.episode_length}") + print(f" Control interval: {env.control_interval}s") + print(f" Learning rate: {agent_config.get('learning_rate', 3e-4)}") + print(f" Device: {agent_config.get('device', 'cuda')}") print() - # 创建 PPO 智能体 + # 鍒涘缓 PPO 鏅鸿兘浣? agent = PPOAgent( state_dim=state_dim, action_dims=action_dims, @@ -95,12 +94,12 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): total_episodes=train_config["num_episodes"], ) - # 训练参数 + # 璁粌鍙傛暟 num_episodes = train_config["num_episodes"] save_freq = train_config.get("save_freq", 50) log_freq = train_config.get("log_freq", 10) - # 统计变量 + # 缁熻鍙橀噺 episode_rewards = [] episode_throughputs = [] episode_mean_speeds = [] @@ -111,11 +110,11 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): entropies = [] best_reward = -float("inf") - print("开始训练...\n") + print("Starting training...\n") try: for episode in range(start_episode, num_episodes + 1): - # 每个 episode 使用不同 seed 引入随机性 + # 姣忎釜 episode 浣跨敤涓嶅悓 seed 寮曞叆闅忔満鎬? seed = derive_seed(base_seed, episode) state = env.reset(seed=seed) episode_reward = 0 @@ -158,7 +157,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): pbar.close() - # GAE 计算和策略更新 + # GAE 璁$畻鍜岀瓥鐣ユ洿鏂? if done: next_value = 0.0 else: @@ -168,7 +167,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): train_stats = agent.update(next_value) - # 记录统计 + # 璁板綍缁熻 avg_tp = episode_throughput / max(step, 1) avg_speed = episode_speed / max(step, 1) avg_speed_variance_norm = episode_speed_variance_norm / max(step, 1) @@ -200,7 +199,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): ttc_risk=episode_ttc_risk, ) - # 保存最佳模型 + # 淇濆瓨鏈€浣虫ā鍨? episode_summary = { "episode": episode, "reward": float(episode_reward), @@ -229,7 +228,7 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): best_reward = episode_reward agent.save(os.path.join(checkpoint_dir, "model_best.pt")) - # 定期日志 + # 瀹氭湡鏃ュ織 if episode % log_freq == 0: recent_rewards = episode_rewards[-log_freq:] print(f"\nEpisode {episode}/{num_episodes}") @@ -249,20 +248,20 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Value Loss: {train_stats['value_loss']:.4f}") print(f" Entropy: {train_stats['entropy']:.4f}") - # 定期保存 + # 瀹氭湡淇濆瓨 if episode % save_freq == 0: agent.save(os.path.join(checkpoint_dir, f"model_ep{episode}.pt")) except KeyboardInterrupt: - print("\n训练被中断,保存当前模型...") + print("\nTraining interrupted, saving current model...") agent.save(os.path.join(checkpoint_dir, "model_interrupted.pt")) finally: env.close() - # 最终保存 + # 鏈€缁堜繚瀛? agent.save(os.path.join(checkpoint_dir, f"model_ep{num_episodes}.pt")) - # 绘制训练曲线 + # 缁樺埗璁粌鏇茬嚎 plot_training_curves( episode_rewards, episode_throughputs, episode_mean_speeds, episode_speed_variance_norms, episode_ttc_risks, policy_losses, value_losses, @@ -270,8 +269,8 @@ def train_sumo_ppo(log_dir=None, checkpoint_dir=None, run_timestamp=None): ) print("=" * 70) - print("训练完成!") - print(f" 最佳奖励: {best_reward:.2f}") - print(f" 模型目录: {checkpoint_dir}") - print(f" 日志目录: {log_dir}") + print("Training complete") + print(f" Best reward: {best_reward:.2f}") + print(f" Model dir: {checkpoint_dir}") + print(f" Log dir: {log_dir}") print("=" * 70) diff --git a/training/train_sac.py b/training/train_sac.py index 5dc43f5..8adfe09 100644 --- a/training/train_sac.py +++ b/training/train_sac.py @@ -1,4 +1,4 @@ -"""SUMO SAC training entrypoint.""" +"""SUMO SAC training entrypoint.""" import os import copy import yaml @@ -29,7 +29,7 @@ def train_sumo_sac(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "sac", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -40,6 +40,7 @@ def train_sumo_sac(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -224,3 +225,6 @@ def train_sumo_sac(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Checkpoints: {checkpoint_dir}") print(f" Logs: {log_dir}") print("=" * 70) + + + diff --git a/training/train_tcamappo.py b/training/train_tcamappo.py index ff3ae15..cb0a1bf 100644 --- a/training/train_tcamappo.py +++ b/training/train_tcamappo.py @@ -1,4 +1,4 @@ -""" +""" Temporal Credit Assignment MAPPO training script for SUMO + TraCI VSL. """ import copy @@ -31,7 +31,7 @@ def train_sumo_tcamappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( "tcamappo", log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -42,6 +42,7 @@ def train_sumo_tcamappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -270,3 +271,6 @@ def train_sumo_tcamappo(log_dir=None, checkpoint_dir=None, run_timestamp=None): print(f" Model dir: {checkpoint_dir}") print(f" Log dir: {log_dir}") print("=" * 70) + + + diff --git a/training/train_td3.py b/training/train_td3.py index ed62b3d..5a9447c 100644 --- a/training/train_td3.py +++ b/training/train_td3.py @@ -1,6 +1,6 @@ """ -基于 SUMO+TraCI 的 TD3 训练脚本 -使用 Stable-Baselines3 的 TD3 算法 +鍩轰簬 SUMO+TraCI 鐨?TD3 璁粌鑴氭湰 +浣跨敤 Stable-Baselines3 鐨?TD3 绠楁硶 """ import os import copy @@ -32,7 +32,7 @@ def train_sumo_td3( display_name: str = "TD3", agent_class=TD3Agent, ): - """SUMO 环境下的 TD3 训练主函数""" + """Train TD3 on the SUMO VSL environment.""" with open("config_sumo_vsl.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) @@ -41,7 +41,7 @@ def train_sumo_td3( base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( model_name, log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -52,6 +52,7 @@ def train_sumo_td3( runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -66,14 +67,14 @@ def train_sumo_td3( action_dims = [env.action_dim] * env.num_controlled_edges print("=" * 70) - print(f"{display_name}训练 - SUMO+TraCI VSL 环境") + print(f"{display_name} training - SUMO+TraCI VSL") print("=" * 70) - print(f" 状态维度: {state_dim}") - print(f" 动作空间: {action_dims}") - print(f" Episode 步数: {env.episode_length}") - print(f" 控制间隔: {env.control_interval}s") - print(f" 学习率: {agent_config.get('learning_rate', 3e-4)}") - print(f" 设备: {agent_config.get('device', 'cuda')}") + print(f" State dim: {state_dim}") + print(f" Action dims: {action_dims}") + print(f" Episode length: {env.episode_length}") + print(f" Control interval: {env.control_interval}s") + print(f" Learning rate: {agent_config.get('learning_rate', 3e-4)}") + print(f" Device: {agent_config.get('device', 'cuda')}") print() common_kwargs = dict( @@ -119,7 +120,7 @@ def train_sumo_td3( episode_ttc_risks = [] best_reward = -float("inf") - print("开始训练...\n") + print("Starting training...\n") try: for episode in range(1, num_episodes + 1): @@ -224,7 +225,7 @@ def train_sumo_td3( agent.save(os.path.join(checkpoint_dir, f"model_ep{episode}")) except KeyboardInterrupt: - print("\n训练被中断,保存当前模型...") + print("\nTraining interrupted, saving current model...") agent.save(os.path.join(checkpoint_dir, "model_interrupted")) finally: env.close() @@ -237,8 +238,8 @@ def train_sumo_td3( ) print("=" * 70) - print("训练完成!") - print(f" 最佳奖励: {best_reward:.2f}") - print(f" 模型目录: {checkpoint_dir}") - print(f" 日志目录: {log_dir}") + print("Training complete") + print(f" Best reward: {best_reward:.2f}") + print(f" Model dir: {checkpoint_dir}") + print(f" Log dir: {log_dir}") print("=" * 70) diff --git a/training/train_value_based.py b/training/train_value_based.py index 87cc938..2c796a5 100644 --- a/training/train_value_based.py +++ b/training/train_value_based.py @@ -1,4 +1,4 @@ -"""Shared training loop for value-based VSL agents.""" +"""Shared training loop for value-based VSL agents.""" from __future__ import annotations import copy @@ -76,7 +76,7 @@ def train_sumo_value_based( base_seed = resolve_base_seed(train_config) set_global_seed(base_seed) - _, checkpoint_dir, log_dir = resolve_run_dirs( + resolved_run_timestamp, checkpoint_dir, log_dir = resolve_run_dirs( model_key, log_dir=log_dir, checkpoint_dir=checkpoint_dir, @@ -88,6 +88,7 @@ def train_sumo_value_based( runtime_config = copy.deepcopy(config) runtime_config.setdefault("runtime", {})["output_dir"] = log_dir runtime_config["runtime"]["evaluation_mode"] = False + runtime_config["runtime"]["run_timestamp"] = resolved_run_timestamp write_shared_run_config( runtime_config, log_dir=log_dir, @@ -266,3 +267,6 @@ def train_sumo_value_based( print(f" Model dir: {checkpoint_dir}") print(f" Log dir: {log_dir}") print("=" * 70) + + + diff --git a/utils/reward_baseline.py b/utils/reward_baseline.py new file mode 100644 index 0000000..e78d2c4 --- /dev/null +++ b/utils/reward_baseline.py @@ -0,0 +1,138 @@ +"""Step-level no-control reward baseline exchange utilities.""" + +from __future__ import annotations + +import csv +import os +import tempfile +import time +from pathlib import Path +from typing import Dict, Iterable, Mapping + + +BASELINE_COLUMNS = [ + "episode", + "step", + "seed", + "sim_time", + "reward", + "mean_speed_kmh", + "num_vehicles", + "mainline_completed_count", + "mainline_interval_travel_time_mean_s", + "mainline_travel_time_cumulative_mean_s", + "ttc_risk_rate", +] + + +def resolve_baseline_dir(config: Mapping[str, object], run_timestamp: str | None = None) -> str: + runtime_cfg = config.get("runtime", {}) if isinstance(config, Mapping) else {} + reward_cfg = (config.get("environment", {}) or {}).get("reward", {}) if isinstance(config, Mapping) else {} + baseline_dir = str( + runtime_cfg.get("reward_baseline_dir") + or reward_cfg.get("baseline_dir") + or "" + ).strip() + if baseline_dir: + return os.path.abspath(baseline_dir) + + timestamp = run_timestamp or str(runtime_cfg.get("run_timestamp", "") or "default") + return os.path.abspath(os.path.join("runs", timestamp, "reward_baseline")) + + +def episode_baseline_path(baseline_dir: str, episode: int) -> str: + return os.path.join(os.path.abspath(baseline_dir), f"episode_{int(episode):04d}.csv") + + +def write_episode_baseline( + *, + baseline_dir: str, + episode: int, + rows: Iterable[Mapping[str, object]], +) -> str: + os.makedirs(baseline_dir, exist_ok=True) + target_path = episode_baseline_path(baseline_dir, episode) + fd, temp_path = tempfile.mkstemp( + prefix=f"episode_{int(episode):04d}_", + suffix=".tmp", + dir=baseline_dir, + text=True, + ) + try: + with os.fdopen(fd, "w", newline="", encoding="utf-8-sig") as f: + writer = csv.DictWriter(f, fieldnames=BASELINE_COLUMNS) + writer.writeheader() + for row in rows: + writer.writerow({column: row.get(column, "") for column in BASELINE_COLUMNS}) + os.replace(temp_path, target_path) + finally: + if os.path.exists(temp_path): + os.remove(temp_path) + return target_path + + +class EpisodeBaselineWriter: + def __init__(self, *, baseline_dir: str, episode: int): + self.baseline_dir = os.path.abspath(baseline_dir) + self.episode = int(episode) + self.rows: list[Mapping[str, object]] = [] + + def append(self, row: Mapping[str, object]) -> str: + self.rows.append(dict(row)) + return write_episode_baseline( + baseline_dir=self.baseline_dir, + episode=self.episode, + rows=self.rows, + ) + + +def read_episode_baseline(path: str) -> Dict[int, Dict[str, float]]: + baseline_by_step: Dict[int, Dict[str, float]] = {} + with open(path, "r", newline="", encoding="utf-8-sig") as f: + reader = csv.DictReader(f) + for row in reader: + try: + step = int(float(row.get("step", ""))) + except (TypeError, ValueError): + continue + baseline_by_step[step] = { + "mean_speed_kmh": _safe_float(row.get("mean_speed_kmh")), + "mainline_completed_count": _safe_float(row.get("mainline_completed_count")), + "mainline_interval_travel_time_mean_s": _safe_float( + row.get("mainline_interval_travel_time_mean_s") + ), + "mainline_travel_time_cumulative_mean_s": _safe_float( + row.get("mainline_travel_time_cumulative_mean_s") + ), + "ttc_risk_rate": _safe_float(row.get("ttc_risk_rate")), + } + return baseline_by_step + + +def wait_for_episode_baseline( + *, + baseline_dir: str, + episode: int, + min_step: int, + timeout_s: float, + poll_interval_s: float, +) -> Dict[int, Dict[str, float]]: + path = episode_baseline_path(baseline_dir, episode) + deadline = time.monotonic() + max(float(timeout_s), 0.0) + while True: + if os.path.isfile(path): + baseline = read_episode_baseline(path) + if baseline and max(baseline) >= int(min_step): + return baseline + if time.monotonic() >= deadline: + raise TimeoutError( + f"Timed out waiting for reward baseline episode={episode} step>={min_step}: {path}" + ) + time.sleep(max(float(poll_interval_s), 0.05)) + + +def _safe_float(value: object) -> float: + try: + return float(value) + except (TypeError, ValueError): + return float("nan")