"""
任务执行器 - 执行单个任务的详细步骤
"""
import logging
from typing import Dict, List, Any, Tuple
from datetime import datetime
from enum import Enum
from .trajectory_recorder import TrajectoryRecorder
logger = logging.getLogger(__name__)
class ActionStatus(Enum):
"""动作执行状态"""
SUCCESS = "SUCCESS"
FAILED = "FAILED"
TIMEOUT = "TIMEOUT"
INVALID = "INVALID"
[docs]
class TaskExecutor:
"""任务执行器 - 执行单个任务的详细步骤"""
[docs]
def __init__(self, simulator, agent_adapter, trajectory_recorder: TrajectoryRecorder):
"""
初始化任务执行器
Args:
simulator: 模拟器实例
agent_adapter: 智能体适配器
trajectory_recorder: 轨迹记录器
"""
self.simulator = simulator
self.agent_adapter = agent_adapter
self.trajectory_recorder = trajectory_recorder
logger.debug("🔧 任务执行器初始化完成")
[docs]
def execute_task(self, task: Dict[str, Any], task_index: int,
max_steps: int = 50) -> Dict[str, Any]:
"""
执行单个任务
Args:
task: 任务信息
task_index: 任务索引(从1开始)
max_steps: 最大步数
Returns:
Dict: 任务执行结果
"""
logger.info(f"🎯 开始执行任务 {task_index}: {task.get('task_description', 'Unknown')}")
start_time = datetime.now()
# 设置任务描述给智能体
task_description = task.get('task_description', '')
self.agent_adapter.set_task(task_description)
# 设置当前任务索引给智能体(用于LLM交互记录)
if hasattr(self.agent_adapter, 'agent') and hasattr(self.agent_adapter.agent, '__dict__'):
self.agent_adapter.agent.current_task_index = task_index
# 执行步骤循环
execution_result = self._execute_step_loop(task, task_index, max_steps)
# 计算执行时间
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 生成任务结果
task_result = self._generate_task_result(
task, task_index, execution_result, start_time, end_time, duration
)
logger.info(f"✅ 任务 {task_index} 执行完成,状态: {task_result['status']}")
return task_result
def _execute_step_loop(self, task: Dict[str, Any], task_index: int,
max_steps: int) -> Dict[str, Any]:
"""执行步骤循环 - 核心执行逻辑"""
successful_steps = 0
failed_steps = 0
llm_interactions = 0
done_command_step = -1
actual_completion_step = -1
step_start_time = datetime.now() # 记录步骤开始时间
for step in range(max_steps):
logger.debug(f"🔄 执行步骤 {step + 1}/{max_steps}")
try:
# 1. 执行智能体步骤(包含决策和执行)
status, message, result = self.agent_adapter.step()
# 2. 获取执行的动作 - 从智能体的最后一次动作获取
action = self._get_last_action_from_agent()
# 3. 检查是否有LLM交互(用于统计,不重复记录)
llm_info = self._get_llm_interaction_info()
if llm_info:
llm_interactions += 1
# 4. 记录动作执行到轨迹记录器
agent_id = self._get_agent_id()
try:
# 确保status是字符串格式
status_str = status.name if hasattr(status, 'name') else str(status)
self.trajectory_recorder.record_action_execution(
task_index, step + 1, action, status_str,
message, result or {}, agent_id
)
except Exception as record_error:
logger.error(f"记录轨迹失败: {record_error}")
# 5. 更新统计
if status == ActionStatus.SUCCESS:
successful_steps += 1
else:
failed_steps += 1
# 6. 检查DONE命令
if "DONE" in action.upper() and done_command_step == -1:
done_command_step = step
# 7. 检查任务完成状态
completion_status = self._check_task_completion(task, step)
if completion_status and actual_completion_step == -1:
actual_completion_step = step
self._record_task_completion(task_index, step)
# 8. 检查结束条件
if self._should_terminate(action, completion_status):
logger.debug(f"🏁 任务在步骤 {step + 1} 结束")
break
except Exception as e:
logger.error(f"❌ 步骤 {step + 1} 执行失败: {e}")
failed_steps += 1
# 记录失败的动作
agent_id = self._get_agent_id()
self._record_action_execution(
task_index, step, "ERROR", ActionStatus.FAILED, str(e), {}, agent_id
)
# 生成执行日志数据
execution_log = {
'task_index': task_index,
'task_description': task.get('task_description', ''),
'task_category': task.get('task_category', ''),
'agent_type': 'single',
'start_time': step_start_time.isoformat(),
'end_time': datetime.now().isoformat(),
'total_steps': step + 1,
'successful_steps': successful_steps,
'failed_steps': failed_steps,
'llm_interactions': llm_interactions,
'done_command_step': done_command_step,
'actual_completion_step': actual_completion_step,
'command_success_rate': successful_steps / (step + 1) if step >= 0 else 0.0
}
return {
'total_steps': step + 1,
'successful_steps': successful_steps,
'failed_steps': failed_steps,
'llm_interactions': llm_interactions,
'done_command_step': done_command_step,
'actual_completion_step': actual_completion_step,
'execution_log': execution_log
}
def _get_llm_interaction_info(self) -> Dict[str, Any]:
"""获取LLM交互信息 - 仅用于统计,不重复记录"""
try:
# 尝试从智能体获取LLM交互信息
if hasattr(self.agent_adapter, 'get_llm_interaction_info'):
llm_info = self.agent_adapter.get_llm_interaction_info()
if llm_info:
return llm_info
except Exception as e:
logger.warning(f"获取LLM交互信息失败: {e}")
return None
def _get_agent_id(self) -> str:
"""获取智能体ID"""
try:
# 对于单智能体模式
if hasattr(self.agent_adapter, 'agent') and hasattr(self.agent_adapter.agent, 'agent_id'):
return self.agent_adapter.agent.agent_id
# 对于多智能体模式
elif hasattr(self.agent_adapter, 'primary_agent') and hasattr(self.agent_adapter.primary_agent, 'agent_id'):
return self.agent_adapter.primary_agent.agent_id
else:
return 'unknown'
except Exception:
return 'unknown'
def _get_last_action_from_agent(self) -> str:
"""从智能体获取最后执行的动作"""
try:
# 尝试从智能体的历史记录中获取最后一次动作
if hasattr(self.agent_adapter, 'agent'):
agent = self.agent_adapter.agent
if hasattr(agent, 'history') and agent.history:
last_entry = agent.history[-1]
if isinstance(last_entry, dict) and 'action' in last_entry:
action = last_entry['action']
# 特殊处理中心化智能体的COORDINATE动作
if action == 'COORDINATE' and 'coordination_details' in last_entry:
# 从coordination_details中提取实际的协作命令
coordination_details = last_entry['coordination_details']
agent_1_action = coordination_details.get('agent_1', {}).get('action', 'UNKNOWN')
agent_2_action = coordination_details.get('agent_2', {}).get('action', 'UNKNOWN')
# 如果两个智能体执行相同的协作命令,返回该命令
if agent_1_action == agent_2_action and agent_1_action.startswith('CORP_'):
return agent_1_action
# 否则返回组合格式
elif agent_1_action != 'UNKNOWN' or agent_2_action != 'UNKNOWN':
return f"agent_1:{agent_1_action}, agent_2:{agent_2_action}"
return action
# 如果没有历史记录,尝试获取当前动作
if hasattr(agent, 'current_action'):
return agent.current_action
return "UNKNOWN"
except Exception as e:
logger.warning(f"获取最后动作失败: {e}")
return "UNKNOWN"
def _record_task_completion(self, task_index: int, step: int):
"""记录任务完成状态"""
try:
self.trajectory_recorder.record_task_completion(task_index, step)
except Exception as e:
logger.error(f"记录任务完成失败: {e}")
def _check_task_completion(self, task: Dict[str, Any], step: int) -> Dict[str, Any]:
"""检查任务完成状态"""
try:
# 方法1:从模拟器的task_verifier检查任务完成状态
if hasattr(self.simulator, 'task_verifier') and self.simulator.task_verifier:
completion_status = self.simulator.task_verifier.get_current_completion_status()
if completion_status and completion_status.get('completed_tasks', 0) > 0:
return {
'completed': True,
'step': step,
'validation_result': completion_status,
'timestamp': datetime.now().isoformat()
}
# 方法2:从action_handler检查任务验证状态
elif hasattr(self.simulator, 'action_handler') and self.simulator.action_handler:
verification_status = self.simulator.action_handler.get_task_verification_status()
if verification_status and verification_status.get('completion_summary', {}).get('completed_tasks', 0) > 0:
return {
'completed': True,
'step': step,
'validation_result': verification_status,
'timestamp': datetime.now().isoformat()
}
# 方法3:直接调用模拟器的任务验证方法
elif hasattr(self.simulator, 'get_task_verification_status'):
verification_status = self.simulator.get_task_verification_status()
if verification_status and verification_status.get('summary', {}).get('completed_tasks', 0) > 0:
return {
'completed': True,
'step': step,
'validation_result': verification_status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.warning(f"检查任务完成状态失败: {e}")
return None
def _should_terminate(self, action: str, completion_status: Dict[str, Any]) -> bool:
"""判断是否应该终止执行"""
# 检查是否为中心化模式的双DONE
if hasattr(self.agent_adapter, 'agent') and hasattr(self.agent_adapter.agent, 'mode'):
if self.agent_adapter.agent.mode == "centralized":
# 中心化模式:检查是否两个智能体都输出DONE
if hasattr(self.agent_adapter.agent, 'last_llm_interaction'):
last_interaction = self.agent_adapter.agent.last_llm_interaction
if last_interaction and 'extracted_action' in last_interaction:
extracted_action = last_interaction['extracted_action']
# 检查是否包含两个DONE
if 'agent_1=DONE' in extracted_action and 'agent_2=DONE' in extracted_action:
logger.info("🏁 中心化模式:两个智能体都输出DONE,任务终止")
return True
# 如果只有一个DONE,继续执行
return False
# 单智能体模式:只要输出DONE就终止
if "DONE" in action.upper():
return True
# 不再因为任务完成就自动终止,必须等待DONE命令
# completion_status参数保留用于兼容性,但不再用于终止判断
return False
def _calculate_evaluation_metrics(self, actually_completed: bool, model_claimed_done: bool) -> Dict[str, bool]:
"""
计算四种评估情况
Args:
actually_completed: 模拟器验证的实际完成状态
model_claimed_done: 模型声称的完成状态(是否输出DONE)
Returns:
Dict: 包含四种评估情况的字典
"""
return {
# 真正例:模型说完成且模拟器验证完成
'true_positive': model_claimed_done and actually_completed,
# 假正例:模型说完成但模拟器验证未完成
'false_positive': model_claimed_done and not actually_completed,
# 真负例:模型说未完成且模拟器验证未完成
'true_negative': not model_claimed_done and not actually_completed,
# 假负例:模型说未完成但模拟器验证完成
'false_negative': not model_claimed_done and actually_completed
}
def _generate_task_result(self, task: Dict[str, Any], task_index: int,
execution_result: Dict[str, Any],
start_time: datetime, end_time: datetime,
duration: float) -> Dict[str, Any]:
"""生成任务结果"""
total_steps = execution_result['total_steps']
successful_steps = execution_result['successful_steps']
failed_steps = execution_result['failed_steps']
# 计算成功率
command_success_rate = successful_steps / total_steps if total_steps > 0 else 0.0
# 判断任务状态
actual_completion_step = execution_result['actual_completion_step']
done_command_step = execution_result['done_command_step']
# 任务是否实际完成(以模拟器判断为准)
actually_completed = actual_completion_step != -1
# 模型是否声称完成(输出了DONE命令)
model_claimed_done = done_command_step != -1
# 计算四种评估情况
evaluation_result = self._calculate_evaluation_metrics(actually_completed, model_claimed_done)
# 确定最终状态
if actually_completed:
status = 'completed'
elif total_steps >= 50: # 达到最大步数
status = 'timeout'
else:
status = 'failed'
return {
'task_index': task_index,
'task_description': task.get('task_description', ''),
'task_category': task.get('task_category', 'unknown'),
'status': status,
'task_executed': True,
'subtask_completed': actually_completed,
'model_claimed_done': model_claimed_done,
'actual_completion_step': actual_completion_step,
'done_command_step': done_command_step,
'total_steps': total_steps,
'successful_steps': successful_steps,
'failed_steps': failed_steps,
'command_success_rate': command_success_rate,
'evaluation_result': evaluation_result,
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'duration_seconds': duration,
'llm_interactions': execution_result['llm_interactions']
}