Agent Harness Quick Start Guide¶
Agent Harness is a Python package that implements a compositional execution-state management system for long-horizon agentic code generation loops. It provides full dependency tracking, feedback-driven patch generation, and selective re-execution capabilities.
Installation¶
Core Concepts¶
Execution State Graph (ESG): A directed acyclic graph representing code generation steps and their data dependencies.
Step: A unit of code execution with inputs, outputs, and execution status.
Dependency Tracking: Automatic analysis of variable reads/writes to infer data flow between steps.
Feedback Loop: Diagnose execution failures, generate patches, and selectively re-execute affected steps.
Basic Usage¶
Example 1: Build an Execution Graph and Execute Steps¶
from agent_harness.esg import ExecutionStateGraph
from agent_harness.executor import Executor
# Initialize the execution state graph and executor
esg = ExecutionStateGraph()
executor = Executor()
# Add and execute a step: define a function
step1_source = """
def add(a, b):
return a + b
"""
step1_id = esg.add_step("step_1", step1_source)
result1 = executor.execute("step_1", step1_source)
esg.record_output("step_1", result1.namespace)
# Add and execute a step: use the function
step2_source = """
result = add(5, 3)
"""
step2_id = esg.add_step("step_2", step2_source)
# Infer dependencies automatically
esg.add_edge("step_1", "step_2")
result2 = executor.execute("step_2", step2_source)
esg.record_output("step_2", result2.namespace)
print(f"Step 2 result: {result2.namespace}") # Output: {'result': 8}
Example 2: Analyze Dependencies and Track Data Flow¶
from agent_harness.dependency_analyzer import get_reads, get_writes, infer_edges
from agent_harness.esg import ExecutionStateGraph
# Analyze variable reads and writes
code1 = "x = 10; y = x + 5"
code2 = "z = y * 2"
writes1 = get_writes(code1) # {'x', 'y'}
reads1 = get_reads(code1) # {'x'}
writes2 = get_writes(code2) # {'z'}
reads2 = get_reads(code2) # {'y'}
print(f"Step 1 writes: {writes1}, reads: {reads1}")
print(f"Step 2 writes: {writes2}, reads: {reads2}")
# Infer edges based on data flow
steps = [
("step_1", code1),
("step_2", code2),
]
edges = infer_edges(steps)
print(f"Inferred edges: {edges}") # [('step_1', 'step_2')]
# Build ESG with inferred dependencies
esg = ExecutionStateGraph()
esg.add_step("step_1", code1)
esg.add_step("step_2", code2)
for src, dst in edges:
esg.add_edge(src, dst)
# Get ancestors for a step (dependencies needed to execute it)
ancestors = esg.get_ancestors("step_2", variable_names={'y'})
print(f"Ancestors of step_2: {ancestors}") # ['step_1']
Example 3: Diagnose Failures and Generate Patches¶
from agent_harness.esg import ExecutionStateGraph
from agent_harness.executor import Executor
from agent_harness.feedback_interpreter import diagnose
from agent_harness.patch_generator import generate_patch
# Setup: Build a graph with a failing step
esg = ExecutionStateGraph()
executor = Executor()
step1_source = "x = 10"
step1_id = esg.add_step("step_1", step1_source)
result1 = executor.execute("step_1", step1_source)
esg.record_output("step_1", result1.namespace)
# Step 2 has an error (undefined variable)
step2_source = "y = undefined_variable + 5"
step2_id = esg.add_step("step_2", step2_source)
esg.add_edge("step_1", "step_2")
result2 = executor.execute("step_2", step2_source)
esg.record_output("step_2", result2.namespace)
print(f"Step 2 success: {result2.success}") # False
print(f"Step 2 error: {result2.error}") # NameError details
# Diagnose the failure
diagnosis = diagnose(result2, esg)
print(f"Diagnosis: {diagnosis.issue}")
print(f"Affected steps: {diagnosis.affected_steps}")
# Generate a patch
patch = generate_patch(
diagnosis=diagnosis,
esg=esg,
executor=executor,
step_id="step_2"
)
print(f"Patch suggestion: {patch.patched_source}")
# Mark step as stale and replay affected subgraph
esg.mark_stale("step_2")
steps_to_replay = esg.replay_from("step_2")
print(f"Steps to replay: {steps_to_replay}")
Key Classes and Functions¶
ExecutionStateGraph¶
add_step(step_id, source): Add a code generation step to the graphrecord_output(step_id, namespace): Record execution output for a stepadd_edge(src_step_id, dst_step_id): Add a dependency edgeget_ancestors(step_id, variable_names): Get all dependency steps needed to execute a stepmark_stale(step_id): Mark a step and its dependents as stale- **`replay_from(step_id