Source code for OmniSimulator.action.action_manager

from typing import Dict, Optional, Tuple, Any, List, Type

from ..core.enums import ActionType, ActionStatus
from .actions.basic_actions import (
    GotoAction, GrabAction, PlaceAction, LookAction, ExploreAction
)
from .actions.corporate_actions import CorpGrabAction, CorpGotoAction, CorpPlaceAction
from .actions.base_action import BaseAction

[docs] class ActionManager: """ 动作管理器 - 负责动作的创建、验证和执行 这个类整合了原有的ActionParser, ActionValidator和ActionExecutor的功能, 采用了面向对象的设计模式,使代码更清晰、易于维护和扩展。 主要职责: 1. 解析命令字符串,创建动作对象 2. 验证动作是否可以执行 3. 执行动作并返回结果 4. 维护协作动作的状态 """ # 类级别的字典,存储每个智能体特定的动作映射 # 格式: {agent_id: {action_name: action_class}} agent_action_classes = {}
[docs] def __init__(self, world_state, env_manager, agent_manager, scene_abilities=None): """ 初始化动作管理器 Args: world_state: 世界状态对象 env_manager: 环境管理器 agent_manager: 智能体管理器 scene_abilities: 场景支持的能力列表,用于注册不需要工具的属性动作 """ self.world_state = world_state self.env_manager = env_manager self.agent_manager = agent_manager # 保存合作动作状态 if not hasattr(self.world_state, 'pending_corporate_actions'): setattr(self.world_state, 'pending_corporate_actions', {}) # 注册所有可用的基础动作类 self.action_classes = { 'GOTO': GotoAction, 'GRAB': GrabAction, 'PLACE': PlaceAction, 'LOOK': LookAction, 'EXPLORE': ExploreAction, 'CORP_GRAB': CorpGrabAction, 'CORP_GOTO': CorpGotoAction, 'CORP_PLACE': CorpPlaceAction } # 根据场景abilities注册不需要工具的属性动作 if scene_abilities: try: from .actions.attribute_actions import AttributeAction from .actions.corporate_attribute_actions import CorporateAttributeAction # 确保CSV文件已加载 AttributeAction.load_from_csv() CorporateAttributeAction.load_from_csv() # 只注册场景中包含的且不需要工具的动作 self._register_scene_no_tool_actions(scene_abilities) except (ImportError, Exception) as e: print(f"加载属性动作配置失败: {e}")
[docs] @classmethod def register_ability_action(cls, action_name: str, agent_id: str): """ 为特定智能体注册能力相关动作(仅限需要工具的动作) Args: action_name: 动作名称 agent_id: 智能体ID """ # 确保属性动作类和配置已加载 from .actions.attribute_actions import AttributeAction if not AttributeAction.action_configs: AttributeAction.load_from_csv() # 确保合作属性动作类和配置已加载 from .actions.corporate_attribute_actions import CorporateAttributeAction if not CorporateAttributeAction.action_configs: CorporateAttributeAction.load_from_csv() # 检查动作是否在配置中且需要工具 action_name_lower = action_name.lower() if action_name_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[action_name_lower] requires_tool = config.get("requires_tool", True) # 只注册需要工具的动作 if requires_tool: if agent_id not in cls.agent_action_classes: cls.agent_action_classes[agent_id] = {} # 注册普通属性动作 cls.agent_action_classes[agent_id][action_name.upper()] = AttributeAction # 注册合作属性动作 cls.agent_action_classes[agent_id][f"CORP_{action_name.upper()}"] = CorporateAttributeAction print(f"为智能体 {agent_id} 注册动作: {action_name} 和 CORP_{action_name}")
def _register_scene_no_tool_actions(self, scene_abilities: List[str]): """ 根据场景abilities注册不需要工具的属性动作 Args: scene_abilities: 场景支持的能力列表 """ from .actions.attribute_actions import AttributeAction from .actions.corporate_attribute_actions import CorporateAttributeAction registered_count = 0 registered_actions = [] for ability in scene_abilities: ability_lower = ability.lower() # 检查是否在配置中且不需要工具 if ability_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[ability_lower] requires_tool = config.get('requires_tool', True) if not requires_tool: # 注册普通属性动作 action_name = ability_lower.upper() self.register_action_class(action_name, AttributeAction) registered_actions.append(action_name) # 注册合作属性动作 corp_action_name = f"CORP_{action_name}" self.register_action_class(corp_action_name, CorporateAttributeAction) registered_actions.append(corp_action_name) registered_count += 1 if registered_count > 0: print(f"根据场景abilities注册了 {registered_count} 个不需要工具的动作: {', '.join(registered_actions)}") else: print("场景中没有不需要工具的属性动作需要注册") def _get_agent_current_abilities(self, agent_id: str) -> set: """ 获取智能体当前具有的所有能力 Args: agent_id: 智能体ID Returns: set: 智能体当前能力集合 """ try: agent = self.agent_manager.get_agent(agent_id) if agent and hasattr(agent, 'abilities'): # 确保返回集合类型 if isinstance(agent.abilities, set): return agent.abilities elif isinstance(agent.abilities, (list, tuple)): return set(agent.abilities) else: return set() return set() except Exception as e: import logging logger = logging.getLogger(__name__) logger.warning(f"获取智能体 {agent_id} 能力时出错: {e}") return set() def _get_common_abilities(self, agent_ids: list) -> set: """ 获取多个智能体的共同能力 Args: agent_ids: 智能体ID列表 Returns: set: 所有智能体都具有的能力集合 """ if not agent_ids: return set() # 获取第一个智能体的能力作为基准 common_abilities = self._get_agent_current_abilities(agent_ids[0]) # 与其他智能体的能力求交集 for agent_id in agent_ids[1:]: agent_abilities = self._get_agent_current_abilities(agent_id) common_abilities = common_abilities.intersection(agent_abilities) return common_abilities
[docs] @classmethod def unregister_ability_action(cls, action_name: str, agent_id: str): """ 为特定智能体解绑能力相关动作(仅限需要工具的动作) Args: action_name: 动作名称 agent_id: 智能体ID """ # 确保属性动作类和配置已加载 from .actions.attribute_actions import AttributeAction if not AttributeAction.action_configs: AttributeAction.load_from_csv() # 检查动作是否在配置中且需要工具 action_name_lower = action_name.lower() if action_name_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[action_name_lower] requires_tool = config.get("requires_tool", True) # 只解绑需要工具的动作 if requires_tool and agent_id in cls.agent_action_classes: # 解绑普通属性动作 if action_name.upper() in cls.agent_action_classes[agent_id]: del cls.agent_action_classes[agent_id][action_name.upper()] # 解绑合作属性动作 corp_action_name = f"CORP_{action_name.upper()}" if corp_action_name in cls.agent_action_classes[agent_id]: del cls.agent_action_classes[agent_id][corp_action_name] print(f"为智能体 {agent_id} 解绑动作: {action_name} 和 CORP_{action_name}")
[docs] def register_action_class(self, action_name: str, action_class: Type[BaseAction]): """ 注册单个动作类 Args: action_name: 动作名称,将自动转为大写 action_class: 动作类,必须是BaseAction的子类 Returns: self: 支持链式调用 """ if not issubclass(action_class, BaseAction): raise TypeError(f"动作类必须是BaseAction的子类: {action_class}") self.action_classes[action_name.upper()] = action_class return self
[docs] def register_action_classes(self, action_classes_dict: Dict[str, Type[BaseAction]]): """ 批量注册多个动作类 Args: action_classes_dict: 动作名称到动作类的映射字典 Returns: self: 支持链式调用 """ for name, cls in action_classes_dict.items(): self.register_action_class(name, cls) return self
[docs] def is_action_registered(self, action_name: str) -> bool: """ 检查动作是否已注册 Args: action_name: 动作名称 Returns: bool: 是否已注册 """ return action_name.upper() in self.action_classes
[docs] def parse_command(self, command_str: str, agent_id: str) -> Tuple[Optional[BaseAction], Optional[str]]: """ 解析命令字符串并创建相应的动作对象 Args: command_str: 命令字符串,如"GRAB cup_1" agent_id: 执行动作的智能体ID Returns: Tuple[Optional[BaseAction], Optional[str]]: (动作对象, 错误消息) 如果解析成功,错误消息为None;如果解析失败,动作对象为None """ # 去除命令两端的空白并提取动作名称 command = command_str.strip().split()[0].upper() # 首先检查智能体特定的动作类 agent_actions = self.agent_action_classes.get(agent_id, {}) if command in agent_actions: action_class = agent_actions[command] action = action_class.from_command(command_str, agent_id) if action: return action, None # 如果没有找到智能体特定的动作,再查找全局动作类 action_class = self.action_classes.get(command) if not action_class: return None, f"Unknown command: {command}" # 使用动作类解析命令字符串 action = action_class.from_command(command_str, agent_id) if not action: return None, f"Invalid command format: {command_str}" return action, None
[docs] def validate_action(self, action: BaseAction) -> Tuple[bool, str]: """ 验证动作是否有效 Args: action: 动作对象 Returns: Tuple[bool, str]: (是否有效, 原因消息) """ # 合作模式全局限制 agent = self.agent_manager.get_agent(action.agent_id) if agent and hasattr(agent, 'corporate_mode_object_id') and agent.corporate_mode_object_id: # 直接使用ActionType枚举比较 from ..core.enums import ActionType allowed_actions = [ActionType.CORP_GOTO, ActionType.CORP_PLACE] if action.action_type not in allowed_actions: return False, "In cooperative mode, only cooperative carrying or placing actions are allowed" return action.validate( self.world_state, self.env_manager, self.agent_manager )
[docs] def execute_action(self, action: BaseAction) -> Tuple[ActionStatus, str, Optional[Dict[str, Any]]]: """ 执行动作 Args: action: 动作对象 Returns: Tuple[ActionStatus, str, Optional[Dict]]: (执行状态, 反馈消息, 额外结果数据) """ return action.execute( self.world_state, self.env_manager, self.agent_manager )
[docs] def process_command(self, command_str: str, agent_id: str) -> Tuple[ActionStatus, str, Optional[Dict[str, Any]]]: """ 处理命令字符串,包括解析、验证和执行 Args: command_str: 命令字符串 agent_id: 执行命令的智能体ID Returns: Tuple[ActionStatus, str, Optional[Dict]]: (执行状态, 反馈消息, 额外结果数据) """ # 解析命令 action, error = self.parse_command(command_str, agent_id) if error or action is None: return ActionStatus.INVALID, error or "Failed to parse the command", None # 验证动作 valid, reason = self.validate_action(action) if not valid: return ActionStatus.INVALID, reason, None # 执行动作 return self.execute_action(action)
[docs] def get_agent_supported_actions_description(self, agent_ids: List[str]) -> str: """ 获取智能体支持的所有动作的字符串描述 Args: agent_ids: 智能体ID列表,支持单个或多个智能体 Returns: str: 包含所有支持动作的描述字符串(英文) """ from .actions.attribute_actions import AttributeAction # 确保CSV文件已加载 AttributeAction.load_from_csv() # 参数验证 if not agent_ids or not isinstance(agent_ids, list): return "Error: agent_ids must be a non-empty list" # 去重并保持顺序 unique_agent_ids = [] for agent_id in agent_ids: if agent_id not in unique_agent_ids: unique_agent_ids.append(agent_id) agent_ids = unique_agent_ids is_single_agent = len(agent_ids) == 1 descriptions = [] # 根据智能体数量设置标题 if is_single_agent: descriptions.append(f"=== SUPPORTED ACTIONS FOR {agent_ids[0].upper()} ===\n") else: agent_names = " & ".join([agent_id.upper() for agent_id in agent_ids]) descriptions.append(f"=== SUPPORTED ACTIONS FOR {agent_names} ===\n") # 1. 基础动作描述 descriptions.append("== Basic Actions ==") descriptions.append("GOTO <object_id>") descriptions.append(" - Move to a specific location or object") descriptions.append(" - Example: GOTO main_workbench_area") descriptions.append("") descriptions.append("GRAB <object_id>") descriptions.append(" - Pick up an object that is nearby") descriptions.append(" - Example: GRAB cup_1") descriptions.append("") descriptions.append("PLACE <object_id> <in|on> <container_id>") descriptions.append(" - Place a held object into or onto another object") descriptions.append(" - Example: PLACE cup_1 on table_1") descriptions.append("") # descriptions.append("LOOK <object_id>") # descriptions.append(" - Examine an object to get detailed information") # descriptions.append(" - Example: LOOK table_1") # descriptions.append("") descriptions.append("EXPLORE") descriptions.append(" - Explore current room to discover objects") descriptions.append(" - Example: EXPLORE") descriptions.append("") descriptions.append("DONE") descriptions.append(" - Finish the task") descriptions.append(" - Example: DONE") descriptions.append("") # 2. 全局可用的属性动作(不需要工具) global_attribute_actions = [] for action_name, action_class in self.action_classes.items(): if action_class == AttributeAction and not action_name.startswith('CORP_'): action_lower = action_name.lower() if action_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[action_lower] # 只显示不需要工具的动作 requires_tool = config.get('requires_tool', True) if not requires_tool: description = config.get('description', 'No description available') global_attribute_actions.append((action_name, description)) if global_attribute_actions: descriptions.append("== Attribute Actions (No Tools Required) ==") for action_name, description in sorted(global_attribute_actions): descriptions.append(f"{action_name} <object_id>") descriptions.append(f" - {description}") descriptions.append(f" - Example: {action_name} device_1") descriptions.append("") # 3. 智能体特定的动作(需要工具/能力) # 收集所有相关智能体的特定动作 all_agent_specific_actions = {} # {action_name: (description, agents_list)} for current_agent_id in agent_ids: # 3.1 静态注册的动作(现有逻辑) if current_agent_id in self.agent_action_classes: for action_name, action_class in self.agent_action_classes[current_agent_id].items(): if action_class == AttributeAction and not action_name.startswith('CORP_'): action_lower = action_name.lower() if action_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[action_lower] description = config.get('description', 'No description available') if action_name not in all_agent_specific_actions: all_agent_specific_actions[action_name] = (description, []) all_agent_specific_actions[action_name][1].append(current_agent_id) # 3.2 基于当前能力的动态动作(新增) agent_abilities = self._get_agent_current_abilities(current_agent_id) for ability in agent_abilities: ability_lower = ability.lower() if ability_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[ability_lower] requires_tool = config.get('requires_tool', True) if requires_tool: action_name = ability.upper() description = config.get('description', 'No description available') if action_name not in all_agent_specific_actions: all_agent_specific_actions[action_name] = (description, []) if current_agent_id not in all_agent_specific_actions[action_name][1]: all_agent_specific_actions[action_name][1].append(current_agent_id) if all_agent_specific_actions: if not is_single_agent: descriptions.append("== Agent-Specific Actions (Tools Required) ==") for action_name, (description, agents_list) in sorted(all_agent_specific_actions.items()): agents_str = " & ".join(agents_list) descriptions.append(f"{action_name} <object_id>") descriptions.append(f" - {description}") descriptions.append(f" - Available to: {agents_str}") descriptions.append(f" - Example: {action_name} device_1") descriptions.append("") else: descriptions.append("== Agent-Specific Actions (Tools Required) ==") for action_name, (description, _) in sorted(all_agent_specific_actions.items()): descriptions.append(f"{action_name} <object_id>") descriptions.append(f" - {description}") descriptions.append(f" - Example: {action_name} device_1") descriptions.append("") # 4. 合作动作(只在多智能体时显示) if not is_single_agent: descriptions.append("== Cooperative Actions ==") if len(agent_ids) == 2: # 两个智能体的具体格式 agent_pair = ",".join(agent_ids) descriptions.append(f"CORP_GRAB {agent_pair} <object_id>") descriptions.append(" - Two agents cooperatively grab a heavy object") descriptions.append(f" - Example: CORP_GRAB {agent_pair} heavy_box_1") descriptions.append("") descriptions.append(f"CORP_GOTO {agent_pair} <location_id>") descriptions.append(" - Two agents move together while carrying an object") descriptions.append(f" - Example: CORP_GOTO {agent_pair} storage_area") descriptions.append("") descriptions.append(f"CORP_PLACE {agent_pair} <object_id> <in|on> <container_id>") descriptions.append(" - Two agents cooperatively place a heavy object") descriptions.append(f" - Example: CORP_PLACE {agent_pair} heavy_box_1 on table_1") descriptions.append("") else: # 多个智能体的通用格式 agent_list = ",".join(agent_ids) descriptions.append(f"CORP_GRAB {agent_list} <object_id>") descriptions.append(" - Multiple agents cooperatively grab a heavy object") descriptions.append(f" - Example: CORP_GRAB {agent_list} heavy_box_1") descriptions.append("") descriptions.append(f"CORP_GOTO {agent_list} <location_id>") descriptions.append(" - Multiple agents move together while carrying an object") descriptions.append(f" - Example: CORP_GOTO {agent_list} storage_area") descriptions.append("") descriptions.append(f"CORP_PLACE {agent_list} <object_id> <in|on> <container_id>") descriptions.append(" - Multiple agents cooperatively place a heavy object") descriptions.append(f" - Example: CORP_PLACE {agent_list} heavy_box_1 on table_1") descriptions.append("") # 5. 合作属性动作(只在多智能体时显示) if not is_single_agent: corp_attribute_actions = {} # {action_name: (description, agents_list)} # 收集全局合作属性动作 for action_name, action_class in self.action_classes.items(): if action_class.__name__ == 'CorporateAttributeAction' and action_name.startswith('CORP_'): base_action = action_name[5:].lower() if base_action in AttributeAction.action_configs: config = AttributeAction.action_configs[base_action] description = config.get('description', 'No description available') corp_attribute_actions[action_name] = (description, []) # 收集智能体特定的合作动作 for current_agent_id in agent_ids: if current_agent_id in self.agent_action_classes: for action_name, action_class in self.agent_action_classes[current_agent_id].items(): if action_class.__name__ == 'CorporateAttributeAction' and action_name.startswith('CORP_'): base_action = action_name[5:].lower() if base_action in AttributeAction.action_configs: config = AttributeAction.action_configs[base_action] description = config.get('description', 'No description available') if action_name not in corp_attribute_actions: corp_attribute_actions[action_name] = (description, []) corp_attribute_actions[action_name][1].append(current_agent_id) if corp_attribute_actions: descriptions.append("== Cooperative Attribute Actions ==") for action_name, (description, agents_list) in sorted(corp_attribute_actions.items()): if len(agent_ids) == 2: # 两个智能体的具体格式 agent_pair = ",".join(agent_ids) if agents_list: # 如果有特定智能体的动作 agents_str = " & ".join(agents_list) descriptions.append(f"{action_name} {agent_pair} <object_id>") descriptions.append(f" - {description} (cooperative)") descriptions.append(f" - Available to: {agents_str}") descriptions.append(f" - Example: {action_name} {agent_pair} device_1") else: # 全局动作 descriptions.append(f"{action_name} {agent_pair} <object_id>") descriptions.append(f" - {description} (cooperative)") descriptions.append(f" - Example: {action_name} {agent_pair} device_1") else: # 多个智能体的通用格式 agent_list = ",".join(agent_ids) descriptions.append(f"{action_name} {agent_list} <object_id>") descriptions.append(f" - {description} (cooperative)") descriptions.append(f" - Example: {action_name} {agent_list} device_1") descriptions.append("") # 6. 基于共同能力的协作动作(新增部分) if not is_single_agent: common_abilities = self._get_common_abilities(agent_ids) if common_abilities: descriptions.append("== Cooperative Ability Actions ==") for ability in sorted(common_abilities): ability_lower = ability.lower() if ability_lower in AttributeAction.action_configs: config = AttributeAction.action_configs[ability_lower] requires_tool = config.get('requires_tool', True) # 只显示需要工具的协作能力动作 if requires_tool: action_name = f"CORP_{ability.upper()}" description = config.get('description', 'No description available') if len(agent_ids) == 2: # 两个智能体的具体格式 agent_pair = ",".join(agent_ids) descriptions.append(f"{action_name} {agent_pair} <object_id>") descriptions.append(f" - {description} (cooperative)") descriptions.append(f" - Both agents have the required ability") descriptions.append(f" - Example: {action_name} {agent_pair} device_1") else: # 多个智能体的通用格式 agent_list = ",".join(agent_ids) descriptions.append(f"{action_name} {agent_list} <object_id>") descriptions.append(f" - {description} (cooperative)") descriptions.append(f" - All agents have the required ability") descriptions.append(f" - Example: {action_name} {agent_list} device_1") descriptions.append("") descriptions.append("=== END OF ACTIONS ===") return "\n".join(descriptions)