Skip to content

Agent Protocol

The core abstraction for agents in DynaBots. An agent processes tasks and returns results.


Protocol Definition

dynabots_core.protocols.agent.Agent

Bases: Protocol

Protocol for agents that participate in DynaBots orchestration.

Required properties: - name: Unique agent identifier - capabilities: List of capability names this agent supports

Optional properties: - domains: Domain keywords for intelligent routing (e.g., ["data", "etl"]) - description: Human-readable description

Required methods: - process_task: Execute a task described in natural language - health_check: Liveness check

Optional methods: - execute_capability: Direct capability execution (legacy mode)

Source code in packages/core/dynabots_core/protocols/agent.py
@runtime_checkable
class Agent(Protocol):
    """
    Protocol for agents that participate in DynaBots orchestration.

    Required properties:
    - name: Unique agent identifier
    - capabilities: List of capability names this agent supports

    Optional properties:
    - domains: Domain keywords for intelligent routing (e.g., ["data", "etl"])
    - description: Human-readable description

    Required methods:
    - process_task: Execute a task described in natural language
    - health_check: Liveness check

    Optional methods:
    - execute_capability: Direct capability execution (legacy mode)
    """

    @property
    def name(self) -> str:
        """Unique agent identifier."""
        ...

    @property
    def capabilities(self) -> List[str]:
        """List of capability names this agent supports."""
        ...

    async def process_task(
        self,
        task_description: str,
        context: Dict[str, Any],
    ) -> "TaskResult":
        """
        Smart Mode: Execute a task described in natural language.

        The agent uses its own LLM and tool schemas to decide how
        to accomplish the task. This is the preferred execution mode.

        Args:
            task_description: Natural language description of what to do.
            context: Execution context including:
                - task_id: Unique task identifier
                - workflow_id: Parent workflow ID
                - parent_results: Results from upstream tasks
                - user_context: User information (email, etc.)

        Returns:
            TaskResult with the execution outcome.

        Example:
            async def process_task(self, task_description, context):
                # Use your agent's LLM to understand the task
                plan = await self.llm.plan(task_description, self.tools)

                # Execute the plan
                result = await self.execute_plan(plan)

                return TaskResult.success(
                    task_id=context["task_id"],
                    data=result
                )
        """
        ...

    async def health_check(self) -> bool:
        """
        Check if this agent is healthy and ready to process tasks.

        Returns:
            True if healthy, False otherwise.

        Example:
            async def health_check(self) -> bool:
                try:
                    await self.db.ping()
                    return True
                except Exception:
                    return False
        """
        ...

capabilities property

List of capability names this agent supports.

name property

Unique agent identifier.

health_check() async

Check if this agent is healthy and ready to process tasks.

Returns:

Type Description
bool

True if healthy, False otherwise.

Example

async def health_check(self) -> bool: try: await self.db.ping() return True except Exception: return False

Source code in packages/core/dynabots_core/protocols/agent.py
async def health_check(self) -> bool:
    """
    Check if this agent is healthy and ready to process tasks.

    Returns:
        True if healthy, False otherwise.

    Example:
        async def health_check(self) -> bool:
            try:
                await self.db.ping()
                return True
            except Exception:
                return False
    """
    ...

process_task(task_description, context) async

Smart Mode: Execute a task described in natural language.

The agent uses its own LLM and tool schemas to decide how to accomplish the task. This is the preferred execution mode.

Parameters:

Name Type Description Default
task_description str

Natural language description of what to do.

required
context Dict[str, Any]

Execution context including: - task_id: Unique task identifier - workflow_id: Parent workflow ID - parent_results: Results from upstream tasks - user_context: User information (email, etc.)

required

Returns:

Type Description
TaskResult

TaskResult with the execution outcome.

Example

async def process_task(self, task_description, context): # Use your agent's LLM to understand the task plan = await self.llm.plan(task_description, self.tools)

# Execute the plan
result = await self.execute_plan(plan)

return TaskResult.success(
    task_id=context["task_id"],
    data=result
)
Source code in packages/core/dynabots_core/protocols/agent.py
async def process_task(
    self,
    task_description: str,
    context: Dict[str, Any],
) -> "TaskResult":
    """
    Smart Mode: Execute a task described in natural language.

    The agent uses its own LLM and tool schemas to decide how
    to accomplish the task. This is the preferred execution mode.

    Args:
        task_description: Natural language description of what to do.
        context: Execution context including:
            - task_id: Unique task identifier
            - workflow_id: Parent workflow ID
            - parent_results: Results from upstream tasks
            - user_context: User information (email, etc.)

    Returns:
        TaskResult with the execution outcome.

    Example:
        async def process_task(self, task_description, context):
            # Use your agent's LLM to understand the task
            plan = await self.llm.plan(task_description, self.tools)

            # Execute the plan
            result = await self.execute_plan(plan)

            return TaskResult.success(
                task_id=context["task_id"],
                data=result
            )
    """
    ...

Simple Implementation

from dynabots_core import Agent, TaskResult
from typing import Any, Dict, List

class SimpleAgent:
    """A minimal agent implementation."""

    @property
    def name(self) -> str:
        return "SimpleAgent"

    @property
    def capabilities(self) -> List[str]:
        return ["greet", "answer"]

    async def process_task(
        self,
        task_description: str,
        context: Dict[str, Any],
    ) -> TaskResult:
        """Execute a task."""
        if "hello" in task_description.lower():
            return TaskResult.success(
                task_id=context.get("task_id", "unknown"),
                data={"message": "Hello!"}
            )
        return TaskResult.failure(
            task_id=context.get("task_id", "unknown"),
            error="Unknown task"
        )

    async def health_check(self) -> bool:
        return True

Advanced Implementation

With LLM-powered task understanding:

import asyncio
from typing import Any, Dict, List
from dynabots_core import Agent, TaskResult, LLMMessage
from dynabots_core.providers import OllamaProvider

class LLMAgent:
    """Agent that uses LLM for smart task understanding."""

    def __init__(self, llm=None):
        self.llm = llm or OllamaProvider(model="qwen2.5:7b")
        self._tools = [
            self._search_knowledge_base,
            self._compute_calculation,
            self._generate_summary,
        ]

    @property
    def name(self) -> str:
        return "LLMAgent"

    @property
    def capabilities(self) -> List[str]:
        return ["search", "compute", "summarize"]

    @property
    def domains(self) -> List[str]:
        return ["information", "analytics"]

    async def process_task(
        self,
        task_description: str,
        context: Dict[str, Any],
    ) -> TaskResult:
        """Use LLM to understand and execute task."""
        task_id = context.get("task_id", "unknown")

        # Build prompt with available tools
        tools_info = "\n".join([
            f"- {tool.__name__}: {tool.__doc__}"
            for tool in self._tools
        ])

        prompt = f"""You are a helpful assistant.

Available tools:
{tools_info}

User request: {task_description}

Determine which tool(s) to use and execute them."""

        try:
            response = await self.llm.complete([
                LLMMessage(role="user", content=prompt)
            ])

            return TaskResult.success(
                task_id=task_id,
                data={"response": response.content}
            )
        except Exception as e:
            return TaskResult.failure(
                task_id=task_id,
                error=str(e)
            )

    async def health_check(self) -> bool:
        """Check if LLM is accessible."""
        try:
            response = await self.llm.complete([
                LLMMessage(role="user", content="ping")
            ])
            return bool(response.content)
        except Exception:
            return False

    async def _search_knowledge_base(self, query: str) -> str:
        """Search the knowledge base."""
        # Simulate search
        return f"Results for '{query}'"

    async def _compute_calculation(self, expr: str) -> str:
        """Compute a mathematical expression."""
        try:
            return str(eval(expr))
        except Exception as e:
            return f"Error: {e}"

    async def _generate_summary(self, text: str) -> str:
        """Generate a summary of text."""
        words = text.split()[:10]
        return " ".join(words) + "..."

Domains and Capabilities

Use these to enable intelligent routing.

Capabilities

List specific actions your agent can perform:

@property
def capabilities(self) -> List[str]:
    return [
        "fetch_data",
        "query_database",
        "transform_data",
        "export_csv",
    ]

Used by orchestration frameworks to route tasks to capable agents.

Domains

List topic areas your agent covers:

@property
def domains(self) -> List[str]:
    return [
        "data",        # General data domain
        "analytics",   # Analytics focus
        "etl",         # Extract-transform-load
    ]

Used for smart routing and competition (agents compete within domains).

Overlapping Domains

Multiple agents can cover the same domain. Orchestration frameworks (like ORC) use judges to determine which agent wins.

# DataAgent
domains = ["data", "analytics"]

# AnalyticsAgent (also handles analytics!)
domains = ["analytics", "reporting"]

# Both cover "analytics" → they compete

Task Context

The context dict passed to process_task() contains execution metadata:

async def process_task(self, task: str, context: Dict[str, Any]) -> TaskResult:
    task_id = context["task_id"]
    workflow_id = context.get("workflow_id")
    parent_results = context.get("parent_results", [])
    user_context = context.get("user_context")

    # Use context in your task execution
    ...

Standard fields:

  • task_id: Unique task identifier
  • workflow_id: Parent workflow (optional)
  • parent_results: Results from upstream tasks (optional)
  • user_context: User metadata (email, etc.) (optional)

Health Checks

The health_check() method enables liveness detection.

async def health_check(self) -> bool:
    """Is this agent ready?"""
    try:
        # Check database connection
        await self.db.ping()
        # Check LLM connection
        await self.llm.complete([...])
        return True
    except Exception:
        return False

Orchestration frameworks periodically call health_check() to detect faulty agents.


Type Checking

DynaBots protocols are runtime_checkable. Use isinstance() to validate:

from dynabots_core import Agent

def use_agent(obj):
    if not isinstance(obj, Agent):
        raise TypeError(f"{obj} does not implement Agent protocol")
    # Safe to use as Agent

Execution Modes

Your agent uses its own LLM to understand tasks:

async def process_task(self, task: str, context: dict) -> TaskResult:
    # Parse task with LLM
    plan = await self.llm.understand(task)
    # Execute plan
    result = await self.execute_plan(plan)
    return TaskResult.success(task_id=context["task_id"], data=result)

Pros: - Flexible task understanding - Use any tools - Better for complex tasks

Legacy Mode (Optional)

Implement execute_capability() for direct routing:

from typing import Any, Dict, List, Protocol, runtime_checkable

@runtime_checkable
class LegacyAgent(Protocol):
    async def execute_capability(
        self,
        capability: str,
        parameters: Dict[str, Any],
        context: Dict[str, Any],
    ) -> "TaskResult":
        """Execute a specific capability directly."""
        ...

Pros: - Predictable, direct routing - Good for well-defined operations - No LLM overhead


Best Practices

  1. Stateless agents: Agents should be stateless. Store data in dedicated services.
  2. Health checks: Always implement health_check(). Return fast.
  3. Domains: Be specific with domains. Overlapping domains enable competition.
  4. Error handling: Return TaskResult.failure() with clear error messages.
  5. Metadata: Use TaskResult.metadata to attach domain-specific data.

See Also