TaskResult¶
The standard outcome format for all task executions. Rich semantics for workflow control.
Protocol Definition¶
dynabots_core.value_objects.task_result.TaskResult
dataclass
¶
Standardized task result with conditional execution support.
This is the core value object that enables smart workflow execution. Every task returns a TaskResult, which tells downstream tasks whether they should execute or skip.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
str
|
Unique identifier for the task |
outcome |
TaskOutcome
|
Execution outcome (success, failure, etc.) |
data |
Any
|
Result data (any type) |
metadata |
Dict[str, Any]
|
Additional metadata |
should_continue |
bool
|
Whether downstream tasks should execute |
skip_reason |
Optional[str]
|
Reason for skipping (if applicable) |
timestamp |
datetime
|
When the result was created |
duration_ms |
Optional[int]
|
Execution duration in milliseconds |
error_message |
Optional[str]
|
Error message (if failure) |
Example
Task that finds missing items¶
missing = analyze_gaps(notes, emails)
if not missing: return TaskResult( task_id="analyze", outcome=TaskOutcome.NO_ACTION_NEEDED, data=None, should_continue=False, skip_reason="No missing items found" ) else: return TaskResult.success( task_id="analyze", data={"missing_items": missing} )
Source code in packages/core/dynabots_core/value_objects/task_result.py
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 | |
is_actionable
property
¶
Does this result require downstream action?
Returns False if: - No action needed - Task failed - Task was skipped
is_failure
property
¶
Did the task fail?
is_no_action_needed
property
¶
Did the task determine no action was needed?
is_skipped
property
¶
Was the task skipped?
is_success
property
¶
Did the task succeed?
failure(task_id, error, metadata=None, duration_ms=None)
classmethod
¶
Create a failure result.
Source code in packages/core/dynabots_core/value_objects/task_result.py
from_dict(data)
classmethod
¶
Create a TaskResult from a dictionary.
Source code in packages/core/dynabots_core/value_objects/task_result.py
get_context_for_downstream()
¶
Extract context needed by downstream tasks.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with task result data formatted for downstream consumption. |
Source code in packages/core/dynabots_core/value_objects/task_result.py
no_action_needed(task_id, reason, metadata=None, duration_ms=None)
classmethod
¶
Create a 'no action needed' result that skips downstream tasks.
Source code in packages/core/dynabots_core/value_objects/task_result.py
partial(task_id, data, reason, metadata=None, duration_ms=None)
classmethod
¶
Create a partial success result.
Source code in packages/core/dynabots_core/value_objects/task_result.py
skipped(task_id, reason, metadata=None)
classmethod
¶
Create a skipped result.
Source code in packages/core/dynabots_core/value_objects/task_result.py
success(task_id, data, metadata=None, duration_ms=None)
classmethod
¶
Create a successful result.
Source code in packages/core/dynabots_core/value_objects/task_result.py
to_dict()
¶
Convert to dictionary for logging/storage.
Source code in packages/core/dynabots_core/value_objects/task_result.py
dynabots_core.value_objects.task_result.TaskOutcome
¶
Bases: Enum
Task execution outcome.
Source code in packages/core/dynabots_core/value_objects/task_result.py
Outcomes¶
Every TaskResult has one of five outcomes:
SUCCESS¶
Task completed successfully with data:
result = TaskResult.success(
task_id="task_001",
data={"records": 42, "details": "..."}
)
print(result.is_success) # True
print(result.data) # {"records": 42, ...}
print(result.should_continue) # True (downstream tasks run)
FAILURE¶
Task failed with an error:
result = TaskResult.failure(
task_id="task_002",
error="Database connection failed"
)
print(result.is_failure) # True
print(result.error_message) # "Database connection failed"
print(result.should_continue) # False (stop execution)
NO_ACTION_NEEDED¶
Nothing to do. Skip downstream tasks but don't mark as error:
result = TaskResult.no_action_needed(
task_id="task_003",
reason="Data is already up to date"
)
print(result.is_no_action_needed) # True
print(result.skip_reason) # "Data is already up to date"
print(result.should_continue) # False (downstream skip)
Perfect for conditional workflows:
# Example: Email notification task
if user_already_notified():
return TaskResult.no_action_needed(
task_id="notify",
reason="User was already notified yesterday"
)
else:
await send_email(user)
return TaskResult.success(task_id="notify", data={"sent": True})
PARTIAL¶
Partial success. Some work done, some failed:
result = TaskResult.partial(
task_id="task_004",
data={"processed": 35, "failed": 5},
reason="Timeout after 30s, processed 35/40 records"
)
print(result.outcome) # TaskOutcome.PARTIAL
print(result.data) # {"processed": 35, "failed": 5}
print(result.should_continue) # True (has usable data)
Good for resilient workflows—downstream tasks work with partial data.
SKIPPED¶
Task was skipped intentionally:
result = TaskResult.skipped(
task_id="task_005",
reason="Feature flag disabled for this user"
)
print(result.is_skipped) # True
print(result.should_continue) # False (don't continue)
Creating Results¶
Factory Methods¶
Use the class methods for clarity:
# Success
TaskResult.success(task_id="...", data={...})
# Failure
TaskResult.failure(task_id="...", error="...")
# No action needed
TaskResult.no_action_needed(task_id="...", reason="...")
# Partial
TaskResult.partial(task_id="...", data={...}, reason="...")
# Skipped
TaskResult.skipped(task_id="...", reason="...")
Direct Constructor¶
Or use the constructor directly:
from dynabots_core import TaskResult, TaskOutcome
result = TaskResult(
task_id="task_001",
outcome=TaskOutcome.SUCCESS,
data={"count": 42},
should_continue=True,
)
Properties¶
Core¶
result = await agent.process_task(task, context)
print(result.task_id) # Unique task identifier
print(result.outcome) # TaskOutcome enum
print(result.data) # Task-specific data
Status Checks¶
result.is_success # outcome == SUCCESS
result.is_failure # outcome == FAILURE
result.is_no_action_needed # outcome == NO_ACTION_NEEDED
result.is_skipped # outcome == SKIPPED
result.is_actionable # False for FAILURE, NO_ACTION_NEEDED, SKIPPED
# True for SUCCESS, PARTIAL
Conditional Execution¶
# In orchestration framework
result = await agent.process_task(task, context)
if result.should_continue:
# Run next task with result data
next_result = await next_task(result.data)
else:
# Stop execution
print(f"Stopped: {result.skip_reason}")
return
Audit Fields¶
result.timestamp # When result was created
result.duration_ms # How long task took (milliseconds)
result.error_message # Error if FAILURE
result.skip_reason # Reason if NO_ACTION_NEEDED/SKIPPED
result.metadata # Custom data
Metadata¶
Attach domain-specific data:
result = TaskResult.success(
task_id="fetch_data",
data={"records": 42},
metadata={
"source": "database",
"query_time_ms": 150,
"cache_hit": False,
"custom_field": "value"
},
duration_ms=200
)
print(result.metadata["query_time_ms"]) # 150
Use metadata to: - Track performance metrics - Store execution details - Pass custom data to downstream tasks
Serialization¶
To Dictionary¶
result = TaskResult.success(task_id="task_001", data={"count": 42})
data = result.to_dict()
# {
# "task_id": "task_001",
# "outcome": "success",
# "data": {"count": 42},
# "should_continue": True,
# "timestamp": "2024-03-26T...",
# "duration_ms": None,
# "error_message": None,
# "skip_reason": None,
# "is_actionable": True,
# "is_success": True,
# }
From Dictionary¶
data = {
"task_id": "task_001",
"outcome": "success",
"data": {"count": 42},
...
}
result = TaskResult.from_dict(data)
Good for storage and transmission.
Context for Downstream¶
Extract context needed by next task:
result = await agent.process_task(task, context)
# Get data formatted for downstream
downstream_context = result.get_context_for_downstream()
# {
# "task_id": "task_001",
# "outcome": "success",
# "data": {...},
# "is_actionable": True,
# "should_continue": True,
# ...custom metadata...
# }
# Pass to next task
next_result = await next_agent.process_task(
next_task,
downstream_context
)
Example: Conditional Workflow¶
import asyncio
from dynabots_core import TaskResult
async def workflow():
"""Example workflow with conditional execution."""
# Task 1: Fetch data
print("Task 1: Fetching data...")
result1 = TaskResult.success(
task_id="fetch",
data={"records": 100}
)
if not result1.should_continue:
print(f"Stopped at fetch: {result1.skip_reason}")
return
# Task 2: Validate data
print("Task 2: Validating...")
if result1.data["records"] == 0:
result2 = TaskResult.no_action_needed(
task_id="validate",
reason="No records to validate"
)
else:
result2 = TaskResult.success(
task_id="validate",
data={"valid": 100}
)
if not result2.should_continue:
print(f"Stopped at validate: {result2.skip_reason}")
return
# Task 3: Process data
print("Task 3: Processing...")
try:
# Simulate processing
processed = result2.data["valid"] * 2
result3 = TaskResult.success(
task_id="process",
data={"processed": processed}
)
except Exception as e:
result3 = TaskResult.failure(
task_id="process",
error=str(e)
)
if result3.is_success:
print(f"Success! Processed: {result3.data['processed']}")
else:
print(f"Failed: {result3.error_message}")
asyncio.run(workflow())
Output:
Best Practices¶
- Use factory methods:
TaskResult.success()is clearer than constructor. - Always set task_id: Track which task produced each result.
- Clear skip reasons: Help operators understand conditional skips.
- Metadata for context: Use metadata for execution details.
- consistent outcomes: Always return appropriate outcome (don't return success on partial).