Source code for OmniSimulator.action.action_handler

from typing import Dict, Optional, Tuple, Any, Type, List

from ..core.enums import ActionStatus
from .action_manager import ActionManager
from .actions.base_action import BaseAction

[docs] class ActionHandler: """ 动作处理器 - 负责处理智能体的动作请求 此类是模拟器的主要对外接口,用于处理智能体的动作请求。 它封装了ActionManager的内部复杂性,提供简单直观的API。 使用示例: ```python action_handler = ActionHandler(world_state, env_manager, agent_manager) status, message, result = action_handler.process_command('agent_1', 'GRAB cup_1') ``` """
[docs] def __init__(self, world_state, env_manager, agent_manager, scene_abilities=None, config=None): """ 初始化动作处理器 Args: world_state: 世界状态对象 env_manager: 环境管理器 agent_manager: 智能体管理器 scene_abilities: 场景支持的能力列表,用于注册不需要工具的属性动作 config: 配置字典,包含验证相关配置 """ self.action_manager = ActionManager( world_state, env_manager, agent_manager, scene_abilities=scene_abilities ) self.config = config or {} self.task_verifier = None # 将在设置验证数据时初始化
[docs] def process_command(self, agent_id: str, command_str: str) -> Tuple[ActionStatus, str, Optional[Dict[str, Any]]]: """ 处理智能体的命令 Args: agent_id: 智能体ID command_str: 命令字符串 Returns: Tuple[ActionStatus, str, Optional[Dict]]: (执行状态, 反馈消息, 额外结果数据) """ # 检查是否是done命令 if command_str.strip().lower() == "done": return self._handle_done_command() # 使用ActionManager处理命令 status, message, result = self.action_manager.process_command(command_str, agent_id) # 如果启用了任务验证,在动作执行后进行验证 if self._should_verify_tasks(): verification_result = self._perform_task_verification() if result is None: result = {} result["task_verification"] = verification_result return status, message, result
[docs] def register_action_class(self, action_name: str, action_class: Type[BaseAction]): """ 注册单个动作类 Args: action_name: 动作名称 action_class: 动作类 Returns: self: 支持链式调用 """ self.action_manager.register_action_class(action_name, action_class) return self
[docs] def register_action_classes(self, action_classes_dict: Dict[str, Type[BaseAction]]): """ 批量注册多个动作类 Args: action_classes_dict: 动作名称到动作类的映射字典 Returns: self: 支持链式调用 """ self.action_manager.register_action_classes(action_classes_dict) return self
[docs] def register_attribute_actions(self): """ 加载属性动作配置(仅为保持向后兼容) Returns: self: 支持链式调用 """ from .actions.attribute_actions import AttributeAction AttributeAction.load_from_csv() return self
[docs] def get_pending_corporate_actions(self) -> Dict[str, Any]: """ 获取待处理的合作动作 Returns: Dict[str, Any]: 动作ID到动作数据的映射 """ # 从world_state获取待处理的合作动作 return getattr(self.action_manager.world_state, 'pending_corporate_actions', {})
[docs] def set_task_verifier(self, task_data: Dict[str, Any]): """ 设置任务验证器 Args: task_data: 任务数据,来自task.json文件 """ verification_config = self.config.get("task_verification", {}) enabled = verification_config.get("enabled", False) mode = verification_config.get("mode", "disabled") # 只要启用了验证且模式不是disabled,就创建验证器 if task_data and enabled and mode != "disabled": from utils.task_verifier import TaskVerifier env_manager = self.action_manager.env_manager self.task_verifier = TaskVerifier(task_data, env_manager, verification_config)
def _should_verify_tasks(self) -> bool: """ 检查是否应该进行任务验证 Returns: bool: 是否应该验证 """ verification_config = self.config.get("task_verification", {}) enabled = verification_config.get("enabled", False) mode = verification_config.get("mode", "disabled") return enabled and mode == "step_by_step" def _perform_task_verification(self) -> Optional[Dict[str, Any]]: """ 执行任务验证 Returns: Optional[Dict[str, Any]]: 验证结果,如果未启用验证则返回None """ if not self.task_verifier: return None try: verification_config = self.config.get("task_verification", {}) return_subtask_status = verification_config.get("return_subtask_status", True) if return_subtask_status: # 获取任务完成情况摘要 summary = self.task_verifier.get_completion_summary() # 获取子任务完成状态列表 subtask_completion_list = self.task_verifier.get_subtask_completion_list() result = { "completion_summary": summary, "subtask_completion_list": subtask_completion_list, "timestamp": self._get_current_timestamp() } return result except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"任务验证过程中发生错误: {e}") return {"error": f"Verification failed: {str(e)}"} return None def _get_current_timestamp(self) -> str: """获取当前时间戳""" import datetime return datetime.datetime.now().isoformat() def _handle_done_command(self) -> Tuple[ActionStatus, str, Optional[Dict[str, Any]]]: """ 处理done命令,进行全局任务验证 Returns: Tuple[ActionStatus, str, Optional[Dict]]: (执行状态, 反馈消息, 验证结果) """ verification_config = self.config.get("task_verification", {}) enabled = verification_config.get("enabled", False) mode = verification_config.get("mode", "disabled") if not enabled or mode == "disabled": return ActionStatus.SUCCESS, "Task verification is disabled", None if not self.task_verifier: return ActionStatus.INVALID, "Task verifier not initialized", None try: # 执行全局任务验证 verification_result = self._perform_task_verification() if verification_result: completion_summary = verification_result.get("completion_summary", {}) total_tasks = completion_summary.get("total_tasks", 0) completed_tasks = completion_summary.get("completed_tasks", 0) completion_rate = completion_summary.get("completion_rate", 0.0) message = f"DONE command executed - Global task verification: {completed_tasks}/{total_tasks} tasks completed ({completion_rate:.1%})" result_data = { "global_verification": verification_result, "is_all_completed": completion_rate >= 1.0, "completion_status": self.task_verifier.get_current_completion_status() } return ActionStatus.SUCCESS, message, result_data else: return ActionStatus.INVALID, "Task verification failed", None except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"Error occurred while processing done command: {e}") return ActionStatus.INVALID, f"Verification failed: {str(e)}", None
[docs] def get_task_verification_status(self) -> Optional[Dict[str, Any]]: """ 手动获取任务验证状态(用于手动查询) Returns: Optional[Dict[str, Any]]: 验证状态,如果未启用验证则返回None """ if not self.task_verifier: return None return self._perform_task_verification()
[docs] def get_agent_supported_actions_description(self, agent_ids: List[str]) -> str: """ 获取智能体支持的所有动作的字符串描述 Args: agent_ids: 智能体ID列表,支持单个或多个智能体 Returns: str: 包含所有支持动作的描述字符串(英文) """ return self.action_manager.get_agent_supported_actions_description(agent_ids)