Agent Protocol¶
The core abstraction for agents in DynaBots. An agent processes tasks and returns results.
Protocol Definition¶
dynabots_core.protocols.agent.Agent
¶
Bases: Protocol
Protocol for agents that participate in DynaBots orchestration.
Required properties: - name: Unique agent identifier - capabilities: List of capability names this agent supports
Optional properties: - domains: Domain keywords for intelligent routing (e.g., ["data", "etl"]) - description: Human-readable description
Required methods: - process_task: Execute a task described in natural language - health_check: Liveness check
Optional methods: - execute_capability: Direct capability execution (legacy mode)
Source code in packages/core/dynabots_core/protocols/agent.py
capabilities
property
¶
List of capability names this agent supports.
name
property
¶
Unique agent identifier.
health_check()
async
¶
Check if this agent is healthy and ready to process tasks.
Returns:
| Type | Description |
|---|---|
bool
|
True if healthy, False otherwise. |
Example
async def health_check(self) -> bool: try: await self.db.ping() return True except Exception: return False
Source code in packages/core/dynabots_core/protocols/agent.py
process_task(task_description, context)
async
¶
Smart Mode: Execute a task described in natural language.
The agent uses its own LLM and tool schemas to decide how to accomplish the task. This is the preferred execution mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_description
|
str
|
Natural language description of what to do. |
required |
context
|
Dict[str, Any]
|
Execution context including: - task_id: Unique task identifier - workflow_id: Parent workflow ID - parent_results: Results from upstream tasks - user_context: User information (email, etc.) |
required |
Returns:
| Type | Description |
|---|---|
TaskResult
|
TaskResult with the execution outcome. |
Example
async def process_task(self, task_description, context): # Use your agent's LLM to understand the task plan = await self.llm.plan(task_description, self.tools)
# Execute the plan
result = await self.execute_plan(plan)
return TaskResult.success(
task_id=context["task_id"],
data=result
)
Source code in packages/core/dynabots_core/protocols/agent.py
Simple Implementation¶
from dynabots_core import Agent, TaskResult
from typing import Any, Dict, List
class SimpleAgent:
"""A minimal agent implementation."""
@property
def name(self) -> str:
return "SimpleAgent"
@property
def capabilities(self) -> List[str]:
return ["greet", "answer"]
async def process_task(
self,
task_description: str,
context: Dict[str, Any],
) -> TaskResult:
"""Execute a task."""
if "hello" in task_description.lower():
return TaskResult.success(
task_id=context.get("task_id", "unknown"),
data={"message": "Hello!"}
)
return TaskResult.failure(
task_id=context.get("task_id", "unknown"),
error="Unknown task"
)
async def health_check(self) -> bool:
return True
Advanced Implementation¶
With LLM-powered task understanding:
import asyncio
from typing import Any, Dict, List
from dynabots_core import Agent, TaskResult, LLMMessage
from dynabots_core.providers import OllamaProvider
class LLMAgent:
"""Agent that uses LLM for smart task understanding."""
def __init__(self, llm=None):
self.llm = llm or OllamaProvider(model="qwen2.5:7b")
self._tools = [
self._search_knowledge_base,
self._compute_calculation,
self._generate_summary,
]
@property
def name(self) -> str:
return "LLMAgent"
@property
def capabilities(self) -> List[str]:
return ["search", "compute", "summarize"]
@property
def domains(self) -> List[str]:
return ["information", "analytics"]
async def process_task(
self,
task_description: str,
context: Dict[str, Any],
) -> TaskResult:
"""Use LLM to understand and execute task."""
task_id = context.get("task_id", "unknown")
# Build prompt with available tools
tools_info = "\n".join([
f"- {tool.__name__}: {tool.__doc__}"
for tool in self._tools
])
prompt = f"""You are a helpful assistant.
Available tools:
{tools_info}
User request: {task_description}
Determine which tool(s) to use and execute them."""
try:
response = await self.llm.complete([
LLMMessage(role="user", content=prompt)
])
return TaskResult.success(
task_id=task_id,
data={"response": response.content}
)
except Exception as e:
return TaskResult.failure(
task_id=task_id,
error=str(e)
)
async def health_check(self) -> bool:
"""Check if LLM is accessible."""
try:
response = await self.llm.complete([
LLMMessage(role="user", content="ping")
])
return bool(response.content)
except Exception:
return False
async def _search_knowledge_base(self, query: str) -> str:
"""Search the knowledge base."""
# Simulate search
return f"Results for '{query}'"
async def _compute_calculation(self, expr: str) -> str:
"""Compute a mathematical expression."""
try:
return str(eval(expr))
except Exception as e:
return f"Error: {e}"
async def _generate_summary(self, text: str) -> str:
"""Generate a summary of text."""
words = text.split()[:10]
return " ".join(words) + "..."
Domains and Capabilities¶
Use these to enable intelligent routing.
Capabilities¶
List specific actions your agent can perform:
@property
def capabilities(self) -> List[str]:
return [
"fetch_data",
"query_database",
"transform_data",
"export_csv",
]
Used by orchestration frameworks to route tasks to capable agents.
Domains¶
List topic areas your agent covers:
@property
def domains(self) -> List[str]:
return [
"data", # General data domain
"analytics", # Analytics focus
"etl", # Extract-transform-load
]
Used for smart routing and competition (agents compete within domains).
Overlapping Domains¶
Multiple agents can cover the same domain. Orchestration frameworks (like ORC) use judges to determine which agent wins.
# DataAgent
domains = ["data", "analytics"]
# AnalyticsAgent (also handles analytics!)
domains = ["analytics", "reporting"]
# Both cover "analytics" → they compete
Task Context¶
The context dict passed to process_task() contains execution metadata:
async def process_task(self, task: str, context: Dict[str, Any]) -> TaskResult:
task_id = context["task_id"]
workflow_id = context.get("workflow_id")
parent_results = context.get("parent_results", [])
user_context = context.get("user_context")
# Use context in your task execution
...
Standard fields:
task_id: Unique task identifierworkflow_id: Parent workflow (optional)parent_results: Results from upstream tasks (optional)user_context: User metadata (email, etc.) (optional)
Health Checks¶
The health_check() method enables liveness detection.
async def health_check(self) -> bool:
"""Is this agent ready?"""
try:
# Check database connection
await self.db.ping()
# Check LLM connection
await self.llm.complete([...])
return True
except Exception:
return False
Orchestration frameworks periodically call health_check() to detect faulty agents.
Type Checking¶
DynaBots protocols are runtime_checkable. Use isinstance() to validate:
from dynabots_core import Agent
def use_agent(obj):
if not isinstance(obj, Agent):
raise TypeError(f"{obj} does not implement Agent protocol")
# Safe to use as Agent
Execution Modes¶
Smart Mode (Recommended)¶
Your agent uses its own LLM to understand tasks:
async def process_task(self, task: str, context: dict) -> TaskResult:
# Parse task with LLM
plan = await self.llm.understand(task)
# Execute plan
result = await self.execute_plan(plan)
return TaskResult.success(task_id=context["task_id"], data=result)
Pros: - Flexible task understanding - Use any tools - Better for complex tasks
Legacy Mode (Optional)¶
Implement execute_capability() for direct routing:
from typing import Any, Dict, List, Protocol, runtime_checkable
@runtime_checkable
class LegacyAgent(Protocol):
async def execute_capability(
self,
capability: str,
parameters: Dict[str, Any],
context: Dict[str, Any],
) -> "TaskResult":
"""Execute a specific capability directly."""
...
Pros: - Predictable, direct routing - Good for well-defined operations - No LLM overhead
Best Practices¶
- Stateless agents: Agents should be stateless. Store data in dedicated services.
- Health checks: Always implement
health_check(). Return fast. - Domains: Be specific with domains. Overlapping domains enable competition.
- Error handling: Return
TaskResult.failure()with clear error messages. - Metadata: Use
TaskResult.metadatato attach domain-specific data.