DREAMv1 Examples

Learn how to build various types of multi-agent systems with DREAMv1 through practical examples. From simple agent communication to complex hierarchical systems, these examples demonstrate DREAMv1's capabilities.

Basic Examples

Get started with these simple examples showcasing core functionality.

Simple Two-Agent System

Create a basic system with two agents that communicate with each other.

import dreamv1 as dv1

# Initialize the MCP
mcp = dv1.MasterControlProgram()

# Create two agents
sender = mcp.create_agent("basic", {
    "name": "Sender",
    "role": "Sends messages"
})

receiver = mcp.create_agent("basic", {
    "name": "Receiver",
    "role": "Receives and processes messages"
})

# Define a message handler for the receiver
def handle_message(message):
    print(f"Received: {message.content}")
    return "Message acknowledged"

# Register the handler with the receiver
receiver.set_message_handler(handle_message)

# Start the system
mcp.start()

# Send a message
sender.send_message(
    receiver.id, 
    "Hello from sender agent!", 
    message_type="greeting"
)

# Wait for processing
import time
time.sleep(1)

# Stop the system
mcp.stop()

Reactive Agent

Create an agent that reacts to specific events with predefined actions.

import dreamv1 as dv1

# Initialize the MCP
mcp = dv1.MasterControlProgram()

# Create a reactive agent
reactive_agent = mcp.create_agent("reactive", {
    "name": "WeatherMonitor",
    "role": "Monitors weather conditions and reacts to changes"
})

# Define reactions to different events
reactions = {
    "temperature_high": lambda data: print(f"Alert: High temperature {data['value']}°C"),
    "temperature_low": lambda data: print(f"Alert: Low temperature {data['value']}°C"),
    "rain_detected": lambda data: print(f"Weather update: Rain detected, intensity: {data['intensity']}")
}

# Register reactions
for event, reaction in reactions.items():
    reactive_agent.register_reaction(event, reaction)

# Start the system
mcp.start()

# Simulate some weather events
reactive_agent.trigger_event("temperature_high", {"value": 35})
reactive_agent.trigger_event("rain_detected", {"intensity": "moderate"})

# Stop the system
mcp.stop()

Intermediate Examples

Build more complex systems with specialized agent types and hierarchical structures.

Executive-Worker Pattern

Create a simple hierarchy with an executive agent coordinating worker agents.

import dreamv1 as dv1

# Initialize the MCP
mcp = dv1.MasterControlProgram()

# Create an executive agent
executive = mcp.create_agent("executive", {
    "name": "TaskManager",
    "role": "Coordinates and delegates tasks"
})

# Create worker agents
workers = []
for i in range(3):
    worker = mcp.create_agent("specialist", {
        "name": f"Worker-{i}",
        "role": "Processes assigned tasks"
    })
    
    # Define task processing capability
    def process_task(task_data, worker_id=i):
        print(f"Worker {worker_id} processing task: {task_data['description']}")
        return {"status": "completed", "result": f"Task {task_data['id']} result"}
    
    # Register the capability
    worker.register_capability("process_task", process_task)
    workers.append(worker)

# Start the system
mcp.start()

# Define task assignment strategy for the executive
def assign_tasks(tasks):
    for i, task in enumerate(tasks):
        worker_idx = i % len(workers)
        print(f"Executive: Assigning task {task['id']} to Worker-{worker_idx}")
        executive.send_message(
            workers[worker_idx].id,
            task,
            message_type="task_assignment"
        )

# Register the strategy
executive.set_task_assignment_strategy(assign_tasks)

# Create some tasks
tasks = [
    {"id": "task-1", "description": "Process data file", "priority": "high"},
    {"id": "task-2", "description": "Generate report", "priority": "medium"},
    {"id": "task-3", "description": "Update database", "priority": "low"}
]

# Assign tasks
executive.assign_tasks(tasks)

# Wait for processing to complete
import time
time.sleep(2)

# Stop the system
mcp.stop()

Memory-Based Agent

Create agents that store and retrieve information from different memory systems.

import dreamv1 as dv1

# Initialize the MCP with memory systems enabled
mcp = dv1.MasterControlProgram({
    "memory": {
        "working_memory": {"size": 100},
        "long_term_memory": {"persistence": "file", "path": "./agent_memory.db"}
    }
})

# Create an agent with memory access
memory_agent = mcp.create_agent("memory_enabled", {
    "name": "MemoryAgent",
    "role": "Processes and remembers information"
})

# Start the system
mcp.start()

# Store information in different memory types
memory_agent.memory.store(
    "working_memory", 
    "current_task", 
    {"id": "task-1", "status": "in_progress"}
)

memory_agent.memory.store(
    "long_term_memory",
    "important_config",
    {"api_key": "abc123", "endpoint": "https://api.example.com"},
    metadata={"category": "configuration", "importance": "high"}
)

# Retrieve from working memory
current_task = memory_agent.memory.retrieve("working_memory", "current_task")
print(f"Current task: {current_task}")

# Retrieve from long-term memory
config = memory_agent.memory.retrieve("long_term_memory", "important_config")
print(f"Retrieved configuration: {config}")

# Query memory by metadata
configurations = memory_agent.memory.retrieve(
    "long_term_memory", 
    query={"metadata.category": "configuration"}
)
print(f"All configurations: {configurations}")

# Stop the system
mcp.stop()

Advanced Examples

Sophisticated multi-agent systems with complex behaviors and interactions.

Self-Organizing Team

Create a team of agents that dynamically organize based on workload and capabilities.

import dreamv1 as dv1
import random

# Initialize the MCP
mcp = dv1.MasterControlProgram()

# Create a coordinator agent
coordinator = mcp.create_agent("executive", {
    "name": "TeamCoordinator",
    "role": "Monitors team performance and reassigns roles as needed"
})

# Define agent capabilities and efficiency levels
capabilities = ["data_processing", "report_generation", "client_communication", "forecasting"]
specialty_matrix = {}

# Create a team of agents with different specialties
team = []
for i in range(5):
    agent = mcp.create_agent("adaptive", {
        "name": f"TeamMember-{i}",
        "role": "Flexible team member with adjustable specialties"
    })
    
    # Assign random efficiency levels for each capability
    agent_specialties = {}
    for capability in capabilities:
        efficiency = random.uniform(0.5, 1.0)
        agent_specialties[capability] = efficiency
        
        # Register the capability function with the agent
        def execute_capability(data, cap=capability, eff=efficiency):
            time_required = data["base_time"] / eff
            print(f"Agent {agent.name} executing {cap} with {eff:.2f} efficiency. Time: {time_required:.2f}s")
            return {"success": True, "time_taken": time_required}
            
        agent.register_capability(capability, execute_capability)
    
    specialty_matrix[agent.id] = agent_specialties
    team.append(agent)

# Configure the coordinator with team information
coordinator.set_team_data(team, specialty_matrix)

# Start the system
mcp.start()

# Define task queue with different capability requirements
tasks = [
    {"id": "task-1", "capability": "data_processing", "base_time": 10, "priority": "high"},
    {"id": "task-2", "capability": "report_generation", "base_time": 15, "priority": "medium"},
    {"id": "task-3", "capability": "client_communication", "base_time": 5, "priority": "high"},
    {"id": "task-4", "capability": "forecasting", "base_time": 20, "priority": "low"},
    {"id": "task-5", "capability": "data_processing", "base_time": 12, "priority": "medium"}
]

# Function for optimal task assignment
def optimal_assignment(task):
    capability = task["capability"]
    best_agent = None
    best_efficiency = 0
    
    for agent_id, specialties in specialty_matrix.items():
        if specialties[capability] > best_efficiency:
            best_efficiency = specialties[capability]
            best_agent = agent_id
    
    return best_agent

# Register assignment strategy
coordinator.set_assignment_strategy(optimal_assignment)

# Process tasks
for task in tasks:
    coordinator.assign_task(task)

# Wait for processing to complete
import time
time.sleep(3)

# Adapt team based on workload
coordinator.analyze_performance()
coordinator.rebalance_team()

# Stop the system
mcp.stop()

Design Patterns

Common patterns for building effective multi-agent systems.

Monitoring and Reporting Pattern

Set up a monitoring system where specialized agents report to a central dashboard.

import dreamv1 as dv1

# Initialize the MCP with monitoring enabled
mcp = dv1.MasterControlProgram({
    "monitoring": {
        "enabled": True,
        "metrics_interval": 5,  # seconds
        "persistence": "memory"
    }
})

# Create a dashboard agent
dashboard = mcp.create_agent("monitoring", {
    "name": "SystemDashboard",
    "role": "Collects and visualizes system metrics"
})

# Create monitoring agents for different subsystems
monitors = []
subsystems = ["database", "api", "frontend", "background_tasks"]

for subsystem in subsystems:
    monitor = mcp.create_agent("sensor", {
        "name": f"{subsystem.capitalize()}Monitor",
        "role": f"Monitors {subsystem} performance and health"
    })
    
    # Configure what this monitor tracks
    metrics = {
        "health": lambda: random.choice(["healthy", "degraded", "failing"]),
        "response_time": lambda: random.uniform(10, 500),  # ms
        "error_rate": lambda: random.uniform(0, 0.05),  # 0-5%
        "throughput": lambda: random.randint(10, 1000)  # requests/minute
    }
    
    # Register metrics
    for metric_name, metric_func in metrics.items():
        monitor.register_metric(f"{subsystem}_{metric_name}", metric_func)
    
    monitors.append(monitor)

# Configure dashboard to receive reports
def process_report(report):
    print(f"Dashboard received report from {report['source']}:")
    for metric, value in report['metrics'].items():
        status = "✓"
        if metric.endswith("_health") and value != "healthy":
            status = "⚠️" if value == "degraded" else "✕"
        elif metric.endswith("_error_rate") and value > 0.01:
            status = "⚠️" if value < 0.03 else "✕"
        print(f"  {status} {metric}: {value}")

dashboard.set_report_handler(process_report)

# Start the system
mcp.start()

# Configure monitors to send regular reports
import random
import time

for monitor in monitors:
    monitor.schedule_reporting(dashboard.id, interval=10)  # report every 10 seconds

# Simulate system running for a period
print("System monitoring active. Press Ctrl+C to stop...")
try:
    time.sleep(30)  # Run for 30 seconds
except KeyboardInterrupt:
    pass

# Get overall system health report
system_health = dashboard.generate_health_report()
print("\nSystem Health Summary:")
for subsystem, status in system_health.items():
    print(f"{subsystem}: {status['status']} - {status['description']}")

# Stop the system
mcp.stop()

Integrations

Examples of integrating DREAMv1 with external services and libraries.

LLM Integration

Create agents that leverage large language models for decision making.

import dreamv1 as dv1
from dreamv1.integrations import llm

# Initialize the MCP with LLM integration
mcp = dv1.MasterControlProgram({
    "integrations": {
        "llm": {
            "provider": "openai",
            "model": "gpt-4",
            "api_key": "YOUR_API_KEY"  # In production, use environment variables
        }
    }
})

# Create an LLM-powered agent
assistant = mcp.create_agent("llm_powered", {
    "name": "ResearchAssistant",
    "role": "Answers research questions with LLM assistance",
    "system_prompt": """You are a helpful research assistant specialized in AI and 
    machine learning. Provide accurate, concise answers to questions in these fields."""
})

# Create a user agent to interact with the assistant
user = mcp.create_agent("basic", {
    "name": "User",
    "role": "Human user asking questions"
})

# Start the system
mcp.start()

# Function to process questions from the user
def process_user_question(question):
    print(f"\nUser asks: {question}")
    
    # Send the question to the assistant
    response_message = user.send_message(
        assistant.id,
        question,
        message_type="question",
        wait_for_response=True  # This will wait for a response
    )
    
    print(f"Assistant answers: {response_message.content}")

# Ask some questions
questions = [
    "What is the difference between hierarchical and flat multi-agent systems?",
    "What are some applications of hierarchical multi-agent systems in robotics?",
    "How do hierarchical systems handle task delegation differently from non-hierarchical ones?"
]

for question in questions:
    process_user_question(question)

# Stop the system
mcp.stop()

Web API Integration

Create a web service that exposes a multi-agent system via a REST API.

import dreamv1 as dv1
from dreamv1.integrations import web
from fastapi import FastAPI
import uvicorn

# Initialize the MCP
mcp = dv1.MasterControlProgram()

# Create agents for different services
data_processor = mcp.create_agent("specialist", {
    "name": "DataProcessor",
    "role": "Processes data payloads"
})

def process_data(data):
    # Process the data
    result = {
        "processed": True,
        "input_size": len(str(data)),
        "summary": f"Processed {len(data.get('items', []))} items"
    }
    return result

data_processor.register_capability("process_data", process_data)

# Create a query agent
query_agent = mcp.create_agent("specialist", {
    "name": "QueryAgent",
    "role": "Handles data queries"
})

def execute_query(query_params):
    # Execute the query
    result = {
        "results": [
            {"id": 1, "name": "Sample Result 1"},
            {"id": 2, "name": "Sample Result 2"}
        ],
        "count": 2,
        "query": query_params
    }
    return result

query_agent.register_capability("execute_query", execute_query)

# Start the system
mcp.start()

# Create a FastAPI application
app = FastAPI(title="DREAMv1 API Example")

# Create an API gateway
api_gateway = web.APIGateway(mcp, app)

# Register endpoints
@api_gateway.post("/process")
async def process_endpoint(data: dict):
    """Process data through the data processor agent"""
    result = await api_gateway.execute_capability(
        agent_id=data_processor.id,
        capability="process_data",
        data=data
    )
    return result

@api_gateway.get("/query")
async def query_endpoint(q: str = "", limit: int = 10):
    """Execute a query through the query agent"""
    result = await api_gateway.execute_capability(
        agent_id=query_agent.id,
        capability="execute_query",
        data={"q": q, "limit": limit}
    )
    return result

@api_gateway.get("/agents")
async def list_agents():
    """List all active agents in the system"""
    agents = mcp.list_agents()
    return {
        "agents": [
            {"id": agent.id, "name": agent.name, "role": agent.role}
            for agent in agents
        ],
        "count": len(agents)
    }

# Run the web server (for actual deployment, use a production server)
if __name__ == "__main__":
    print("Starting DREAMv1 Web API...")
    print("API documentation available at http://127.0.0.1:8000/docs")
    uvicorn.run(app, host="127.0.0.1", port=8000)

# In a real application, we would manage the MCP lifecycle separately

Next Steps

Ready to build your own multi-agent system? Review the architecture documentation for a deeper understanding of DREAMv1's components, or check the API reference for detailed information about each class and method.

If you build something interesting with DREAMv1, please consider contributing your example to help others learn!