from typing import Dict, List, Set, Tuple, Optional, Any
import queue
class EnvironmentGraph:
"""环境图类 - 用于表示环境中的实体(房间、物体)及其关系"""
def __init__(self):
"""初始化环境图"""
# 节点存储 - 房间和物体都是节点
self.nodes = {} # node_id -> node_data
# 边存储 - 从节点到节点的关系
self.edges = {} # from_id -> {to_id -> [relation1, relation2, ...]}
# 房间ID集合
self.room_ids = set()
def add_node(self, node_id: str, node_data: Dict, is_room: bool = False) -> None:
"""添加节点到图"""
self.nodes[node_id] = node_data
if is_room:
self.room_ids.add(node_id)
# 初始化边字典
if node_id not in self.edges:
self.edges[node_id] = {}
def add_edge(self, from_id: str, to_id: str, relation: Dict) -> None:
"""添加边到图"""
if from_id not in self.edges:
self.edges[from_id] = {}
if to_id not in self.edges[from_id]:
self.edges[from_id][to_id] = []
self.edges[from_id][to_id].append(relation)
def remove_edge(self, from_id: str, to_id: str, relation_type: Optional[str] = None) -> None:
"""从图中移除边"""
if from_id not in self.edges or to_id not in self.edges[from_id]:
return
if relation_type is None:
# 移除所有关系
del self.edges[from_id][to_id]
else:
# 移除指定类型的关系
self.edges[from_id][to_id] = [
rel for rel in self.edges[from_id][to_id]
if rel.get('type') != relation_type
]
if not self.edges[from_id][to_id]:
del self.edges[from_id][to_id]
def get_node(self, node_id: str) -> Optional[Dict]:
"""获取节点数据"""
return self.nodes.get(node_id)
def get_edges(self, from_id: str, to_id: Optional[str] = None) -> List[Dict]:
"""获取边关系"""
if from_id not in self.edges:
return []
if to_id is not None:
return self.edges[from_id].get(to_id, [])
# 获取从from_id出发的所有边
all_edges = []
for target_id, relations in self.edges[from_id].items():
for relation in relations:
edge_info = relation.copy()
edge_info['from_id'] = from_id
edge_info['to_id'] = target_id
all_edges.append(edge_info)
return all_edges
def get_incoming_edges(self, to_id: str) -> Dict[str, Dict]:
"""获取指向to_id的所有边"""
incoming = {}
for from_id, targets in self.edges.items():
if to_id in targets:
# 简化:只取第一个关系
incoming[from_id] = targets[to_id][0]
return incoming
def get_room_for_object(self, object_id: str) -> Optional[str]:
"""获取物体所在的房间ID"""
if object_id in self.room_ids:
return object_id # 如果是房间,返回自己
# 使用辅助函数递归查找
visited = set()
return self._find_container_room(object_id, visited)
def _find_container_room(self, object_id: str, visited: Set[str]) -> Optional[str]:
"""递归查找物体所在的房间"""
if object_id in visited:
return None # 防止循环引用
visited.add(object_id)
# 查找直接包含该物体的容器
containers = self.get_incoming_edges(object_id)
for container_id, edge_data in containers.items():
# 如果容器是房间,直接返回
if container_id in self.room_ids:
return container_id
# 否则递归查找容器的房间
room_id = self._find_container_room(container_id, visited)
if room_id:
return room_id
return None
def get_objects_in_room(self, room_id: str, recursive: bool = True) -> List[str]:
"""获取房间中的所有物体ID"""
if room_id not in self.room_ids:
return []
# 获取直接在房间中的物体
direct_objects = list(self.edges.get(room_id, {}).keys())
if not recursive:
return direct_objects
# 递归获取容器中的物体
all_objects = direct_objects.copy()
visited = set()
def collect_contained_objects(container_id: str, visited: Set[str] = None) -> List[str]:
"""递归收集容器中的所有物体"""
if visited is None:
visited = set()
if container_id in visited:
return [] # 防止循环引用
visited.add(container_id)
contained_objects = []
for object_id in self.edges.get(container_id, {}).keys():
if object_id not in self.room_ids and object_id not in visited:
contained_objects.append(object_id)
# 递归获取这个物体中的物体(如果它是容器)
contained_objects.extend(collect_contained_objects(object_id, visited))
return contained_objects
# 对每个直接物体,递归获取其中的物体
for obj_id in direct_objects:
if obj_id not in self.room_ids: # 跳过房间
all_objects.extend(collect_contained_objects(obj_id, visited))
return all_objects
def find_path(self, start_room_id: str, end_room_id: str) -> Optional[List[str]]:
"""
查找从起始房间到目标房间的路径
Args:
start_room_id: 起始房间ID
end_room_id: 目标房间ID
Returns:
List[str]: 房间ID路径,如果不存在则返回None
"""
if start_room_id not in self.room_ids or end_room_id not in self.room_ids:
return None
if start_room_id == end_room_id:
return [start_room_id]
# 使用BFS查找最短路径
visited = {start_room_id}
q = queue.Queue()
q.put((start_room_id, [start_room_id]))
while not q.empty():
current_id, path = q.get()
# 获取相连的房间
for next_room_id in self.edges.get(current_id, {}):
if next_room_id in self.room_ids and next_room_id not in visited:
new_path = path + [next_room_id]
if next_room_id == end_room_id:
return new_path
visited.add(next_room_id)
q.put((next_room_id, new_path))
return None
def describe_agent_natural_language(self, agent_id: str, agent: Dict) -> str:
"""
用自然语言详细描述智能体状态
Args:
agent_id: 智能体ID
agent: 智能体数据字典
Returns:
str: 智能体描述文本
"""
# 获取基本信息,确保所有属性存在默认值
name = agent.get('name', agent_id)
location_id = agent.get('location_id', 'Unknown Location')
# 获取位置名称
location_node = self.get_node(location_id)
location_name = location_node.get('name', location_id) if location_node else location_id
# 基本信息部分
lines = [f"▶ Agent: {name} (ID: {agent_id})"]
lines.append(f" • Location: {location_name} (ID: {location_id})")
# 物理属性部分
properties = agent.get('properties', {})
max_grasp = agent.get('max_grasp_limit', 'Unknown')
inventory = agent.get('inventory', [])
inventory_count = len(inventory)
current_weight = agent.get('current_weight', 0)
max_weight = properties.get('max_weight', 'Unknown')
lines.append(" • Physical Properties:")
lines.append(f" - Load: {current_weight}/{max_weight}kg")
lines.append(f" - Grasp Capacity: {inventory_count}/{max_grasp} items")
# 其他关键属性
for key, value in properties.items():
if key not in ['max_weight', 'max_length', 'max_width', 'max_height']:
lines.append(f" - {key}: {value}")
# 能力信息部分
abilities = agent.get('abilities', set())
if not isinstance(abilities, (set, list)):
abilities = set()
ability_sources = agent.get('ability_sources', {})
if abilities:
lines.append(" • Abilities:")
for ability in sorted(abilities):
sources = ability_sources.get(ability, set())
if not isinstance(sources, (set, list)):
sources = set()
source_names = []
for source_id in sources:
source_obj = self.get_node(source_id)
source_name = source_obj.get('name', source_id) if source_obj else source_id
source_names.append(f"{source_name} (ID: {source_id})")
source_str = f", Source: {', '.join(source_names)}" if source_names else ""
lines.append(f" - {ability}{source_str}")
else:
lines.append(" • Abilities: None")
# 库存物品部分
if inventory:
lines.append(" • Inventory:")
for obj_id in inventory:
if obj_id.startswith('corp:'):
# 合作持有的物体
real_obj_id = obj_id.split(':', 1)[1]
obj = self.get_node(real_obj_id)
if obj:
obj_name = obj.get('name', real_obj_id)
obj_type = obj.get('type', 'Unknown Type')
holders = obj.get('holders', [])
holder_ids = ', '.join(holders)
lines.append(f" - {obj_name} (ID: {real_obj_id}, Type: {obj_type})")
lines.append(f" Cooperative Holders: {holder_ids}")
else:
# 普通物品
obj = self.get_node(obj_id)
if obj:
obj_name = obj.get('name', obj_id)
obj_type = obj.get('type', 'Unknown Type')
properties = obj.get('properties', {})
lines.append(f" - {obj_name} (ID: {obj_id}, Type: {obj_type})")
# 物品能力
if 'provides_abilities' in properties:
abilities = properties['provides_abilities']
if isinstance(abilities, list) and abilities:
lines.append(f" Provides Abilities: {', '.join(abilities)}")
elif isinstance(abilities, str):
lines.append(f" Provides Abilities: {abilities}")
# 物品其他属性
for key, value in properties.items():
if key != 'provides_abilities':
lines.append(f" {key}: {value}")
else:
lines.append(" • Inventory: None")
# 近邻物体部分
# near_objects = agent.get('near_objects', set())
# if not isinstance(near_objects, (set, list)):
# near_objects = set()
# if near_objects:
# lines.append(" • 近邻物体(位置接近的物体):")
# for obj_id in sorted(near_objects):
# obj = self.get_node(obj_id)
# if obj:
# obj_name = obj.get('name', obj_id)
# obj_type = obj.get('type', '未知类型')
# lines.append(f" - {obj_name} (ID: {obj_id}, 类型: {obj_type})")
# else:
# lines.append(" • 近邻物体(位置接近的物体):无")
# 合作模式部分
corporate_mode = agent.get('corporate_mode_object_id')
if corporate_mode:
obj = self.get_node(corporate_mode)
if obj:
obj_name = obj.get('name', corporate_mode)
lines.append(f" • Cooperative Mode: Currently cooperatively holding {obj_name} (ID: {corporate_mode})")
else:
lines.append(f" • Cooperative Mode: Currently cooperatively holding object (ID: {corporate_mode})")
return '\n'.join(lines)
def describe_room_natural_language(self, room_id: str, agents: Optional[Dict[str, Dict]] = None, sim_config: Optional[Dict[str, Any]] = None) -> str:
"""
用自然语言描述房间内容
Args:
room_id: 房间ID
agents: 智能体字典 {agent_id -> agent_data}
sim_config: 模拟器配置
Returns:
str: 房间描述文本
"""
if sim_config is None:
sim_config = getattr(self, 'sim_config', {}) or {}
show_properties = sim_config.get('nlp_show_object_properties', True)
only_discovered = sim_config.get('nlp_only_show_discovered', True)
if room_id not in self.room_ids:
return f"Room {room_id} does not exist."
room_node = self.get_node(room_id)
room_name = room_node.get('name', room_id) if room_node else room_id
# 获取所有物体(递归)
all_objects = self.get_objects_in_room(room_id, recursive=True)
if only_discovered:
all_objects = [oid for oid in all_objects if (self.get_node(oid) or {}).get('is_discovered', False)]
# 获取房间内所有agent
agent_lines = []
agent_ids_in_room = set()
if agents:
for agent_id, agent in agents.items():
if agent.get('location_id') == room_id:
agent_ids_in_room.add(agent_id)
# 简要描述智能体
name = agent.get('name', agent_id)
inventory = agent.get('inventory', [])
inventory_count = len(inventory)
abilities = agent.get('abilities', set())
if not isinstance(abilities, (set, list)):
abilities = set()
abilities_count = len(abilities)
agent_lines.append(f"- Agent: {name} (ID: {agent_id})")
agent_lines.append(f" Items held: {inventory_count}, Abilities: {abilities_count}")
# 构建物体的空间关系映射(obj_id -> (relation, container_id))
spatial_rel = {}
for from_id, to_dict in self.edges.items():
for to_id, rels in to_dict.items():
for rel in rels:
if rel.get('type') in ('in', 'on'):
spatial_rel[to_id] = (rel.get('type'), from_id)
# 递归输出物体(不包括agent和被agent持有的物体,也不包括被合作持有的物体)
def output_objects(parent_id, indent=0, visited=None):
if visited is None:
visited = set()
lines = []
# 只找直接属于parent_id的物体,且不是AGENT也不是ROOM,且(如需)is_discovered为True
children = [
oid for oid in self.edges.get(parent_id, {})
if (self.get_node(oid) or {}).get('type', None) not in ('AGENT', 'ROOM')
and (not only_discovered or (self.get_node(oid) or {}).get('is_discovered', False))
]
# 排除被agent持有的物体
inventory_objs = set()
if agents:
for agent in agents.values():
inventory_objs.update(agent.get('inventory', []))
# 排除被合作持有的物体
children = [oid for oid in children if oid not in inventory_objs and not ((self.get_node(oid) or {}).get('holders', []) )]
for oid in children:
if oid in visited:
continue # 防止递归死循环
visited.add(oid)
obj = self.get_node(oid)
if not obj:
continue
obj_name = obj.get('name', oid)
obj_type = obj.get('type', 'Unknown Type')
# 基本信息
prefix = ' ' * indent
lines.append(f"{prefix}◆ {obj_name} (ID: {oid}, Type: {obj_type})")
# 空间关系
if oid in spatial_rel:
rel, container_id = spatial_rel[oid]
container = self.get_node(container_id)
container_name = container.get('name', container_id) if container else container_id
rel_text = "inside" if rel == "in" else "on top of" if rel == "on" else "at"
lines.append(f"{prefix} • Location: {rel_text} {container_name} (ID: {container_id})")
# 状态信息
states = obj.get('states', {})
if states:
lines.append(f"{prefix} • States:")
for k, v in states.items():
lines.append(f"{prefix} - {k}: {v}")
# 属性信息
if show_properties:
props = obj.get('properties', {})
if props:
lines.append(f"{prefix} • Properties:")
# 物理属性(尺寸和重量)
size = props.get('size')
if size is not None and isinstance(size, (list, tuple)) and len(size) == 3:
length, width, height = size # [长,宽,高]
lines.append(f"{prefix} - Size: {length}m x {width}m x {height}m (L x W x H)")
else:
if 'length' in props:
lines.append(f"{prefix} - Length: {props['length']}m")
if 'width' in props:
lines.append(f"{prefix} - Width: {props['width']}m")
if 'height' in props:
lines.append(f"{prefix} - Height: {props['height']}m")
if 'weight' in props:
lines.append(f"{prefix} - Weight: {props['weight']}kg")
# 物体提供的能力
if 'provides_abilities' in props:
abilities = props['provides_abilities']
if isinstance(abilities, list) and abilities:
lines.append(f"{prefix} - Provides Abilities: {', '.join(abilities)}")
elif isinstance(abilities, str):
lines.append(f"{prefix} - Provides Abilities: {abilities}")
# 其他所有属性
for k, v in props.items():
if k not in ['size', 'length', 'width', 'height', 'weight', 'provides_abilities']:
lines.append(f"{prefix} - {k}: {v}")
# 递归输出子物体
sub_lines = output_objects(oid, indent+1, visited)
if sub_lines:
lines.append(f"{prefix} • Contains Objects:")
lines.extend(sub_lines)
return lines
# 构建完整描述
desc_lines = [f"▶ Room: {room_name} (ID: {room_id})"]
# 房间属性
room_props = room_node.get('properties', {})
if room_props:
desc_lines.append(" • Room Properties:")
for k, v in room_props.items():
desc_lines.append(f" - {k}: {v}")
# 房间连接
connected_rooms = []
for to_id, rels in self.edges.get(room_id, {}).items():
for rel in rels:
if rel.get('type') == 'connected_to' and to_id in self.room_ids:
room = self.get_node(to_id)
room_name = room.get('name', to_id) if room else to_id
connected_rooms.append(f"{room_name} (ID: {to_id})")
if connected_rooms:
desc_lines.append(" • Connected Rooms: " + ", ".join(connected_rooms))
# 智能体
if agent_lines:
desc_lines.append(" • Agents:")
desc_lines.extend([' ' + line for line in agent_lines])
# 物体
obj_lines = output_objects(room_id)
if obj_lines:
desc_lines.append(" • Objects:")
desc_lines.extend([" " + line for line in obj_lines])
else:
desc_lines.append(" • Objects: Not yet discovered")
# 被合作持有的物体
cooperative_objs = []
for obj_id, obj in self.nodes.items():
holders = obj.get('holders', [])
if holders and len(holders) > 1:
# 检查所有holder都在本房间
if agents and all(agents.get(aid, {}).get('location_id') == room_id for aid in holders if aid in agents):
obj_name = obj.get('name', obj_id)
obj_type = obj.get('type', 'Unknown Type')
holder_names = []
for hid in holders:
if hid in agents:
holder_names.append(f"{agents[hid]['name']} (ID: {hid})")
else:
holder_names.append(hid)
cooperative_objs.append(f"- {obj_name} (ID: {obj_id}, Type: {obj_type})")
cooperative_objs.append(f" Holders: {', '.join(holder_names)}")
if cooperative_objs:
desc_lines.append(" • Cooperatively Held Objects:")
desc_lines.extend([' ' + line for line in cooperative_objs])
return '\n'.join(desc_lines)
def describe_environment_natural_language(self, agents: Optional[Dict[str, Dict]] = None, sim_config: Optional[Dict[str, Any]] = None) -> str:
"""
用自然语言描述整个环境(所有房间及其内容)和所有智能体状态。
Args:
agents: 智能体字典 {agent_id -> agent_data}
sim_config: 模拟器配置
- nlp_show_object_properties: 是否输出家具和物品的详细属性
- nlp_only_show_discovered: 只描述已发现内容
Returns:
str: 环境和智能体描述文本
"""
if sim_config is None:
sim_config = getattr(self, 'sim_config', {}) or {}
sections = []
# 环境概述
sections.append("================ Environment Overview ================")
room_count = len(self.room_ids)
object_count = sum(1 for node in self.nodes.values() if node.get('type') not in ['ROOM', 'AGENT'])
agent_count = len(agents) if agents else 0
sections.append(f"Total: {room_count} rooms, {object_count} objects, {agent_count} agents")
sections.append("")
# 房间描述
sections.append("================ Room Details ================")
for room_id in self.room_ids:
room_desc = self.describe_room_natural_language(room_id, agents, sim_config=sim_config)
sections.append(room_desc)
sections.append("") # 空行分隔不同房间
# 智能体详细描述
if agents:
sections.append("================ Agent Details ================")
for agent_id, agent in agents.items():
agent_desc = self.describe_agent_natural_language(agent_id, agent)
sections.append(agent_desc)
sections.append("") # 空行分隔不同智能体
return '\n'.join(sections)
[docs]
class WorldState:
"""世界状态类 - 维护整个模拟世界的状态"""
[docs]
def __init__(self):
self.graph = EnvironmentGraph()
self.agents = {} # agent_id -> agent_data
[docs]
def add_agent(self, agent_id: str, agent_data: Dict) -> None:
"""添加智能体到世界状态"""
self.agents[agent_id] = agent_data
[docs]
def update_agent(self, agent_id: str, update_data: Dict) -> None:
"""更新智能体状态"""
if agent_id in self.agents:
self.agents[agent_id].update(update_data)
[docs]
def get_agent(self, agent_id: str) -> Optional[Dict]:
"""获取智能体数据"""
return self.agents.get(agent_id)
[docs]
def get_object(self, object_id: str) -> Optional[Dict]:
"""获取物体数据"""
return self.graph.get_node(object_id)
[docs]
def is_object_accessible_to_agent(self, object_id: str, agent_id: str) -> Tuple[bool, str]:
"""检查物体是否对智能体可访问"""
agent = self.get_agent(agent_id)
if not agent:
return False, f"Agent {agent_id} does not exist"
obj = self.get_object(object_id)
if not obj:
return False, f"Object {object_id} does not exist"
# 注释掉检查同一房间的代码,由上层功能执行
# 检查智能体和物体是否在同一房间
# agent_room = agent.get("location_id")
# obj_room = self.graph.get_room_for_object(object_id)
#
# if agent_room != obj_room:
# return False, f"智能体 {agent_id} 不在物体 {object_id} 所在的房间"
# 检查物体是否在封闭容器中
# 只有 "in" 关系的物品需要检查容器是否打开
# "on" 关系的物品可以直接拿取,不需要打开容器
# 查找物体的直接容器和关系类型
for potential_container_id, edges_dict in self.graph.edges.items():
if object_id in edges_dict:
relations_list = edges_dict[object_id]
# relations_list 是一个关系列表,每个关系是一个字典
if isinstance(relations_list, list) and relations_list:
# 取第一个关系(通常一个物体只有一个位置关系)
relation = relations_list[0]
relation_type = relation.get("type", "in")
# 只有 "in" 关系才需要检查容器是否打开
# "on" 关系的物品可以直接拿取
if relation_type == "in":
container = self.get_object(potential_container_id)
if container and "states" in container:
# 检查容器是否有 is_open 状态且为 False
if container["states"].get("is_open") is False:
return False, f"Object {object_id} is in closed container {container['name']}"
return True, "Object is accessible"
[docs]
def describe_agent_natural_language(self, agent_id: str, agent: Dict = None) -> str:
"""转发到EnvironmentGraph的describe_agent_natural_language方法"""
if agent is None:
agent = self.get_agent(agent_id)
if not agent:
return f"Agent {agent_id} does not exist"
return self.graph.describe_agent_natural_language(agent_id, agent)
[docs]
def describe_room_natural_language(self, room_id: str, agents: Optional[Dict[str, Dict]] = None, sim_config: Optional[Dict[str, Any]] = None) -> str:
"""转发到EnvironmentGraph的describe_room_natural_language方法"""
if agents is None:
agents = self.agents
return self.graph.describe_room_natural_language(room_id, agents, sim_config)
[docs]
def describe_environment_natural_language(self, agents: Optional[Dict[str, Dict]] = None, sim_config: Optional[Dict[str, Any]] = None) -> str:
"""转发到EnvironmentGraph的describe_environment_natural_language方法"""
if agents is None:
agents = self.agents
return self.graph.describe_environment_natural_language(agents, sim_config)