Source code for OmniSimulator.agent.agent_manager

from typing import Dict, List, Optional, Any
import uuid
import random

from ..core.state import WorldState
from .agent import Agent

[docs] class AgentManager: """智能体管理器 - 负责管理所有智能体"""
[docs] def __init__(self, world_state: WorldState, config: Optional[Dict[str, Any]] = None): """ 初始化智能体管理器 Args: world_state: 世界状态对象 config: 全局配置字典,可选 """ self.world_state = world_state self.config = config or {} self.agents: Dict[str, Agent] = {} # agent_id -> Agent实例 # 如果配置中明确指定了agent_count,则自动生成agent if 'agent_count' in self.config and self.config.get('agent_count', 0) > 0: self._init_agents_from_config()
def _init_agents_from_config(self): agent_count = self.config.get('agent_count', 0) agent_types = self.config.get('agent_types', []) agent_init_mode = self.config.get('agent_init_mode', 'default') # 如果agent_count为0,则不创建任何智能体 if agent_count <= 0: return # 获取所有房间ID room_ids = list(self.world_state.graph.room_ids) if not room_ids: return # default模式:全部放第一个房间(排序保证一致性) if agent_init_mode == 'default': init_room = sorted(room_ids)[0] for i in range(agent_count): agent_type = agent_types[i % len(agent_types)] if agent_types else {} agent_id = f"agent_{i+1}" agent_name = f"Agent_{i+1}" agent_data = { "id": agent_id, "name": agent_name, "location_id": init_room, **agent_type } self.add_agent(agent_data) # random模式:随机分配房间 elif agent_init_mode == 'random': for i in range(agent_count): agent_type = agent_types[i % len(agent_types)] if agent_types else {} agent_id = f"agent_{i+1}" agent_name = f"Agent_{i+1}" room_id = random.choice(room_ids) agent_data = { "id": agent_id, "name": agent_name, "location_id": room_id, **agent_type } self.add_agent(agent_data)
[docs] def add_agent(self, agent_data: Dict[str, Any]) -> Optional[str]: """ 添加智能体 Args: agent_data: 智能体数据字典 Returns: str: 添加成功则返回智能体ID,否则返回None """ try: # 如果没有指定ID,则生成一个 if 'id' not in agent_data: agent_data['id'] = str(uuid.uuid4()) # 创建智能体对象 # Agent.from_dict会处理max_grasp_limit的默认值以及properties的加载 agent = Agent.from_dict(agent_data) # 检查位置是否存在 if not self.world_state.graph.get_node(agent.location_id): print(f"位置不存在: {agent.location_id}") return None # 添加智能体到世界状态 (字典表示) self.world_state.add_agent(agent.id, agent.to_dict()) # 同时将智能体添加到图节点中,类型设为"agent" (字典表示) agent_dict = agent.to_dict() agent_dict['type'] = 'AGENT' # 确保智能体节点有明确的类型 self.world_state.graph.add_node(agent.id, agent_dict) # 建立智能体与位置的关系 self.world_state.graph.add_edge(agent.location_id, agent.id, {"type": "in"}) # 存储智能体实例 self.agents[agent.id] = agent # 初始化near_objects为空 agent.near_objects = set() return agent.id except Exception as e: print(f"添加智能体时出错: {e}") return None
[docs] def get_agent(self, agent_id: str) -> Optional[Agent]: """ 获取智能体实例 Args: agent_id: 智能体ID Returns: Agent: 智能体实例,如果不存在则返回None """ return self.agents.get(agent_id)
[docs] def update_agent(self, agent_id: str, update_data: Dict[str, Any]) -> bool: """ 更新智能体数据 Args: agent_id: 智能体ID update_data: 更新数据字典 Returns: bool: 更新是否成功 """ agent = self.get_agent(agent_id) if not agent: return False # 更新智能体实例 for key, value in update_data.items(): if hasattr(agent, key): # 对于特殊字段,需要确保正确的数据类型 if key == 'near_objects': # 确保near_objects是集合 setattr(agent, key, set(value) if isinstance(value, (list, tuple)) else value) elif key == 'abilities': # 确保abilities是集合 setattr(agent, key, set(value) if isinstance(value, (list, tuple)) else value) elif key == 'ability_sources': # 确保ability_sources的值是集合 if isinstance(value, dict): converted_sources = {} for ability, sources in value.items(): converted_sources[ability] = set(sources) if isinstance(sources, (list, tuple)) else sources setattr(agent, key, converted_sources) else: setattr(agent, key, value) else: setattr(agent, key, value) # 更新世界状态中的智能体数据 self.world_state.update_agent(agent_id, agent.to_dict()) return True
[docs] def move_agent(self, agent_id: str, new_location_id: str) -> bool: """ 移动智能体到新位置 Args: agent_id: 智能体ID new_location_id: 新位置ID Returns: bool: 移动是否成功 """ agent = self.get_agent(agent_id) if not agent: return False # 只允许移动到房间 if new_location_id not in self.world_state.graph.room_ids: return False old_location_id = agent.location_id agent.move_to(new_location_id) self.world_state.update_agent(agent_id, agent.to_dict()) if old_location_id: self.world_state.graph.remove_edge(old_location_id, agent_id) self.world_state.graph.add_edge(new_location_id, agent_id, {"type": "in"}) agent.update_near_objects() return True
[docs] def add_to_inventory(self, agent_id: str, object_id: str) -> bool: """ 将物体添加到智能体库存 Args: agent_id: 智能体ID object_id: 物体ID Returns: bool: 添加是否成功 """ agent = self.get_agent(agent_id) if not agent: return False # 尝试抓取物体 success = agent.grab_object(object_id) if success: # 更新世界状态中的智能体数据 self.world_state.update_agent(agent_id, agent.to_dict()) return success
[docs] def remove_from_inventory(self, agent_id: str, object_id: str) -> bool: """ 从智能体库存移除物体 Args: agent_id: 智能体ID object_id: 物体ID Returns: bool: 移除是否成功 """ agent = self.get_agent(agent_id) if not agent: return False # 尝试放下物体 success = agent.drop_object(object_id) if success: # 更新世界状态中的智能体数据 self.world_state.update_agent(agent_id, agent.to_dict()) return success
[docs] def get_agent_inventory(self, agent_id: str) -> List[str]: """ 获取智能体库存 Args: agent_id: 智能体ID Returns: List[str]: 物体ID列表 """ agent = self.get_agent(agent_id) if not agent: return [] return agent.inventory.copy()
[docs] def get_all_agents(self) -> Dict[str, Agent]: """ 获取所有智能体 Returns: Dict[str, Agent]: 智能体ID到智能体实例的映射 """ return self.agents.copy()