diff --git a/README.md b/README.md index ba8272a..dda7702 100644 --- a/README.md +++ b/README.md @@ -13,13 +13,15 @@ ctm/ ├── ctm_model.py # 元胞传输模型实现 ├── dqn_agent.py # DQN智能体与经验回放 ├── environment.py # 训练环境 -├── vectorized_env.py # 并行环境包装器 ├── demand_loader.py # CSV流量数据加载器 ├── train.py # 训练脚本 ├── test.py # 测试/评估脚本 ├── utils.py # 工具函数 -├── checkpoints/ # 模型检查点(自动创建) -└── logs/ # 训练日志和图表(自动创建) +├── latest/ # 最新训练运行目录(自动创建) +│ ├── checkpoints/ # 模型检查点 +│ ├── logs/ # 训练日志和图表 +│ └── config.yaml # 训练时使用的配置 +└── runs/ # 历史训练运行归档(自动创建) ``` ## 主要特性 @@ -28,12 +30,13 @@ ctm/ - **DQN智能体**: 基于深度强化学习的限速控制 - **灵活配置**: 通过YAML配置文件轻松调整参数 - **训练与测试**: 独立的训练和评估模式 +- **Baseline对比**: 自动对比无控制策略的性能 - **可视化**: 自动生成训练结果和交通模式图表 - **检查点保存**: 训练过程中定期保存模型 -- **并行训练**: 支持多环境并行训练,显著提升训练效率 - **CSV流量输入**: 支持从CSV文件读取真实交通流量数据 - **随机种子固定**: 确保训练结果可重复 - **最佳模型保存**: 自动保存效果最好的模型 +- **运行管理**: 自动管理训练运行,最新运行保存在latest/,历史运行归档到runs/ ## 安装 @@ -100,7 +103,7 @@ python main.py --mode test --model checkpoints/model_best.pt - `save_freq`: 模型检查点保存频率(默认:50) - `log_freq`: 日志记录频率(默认:10) - `random_seed`: 随机种子(默认:42) -- `num_parallel_envs`: 并行环境数量(默认:4) +- `train_freq`: 训练频率,每N步训练一次(默认:1) ### 奖励函数权重 - `throughput_weight`: 通行量奖励权重(默认:1.0) @@ -112,59 +115,7 @@ python main.py --mode test --model checkpoints/model_best.pt ## 高级功能 -### 1. 并行环境训练 - -系统支持多环境并行训练,可显著提高训练效率。 - -#### 配置方法 - -在 `config.yaml` 中设置: - -```yaml -training: - num_parallel_envs: 4 # 使用4个并行环境 -``` - -#### 推荐配置 - -- **CPU训练**: 2-4个并行环境 -- **GPU训练**: 4-8个并行环境 -- **高性能GPU**: 8-16个并行环境 - -#### 性能对比 - -| 并行环境数 | 相对速度 | 内存占用 | 推荐场景 | -|----------|---------|---------|---------| -| 1 | 1x | 低 | 调试、小规模实验 | -| 2 | ~1.8x | 中 | CPU训练 | -| 4 | ~3.5x | 中高 | 标准训练 | -| 8 | ~6.5x | 高 | GPU训练 | - -#### 使用示例 - -```yaml -training: - num_episodes: 500 - num_parallel_envs: 4 -agent: - batch_size: 128 # 建议增加批量大小 - buffer_size: 100000 -``` - -运行训练: -```bash -python main.py --mode train -``` - -输出示例: -``` -Random seed set to: 42 -Created 4 parallel environments -Using 4 parallel environments -Starting training for 500 episodes... -``` - -### 2. CSV流量输入 +### 1. CSV流量输入 系统支持从CSV文件读取真实交通流量数据。 @@ -283,8 +234,8 @@ agent: **问题:训练速度慢** ```yaml training: - num_parallel_envs: 4 # 启用并行环境 num_episodes: 200 # 减少episode数量 + train_freq: 5 # 降低训练频率 environment: episode_length: 180 # 减少episode长度 ``` @@ -305,21 +256,6 @@ agent: hidden_layers: [256, 256, 128] # 增加网络容量 ``` -### 并行训练相关 - -**问题:并行训练速度提升不明显** -- 检查GPU利用率是否已经很高 -- 瓶颈可能在网络训练而非数据收集 -- 尝试减少训练频率 - -**问题:内存不足** -```yaml -training: - num_parallel_envs: 2 # 减少并行环境数 -agent: - buffer_size: 50000 # 减小缓冲区 -``` - ### CSV流量输入相关 **问题:找不到CSV文件** @@ -335,6 +271,20 @@ agent: ## 技术细节 +### 运行管理 +系统自动管理训练运行,保持目录整洁: +- 最新训练保存在 `latest/` 目录 +- 每次新训练开始时,旧的 `latest/` 会自动移动到 `runs/run_YYYYMMDD_HHMMSS/` +- 每个运行包含:配置文件、模型检查点、训练日志和图表 +- 测试时自动使用 `latest/checkpoints/model_best.pt` 和 `latest/config.yaml` + +### Baseline 对比测试 +测试模式会自动运行 baseline 对比实验: +- Baseline 使用固定的最大速度限制(无动态控制) +- 自动计算 DQN 相对于 baseline 的性能提升百分比 +- 对比指标包括:平均奖励、平均吞吐量 +- 帮助评估 DQN 控制策略的实际效果 + ### 随机种子固定 系统在训练开始时会固定所有随机种子(Python、NumPy、PyTorch),确保: - 训练结果可重复 @@ -344,8 +294,9 @@ agent: ### 最佳模型保存 训练过程中会自动跟踪并保存效果最好的模型: - 基于episode总奖励评估 -- 保存为 `checkpoints/model_best.pt` +- 保存为 `latest/checkpoints/model_best.pt` - 训练结束时显示最佳奖励值 +- 训练完成后自动使用最佳模型进行测试 --- diff --git a/config.yaml b/config.yaml index 0d4b864..19dfe6e 100644 --- a/config.yaml +++ b/config.yaml @@ -57,7 +57,6 @@ training: checkpoint_dir: "checkpoints" # Checkpoint directory log_dir: "logs" # Log directory random_seed: 42 # Random seed for reproducibility - num_parallel_envs: 1 # Number of parallel environments (1 = no parallelization) train_freq: 5 # Train every N steps (higher = faster but less stable) # Testing Parameters diff --git a/ctm_model.py b/ctm_model.py index 72a2fbe..0ddcc74 100644 --- a/ctm_model.py +++ b/ctm_model.py @@ -69,24 +69,36 @@ class CTMModel: return np.concatenate([self.densities, self.speed_limits]) def _calculate_sending_flow(self, cell_idx: int) -> float: - """Calculate sending flow from a cell.""" + """ + Calculate sending flow from a cell (vehicles per time step). + + Returns flow in vehicles that can leave the cell during one time step. + """ density = self.densities[cell_idx] speed_limit = self.speed_limits[cell_idx] effective_speed = min(speed_limit, self.free_flow_speed) + # Flow = density (veh/km) * speed (m/s) * 3.6 to get veh/h, then convert to veh/timestep + # Simplified: density * speed * time_step / 1000 gives vehicles per time step sending_flow = min( - density * effective_speed, - self.capacity + density * effective_speed * self.time_step / 1000.0, + self.capacity * self.time_step / 1000.0 ) return sending_flow def _calculate_receiving_flow(self, cell_idx: int) -> float: - """Calculate receiving flow to a cell.""" + """ + Calculate receiving flow to a cell (vehicles per time step). + + Returns flow in vehicles that can enter the cell during one time step. + """ density = self.densities[cell_idx] + # Receiving capacity based on available space + # congestion_wave_speed (m/s) * (jam_density - density) (veh/km) * time_step / 1000 receiving_flow = min( - self.capacity, - self.congestion_wave_speed * (self.jam_density - density) + self.capacity * self.time_step / 1000.0, + self.congestion_wave_speed * (self.jam_density - density) * self.time_step / 1000.0 ) return receiving_flow @@ -105,14 +117,16 @@ class CTMModel: new_densities = self.densities.copy() flows = np.zeros(self.num_cells + 1) - flows[0] = inflow / 3600.0 * self.time_step + # Convert inflow from veh/h to vehicles per time step + flows[0] = inflow * self.time_step / 3600.0 for i in range(self.num_cells): sending = self._calculate_sending_flow(i) if i < self.num_cells - 1: receiving = self._calculate_receiving_flow(i + 1) else: - receiving = outflow / 3600.0 * self.time_step + # Convert outflow from veh/h to vehicles per time step + receiving = outflow * self.time_step / 3600.0 flows[i + 1] = min(sending, receiving) @@ -130,6 +144,7 @@ class CTMModel: "flows": flows.copy(), "average_density": np.mean(self.densities), "total_vehicles": np.sum(self.densities) * self.cell_length / 1000.0, + # flows[-1] is in vehicles per time step, convert to veh/h "throughput": flows[-1] * 3600.0 / self.time_step, } diff --git a/main.py b/main.py index 45ffced..6e97461 100644 --- a/main.py +++ b/main.py @@ -42,15 +42,17 @@ def main(): # Auto-detect config from model path if using default model config_path = args.config model_path = args.model + output_dir = None # If using latest model and default config, use latest config instead if model_path.startswith("latest/") and args.config == "config.yaml": latest_config = os.path.join("latest", "config.yaml") + output_dir = os.path.join("latest", "logs") if os.path.exists(latest_config): config_path = latest_config print(f"Auto-detected config from latest run: {config_path}") - test(config_path, model_path) + test(config_path, model_path, output_dir=output_dir) if __name__ == "__main__": diff --git a/parallel_env.py b/parallel_env.py deleted file mode 100644 index 7cd3826..0000000 --- a/parallel_env.py +++ /dev/null @@ -1,138 +0,0 @@ -""" -Multiprocessing-based parallel environment for true parallelization. -""" -import numpy as np -from typing import List, Tuple, Dict -from multiprocessing import Process, Pipe -from environment import TrafficEnvironment - - -def worker(remote, parent_remote, config): - """ - Worker process for running environment. - - Args: - remote: Child end of pipe - parent_remote: Parent end of pipe (closed in child) - config: Environment configuration - """ - parent_remote.close() - env = TrafficEnvironment(config) - - while True: - try: - cmd, data = remote.recv() - - if cmd == 'step': - state, reward, done, info = env.step(data) - if done: - state = env.reset() - remote.send((state, reward, done, info)) - - elif cmd == 'reset': - state = env.reset() - remote.send(state) - - elif cmd == 'get_metrics': - remote.send(env.episode_metrics) - - elif cmd == 'close': - remote.close() - break - - except EOFError: - break - - -class ParallelEnvironment: - """Multiprocessing-based parallel environment wrapper.""" - - def __init__(self, config: dict, num_envs: int = 4): - """ - Initialize parallel environment with multiprocessing. - - Args: - config: Configuration dictionary - num_envs: Number of parallel environments - """ - self.num_envs = num_envs - self.config = config - - # Create pipes and processes - self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(num_envs)]) - self.processes = [] - - for work_remote, remote in zip(self.work_remotes, self.remotes): - proc = Process( - target=worker, - args=(work_remote, remote, config) - ) - proc.daemon = True - proc.start() - self.processes.append(proc) - work_remote.close() - - # Get dimensions from config - temp_env = TrafficEnvironment(config) - self.state_dim = temp_env.state_dim - self.action_dim = temp_env.action_dim - - print(f"Created {num_envs} parallel environments with multiprocessing") - - def reset(self) -> np.ndarray: - """ - Reset all environments in parallel. - - Returns: - Array of initial states, shape: (num_envs, state_dim) - """ - for remote in self.remotes: - remote.send(('reset', None)) - - states = [remote.recv() for remote in self.remotes] - return np.array(states) - - def step(self, actions: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[Dict]]: - """ - Step all environments in parallel. - - Args: - actions: Array of actions, shape: (num_envs,) - - Returns: - states: Array of next states, shape: (num_envs, state_dim) - rewards: Array of rewards, shape: (num_envs,) - dones: Array of done flags, shape: (num_envs,) - infos: List of info dictionaries - """ - # Send actions to all workers - for remote, action in zip(self.remotes, actions): - remote.send(('step', action)) - - # Receive results from all workers - results = [remote.recv() for remote in self.remotes] - states, rewards, dones, infos = zip(*results) - - return np.array(states), np.array(rewards), np.array(dones), list(infos) - - def get_episode_metrics(self) -> List[List[Dict]]: - """ - Get episode metrics from all environments. - - Returns: - List of episode metrics for each environment - """ - for remote in self.remotes: - remote.send(('get_metrics', None)) - - metrics = [remote.recv() for remote in self.remotes] - return metrics - - def close(self): - """Close all worker processes.""" - for remote in self.remotes: - remote.send(('close', None)) - - for proc in self.processes: - proc.join() - diff --git a/train.py b/train.py index bc8a46b..f92d39b 100644 --- a/train.py +++ b/train.py @@ -9,7 +9,6 @@ from tqdm import tqdm import matplotlib.pyplot as plt from utils import load_config, create_run_directory from environment import TrafficEnvironment -from parallel_env import ParallelEnvironment from dqn_agent import DQNAgent from test import test @@ -38,14 +37,8 @@ def train(config_path: str = "config.yaml"): set_random_seed(random_seed) print(f"Random seed set to: {random_seed}") - # Create environment (parallel or single) - num_parallel_envs = config["training"].get("num_parallel_envs", 1) - if num_parallel_envs > 1: - env = ParallelEnvironment(config, num_envs=num_parallel_envs) - print(f"Using {num_parallel_envs} parallel environments with multiprocessing") - else: - env = TrafficEnvironment(config) - print("Using single environment") + env = TrafficEnvironment(config) + print("Using single environment") agent = DQNAgent( state_dim=env.state_dim, action_dim=env.action_dim, @@ -81,88 +74,43 @@ def train(config_path: str = "config.yaml"): print(f"Train frequency: every {train_freq} steps") # Check if using parallel environments - is_vectorized = num_parallel_envs > 1 + # is_vectorized = num_parallel_envs > 1 for episode in tqdm(range(num_episodes), desc="Training"): states = env.reset() step_count = 0 # Track steps for training frequency + episode_reward = 0 + episode_loss = 0 + loss_count = 0 + state = states + step_count = 0 - if is_vectorized: - # Parallel environment training - episode_rewards_vec = np.zeros(num_parallel_envs) - episode_loss = 0 - loss_count = 0 - done_flags = np.zeros(num_parallel_envs, dtype=bool) + while True: + action = agent.select_action(state, training=True) + next_state, reward, done, info = env.step(action) - while not np.all(done_flags): - # Select actions for all environments - actions = np.array([agent.select_action(states[i], training=True) - for i in range(num_parallel_envs)]) + agent.store_transition(state, action, reward, next_state, done) - next_states, rewards, dones, infos = env.step(actions) + # Train agent every train_freq steps + step_count += 1 + if step_count % train_freq == 0: + loss = agent.train() + if loss > 0: + episode_loss += loss + loss_count += 1 - # Store transitions for all environments - for i in range(num_parallel_envs): - if not done_flags[i]: - agent.store_transition(states[i], actions[i], rewards[i], - next_states[i], dones[i]) - episode_rewards_vec[i] += rewards[i] - if dones[i]: - done_flags[i] = True + episode_reward += reward + state = next_state - # Train agent every train_freq steps - step_count += 1 - if step_count % train_freq == 0: - loss = agent.train() - if loss > 0: - episode_loss += loss - loss_count += 1 - - states = next_states - - episode_reward = np.mean(episode_rewards_vec) - else: - # Single environment training - episode_reward = 0 - episode_loss = 0 - loss_count = 0 - state = states - step_count = 0 - - while True: - action = agent.select_action(state, training=True) - next_state, reward, done, info = env.step(action) - - agent.store_transition(state, action, reward, next_state, done) - - # Train agent every train_freq steps - step_count += 1 - if step_count % train_freq == 0: - loss = agent.train() - if loss > 0: - episode_loss += loss - loss_count += 1 - - episode_reward += reward - state = next_state - - if done: - break + if done: + break agent.end_episode() episode_rewards.append(episode_reward) episode_losses.append(episode_loss / max(1, loss_count)) - # Calculate average throughput - if is_vectorized: - all_metrics = env.get_episode_metrics() - avg_throughput = np.mean([ - np.mean([m["throughput"] for m in metrics]) - for metrics in all_metrics if len(metrics) > 0 - ]) - else: - avg_throughput = np.mean([m["throughput"] for m in env.episode_metrics]) + avg_throughput = np.mean([m["throughput"] for m in env.episode_metrics]) episode_throughputs.append(avg_throughput) # Save best model based on episode reward