Source code for OmniSimulator.action.actions.basic_actions

from typing import Dict, Optional, Tuple, Any, List, ClassVar
import re
import random
import logging

from ...core.enums import ActionType, ActionStatus, ObjectType
from .base_action import BaseAction
from ...utils.action_validators import ActionValidator
from ...utils.weight_calculator import calculate_container_weight, has_children

logger = logging.getLogger(__name__)

[docs] class GotoAction(BaseAction): """GOTO动作 - 移动到指定位置""" action_type = ActionType.GOTO command_pattern = r'^GOTO\s+(\w+)$' @classmethod def _parse_command(cls, match, agent_id: str): target_id = match.group(1) return cls(agent_id, None, target_id) def _validate(self, agent, world_state, env_manager, agent_manager): if not self.target_id: return False, "导航动作需要指定目标位置" # 允许目标为房间或物体 target_room = env_manager.get_room_by_id(self.target_id) target_object = env_manager.get_object_by_id(self.target_id) if not target_room else None if not target_room and not target_object: return False, f"Target location does not exist: {self.target_id}" # 如果目标是房间,检查是否已在该房间 if target_room and agent.location_id == self.target_id: return True, f"Agent is already in {target_room['name']}" # 如果目标是物体,检查是否已在同一房间且已靠近 if target_object: # 物体必须已被发现 if not target_object.get('is_discovered', False): return False, f"Target object not discovered: {target_object.get('name', self.target_id)}" # 必须在同一房间 object_room = env_manager.get_object_room(self.target_id) if object_room != agent.location_id: room = env_manager.get_room_by_id(object_room) room_name = room.get('name', object_room) if room else object_room return False, f"Must go to {room_name} first, then approach {target_object.get('name', self.target_id)}" return True, "Action is valid" def _execute(self, agent, world_state, env_manager, agent_manager): """ 执行GOTO动作 Args: agent: 智能体对象 world_state: 世界状态对象 env_manager: 环境管理器对象 agent_manager: 智能体管理器对象 Returns: Tuple: (执行状态, 反馈消息, 结果数据) """ logger.debug(f"执行GOTO动作 - 智能体: {agent.id}, 目标: {self.target_id}") # 判断目标是房间还是物体 target_room = env_manager.get_room_by_id(self.target_id) target_object = env_manager.get_object_by_id(self.target_id) if not target_room else None # 记录执行前的位置 old_location_id = agent.location_id old_near_objects = set(agent.near_objects) if hasattr(agent, 'near_objects') else set() logger.debug(f"GOTO execution before - location: {old_location_id}, nearby objects count: {len(old_near_objects)}") if target_room: # goto房间,调用move_agent logger.debug(f"Target is room: {self.target_id}") success = agent_manager.move_agent(agent.id, self.target_id) if success: room_name = target_room.get('name', self.target_id) # 确保near_objects在移动后被更新 if hasattr(agent, 'near_objects'): agent.update_near_objects(env_manager=env_manager) logger.debug(f"GOTO success - new location: {self.target_id}, nearby objects count: {len(agent.near_objects)}") return ActionStatus.SUCCESS, f"{agent.name} successfully moved to {room_name}", {"new_location_id": self.target_id} else: logger.warning(f"GOTO room failed: {self.target_id}") return ActionStatus.FAILURE, f"Movement failed", None elif target_object: # goto物体,不改变location_id,只更新near_objects logger.debug(f"目标是物体: {self.target_id}") # 检查物体是否已发现 if not target_object.get('is_discovered', False): logger.warning(f"Target object not discovered: {self.target_id}") return ActionStatus.FAILURE, f"Target object not discovered: {target_object.get('name', self.target_id)}", None # 检查物体是否在同一房间 object_room = env_manager.get_object_room(self.target_id) if object_room != agent.location_id: logger.warning(f"Object not in current room - object: {self.target_id}, object room: {object_room}, agent room: {agent.location_id}") return ActionStatus.FAILURE, f"Target object not in current room", None # 更新near_objects集合 agent.update_near_objects(self.target_id, env_manager) # 检查更新后的near_objects是否包含目标物体 if self.target_id not in agent.near_objects: logger.warning(f"Target object not in near_objects after update - object: {self.target_id}, near_objects: {agent.near_objects}") # 强制添加目标物体到near_objects agent.near_objects.add(self.target_id) logger.debug(f"Force add target object to near_objects: {self.target_id}") obj_name = target_object.get('name', self.target_id) room_id = agent.location_id room = env_manager.get_room_by_id(room_id) room_name = room.get('name', room_id) if room else room_id logger.debug(f"GOTO object success - object: {obj_name}, room: {room_name}, nearby objects count: {len(agent.near_objects)}") # 确保agent状态被保存 agent_manager.update_agent(agent.id, agent.to_dict()) return ActionStatus.SUCCESS, f"{agent.name} approached {obj_name} (in {room_name})", {"near_object_id": self.target_id} else: logger.error(f"Target location does not exist: {self.target_id}") return ActionStatus.FAILURE, f"Target location does not exist: {self.target_id}", None
[docs] class GrabAction(BaseAction): """GRAB动作 - 抓取物体""" action_type = ActionType.GRAB command_pattern = r'^GRAB\s+(\w+)$' @classmethod def _parse_command(cls, match, agent_id: str): target_id = match.group(1) return cls(agent_id, None, target_id) def _validate(self, agent, world_state, env_manager, agent_manager): if not self.target_id: return False, "Grab action requires a target object" # 使用新的验证系统进行基本验证 result = ActionValidator.validate_grab_action(env_manager, agent, self.target_id) if not result: return False, result.message # 检查该物体是否已被其他agent持有 for other_agent in agent_manager.get_all_agents().values(): if other_agent.id != agent.id and self.target_id in other_agent.inventory: return False, f"Object is already held by {other_agent.name}, cannot grab again" return True, "Action is valid" def _execute(self, agent, world_state, env_manager, agent_manager): """ 执行GRAB动作 Args: agent: 智能体对象 world_state: 世界状态对象 env_manager: 环境管理器对象 agent_manager: 智能体管理器对象 Returns: Tuple: (执行状态, 反馈消息, 结果数据) """ logger.debug(f"Executing GRAB action - agent: {agent.id}, target: {self.target_id}") # 记录执行前的状态 inventory_before = set(agent.inventory) near_objects_before = set(agent.near_objects) if hasattr(agent, 'near_objects') else set() logger.debug(f"GRAB before execution - inventory: {inventory_before}, nearby objects count: {len(near_objects_before)}") # 获取物体 obj = env_manager.get_object_by_id(self.target_id) if not obj: logger.warning(f"Object does not exist: {self.target_id}") return ActionStatus.FAILURE, f"Object does not exist: {self.target_id}", None obj_name = obj.get('name', self.target_id) # 检查物体是否已发现 if not obj.get('is_discovered', False): logger.warning(f"Object not discovered: {self.target_id}") return ActionStatus.FAILURE, f"Object not discovered: {obj_name}", None # 检查物体是否在近邻列表中 if self.target_id not in agent.near_objects: logger.warning(f"Object not in nearby list - object: {self.target_id}, nearby: {agent.near_objects}") return ActionStatus.FAILURE, f"Agent must approach {obj_name} before grabbing", None # 记录抓取前的容器id container_id = None if 'location_id' in obj: from ...utils.parse_location import parse_location_id _, container_id = parse_location_id(obj['location_id']) logger.debug(f"Object current container: {container_id}") # 检查物体是否可以被智能体承载 properties = obj.get('properties', {}) # Check if cooperative mode is_cooperative = hasattr(agent, 'corporate_mode_object_id') and agent.corporate_mode_object_id is not None if is_cooperative: # Cooperative mode: use original weight effective_properties = properties logger.info(f"Cooperative grab: {obj_name}, skip weight check") else: # Single mode: calculate total weight if has children if has_children(env_manager, self.target_id): total_weight = calculate_container_weight(env_manager, self.target_id) effective_properties = properties.copy() effective_properties['weight'] = total_weight logger.info(f"Grab container {obj_name}: total weight {total_weight}kg (including children)") else: effective_properties = properties # Check carrying capacity can_carry, reason = agent.can_carry(effective_properties) if not can_carry: logger.warning(f"Cannot carry object: {reason}") return ActionStatus.FAILURE, reason, None # Add object to agent inventory success, message = agent.grab_object(self.target_id, effective_properties) if not success: logger.warning(f"Grab failed: {message}") return ActionStatus.FAILURE, message, None logger.debug(f"Grab successful - object: {obj_name}") # 检查物体是否赋予智能体特定能力 abilities = properties.get('provides_abilities', []) if abilities: if isinstance(abilities, str): abilities = [abilities] # 如果是单个字符串,转为列表 for ability in abilities: logger.debug(f"Object provides ability: {ability}") agent.add_ability_from_object(ability, self.target_id) # 更新智能体数据 agent_manager.update_agent(agent.id, agent.to_dict()) # 更新物体位置 env_success = env_manager.move_object(self.target_id, agent.id) if not env_success: # 如果环境更新失败,要回滚智能体状态 logger.error(f"Failed to update object position - object: {self.target_id}") success, _ = agent.drop_object(self.target_id, properties) # 同时移除已授予的能力 abilities = properties.get('provides_abilities', []) if abilities: if isinstance(abilities, str): abilities = [abilities] for ability in abilities: agent.remove_ability_from_object(ability, self.target_id) agent_manager.update_agent(agent.id, agent.to_dict()) return ActionStatus.FAILURE, f"Failed to update object position", None # 抓取成功后,near_objects包含原容器 if container_id: logger.debug(f"Update nearby list, include original container: {container_id}") agent.update_near_objects(container_id, env_manager) else: agent.update_near_objects() # 记录执行后的状态 inventory_after = set(agent.inventory) near_objects_after = set(agent.near_objects) if hasattr(agent, 'near_objects') else set() logger.debug(f"GRAB after execution - inventory: {inventory_after}, nearby objects count: {len(near_objects_after)}") logger.debug(f"Inventory changes: added {inventory_after - inventory_before}, removed {inventory_before - inventory_after}") logger.debug(f"Nearby changes: added {near_objects_after - near_objects_before}, removed {near_objects_before - near_objects_after}") return ActionStatus.SUCCESS, f"{agent.name} successfully grabbed {obj_name}", { "object_id": self.target_id }
[docs] class PlaceAction(BaseAction): """PLACE动作 - 放置物体""" action_type = ActionType.PLACE command_pattern = r'^PLACE\s+(\w+)(?:\s+(on|in)\s+(\w+))?$' @classmethod def _parse_command(cls, match, agent_id: str): obj_id, rel, location_id = match.groups() params = {} if rel and location_id: params['relation'] = rel params['location_id'] = location_id return cls(agent_id, None, obj_id, params) def _validate(self, agent, world_state, env_manager, agent_manager): if not self.target_id: return False, "PLACE action requires target object" # 检查是否提供了必要的relation和location_id参数 relation = self.params.get('relation') location_id = self.params.get('location_id') if not relation or not location_id: return False, "PLACE action must specify placement method (on/in) and location, format: PLACE <object_id> <on|in> <location_id>" # 使用新的验证系统进行验证 result = ActionValidator.validate_place_action(env_manager, agent, self.target_id, location_id, relation) if not result: return False, result.message # 检查智能体是否与目标位置在同一房间(额外的位置检查) if location_id != agent.location_id and location_id not in world_state.graph.room_ids: location_room = env_manager.get_object_room(location_id) if location_room != agent.location_id: location = env_manager.get_object_by_id(location_id) location_name = location.get('name', location_id) if location else location_id room = env_manager.get_room_by_id(location_room) room_name = room.get('name', location_room) if room else location_room return False, f"Agent must go to {room_name} first to place object on {location_name}" return True, "Action is valid" def _execute(self, agent, world_state, env_manager, agent_manager): # 获取物体 obj = env_manager.get_object_by_id(self.target_id) if not obj: return ActionStatus.FAILURE, f"Object does not exist: {self.target_id}", None obj_name = obj.get('name', self.target_id) obj_properties = obj.get('properties', {}) # 确定放置位置 relation = self.params.get('relation') location_id = self.params.get('location_id') # 添加关系前缀到位置ID location_id = f"{relation}:{location_id}" # 获取位置名称 location_name = None if location_id.split(':', 1)[1] == agent.location_id: room = env_manager.get_room_by_id(agent.location_id) location_name = room.get('name', agent.location_id) if room else agent.location_id else: location = env_manager.get_object_by_id(location_id.split(':', 1)[1]) location_name = location.get('name', location_id.split(':', 1)[1]) if location else location_id.split(':', 1)[1] # 从智能体库存移除物体 - 计算正确的重量 is_cooperative = hasattr(agent, 'corporate_mode_object_id') and agent.corporate_mode_object_id is not None if is_cooperative: effective_properties = obj_properties else: if has_children(env_manager, self.target_id): total_weight = calculate_container_weight(env_manager, self.target_id) effective_properties = obj_properties.copy() effective_properties['weight'] = total_weight else: effective_properties = obj_properties success, message = agent.drop_object(self.target_id, effective_properties) if not success: return ActionStatus.FAILURE, message, None # 物体放置成功,再移除能力 abilities = obj_properties.get('provides_abilities', []) if abilities: if isinstance(abilities, str): abilities = [abilities] # 如果是单个字符串,转为列表 for ability in abilities: agent.remove_ability_from_object(ability, self.target_id) # 更新智能体数据 agent_manager.update_agent(agent.id, agent.to_dict()) # 将物体移动到新位置 env_success = env_manager.move_object(self.target_id, location_id) if not env_success: # 如果环境更新失败,要回滚智能体状态 success, _ = agent.grab_object(self.target_id, obj_properties) # 同时恢复能力 if abilities: for ability in abilities: agent.add_ability_from_object(ability, self.target_id) agent_manager.update_agent(agent.id, agent.to_dict()) return ActionStatus.FAILURE, f"Cannot place {obj_name} on {location_name}", None # 放下后,near_objects包含刚刚放下的物体及其父/子物体 agent.update_near_objects(self.target_id, env_manager) return ActionStatus.SUCCESS, f"{agent.name} placed {obj_name} on {location_name}", { "object_id": self.target_id, "location_id": location_id }
[docs] class LookAction(BaseAction): """LOOK动作 - 观察环境或物体""" action_type = ActionType.LOOK command_pattern = r'^LOOK(?:\s+(\w+))?$' @classmethod def _parse_command(cls, match, agent_id: str): target = match.group(1) params = {} if target and target.upper() == "AROUND": params['scope'] = "room" target = None return cls(agent_id, None, target, params) def _validate(self, agent, world_state, env_manager, agent_manager): # 如果指定了目标物体,使用验证系统检查 if self.target_id: result = ActionValidator.validate_basic_object_interaction(env_manager, agent, self.target_id) if not result: return False, result.message return True, "Action is valid" def _execute(self, agent, world_state, env_manager, agent_manager): # 查看范围 scope = self.params.get('scope', 'target') if scope == 'room': # 查看整个房间 room_id = agent.location_id room = env_manager.get_room_by_id(room_id) if not room: return ActionStatus.FAILURE, f"Current location is not a valid room", None room_name = room.get('name', room_id) objects = env_manager.get_objects_in_room(room_id) # 提取已发现的物体信息 visible_objects = [] for obj in objects: if obj.get('is_discovered', False): visible_objects.append({ "id": obj.get('id'), "name": obj.get('name'), "type": obj.get('type'), "states": obj.get('states', {}) }) return ActionStatus.SUCCESS, f"{agent.name} looked around {room_name}", { "room_id": room_id, "room_name": room_name, "visible_objects": visible_objects } elif self.target_id: # 查看特定物体 obj = env_manager.get_object_by_id(self.target_id) if not obj: return ActionStatus.FAILURE, f"Object does not exist: {self.target_id}", None obj_name = obj.get('name', self.target_id) # 检查物体是否为容器,如果是,列出其中的已发现物体 contained_objects = [] if obj.get('properties', {}).get('is_container', False) and \ obj.get('states', {}).get('is_open', False): # 查找容器中的物体 for obj_id, edges in world_state.graph.edges.items(): if obj_id == self.target_id: for contained_id in edges: contained_obj = env_manager.get_object_by_id(contained_id) if contained_obj and contained_obj.get('is_discovered', False): contained_objects.append({ "id": contained_obj.get('id'), "name": contained_obj.get('name'), "type": contained_obj.get('type') }) return ActionStatus.SUCCESS, f"{agent.name} looked at {obj_name}", { "object_id": self.target_id, "object_name": obj_name, "object_type": obj.get('type'), "object_states": obj.get('states', {}), "contained_objects": contained_objects } else: # 如果没有指定目标,默认环顾四周 self.params['scope'] = 'room' return self._execute(agent, world_state, env_manager, agent_manager)
[docs] class ExploreAction(BaseAction): """EXPLORE动作 - 探索当前房间""" action_type = ActionType.EXPLORE command_pattern = r'^EXPLORE(?:\s+(\w+))?$' @classmethod def _parse_command(cls, match, agent_id: str): exploration_level = match.group(1) params = {} # 检查是否指定了房间或探索级别 if exploration_level: if exploration_level.upper() == "THOROUGH": params['exploration_level'] = "thorough" else: # 假设是房间ID params['room_id'] = exploration_level params['exploration_level'] = "thorough" # else: 不要设置 exploration_level,留给_execute读取配置 return cls(agent_id, None, None, params) def _validate(self, agent, world_state, env_manager, agent_manager): # EXPLORE只能探索当前房间 room_id = self.params.get('room_id') if room_id and room_id != agent.location_id: room = env_manager.get_room_by_id(room_id) room_name = room.get('name', room_id) if room else room_id return False, f"Agent must go to {room_name} before exploring that room" return True, "Action is valid" def _execute(self, agent, world_state, env_manager, agent_manager): # 获取要探索的房间(默认为当前房间) room_id = self.params.get('room_id', agent.location_id) room = env_manager.get_room_by_id(room_id) if not room: return ActionStatus.FAILURE, f"Specified location is not a valid room", None room_name = room.get('name', room_id) # 获取房间中所有未发现的物体,排除房间节点 objects = env_manager.get_objects_in_room(room_id) undiscovered_objects = [] for obj in objects: # 确保物体不是房间,也不是agent obj_id = obj.get('id') obj_type = obj.get('type', '').upper() if not obj.get('is_discovered', False) and obj_id not in world_state.graph.room_ids and obj_type != 'AGENT': undiscovered_objects.append(obj) # 决定发现多少物体 exploration_level = self.params.get('exploration_level') if not exploration_level: sim_config = getattr(env_manager, 'sim_config', {}) or getattr(world_state, 'sim_config', {}) or {} exploration_level = sim_config.get('explore_mode', 'thorough') # 默认改为thorough,发现所有物品 logger.debug(f"EXPLORE - exploration level: {exploration_level}, undiscovered objects count: {len(undiscovered_objects)}") if exploration_level == 'thorough': # 彻底探索 - 发现所有物体 discovery_percentage = 1.0 # 确保是所有物体 discovery_count = len(undiscovered_objects) to_discover = undiscovered_objects else: # 普通探索 - 随机发现部分物体 discovery_percentage = random.uniform(0.5, 0.8) # 计算实际发现的数量 discovery_count = int(len(undiscovered_objects) * discovery_percentage) # 随机选择要发现的物体 to_discover = random.sample(undiscovered_objects, min(discovery_count, len(undiscovered_objects))) logger.debug(f"EXPLORE - objects to be discovered: {len(to_discover)}") # 标记物体为已发现并收集物体信息(包括所属关系) discovered_objects = [] for obj in to_discover: obj_id = obj.get('id') # 确保物体被标记为已发现 env_manager.update_object_state(obj_id, {"is_discovered": True}) # 查找物体的所属关系(在哪个物体上/中) container_info = "" for potential_container_id, edges in world_state.graph.edges.items(): if obj_id in edges and potential_container_id != room_id and potential_container_id not in world_state.graph.room_ids: container = world_state.graph.get_node(potential_container_id) if container and container.get('is_discovered', False): # 只显示已发现的容器 relation_type = edges[obj_id][0].get('type', 'at') if edges[obj_id] else 'at' relation_text = 'on' if relation_type == 'on' else 'in' if relation_type == 'in' else 'at' container_info = f"({relation_text} {container.get('name', potential_container_id)})" break discovered_objects.append({ "id": obj_id, "name": obj.get('name'), "type": obj.get('type'), "container_info": container_info }) # 确保近邻物体集合已初始化 if not hasattr(agent, 'near_objects') or agent.near_objects is None: agent.near_objects = set() elif not isinstance(agent.near_objects, set): agent.near_objects = set(agent.near_objects if isinstance(agent.near_objects, (list, tuple)) else []) # 探索只是发现物体,不应该自动将所有物体添加到near_objects # 只将当前房间添加到near_objects,物体需要通过goto靠近才能交互 agent.near_objects.add(room_id) # 确保near_objects仍然是集合类型 if not isinstance(agent.near_objects, set): agent.near_objects = set(agent.near_objects) # 更新智能体数据确保near_objects被保存 agent_manager.update_agent(agent.id, agent.to_dict()) # 根据发现数量决定状态 if not discovered_objects: if discovery_count == 0: return ActionStatus.SUCCESS, f"{agent.name} explored {room_name} but found no new objects", { "room_id": room_id, "discovery_count": 0 } else: return ActionStatus.PARTIAL, f"{agent.name} explored {room_name} but temporarily found no new objects", { "room_id": room_id, "discovery_count": 0 } elif discovery_count < len(undiscovered_objects) and exploration_level != 'thorough': return ActionStatus.PARTIAL, f"{agent.name} discovered {len(discovered_objects)} new objects in {room_name}", { "room_id": room_id, "discovered_objects": discovered_objects, "discovery_count": len(discovered_objects) } else: return ActionStatus.SUCCESS, f"{agent.name} thoroughly explored {room_name} and discovered {len(discovered_objects)} new objects", { "room_id": room_id, "discovered_objects": discovered_objects, "discovery_count": len(discovered_objects), "is_complete": True }