Skip to content

TaskResult

The standard outcome format for all task executions. Rich semantics for workflow control.


Protocol Definition

dynabots_core.value_objects.task_result.TaskResult dataclass

Standardized task result with conditional execution support.

This is the core value object that enables smart workflow execution. Every task returns a TaskResult, which tells downstream tasks whether they should execute or skip.

Attributes:

Name Type Description
task_id str

Unique identifier for the task

outcome TaskOutcome

Execution outcome (success, failure, etc.)

data Any

Result data (any type)

metadata Dict[str, Any]

Additional metadata

should_continue bool

Whether downstream tasks should execute

skip_reason Optional[str]

Reason for skipping (if applicable)

timestamp datetime

When the result was created

duration_ms Optional[int]

Execution duration in milliseconds

error_message Optional[str]

Error message (if failure)

Example

Task that finds missing items

missing = analyze_gaps(notes, emails)

if not missing: return TaskResult( task_id="analyze", outcome=TaskOutcome.NO_ACTION_NEEDED, data=None, should_continue=False, skip_reason="No missing items found" ) else: return TaskResult.success( task_id="analyze", data={"missing_items": missing} )

Source code in packages/core/dynabots_core/value_objects/task_result.py
@dataclass(frozen=True)
class TaskResult:
    """
    Standardized task result with conditional execution support.

    This is the core value object that enables smart workflow execution.
    Every task returns a TaskResult, which tells downstream tasks whether
    they should execute or skip.

    Attributes:
        task_id: Unique identifier for the task
        outcome: Execution outcome (success, failure, etc.)
        data: Result data (any type)
        metadata: Additional metadata
        should_continue: Whether downstream tasks should execute
        skip_reason: Reason for skipping (if applicable)
        timestamp: When the result was created
        duration_ms: Execution duration in milliseconds
        error_message: Error message (if failure)

    Example:
        # Task that finds missing items
        missing = analyze_gaps(notes, emails)

        if not missing:
            return TaskResult(
                task_id="analyze",
                outcome=TaskOutcome.NO_ACTION_NEEDED,
                data=None,
                should_continue=False,
                skip_reason="No missing items found"
            )
        else:
            return TaskResult.success(
                task_id="analyze",
                data={"missing_items": missing}
            )
    """

    task_id: str
    outcome: TaskOutcome
    data: Any
    metadata: Dict[str, Any] = field(default_factory=dict)

    # Conditional execution control
    should_continue: bool = True
    skip_reason: Optional[str] = None

    # Audit fields
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    duration_ms: Optional[int] = None
    error_message: Optional[str] = None

    @property
    def is_actionable(self) -> bool:
        """
        Does this result require downstream action?

        Returns False if:
        - No action needed
        - Task failed
        - Task was skipped
        """
        return self.outcome not in [
            TaskOutcome.NO_ACTION_NEEDED,
            TaskOutcome.FAILURE,
            TaskOutcome.SKIPPED,
        ]

    @property
    def is_success(self) -> bool:
        """Did the task succeed?"""
        return self.outcome == TaskOutcome.SUCCESS

    @property
    def is_failure(self) -> bool:
        """Did the task fail?"""
        return self.outcome == TaskOutcome.FAILURE

    @property
    def is_skipped(self) -> bool:
        """Was the task skipped?"""
        return self.outcome == TaskOutcome.SKIPPED

    @property
    def is_no_action_needed(self) -> bool:
        """Did the task determine no action was needed?"""
        return self.outcome == TaskOutcome.NO_ACTION_NEEDED

    def get_context_for_downstream(self) -> Dict[str, Any]:
        """
        Extract context needed by downstream tasks.

        Returns:
            Dictionary with task result data formatted for downstream consumption.
        """
        return {
            "task_id": self.task_id,
            "outcome": self.outcome.value,
            "data": self.data,
            "is_actionable": self.is_actionable,
            "should_continue": self.should_continue,
            **self.metadata,
        }

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for logging/storage."""
        return {
            "task_id": self.task_id,
            "outcome": self.outcome.value,
            "data": self.data,
            "metadata": self.metadata,
            "should_continue": self.should_continue,
            "skip_reason": self.skip_reason,
            "timestamp": self.timestamp.isoformat(),
            "duration_ms": self.duration_ms,
            "error_message": self.error_message,
            "is_actionable": self.is_actionable,
            "is_success": self.is_success,
        }

    @classmethod
    def success(
        cls,
        task_id: str,
        data: Any,
        metadata: Optional[Dict[str, Any]] = None,
        duration_ms: Optional[int] = None,
    ) -> "TaskResult":
        """Create a successful result."""
        return cls(
            task_id=task_id,
            outcome=TaskOutcome.SUCCESS,
            data=data,
            metadata=metadata or {},
            should_continue=True,
            duration_ms=duration_ms,
        )

    @classmethod
    def no_action_needed(
        cls,
        task_id: str,
        reason: str,
        metadata: Optional[Dict[str, Any]] = None,
        duration_ms: Optional[int] = None,
    ) -> "TaskResult":
        """Create a 'no action needed' result that skips downstream tasks."""
        return cls(
            task_id=task_id,
            outcome=TaskOutcome.NO_ACTION_NEEDED,
            data=None,
            metadata=metadata or {},
            should_continue=False,
            skip_reason=reason,
            duration_ms=duration_ms,
        )

    @classmethod
    def failure(
        cls,
        task_id: str,
        error: str,
        metadata: Optional[Dict[str, Any]] = None,
        duration_ms: Optional[int] = None,
    ) -> "TaskResult":
        """Create a failure result."""
        return cls(
            task_id=task_id,
            outcome=TaskOutcome.FAILURE,
            data=None,
            metadata=metadata or {},
            should_continue=False,
            error_message=error,
            duration_ms=duration_ms,
        )

    @classmethod
    def skipped(
        cls,
        task_id: str,
        reason: str,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> "TaskResult":
        """Create a skipped result."""
        return cls(
            task_id=task_id,
            outcome=TaskOutcome.SKIPPED,
            data=None,
            metadata=metadata or {},
            should_continue=False,
            skip_reason=reason,
            duration_ms=0,
        )

    @classmethod
    def partial(
        cls,
        task_id: str,
        data: Any,
        reason: str,
        metadata: Optional[Dict[str, Any]] = None,
        duration_ms: Optional[int] = None,
    ) -> "TaskResult":
        """Create a partial success result."""
        return cls(
            task_id=task_id,
            outcome=TaskOutcome.PARTIAL,
            data=data,
            metadata=metadata or {},
            should_continue=True,
            skip_reason=reason,
            duration_ms=duration_ms,
        )

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "TaskResult":
        """Create a TaskResult from a dictionary."""
        return cls(
            task_id=data["task_id"],
            outcome=TaskOutcome(data["outcome"]),
            data=data.get("data"),
            metadata=data.get("metadata", {}),
            should_continue=data.get("should_continue", True),
            skip_reason=data.get("skip_reason"),
            duration_ms=data.get("duration_ms"),
            error_message=data.get("error_message"),
        )

is_actionable property

Does this result require downstream action?

Returns False if: - No action needed - Task failed - Task was skipped

is_failure property

Did the task fail?

is_no_action_needed property

Did the task determine no action was needed?

is_skipped property

Was the task skipped?

is_success property

Did the task succeed?

failure(task_id, error, metadata=None, duration_ms=None) classmethod

Create a failure result.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def failure(
    cls,
    task_id: str,
    error: str,
    metadata: Optional[Dict[str, Any]] = None,
    duration_ms: Optional[int] = None,
) -> "TaskResult":
    """Create a failure result."""
    return cls(
        task_id=task_id,
        outcome=TaskOutcome.FAILURE,
        data=None,
        metadata=metadata or {},
        should_continue=False,
        error_message=error,
        duration_ms=duration_ms,
    )

from_dict(data) classmethod

Create a TaskResult from a dictionary.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TaskResult":
    """Create a TaskResult from a dictionary."""
    return cls(
        task_id=data["task_id"],
        outcome=TaskOutcome(data["outcome"]),
        data=data.get("data"),
        metadata=data.get("metadata", {}),
        should_continue=data.get("should_continue", True),
        skip_reason=data.get("skip_reason"),
        duration_ms=data.get("duration_ms"),
        error_message=data.get("error_message"),
    )

get_context_for_downstream()

Extract context needed by downstream tasks.

Returns:

Type Description
Dict[str, Any]

Dictionary with task result data formatted for downstream consumption.

Source code in packages/core/dynabots_core/value_objects/task_result.py
def get_context_for_downstream(self) -> Dict[str, Any]:
    """
    Extract context needed by downstream tasks.

    Returns:
        Dictionary with task result data formatted for downstream consumption.
    """
    return {
        "task_id": self.task_id,
        "outcome": self.outcome.value,
        "data": self.data,
        "is_actionable": self.is_actionable,
        "should_continue": self.should_continue,
        **self.metadata,
    }

no_action_needed(task_id, reason, metadata=None, duration_ms=None) classmethod

Create a 'no action needed' result that skips downstream tasks.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def no_action_needed(
    cls,
    task_id: str,
    reason: str,
    metadata: Optional[Dict[str, Any]] = None,
    duration_ms: Optional[int] = None,
) -> "TaskResult":
    """Create a 'no action needed' result that skips downstream tasks."""
    return cls(
        task_id=task_id,
        outcome=TaskOutcome.NO_ACTION_NEEDED,
        data=None,
        metadata=metadata or {},
        should_continue=False,
        skip_reason=reason,
        duration_ms=duration_ms,
    )

partial(task_id, data, reason, metadata=None, duration_ms=None) classmethod

Create a partial success result.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def partial(
    cls,
    task_id: str,
    data: Any,
    reason: str,
    metadata: Optional[Dict[str, Any]] = None,
    duration_ms: Optional[int] = None,
) -> "TaskResult":
    """Create a partial success result."""
    return cls(
        task_id=task_id,
        outcome=TaskOutcome.PARTIAL,
        data=data,
        metadata=metadata or {},
        should_continue=True,
        skip_reason=reason,
        duration_ms=duration_ms,
    )

skipped(task_id, reason, metadata=None) classmethod

Create a skipped result.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def skipped(
    cls,
    task_id: str,
    reason: str,
    metadata: Optional[Dict[str, Any]] = None,
) -> "TaskResult":
    """Create a skipped result."""
    return cls(
        task_id=task_id,
        outcome=TaskOutcome.SKIPPED,
        data=None,
        metadata=metadata or {},
        should_continue=False,
        skip_reason=reason,
        duration_ms=0,
    )

success(task_id, data, metadata=None, duration_ms=None) classmethod

Create a successful result.

Source code in packages/core/dynabots_core/value_objects/task_result.py
@classmethod
def success(
    cls,
    task_id: str,
    data: Any,
    metadata: Optional[Dict[str, Any]] = None,
    duration_ms: Optional[int] = None,
) -> "TaskResult":
    """Create a successful result."""
    return cls(
        task_id=task_id,
        outcome=TaskOutcome.SUCCESS,
        data=data,
        metadata=metadata or {},
        should_continue=True,
        duration_ms=duration_ms,
    )

to_dict()

Convert to dictionary for logging/storage.

Source code in packages/core/dynabots_core/value_objects/task_result.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for logging/storage."""
    return {
        "task_id": self.task_id,
        "outcome": self.outcome.value,
        "data": self.data,
        "metadata": self.metadata,
        "should_continue": self.should_continue,
        "skip_reason": self.skip_reason,
        "timestamp": self.timestamp.isoformat(),
        "duration_ms": self.duration_ms,
        "error_message": self.error_message,
        "is_actionable": self.is_actionable,
        "is_success": self.is_success,
    }

dynabots_core.value_objects.task_result.TaskOutcome

Bases: Enum

Task execution outcome.

Source code in packages/core/dynabots_core/value_objects/task_result.py
class TaskOutcome(Enum):
    """Task execution outcome."""

    SUCCESS = "success"
    FAILURE = "failure"
    NO_ACTION_NEEDED = "no_action_needed"
    PARTIAL = "partial"
    SKIPPED = "skipped"

Outcomes

Every TaskResult has one of five outcomes:

SUCCESS

Task completed successfully with data:

result = TaskResult.success(
    task_id="task_001",
    data={"records": 42, "details": "..."}
)

print(result.is_success)        # True
print(result.data)              # {"records": 42, ...}
print(result.should_continue)   # True (downstream tasks run)

FAILURE

Task failed with an error:

result = TaskResult.failure(
    task_id="task_002",
    error="Database connection failed"
)

print(result.is_failure)        # True
print(result.error_message)     # "Database connection failed"
print(result.should_continue)   # False (stop execution)

NO_ACTION_NEEDED

Nothing to do. Skip downstream tasks but don't mark as error:

result = TaskResult.no_action_needed(
    task_id="task_003",
    reason="Data is already up to date"
)

print(result.is_no_action_needed)  # True
print(result.skip_reason)          # "Data is already up to date"
print(result.should_continue)      # False (downstream skip)

Perfect for conditional workflows:

# Example: Email notification task
if user_already_notified():
    return TaskResult.no_action_needed(
        task_id="notify",
        reason="User was already notified yesterday"
    )
else:
    await send_email(user)
    return TaskResult.success(task_id="notify", data={"sent": True})

PARTIAL

Partial success. Some work done, some failed:

result = TaskResult.partial(
    task_id="task_004",
    data={"processed": 35, "failed": 5},
    reason="Timeout after 30s, processed 35/40 records"
)

print(result.outcome)           # TaskOutcome.PARTIAL
print(result.data)              # {"processed": 35, "failed": 5}
print(result.should_continue)   # True (has usable data)

Good for resilient workflows—downstream tasks work with partial data.

SKIPPED

Task was skipped intentionally:

result = TaskResult.skipped(
    task_id="task_005",
    reason="Feature flag disabled for this user"
)

print(result.is_skipped)        # True
print(result.should_continue)   # False (don't continue)

Creating Results

Factory Methods

Use the class methods for clarity:

# Success
TaskResult.success(task_id="...", data={...})

# Failure
TaskResult.failure(task_id="...", error="...")

# No action needed
TaskResult.no_action_needed(task_id="...", reason="...")

# Partial
TaskResult.partial(task_id="...", data={...}, reason="...")

# Skipped
TaskResult.skipped(task_id="...", reason="...")

Direct Constructor

Or use the constructor directly:

from dynabots_core import TaskResult, TaskOutcome

result = TaskResult(
    task_id="task_001",
    outcome=TaskOutcome.SUCCESS,
    data={"count": 42},
    should_continue=True,
)

Properties

Core

result = await agent.process_task(task, context)

print(result.task_id)           # Unique task identifier
print(result.outcome)            # TaskOutcome enum
print(result.data)              # Task-specific data

Status Checks

result.is_success              # outcome == SUCCESS
result.is_failure              # outcome == FAILURE
result.is_no_action_needed     # outcome == NO_ACTION_NEEDED
result.is_skipped              # outcome == SKIPPED

result.is_actionable           # False for FAILURE, NO_ACTION_NEEDED, SKIPPED
                               # True for SUCCESS, PARTIAL

Conditional Execution

# In orchestration framework
result = await agent.process_task(task, context)

if result.should_continue:
    # Run next task with result data
    next_result = await next_task(result.data)
else:
    # Stop execution
    print(f"Stopped: {result.skip_reason}")
    return

Audit Fields

result.timestamp               # When result was created
result.duration_ms            # How long task took (milliseconds)
result.error_message          # Error if FAILURE
result.skip_reason            # Reason if NO_ACTION_NEEDED/SKIPPED
result.metadata               # Custom data

Metadata

Attach domain-specific data:

result = TaskResult.success(
    task_id="fetch_data",
    data={"records": 42},
    metadata={
        "source": "database",
        "query_time_ms": 150,
        "cache_hit": False,
        "custom_field": "value"
    },
    duration_ms=200
)

print(result.metadata["query_time_ms"])  # 150

Use metadata to: - Track performance metrics - Store execution details - Pass custom data to downstream tasks


Serialization

To Dictionary

result = TaskResult.success(task_id="task_001", data={"count": 42})

data = result.to_dict()
# {
#     "task_id": "task_001",
#     "outcome": "success",
#     "data": {"count": 42},
#     "should_continue": True,
#     "timestamp": "2024-03-26T...",
#     "duration_ms": None,
#     "error_message": None,
#     "skip_reason": None,
#     "is_actionable": True,
#     "is_success": True,
# }

From Dictionary

data = {
    "task_id": "task_001",
    "outcome": "success",
    "data": {"count": 42},
    ...
}

result = TaskResult.from_dict(data)

Good for storage and transmission.


Context for Downstream

Extract context needed by next task:

result = await agent.process_task(task, context)

# Get data formatted for downstream
downstream_context = result.get_context_for_downstream()
# {
#     "task_id": "task_001",
#     "outcome": "success",
#     "data": {...},
#     "is_actionable": True,
#     "should_continue": True,
#     ...custom metadata...
# }

# Pass to next task
next_result = await next_agent.process_task(
    next_task,
    downstream_context
)

Example: Conditional Workflow

import asyncio
from dynabots_core import TaskResult

async def workflow():
    """Example workflow with conditional execution."""

    # Task 1: Fetch data
    print("Task 1: Fetching data...")
    result1 = TaskResult.success(
        task_id="fetch",
        data={"records": 100}
    )

    if not result1.should_continue:
        print(f"Stopped at fetch: {result1.skip_reason}")
        return

    # Task 2: Validate data
    print("Task 2: Validating...")
    if result1.data["records"] == 0:
        result2 = TaskResult.no_action_needed(
            task_id="validate",
            reason="No records to validate"
        )
    else:
        result2 = TaskResult.success(
            task_id="validate",
            data={"valid": 100}
        )

    if not result2.should_continue:
        print(f"Stopped at validate: {result2.skip_reason}")
        return

    # Task 3: Process data
    print("Task 3: Processing...")
    try:
        # Simulate processing
        processed = result2.data["valid"] * 2
        result3 = TaskResult.success(
            task_id="process",
            data={"processed": processed}
        )
    except Exception as e:
        result3 = TaskResult.failure(
            task_id="process",
            error=str(e)
        )

    if result3.is_success:
        print(f"Success! Processed: {result3.data['processed']}")
    else:
        print(f"Failed: {result3.error_message}")

asyncio.run(workflow())

Output:

Task 1: Fetching data...
Task 2: Validating...
Task 3: Processing...
Success! Processed: 200

Best Practices

  1. Use factory methods: TaskResult.success() is clearer than constructor.
  2. Always set task_id: Track which task produced each result.
  3. Clear skip reasons: Help operators understand conditional skips.
  4. Metadata for context: Use metadata for execution details.
  5. consistent outcomes: Always return appropriate outcome (don't return success on partial).

See Also