from typing import Dict, List, Optional, Tuple, Any
import uuid
import random
from ..core.state import WorldState
from .room import Room
from .object_defs import BaseObject, StaticObject, InteractableObject, GrabbableObject, FurnitureObject, ItemObject, create_object_from_dict
[docs]
class EnvironmentManager:
"""
环境管理器 - 负责管理模拟环境中的所有实体(房间、物体、家具)
"""
[docs]
def __init__(self, world_state: WorldState, sim_config: Optional[Dict[str, Any]] = None):
"""
初始化环境管理器
Args:
world_state: 世界状态对象
sim_config: 全局配置字典,可选
"""
self.world_state = world_state
self.sim_config = sim_config or {}
[docs]
def load_scene(self, scene_data: Dict[str, Any]) -> bool:
"""
从场景数据加载环境
Args:
scene_data: 场景数据字典
Returns:
bool: 加载是否成功
"""
try:
self._clear_pending_locations()
self._load_rooms(scene_data)
self._load_objects(scene_data)
self._resolve_pending_locations()
return True
except Exception as e:
print(f"Failed to load scene: {e}")
import traceback
traceback.print_exc()
return False
[docs]
def load_scene_old(self, scene_data: Dict[str, Any]) -> bool:
"""
从场景数据加载环境
Args:
scene_data: 场景数据字典
Returns:
bool: 加载是否成功
"""
try:
# 清除之前的待处理位置
if hasattr(self, '_pending_locations'):
del self._pending_locations
self._pending_locations = []
# 加载房间 - 房间默认对智能体可见
rooms_data = scene_data.get('rooms', [])
for room_data in rooms_data:
room = Room.from_dict(room_data)
self.add_room(room)
# 处理房间之间的连接关系
for room_data in rooms_data:
room_id = room_data['id']
for connected_room_id in room_data.get('connected_to_room_ids', []):
self.connect_rooms(room_id, connected_room_id)
# 首先加载没有复杂依赖关系的物体(直接放在房间中的物体)
objects_data = scene_data.get('objects', [])
independent_objects = []
dependent_objects = []
# 将物体分为两组:直接在房间中的物体和在其他物体中的物体
for obj_data in objects_data:
location_id = obj_data.get('location_id', '')
preposition, real_id = self.parse_location_id(location_id)
if real_id in self.world_state.graph.room_ids:
independent_objects.append(obj_data)
else:
dependent_objects.append(obj_data)
# 先加载独立物体
for obj_data in independent_objects:
# 设置物体的发现状态
obj_type = obj_data.get('type', '').upper()
if obj_type in ['FURNITURE', 'ITEM', 'INTERACTABLE', 'GRABBABLE']:
# 直接从模拟器配置文件读取全局观察设置
from config.config_manager import ConfigManager
config_manager = ConfigManager()
simulator_config = config_manager.get_config('simulator_config', {})
global_observation = simulator_config.get('global_observation', False)
obj_data['is_discovered'] = global_observation
location_id = obj_data.get('location_id')
object_id = obj_data.get('id', 'unknown')
if location_id:
self.add_object(obj_data, location_id)
else:
print(f"Warning: Object {object_id} has no specified location")
# 然后在多轮中加载依赖物体,直到所有物体都被加载
remaining = dependent_objects
max_iterations = 10 # 避免无限循环
iteration = 0
while remaining and iteration < max_iterations:
iteration += 1
next_remaining = []
for obj_data in remaining:
object_id = obj_data.get('id', 'unknown')
location_id = obj_data.get('location_id')
preposition, real_id = self.parse_location_id(location_id)
# 检查位置是否存在
location_exists = self.world_state.graph.get_node(real_id) is not None
if location_exists:
# 设置物体的发现状态
obj_type = obj_data.get('type', '').upper()
if obj_type in ['FURNITURE', 'ITEM', 'INTERACTABLE', 'GRABBABLE']:
# 直接从模拟器配置文件读取全局观察设置
from config.config_manager import ConfigManager
config_manager = ConfigManager()
global_observation = config_manager.get_config('simulator_config', {}).get('global_observation', False)
obj_data['is_discovered'] = global_observation
self.add_object(obj_data, location_id)
else:
next_remaining.append(obj_data)
# 如果没有进展,跳出循环
if len(next_remaining) == len(remaining):
break
remaining = next_remaining
# 如果仍然有未加载的物体,尝试强制加载
if remaining:
for obj_data in remaining:
object_id = obj_data.get('id', 'unknown')
location_id = obj_data.get('location_id', '')
# 设置物体的发现状态
obj_type = obj_data.get('type', '').upper()
if obj_type in ['FURNITURE', 'ITEM', 'INTERACTABLE', 'GRABBABLE']:
# 直接从模拟器配置文件读取全局观察设置
from config.config_manager import ConfigManager
config_manager = ConfigManager()
global_observation = config_manager.get_config('simulator_config', {}).get('global_observation', False)
obj_data['is_discovered'] = global_observation
# 将物体放在第一个房间中
first_room_id = next(iter(self.world_state.graph.room_ids))
alternative_location = f"in:{first_room_id}"
self.add_object(obj_data, alternative_location)
# 解析待处理的位置关系(这个可能不再需要,因为我们已经处理了依赖关系)
if self._pending_locations:
resolved = []
still_pending = []
for object_id, location_id, preposition in self._pending_locations:
location = self.world_state.graph.get_node(location_id)
if location:
relation_type = preposition if preposition else "in"
self.world_state.graph.add_edge(location_id, object_id, {"type": relation_type})
resolved.append((obj_id, location_id))
else:
still_pending.append((obj_id, location_id, preposition))
if still_pending:
for obj_id, location_id, _ in still_pending:
print(f"Warning: Object {obj_id} references non-existent location {location_id}")
# 所有待处理的位置关系已成功解析
del self._pending_locations
return True
except Exception as e:
print(f"Error loading scene: {e}")
import traceback
traceback.print_exc()
return False
def _clear_pending_locations(self):
"""清除之前的待处理位置"""
if hasattr(self, '_pending_locations'):
del self._pending_locations
self._pending_locations = []
def _load_rooms(self, scene_data: Dict[str, Any]):
"""加载房间和房间连接关系"""
rooms_data = scene_data.get('rooms', [])
# 加载房间
for room_data in rooms_data:
room = Room.from_dict(room_data)
self.add_room(room)
# 处理房间之间的连接关系
for room_data in rooms_data:
room_id = room_data['id']
for connected_room_id in room_data.get('connected_to_room_ids', []):
self.connect_rooms(room_id, connected_room_id)
def _load_objects(self, scene_data: Dict[str, Any]):
"""加载所有物体"""
objects_data = scene_data.get('objects', [])
independent_objects, dependent_objects = self._categorize_objects(objects_data)
self._load_independent_objects(independent_objects)
self._load_dependent_objects_iteratively(dependent_objects)
def _categorize_objects(self, objects_data: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""将物体分为独立物体(在房间中)和依赖物体(在其他物体中)"""
independent_objects = []
dependent_objects = []
for obj_data in objects_data:
location_id = obj_data.get('location_id', '')
preposition, real_id = self.parse_location_id(location_id)
if real_id in self.world_state.graph.room_ids:
independent_objects.append(obj_data)
else:
dependent_objects.append(obj_data)
return independent_objects, dependent_objects
def _load_independent_objects(self, independent_objects: List[Dict[str, Any]]):
"""加载独立物体(直接在房间中的物体)"""
for obj_data in independent_objects:
self._set_object_discovery_state(obj_data)
location_id = obj_data.get('location_id')
object_id = obj_data.get('id', 'unknown')
if location_id:
self.add_object(obj_data, location_id)
else:
print(f"Warning: Object {object_id} has no specified location")
def _load_dependent_objects_iteratively(self, dependent_objects: List[Dict[str, Any]]):
"""迭代加载依赖物体,直到所有物体都被加载"""
remaining = dependent_objects
max_iterations = 10 # 避免无限循环
iteration = 0
while remaining and iteration < max_iterations:
iteration += 1
next_remaining = []
for obj_data in remaining:
if self._try_load_dependent_object(obj_data):
# 成功加载
continue
else:
# 未能加载,保留到下一轮
next_remaining.append(obj_data)
# 如果没有进展,跳出循环
if len(next_remaining) == len(remaining):
break
remaining = next_remaining
# 强制加载剩余的物体
if remaining:
self._force_load_remaining_objects(remaining)
def _try_load_dependent_object(self, obj_data: Dict[str, Any]) -> bool:
"""尝试加载一个依赖物体"""
object_id = obj_data.get('id', 'unknown')
location_id = obj_data.get('location_id')
preposition, real_id = self.parse_location_id(location_id)
# 检查位置是否存在
location_exists = self.world_state.graph.get_node(real_id) is not None
if location_exists:
self._set_object_discovery_state(obj_data)
self.add_object(obj_data, location_id)
return True
return False
def _force_load_remaining_objects(self, remaining: List[Dict[str, Any]]):
"""强制加载剩余的无法解析位置的物体"""
if not self.world_state.graph.room_ids:
return
first_room_id = next(iter(self.world_state.graph.room_ids))
alternative_location = f"in:{first_room_id}"
for obj_data in remaining:
self._set_object_discovery_state(obj_data)
self.add_object(obj_data, alternative_location)
def _set_object_discovery_state(self, obj_data: Dict[str, Any]):
"""设置物体的发现状态"""
obj_type = obj_data.get('type', '').upper()
if obj_type in ['FURNITURE', 'ITEM', 'INTERACTABLE', 'GRABBABLE']:
# 直接从模拟器配置文件读取全局观察设置
from config.config_manager import ConfigManager
config_manager = ConfigManager()
global_observation = config_manager.get_config('simulator_config', {}).get('global_observation', False)
obj_data['is_discovered'] = global_observation
# 清空合作标记(在场景重新加载时)
if 'states' in obj_data and 'cooperative_modified_attributes' in obj_data['states']:
obj_data['states']['cooperative_modified_attributes'] = []
def _resolve_pending_locations(self):
"""解析待处理的位置关系"""
if not self._pending_locations:
return
resolved = []
still_pending = []
for object_id, location_id, preposition in self._pending_locations:
location = self.world_state.graph.get_node(location_id)
if location:
relation_type = "in" if preposition == "in" else "on"
self.world_state.graph.add_edge(location_id, object_id, {"type": relation_type})
resolved.append((object_id, location_id))
else:
still_pending.append((object_id, location_id, preposition))
if still_pending:
self._pending_locations = still_pending
[docs]
def add_room(self, room: Room) -> bool:
"""
添加房间到环境
Args:
room: 房间对象
Returns:
bool: 添加是否成功
"""
if self.world_state.graph.get_node(room.id):
print(f"Room ID already exists: {room.id}")
return False
# 添加房间到环境图
room_dict = room.to_dict()
# 确保房间有明确的type属性
if 'type' not in room_dict:
room_dict['type'] = 'ROOM'
self.world_state.graph.add_node(room.id, room_dict, is_room=True)
return True
[docs]
def parse_location_id(self, location_id: str):
if isinstance(location_id, str) and ':' in location_id:
preposition, real_id = location_id.split(':', 1)
return preposition, real_id
return None, location_id
[docs]
def add_object(self, obj_data: Dict[str, Any], location_id: str) -> Optional[str]:
"""
添加物体到环境
Args:
obj_data: 物体数据字典
location_id: 物体位置ID (房间ID或容器物体ID)
Returns:
str: 添加成功则返回物体ID,否则返回None
"""
try:
if 'id' not in obj_data:
obj_data['id'] = str(uuid.uuid4())
obj_id = obj_data['id']
# 解析位置
preposition, real_location_id = self.parse_location_id(location_id)
location = self.world_state.graph.get_node(real_location_id)
if not location:
# 如果位置不存在但是我们当前正在加载场景,可能是物体定义的顺序问题
# 我们先将物体添加到图中,稍后再处理位置关系
print(f"Location does not exist: {real_location_id}")
# 我们仍然创建对象并添加到图中,但是不建立位置关系
obj = create_object_from_dict(obj_data)
obj.location_id = location_id # 保留原始位置ID以便稍后处理
self.world_state.graph.add_node(obj.id, obj.to_dict())
# 将此对象标记为需要稍后解析位置
if not hasattr(self, '_pending_locations'):
self._pending_locations = []
self._pending_locations.append((obj.id, real_location_id, preposition))
return obj.id
# 正常情况下,位置存在,直接添加物体和关系
obj = create_object_from_dict(obj_data)
obj.location_id = location_id
self.world_state.graph.add_node(obj.id, obj.to_dict())
# 添加关系边
relation_type = preposition if preposition else "in"
self.world_state.graph.add_edge(real_location_id, obj.id, {"type": relation_type})
return obj.id
except Exception as e:
print(f"Error adding object: {e}")
return None
[docs]
def connect_rooms(self, room_id1: str, room_id2: str) -> bool:
"""
连接两个房间
Args:
room_id1: 第一个房间ID
room_id2: 第二个房间ID
Returns:
bool: 连接是否成功
"""
# 检查两个房间是否都存在
room1 = self.world_state.graph.get_node(room_id1)
room2 = self.world_state.graph.get_node(room_id2)
if not room1 or not room2:
print(f"Room does not exist: {room_id1 if not room1 else room_id2}")
return False
# 建立双向连接关系
self.world_state.graph.add_edge(room_id1, room_id2, {"type": "connected"})
self.world_state.graph.add_edge(room_id2, room_id1, {"type": "connected"})
return True
[docs]
def get_room_by_id(self, room_id: str) -> Optional[Dict[str, Any]]:
"""
获取房间数据
Args:
room_id: 房间ID
Returns:
Dict: 房间数据字典,如果不存在则返回None
"""
_, real_room_id = self.parse_location_id(room_id)
if real_room_id not in self.world_state.graph.room_ids:
return None
return self.world_state.graph.get_node(real_room_id)
[docs]
def get_object_by_id(self, object_id: str) -> Optional[Dict[str, Any]]:
"""
获取物体数据
Args:
object_id: 物体ID
Returns:
Dict: 物体数据字典,如果不存在则返回None
"""
_, real_object_id = self.parse_location_id(object_id)
return self.world_state.graph.get_node(real_object_id)
[docs]
def get_objects_in_room(self, room_id: str, recursive: bool = True) -> List[Dict[str, Any]]:
"""
获取房间中的所有物体
Args:
room_id: 房间ID
recursive: 是否递归获取容器中的物体
Returns:
List[Dict]: 物体数据字典列表
"""
object_ids = self.world_state.graph.get_objects_in_room(room_id, recursive)
objects = []
for obj_id in object_ids:
obj = self.world_state.graph.get_node(obj_id)
if obj:
objects.append(obj)
return objects
[docs]
def get_discovered_objects_in_room(self, room_id: str, recursive: bool = True) -> List[Dict[str, Any]]:
"""
获取房间中的所有已发现物体
Args:
room_id: 房间ID
recursive: 是否递归获取容器中的物体
Returns:
List[Dict]: 已发现物体数据字典列表
"""
all_objects = self.get_objects_in_room(room_id, recursive)
return [obj for obj in all_objects if obj.get('is_discovered', False)]
[docs]
def update_object_state(self, object_id: str, state_updates: Dict[str, Any]) -> bool:
"""
更新物体状态
Args:
object_id: 物体ID
state_updates: 状态更新字典
Returns:
bool: 更新是否成功
"""
obj = self.world_state.graph.get_node(object_id)
if not obj:
return False
if 'states' not in obj:
obj['states'] = {}
# 处理特殊的is_discovered字段
if 'is_discovered' in state_updates:
obj['is_discovered'] = state_updates.pop('is_discovered')
# 更新普通状态
obj['states'].update(state_updates)
return True
[docs]
def discover_object(self, object_id: str) -> bool:
"""
将物体标记为已发现
Args:
object_id: 物体ID
Returns:
bool: 更新是否成功
"""
obj = self.world_state.graph.get_node(object_id)
if not obj:
return False
obj['is_discovered'] = True
return True
[docs]
def discover_objects_in_room(self, room_id: str, percentage: float = 1.0) -> List[str]:
"""
随机发现房间中的某个百分比的物体
Args:
room_id: 房间ID
percentage: 要发现的物体百分比 (0.0-1.0)
Returns:
List[str]: 新发现的物体ID列表
"""
# 获取房间中所有未发现的物体
all_objects = self.get_objects_in_room(room_id)
undiscovered = [obj for obj in all_objects if not obj.get('is_discovered', False)]
# 计算要发现的物体数量
discover_count = max(1, int(len(undiscovered) * percentage))
discover_count = min(discover_count, len(undiscovered))
# 如果没有未发现的物体,返回空列表
if not undiscovered:
return []
# 随机选择要发现的物体
to_discover = random.sample(undiscovered, discover_count)
# 标记物体为已发现
discovered_ids = []
for obj in to_discover:
obj_id = obj.get('id')
self.discover_object(obj_id)
discovered_ids.append(obj_id)
return discovered_ids
[docs]
def move_object(self, object_id: str, new_location_id: str) -> bool:
"""
移动物体到新位置(location_id始终为房间ID,on/in关系通过图结构维护)
"""
obj = self.world_state.graph.get_node(object_id)
if not obj:
return False
preposition, real_location_id = self.parse_location_id(new_location_id)
if real_location_id in self.world_state.graph.room_ids:
room_id = real_location_id
else:
room_id = self.get_object_room(real_location_id)
if not room_id:
return False
obj['location_id'] = new_location_id
# 维护图结构的边关系
# 先移除旧的边
for from_id in list(self.world_state.graph.edges):
if object_id in self.world_state.graph.edges[from_id]:
self.world_state.graph.remove_edge(from_id, object_id)
# 建立新边
relation_type = preposition if preposition else "in"
self.world_state.graph.add_edge(real_location_id, object_id, {"type": relation_type})
return True
[docs]
def get_object_location(self, object_id: str) -> Optional[Dict[str, Any]]:
"""
获取物体位置
Args:
object_id: 物体ID
Returns:
Dict: 位置数据字典,如果物体不存在则返回None
"""
obj = self.world_state.graph.get_node(object_id)
if not obj or 'location_id' not in obj:
return None
_, real_location_id = self.parse_location_id(obj['location_id'])
return self.world_state.graph.get_node(real_location_id)
[docs]
def find_path(self, start_room_id: str, end_room_id: str) -> Optional[List[str]]:
"""
查找从一个房间到另一个房间的路径
Args:
start_room_id: 起始房间ID
end_room_id: 目标房间ID
Returns:
List[str]: 路径房间ID列表,如果不存在路径则返回None
"""
return self.world_state.graph.find_path(start_room_id, end_room_id)
[docs]
def is_object_accessible(self, object_id: str, agent_id: str) -> Tuple[bool, str]:
"""
检查物体是否对智能体可访问
Args:
object_id: 物体ID
agent_id: 智能体ID
Returns:
Tuple[bool, str]: (是否可访问, 原因消息)
"""
# 首先检查物体是否已被发现
obj = self.world_state.graph.get_node(object_id)
if obj and not obj.get('is_discovered', False):
return False, f"Object {obj.get('name', object_id)} has not been discovered yet"
# 检查智能体是否与物体在同一位置
is_near, reason = self.is_agent_near_object(agent_id, object_id)
if not is_near:
return False, reason
return self.world_state.is_object_accessible_to_agent(object_id, agent_id)
[docs]
def get_object_room(self, object_id: str) -> Optional[str]:
"""
获取物体所在的房间ID
Args:
object_id: 物体ID
Returns:
str: 房间ID,如果物体不存在或不在任何房间则返回None
"""
obj = self.world_state.graph.get_node(object_id)
if not obj:
return None
if 'location_id' in obj:
preposition, real_location_id = self.parse_location_id(obj['location_id'])
if real_location_id in self.world_state.graph.room_ids:
return real_location_id
else:
return self.get_object_room(real_location_id)
return self.world_state.graph.get_room_for_object(object_id)
[docs]
def is_agent_near_object(self, agent_id: str, object_id: str) -> Tuple[bool, str]:
"""
检查智能体是否与物体在同一房间
Args:
agent_id: 智能体ID
object_id: 物体ID
Returns:
Tuple[bool, str]: (是否在同一房间, 消息)
"""
agent = self.world_state.get_agent(agent_id)
if not agent:
return False, f"Agent does not exist: {agent_id}"
# 获取智能体所在的房间
agent_room_id = agent.get('location_id')
if not agent_room_id:
return False, f"Agent {agent_id} is not in any room"
# 如果物体在智能体的库存中,则视为在同一位置
if 'inventory' in agent and object_id in agent['inventory']:
return True, "Object is in agent's inventory"
# 获取物体所在的房间
object_room_id = self.get_object_room(object_id)
if not object_room_id:
return False, f"Cannot determine location of object {object_id}"
# 检查是否在同一房间
if agent_room_id != object_room_id:
obj = self.world_state.graph.get_node(object_id)
obj_name = obj.get('name', object_id) if obj else object_id
room = self.world_state.graph.get_node(object_room_id)
room_name = room.get('name', object_room_id) if room else object_room_id
return False, f"Agent must go to {room_name} first to interact with {obj_name}"
return True, "Agent and object are in the same room"
[docs]
def update_object_attributes(self, object_id: str, updates: Dict[str, Any]) -> bool:
"""
通用物体属性更新方法,可更新任意顶层字段(如 holders, location_id 等)
"""
obj = self.world_state.graph.get_node(object_id)
if not obj:
return False
obj.update(updates)
return True