Source code for OmniSimulator.core.engine

import os
import yaml
from typing import Dict, Optional, Any, Tuple, List

from .state import WorldState

[docs] class SimulationEngine: """模拟引擎类 - 整个模拟器的核心控制器""" def _deep_merge_dict(self, target: dict, source: dict): """深度合并字典""" for key, value in source.items(): if key in target and isinstance(target[key], dict) and isinstance(value, dict): self._deep_merge_dict(target[key], value) else: target[key] = value
[docs] def __init__(self, config: Optional[Dict[str, Any]] = None, scene_abilities: List[str] = None): """ 初始化模拟引擎 Args: config: 全局配置字典,可选 scene_abilities: 场景特定的能力列表,用于动态注册需要工具的动作 """ # 设置默认配置 default_config = { 'explore_mode': 'thorough', # 设置默认探索模式为彻底探索 } # 合并用户提供的配置与默认配置 self.config = default_config.copy() if config: self._deep_merge_dict(self.config, config) self.world_state = WorldState() self.env_manager = None self.agent_manager = None self.action_handler = None self.task_config = None self.scene_abilities = scene_abilities or [] # 验证系统 self.task_verifier = None # 可视化系统 self.visualization_manager = None self._load_global_config()
def _load_global_config(self): """加载全局配置文件""" try: # 使用根目录下的config/simulator/simulator_config.yaml config_path = os.path.join(os.path.dirname(__file__), '..', '..', 'config', 'simulator', 'simulator_config.yaml') if os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: global_config = yaml.safe_load(f) or {} # 合并全局配置到当前配置(用户配置优先) for key, value in global_config.items(): if key not in self.config: self.config[key] = value elif isinstance(value, dict) and isinstance(self.config.get(key), dict): # 深度合并字典,但保持用户配置优先 merged_dict = value.copy() self._deep_merge_dict(merged_dict, self.config[key]) self.config[key] = merged_dict else: # 如果用户没有提供该配置,使用全局配置 if key not in self.config: self.config[key] = value except Exception as e: print(f"加载全局配置失败: {e}") def _initialize_visualization(self): """初始化可视化系统(仅在需要时)""" try: # 检查是否启用可视化 viz_config = self.config.get('visualization', {}) if not viz_config.get('enabled', False): return True # 延迟导入以避免循环依赖 from visualization.visualization_manager import VisualizationManager self.visualization_manager = VisualizationManager( self.world_state, self.agent_manager, self.env_manager, engine=self ) # 更新可视化管理器的配置 self.visualization_manager.config = self.config self.visualization_manager.visualization_config = self.config.get('visualization', {}) # 初始化可视化管理器 if not self.visualization_manager.initialize(): print("可视化管理器初始化失败") return False # 启动可视化系统 if self.visualization_manager: success = self.visualization_manager.start() if success: print(f"可视化系统已启动: {self.visualization_manager.get_server_url()}") else: print("可视化系统启动失败") return False return True except ImportError: print("可视化模块不可用,跳过可视化初始化") return True except Exception as e: print(f"初始化可视化系统失败: {e}") return True # 可视化失败不应该影响核心功能
[docs] def initialize(self, scene_file: str, agent_file: Optional[str] = None) -> bool: """ 初始化模拟器 Args: scene_file: 场景文件路径 agent_file: 智能体文件路径(已废弃,不再使用) Returns: bool: 是否成功初始化 """ try: # 导入需要的模块 # 注意:这些导入放在这里而不是文件顶部,是为了避免循环导入 from ..environment.environment_manager import EnvironmentManager from ..environment.scene_parser import SceneParser from ..agent.agent_manager import AgentManager from ..action.action_handler import ActionHandler # 创建环境管理器 self.env_manager = EnvironmentManager(self.world_state, sim_config=self.config) # 创建场景解析器并加载场景 scene_parser = SceneParser() scene_data = scene_parser.parse_scene_file(scene_file) if not scene_data: print(f"无法解析场景文件: {scene_file}") return False # 使用环境管理器从场景数据加载环境 success = self.env_manager.load_scene(scene_data) if not success: print("加载场景失败") return False # 创建智能体管理器(只通过yaml配置初始化) self.agent_manager = AgentManager(self.world_state, self.config) # 从场景数据中提取abilities scene_abilities = None if scene_data and 'abilities' in scene_data: self.scene_abilities = scene_data['abilities'] scene_abilities = self.scene_abilities # 创建动作处理器,传递scene_abilities self.action_handler = ActionHandler( self.world_state, self.env_manager, self.agent_manager, scene_abilities=scene_abilities, config=self.config ) # 注册需要工具的动作 if scene_abilities: self._register_scene_specific_actions() # 初始化可视化系统 self._initialize_visualization() return True except Exception as e: print(f"初始化失败: {e}") import traceback traceback.print_exc() return False
[docs] def initialize_with_task(self, task_file: str) -> bool: """ 使用任务文件初始化模拟器(包括场景和智能体) Args: task_file: 任务文件路径 Returns: bool: 是否成功初始化 """ try: # 导入需要的模块 from ..environment.scene_parser import SceneParser from ..environment.scene_validator import SceneValidator # 解析任务文件 scene_parser = SceneParser() task_data = scene_parser.parse_file(task_file) if not task_data: print(f"无法解析任务文件: {task_file}") return False # 验证任务数据 is_valid, errors = SceneValidator.validate_agent_config(task_data) if not is_valid: print("任务数据无效:") for error in errors: print(f" - {error}") return False # 保存任务配置 self.task_config = task_data # 注意:abilities现在从scene.json读取,不再从task.json读取 # 获取关联的场景文件 scene_uid = task_data.get("scene_uid") if not scene_uid: print("任务数据缺少scene_uid字段") return False # 构建场景文件路径 scene_file = self._find_scene_file(scene_uid) if not scene_file: print(f"找不到场景文件: {scene_uid}") return False # 初始化模拟器 success = self.initialize(scene_file) if not success: return False # 加载智能体 if "agents_config" in task_data: success = self.load_agents(task_data["agents_config"]) if not success: print("加载智能体失败") return False # 初始化可视化系统 self._initialize_visualization() return True except Exception as e: print(f"使用任务初始化失败: {e}") import traceback traceback.print_exc() return False
[docs] def initialize_with_data(self, data: Dict[str, Any]) -> bool: """ 使用数据字典初始化模拟器 Args: data: 包含场景、任务和动作配置的数据字典 格式: { 'scene': scene_data, # 场景数据 'task': task_data, # 任务数据 (可选) 'actions': action_config_data # 动作配置数据 (可选) } Returns: bool: 是否成功初始化 """ try: # 导入需要的模块 from ..environment.environment_manager import EnvironmentManager from ..environment.scene_validator import SceneValidator from ..agent.agent_manager import AgentManager from ..action.action_handler import ActionHandler # 获取各部分数据 scene_data = data.get('scene') task_data = data.get('task') action_config = data.get('actions') if not scene_data: print("数据中缺少场景信息") return False # 验证场景数据 is_valid, errors = SceneValidator.validate_scene(scene_data) if not is_valid: print("场景数据无效:") for error in errors: print(f" - {error}") return False # 创建环境管理器 self.env_manager = EnvironmentManager(self.world_state, sim_config=self.config) # 直接从场景数据加载环境 success = self.env_manager.load_scene(scene_data) if not success: print("从数据加载场景失败") return False # 创建智能体管理器 self.agent_manager = AgentManager(self.world_state, self.config) # 如果有任务数据,加载智能体 if task_data: # 验证任务数据 is_valid, errors = SceneValidator.validate_agent_config(task_data) if not is_valid: print("任务数据无效:") for error in errors: print(f" - {error}") return False # 保存任务配置 self.task_config = task_data # 加载智能体 if "agents_config" in task_data: success = self.load_agents(task_data["agents_config"]) if not success: print("加载智能体失败") return False # 从场景数据中提取abilities scene_abilities = None if scene_data and 'abilities' in scene_data: self.scene_abilities = scene_data['abilities'] scene_abilities = self.scene_abilities # 创建动作处理器,传递scene_abilities self.action_handler = ActionHandler( self.world_state, self.env_manager, self.agent_manager, scene_abilities=scene_abilities, config=self.config ) # 注册需要工具的动作 if scene_abilities: self._register_scene_specific_actions() # 如果有动作配置,处理自定义动作配置 if action_config: self._load_action_config(action_config) # 如果有任务数据,设置任务验证器(验证数据现在在task.json中) if task_data: self.task_verifier = self._create_task_verifier(task_data) # 初始化可视化系统 self._initialize_visualization() return True except Exception as e: print(f"使用数据初始化失败: {e}") import traceback traceback.print_exc() return False
def _load_action_config(self, action_config: Dict[str, Any]): """ 加载自定义动作配置 Args: action_config: 动作配置数据 """ try: # 处理属性动作配置 if 'attribute_actions' in action_config: from ..action.actions.attribute_actions import AttributeAction from ..action.actions.corporate_attribute_actions import CorporateAttributeAction # 可以在这里处理自定义的属性动作配置 # 例如:动态更新action_configs字典 custom_actions = action_config['attribute_actions'] for action_name, config in custom_actions.items(): AttributeAction.action_configs[action_name] = config CorporateAttributeAction.action_configs[action_name] = config # 如果不需要工具,添加到全局动作类 if not config.get('requires_tool', True): AttributeAction.no_tool_actions.add(action_name) CorporateAttributeAction.no_tool_actions.add(action_name) # 重新注册不需要工具的动作 self.action_handler.action_manager.register_action_class( action_name.upper(), AttributeAction ) self.action_handler.action_manager.register_action_class( f"CORP_{action_name.upper()}", CorporateAttributeAction ) print(f"已加载 {len(custom_actions)} 个自定义属性动作") except Exception as e: print(f"Failed to load action configuration: {e}") import traceback traceback.print_exc() def _find_scene_file(self, scene_uid: str) -> Optional[str]: """ Find scene file based on scene UID Args: scene_uid: Scene unique identifier Returns: str: Scene file path, returns None if not found """ # 获取项目根目录 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(current_dir)) # Search in root data/scene/ directory base_dirs = [ os.path.join(project_root, "data", "scene"), os.path.join(project_root, "data", "task") ] for base_dir in base_dirs: # Try different file extensions for ext in [".json", ".yaml", ".yml"]: file_path = os.path.join(base_dir, f"{scene_uid}{ext}") if os.path.exists(file_path): return file_path return None
[docs] def validate_environment(self) -> Tuple[bool, List[str]]: """ 验证当前环境是否合法 Returns: Tuple[bool, List[str]]: (是否合法, 错误信息列表) """ from ..environment.scene_validator import SceneValidator if not self.env_manager: return False, ["Simulator not initialized"] # 构建场景数据结构 scene_data = { "rooms": [], "objects": [] } # 收集所有房间 for room_id in self.world_state.graph.room_ids: room_data = self.world_state.graph.get_node(room_id) if room_data: scene_data["rooms"].append(room_data) # 收集所有物体 for node_id, node_data in self.world_state.graph.nodes.items(): if node_id not in self.world_state.graph.room_ids and node_data.get("type", "").upper() not in ["AGENT"]: # 补充location_id信息 if "location_id" not in node_data: edges = self.world_state.graph.get_incoming_edges(node_id) if edges: parent_id, edge_data = next(iter(edges.items())) node_data["location_id"] = f"{edge_data.get('type', 'in')}:{parent_id}" scene_data["objects"].append(node_data) # 确保所有物体的位置引用都是有效的 location_errors = [] object_ids = set(obj.get("id") for obj in scene_data["objects"]) room_ids = set(room.get("id") for room in scene_data["rooms"]) all_valid_ids = object_ids.union(room_ids) # 检查每个物体的location_id引用 for obj in scene_data["objects"]: location_id = obj.get("location_id", "") if location_id and ":" in location_id: _, target_id = SceneValidator._parse_location_id(location_id) if target_id not in all_valid_ids: location_errors.append(f"物体 {obj.get('id')} 的位置 {location_id} 不存在") # 结合所有验证结果 _, validator_errors = SceneValidator.validate_scene(scene_data) # 合并所有错误 all_errors = validator_errors + location_errors # 去重 unique_errors = [] for error in all_errors: if error not in unique_errors: unique_errors.append(error) return len(unique_errors) == 0, unique_errors
[docs] def load_agents(self, agents_config: List[Dict[str, Any]]) -> bool: """ 加载智能体配置 Args: agents_config: 智能体配置列表 Returns: bool: 是否成功加载 """ if not self.agent_manager: print("智能体管理器未初始化") return False try: # 获取所有房间ID room_ids = list(self.world_state.graph.room_ids) if not room_ids: print("环境中没有房间") return False # 默认使用第一个房间 default_room = sorted(room_ids)[0] # 清空现有智能体(确保不会有重复) self._remove_existing_agents() # 加载每个智能体 for i, agent_config in enumerate(agents_config): # 确保有名称和位置 if "name" not in agent_config: agent_config["name"] = f"智能体{i+1}号" # 智能体ID处理:如果未提供,则根据序号生成 if "id" not in agent_config: agent_config["id"] = f"agent_{i+1}" # 确保有位置信息 if "location_id" not in agent_config: agent_config["location_id"] = default_room # 添加智能体 agent_id = self.agent_manager.add_agent(agent_config) if not agent_id: print(f"添加智能体失败: {agent_config.get('name')}") continue return True except Exception as e: print(f"Failed to load agent: {e}") import traceback traceback.print_exc() return False
def _remove_existing_agents(self): """ 移除所有现有智能体 """ if not self.agent_manager: return # 获取所有智能体ID agent_ids = list(self.agent_manager.get_all_agents().keys()) # 移除每个智能体 for agent_id in agent_ids: # 从世界状态中移除 if agent_id in self.world_state.agents: # 先获取位置,移除边关系 agent = self.agent_manager.get_agent(agent_id) if agent and agent.location_id: try: self.world_state.graph.remove_edge(agent.location_id, agent_id) except: pass # 忽略可能的错误 # 从世界状态中删除 del self.world_state.agents[agent_id] # 从图中删除节点 try: self.world_state.graph.remove_node(agent_id) except: pass # 忽略可能的错误 # 清空智能体管理器中的智能体 self.agent_manager.agents.clear()
[docs] def process_command(self, agent_id: str, command: str): """ 处理智能体的命令 Args: agent_id: 智能体ID command: 命令字符串 Returns: Tuple: (执行状态, 反馈消息, 结果数据) """ if not self.action_handler: print("模拟器未初始化") return None return self.action_handler.process_command(agent_id, command)
[docs] def get_agent_info(self, agent_id: str) -> Optional[Dict[str, Any]]: """ 获取智能体信息 Args: agent_id: 智能体ID Returns: Dict: 智能体信息字典 """ if not self.agent_manager: return None agent = self.agent_manager.get_agent(agent_id) if not agent: return None return agent.to_dict()
[docs] def get_visualization_status(self) -> Dict[str, Any]: """ 获取可视化系统状态 Returns: Dict: 可视化系统状态信息 """ if self.visualization_manager: return self.visualization_manager.get_status() else: return { 'enabled': False, 'running': False, 'server_url': None, 'message': '可视化系统未初始化' }
[docs] def get_visualization_url(self) -> Optional[str]: """ 获取可视化Web界面URL Returns: str: 可视化URL,如果未启用则返回None """ if self.visualization_manager: return self.visualization_manager.get_server_url() return None
[docs] def stop_visualization(self): """停止可视化系统""" if self.visualization_manager: self.visualization_manager.stop()
[docs] def restart_visualization(self) -> bool: """ 重启可视化系统 Returns: bool: 是否成功重启 """ if self.visualization_manager: self.visualization_manager.stop() return self.visualization_manager.start() return False
def __del__(self): """析构函数 - 确保可视化系统正确关闭""" try: if self.visualization_manager: self.visualization_manager.stop() except: pass # 忽略析构时的错误
[docs] def get_object_info(self, object_id: str) -> Optional[Dict[str, Any]]: """ 获取物体信息 Args: object_id: 物体ID Returns: Dict: 物体信息字典 """ if not self.env_manager: return None obj = self.env_manager.get_object_by_id(object_id) return obj
[docs] def get_room_info(self, room_id: str) -> Optional[Dict[str, Any]]: """ 获取房间信息 Args: room_id: 房间ID Returns: Dict: 房间信息字典 """ if not self.env_manager: return None room = self.env_manager.get_room_by_id(room_id) return room
[docs] def get_task_info(self) -> Optional[Dict[str, Any]]: """ 获取当前任务信息 Returns: Dict: 任务信息字典 """ return self.task_config
def _register_scene_specific_actions(self): """ 根据场景abilities注册特定的动作 """ if not self.action_handler or not self.scene_abilities: return try: from ..action.actions.attribute_actions import AttributeAction from ..action.actions.corporate_attribute_actions import CorporateAttributeAction # 注册场景特定的属性动作 AttributeAction.register_task_specific_actions( self.action_handler.action_manager, self.scene_abilities ) CorporateAttributeAction.register_task_specific_actions( self.action_handler.action_manager, self.scene_abilities ) print(f"Registered actions based on scene abilities: {self.scene_abilities}") except Exception as e: print(f"Failed to register scene-specific actions: {e}") import traceback traceback.print_exc() def _create_task_verifier(self, task_data: Dict[str, Any]): """ 创建任务验证器 Args: task_data: 任务数据 Returns: TaskVerifier: 任务验证器实例 """ try: from ..utils.task_verifier import TaskVerifier verifier = TaskVerifier(task_data, self.env_manager, self.config) print(f"已创建任务验证器,包含 {len(task_data.get('tasks', []))} 个任务") return verifier except Exception as e: print(f"创建任务验证器失败: {e}") import traceback traceback.print_exc() return None
[docs] def update_scene_abilities(self, new_abilities: List[str]): """ 更新场景能力并重新注册相关动作 Args: new_abilities: 新的能力列表 """ self.scene_abilities = new_abilities if self.action_handler: self._register_scene_specific_actions()
[docs] def get_task_verification_status(self) -> Optional[Dict[str, Any]]: """ 获取任务验证状态 Returns: Optional[Dict[str, Any]]: 验证状态,如果未启用验证则返回None """ if not self.task_verifier: return None try: summary = self.task_verifier.get_completion_summary() completion_list = self.task_verifier.get_subtask_completion_list() return { 'summary': summary, 'completion_list': completion_list, 'enabled': True } except Exception as e: print(f"获取任务验证状态失败: {e}") return None
[docs] def set_task_data(self, task_data: Dict[str, Any]): """ 设置任务数据并创建验证器 Args: task_data: 任务数据,来自task.json文件 """ self.task_config = task_data if self.env_manager: self.task_verifier = self._create_task_verifier(task_data) # 同时设置ActionHandler的任务验证器 if self.action_handler: self.action_handler.set_task_verifier(task_data)
[docs] def get_agent_supported_actions_description(self, agent_ids: List[str]) -> str: """ 获取智能体支持的所有动作的字符串描述 Args: agent_ids: 智能体ID列表,支持单个或多个智能体 Returns: str: 包含所有支持动作的描述字符串(英文) """ if not self.action_handler: return "Action handler not initialized" return self.action_handler.get_agent_supported_actions_description(agent_ids)