Data Generation Tools
The OmniEmbodied Framework includes comprehensive data generation tools for creating custom training and evaluation datasets. The system supports automated generation of scenarios, tasks, and evaluation criteria with quality validation.
Overview
The data generation system provides:
Automated Pipeline: End-to-end generation from raw concepts to complete scenarios
Multi-Stage Generation: Clue generation → Scene generation → Task generation → Validation
Quality Control: Automated validation and consistency checking
Scalable Processing: Concurrent generation with progress tracking
Customizable Generators: Extensible architecture for domain-specific needs
The system generates three types of outputs:
Scenarios: Complete environment descriptions with objects and spatial relationships
Tasks: Goal-oriented activities with success criteria and verification rules
Evaluation Data: Structured datasets for benchmarking agent performance
Core Components
Data Generation Pipeline
The Pipeline class orchestrates the complete generation process:
from data_generation.pipeline import Pipeline
# Initialize pipeline
pipeline = Pipeline()
# Configure generation parameters
config = {
"num_items": 100,
"max_workers": 4,
"output_format": "json"
}
# Run end-to-end generation
results = pipeline.run(
input_file="concepts.txt",
config=config
)
print(f"Generated {results['completed']} scenarios")
print(f"Total token usage: {results['token_usage']}")
Pipeline Architecture
The pipeline processes data through multiple stages:
Raw Concepts → Clue Generation → Scene Generation → Task Generation → Validation
↓ ↓ ↓ ↓ ↓
Text Input Object Lists Environment Task Definition Quality Check
Properties Layout Success Criteria Consistency
Each stage builds upon the previous output, ensuring consistency and coherence throughout the generation process.
Generation Stages
Clue Generation
The ClueGenerator creates object lists and properties from concept descriptions:
Features: - Object identification and categorization - Property assignment (color, size, material, etc.) - Relationship inference - Context-aware object selection
from data_generation.generators.clue_generator import ClueGenerator
# Initialize generator
clue_gen = ClueGenerator(config)
# Generate object clues
clues = clue_gen.generate(
concept="kitchen cooking scenario",
num_objects=15
)
# Example output:
# {
# "objects": [
# {"name": "cooking_pot", "color": "silver", "size": "medium"},
# {"name": "wooden_spoon", "color": "brown", "size": "small"},
# {"name": "ingredients", "type": "vegetables", "count": "multiple"}
# ],
# "relationships": ["pot_on_stove", "ingredients_in_fridge"]
# }
Scene Generation
The SceneGenerator creates complete environment descriptions:
Features: - Room layout generation - Object placement and spatial relationships - Realistic environmental constraints - Multi-room scene support
from data_generation.generators.scene_generator import SceneGenerator
# Initialize generator
scene_gen = SceneGenerator(config)
# Generate scene from clues
scene = scene_gen.generate(
clues=clues,
scene_type="kitchen"
)
# Example output structure:
# {
# "scene_id": "kitchen_001",
# "rooms": [
# {
# "id": "kitchen",
# "type": "kitchen",
# "objects": [...]
# }
# ],
# "objects": [...],
# "spatial_relationships": [...]
# }
Task Generation
The TaskGenerator creates goal-oriented tasks with verification criteria:
Features: - Task description generation - Success criteria definition - Multi-step task planning - Verification rule creation
from data_generation.generators.task_generator import TaskGenerator
# Initialize generator
task_gen = TaskGenerator(config)
# Generate task from scene
task = task_gen.generate(
scene=scene,
task_type="cooking_task"
)
# Example output:
# {
# "task_id": "cook_001",
# "description": "Prepare a simple meal using the ingredients in the kitchen",
# "success_criteria": [
# "Ingredients are removed from refrigerator",
# "Cooking pot is placed on stove",
# "Ingredients are added to pot"
# ],
# "verification": {
# "type": "state_check",
# "conditions": [...]
# }
# }
Configuration and Customization
Pipeline Configuration
Configure the generation pipeline through YAML files:
# pipeline.yaml
pipeline:
max_workers: 4 # Concurrent processing threads
retry_attempts: 3 # Retries for failed generations
timeout_seconds: 300 # Timeout per generation step
output:
format: "json" # Output format (json, yaml)
compression: true # Compress output files
validation: true # Enable quality validation
generation:
num_scenarios: 100 # Number of scenarios to generate
scene_complexity: "medium" # Scenario complexity level
task_variety: "high" # Task type diversity
Generator Configuration
Configure individual generators:
# clue_gen_config.yaml
clue_generator:
max_objects: 20 # Maximum objects per scenario
object_categories: # Allowed object types
- "furniture"
- "tools"
- "consumables"
property_richness: "high" # Detail level for object properties
# scene_gen_config.yaml
scene_generator:
room_types: # Supported room types
- "kitchen"
- "living_room"
- "bedroom"
spatial_complexity: "medium" # Layout complexity
object_density: 0.7 # Object placement density
# task_gen_config.yaml
task_generator:
task_categories: # Task types to generate
- "direct_command"
- "attribute_reasoning"
- "multi_step"
difficulty_distribution: # Difficulty level distribution
easy: 0.3
medium: 0.5
hard: 0.2
Custom Generators
Extend base generators for custom requirements:
from data_generation.generators.base_generator import BaseGenerator
class CustomSceneGenerator(BaseGenerator):
def __init__(self, config):
super().__init__(config)
self.domain_knowledge = self._load_domain_data()
def generate(self, clues, **kwargs):
# Custom generation logic
base_scene = super().generate(clues, **kwargs)
# Apply domain-specific modifications
enhanced_scene = self._apply_domain_constraints(base_scene)
specialized_scene = self._add_domain_objects(enhanced_scene)
return specialized_scene
def _apply_domain_constraints(self, scene):
# Implement domain-specific spatial constraints
return scene
def _add_domain_objects(self, scene):
# Add specialized objects for the domain
return scene
Quality Control and Validation
Task Validation
The TaskValidator ensures generated content meets quality standards:
from data_generation.utils.task_validator import TaskValidator
# Initialize validator
validator = TaskValidator()
# Validate generated task
validation_result = validator.validate_task(
task=generated_task,
scene=scene_data
)
if not validation_result.is_valid:
print(f"Validation failed: {validation_result.errors}")
print(f"Suggestions: {validation_result.suggestions}")
Validation Checks:
Logical Consistency: Task requirements match scene contents
Feasibility: Tasks can be completed given available objects
Completeness: All necessary information is present
Clarity: Task descriptions are unambiguous
Verification: Success criteria are measurable
Automated Quality Metrics
Built-in quality assessment:
# Quality metrics computed automatically
quality_metrics = {
'coherence_score': 0.87, # Logical consistency
'complexity_score': 0.65, # Appropriate difficulty
'completeness_score': 0.92, # Information completeness
'feasibility_score': 0.89, # Task feasibility
'diversity_score': 0.78 # Content variety
}
# Filter by quality thresholds
high_quality_tasks = validator.filter_by_quality(
tasks=all_tasks,
min_coherence=0.8,
min_feasibility=0.85
)
Batch Processing and Workflows
Large-Scale Generation
Generate datasets at scale with progress monitoring:
from data_generation.pipeline import Pipeline
# Configure large-scale generation
pipeline = Pipeline()
# Generate 1000 scenarios with progress tracking
results = pipeline.run_batch(
concepts_file="domain_concepts.txt",
num_scenarios=1000,
batch_size=50,
progress_callback=lambda p: print(f"Progress: {p:.1%}")
)
# Results include detailed statistics
print(f"Success rate: {results['success_rate']:.2%}")
print(f"Average quality score: {results['avg_quality']:.2f}")
print(f"Token usage: {results['total_tokens']:,}")
Resume and Incremental Generation
Resume interrupted generations and add to existing datasets:
# Resume interrupted generation
pipeline = Pipeline()
results = pipeline.resume(
checkpoint_file="generation_checkpoint.json",
output_dir="data/scenarios"
)
# Incremental generation (add to existing dataset)
additional_results = pipeline.extend_dataset(
existing_dir="data/scenarios",
additional_count=200,
maintain_distribution=True # Keep same task type distribution
)
Distributed Generation
Scale across multiple machines:
# Distributed generation coordinator
from data_generation.distributed import DistributedPipeline
coordinator = DistributedPipeline()
# Configure worker nodes
workers = [
{"host": "worker1.local", "threads": 8},
{"host": "worker2.local", "threads": 8},
{"host": "worker3.local", "threads": 8}
]
# Distribute generation across workers
results = coordinator.run_distributed(
concepts="large_concept_set.txt",
workers=workers,
total_scenarios=5000
)
Data Management and Export
Dataset Organization
Generated data is automatically organized:
data/
├── clue/ # Generated object clues
│ ├── batch_001.json
│ └── batch_002.json
├── scene/ # Generated scenes
│ ├── scene_001.json
│ └── scene_002.json
├── task/ # Generated tasks
│ ├── task_001.json
│ └── task_002.json
└── metadata/ # Generation metadata
├── generation_log.json
└── quality_metrics.json
Export Formats
Export to various formats for different use cases:
from data_generation.exporters import DatasetExporter
exporter = DatasetExporter()
# Export for evaluation framework
exporter.export_evaluation_format(
input_dir="data/",
output_file="evaluation_dataset.json",
split_ratio={"train": 0.7, "test": 0.3}
)
# Export for training
exporter.export_training_format(
input_dir="data/",
output_file="training_dataset.jsonl",
include_trajectories=True
)
# Export statistics and analysis
exporter.export_analysis(
input_dir="data/",
output_file="dataset_analysis.html",
include_visualizations=True
)
Performance Optimization
Generation Efficiency
Optimize generation speed and resource usage:
performance:
# Parallel processing
max_workers: 8 # Concurrent threads
batch_size: 20 # Items per batch
# Caching
enable_llm_cache: true # Cache LLM responses
cache_expiry_hours: 24 # Cache duration
# Resource management
memory_limit_mb: 4096 # Memory limit per worker
timeout_seconds: 120 # Generation timeout
# Quality vs speed trade-offs
validation_level: "basic" # basic, standard, comprehensive
quality_threshold: 0.75 # Minimum quality score
Token Usage Optimization
Manage LLM API costs effectively:
# Monitor token usage
usage_tracker = pipeline.get_token_usage()
print(f"Clue generation: {usage_tracker['clue_tokens']:,} tokens")
print(f"Scene generation: {usage_tracker['scene_tokens']:,} tokens")
print(f"Task generation: {usage_tracker['task_tokens']:,} tokens")
print(f"Estimated cost: ${usage_tracker['estimated_cost']:.2f}")
# Optimize prompts for efficiency
pipeline.enable_prompt_optimization(
target_reduction=0.2, # 20% token reduction target
preserve_quality=True # Maintain generation quality
)
Best Practices
Generation Strategy
Planning Generation:
Start with small test batches to validate quality
Use diverse seed concepts for variety
Balance task difficulty distribution
Plan for dataset size requirements early
Quality Management:
Set appropriate quality thresholds
Review samples manually before large-scale generation
Use validation metrics to filter results
Maintain generation logs for reproducibility
Resource Management:
Monitor token usage and costs
Use appropriate parallelism for available hardware
Implement checkpointing for long generations
Plan storage requirements for large datasets
Troubleshooting
Common Issues:
Low Quality Outputs: Adjust prompts or increase model temperature
Generation Failures: Check LLM connectivity and increase timeouts
Memory Issues: Reduce batch size or worker count
Slow Processing: Increase parallelism or optimize prompts
Debugging Tools:
# Enable debug logging
pipeline.enable_debug_logging()
# Inspect failed generations
failures = pipeline.get_failed_items()
for failure in failures:
print(f"Item: {failure['item']}")
print(f"Error: {failure['error']}")
print(f"Stage: {failure['stage']}")
# Validate configuration
config_check = pipeline.validate_configuration()
if not config_check.is_valid:
print(f"Config errors: {config_check.errors}")
API Reference
For complete API documentation, see:
data_generation.pipeline.Pipelinedata_generation.generators.clue_generator.ClueGeneratordata_generation.utils.task_validator.TaskValidator