Build a Self-Improving AI Agent in Python: Walkthrough
← Blog

Engineering·

Build a Self-Improving AI Agent in Python: Walkthrough

By Oxagen Team

  • Python
  • AI Agents
  • Tutorial
  • Self-Improvement
  • Code
  • LangGraph

This walkthrough builds a minimal but complete self-improving agent in Python: a LangGraph reasoning loop, a typed memory store via the Oxagen API, a reflection mechanism with a real verifier, and a nightly benchmark harness. Every component is wired together. The result is an agent that measurably improves on verifiable tasks across runs.

The implementation prioritizes correctness over brevity. Code that looks clean but silently degrades is the primary failure mode in production agents. Every design decision here is explained.

Prerequisites

pip install langgraph langchain-anthropic anthropic httpx pytest jsonschema

You will need an Anthropic API key and an Oxagen API key. Get the Oxagen key at oxagen.ai/guide.

import os
ANTHROPIC_KEY = os.environ["ANTHROPIC_KEY"]
OXAGEN_KEY = os.environ["OXAGEN_KEY"]
WORKSPACE_ID = os.environ["OXAGEN_WORKSPACE_ID"]

Step 1: The memory client

Before the reasoning loop, we need a memory client. This wraps the Oxagen API and exposes three operations: write a fact, query by text, and retrieve facts about a named entity.

import httpx
from typing import Any

OXAGEN_BASE = "https://api.oxagen.ai/v1"

class MemoryClient:
    """Typed memory store backed by the Oxagen knowledge graph.

    Wraps write, query, and entity retrieval operations.
    All operations are scoped to workspace_id.

    Args:
        api_key: Oxagen API key.
        workspace_id: Target workspace ID.
    """

    def __init__(self, api_key: str, workspace_id: str) -> None:
        self._headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        self._workspace_id = workspace_id

    def write_observation(
        self,
        content: str,
        entity_name: str | None = None,
        observation_type: str = "fact",
        confidence: float = 0.9,
    ) -> dict[str, Any]:
        """Write a typed observation to the workspace knowledge graph.

        Args:
            content: The observation text.
            entity_name: Optional entity this observation is about.
            observation_type: One of 'fact', 'episode', 'preference'.
            confidence: Extraction confidence, 0.0–1.0.

        Returns:
            Created observation record from the API.
        """
        payload: dict[str, Any] = {
            "workspace_id": self._workspace_id,
            "content": content,
            "observation_type": observation_type,
            "confidence": confidence,
        }
        if entity_name:
            payload["entity_name"] = entity_name

        resp = httpx.post(
            f"{OXAGEN_BASE}/memory/observations",
            json=payload,
            headers=self._headers,
        )
        resp.raise_for_status()
        return resp.json()

    def query(self, text: str, top_k: int = 5) -> list[dict[str, Any]]:
        """Semantic query over workspace memory.

        Args:
            text: Query string for hybrid retrieval.
            top_k: Maximum number of observations to return.

        Returns:
            List of observation records, ranked by relevance.
        """
        resp = httpx.post(
            f"{OXAGEN_BASE}/memory/query",
            json={
                "workspace_id": self._workspace_id,
                "query": text,
                "top_k": top_k,
            },
            headers=self._headers,
        )
        resp.raise_for_status()
        return resp.json().get("observations", [])

    def get_entity(self, name: str) -> dict[str, Any] | None:
        """Retrieve an entity and its observations by name.

        Args:
            name: Entity name or alias.

        Returns:
            Entity record with observations, or None if not found.
        """
        resp = httpx.get(
            f"{OXAGEN_BASE}/memory/entities",
            params={"workspace_id": self._workspace_id, "name": name},
            headers=self._headers,
        )
        if resp.status_code == 404:
            return None
        resp.raise_for_status()
        return resp.json()

Step 2: The reasoning loop

The reasoning loop is a LangGraph state graph. Nodes are Python functions. State carries the current task, the retrieved memory, the agent's output, and any critique from the reflection node.

from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict

llm = ChatAnthropic(
    model="claude-opus-4-7",
    api_key=ANTHROPIC_KEY,
)
memory = MemoryClient(OXAGEN_KEY, WORKSPACE_ID)


class AgentState(TypedDict):
    """State carried through the reasoning graph."""
    task: str
    context: str          # Retrieved memory
    output: str           # Agent's latest output
    critique: str         # Reflection node output (empty if none)
    iteration: int        # Reflection iteration count
    final: bool           # Whether this is the final output


def retrieve_node(state: AgentState) -> AgentState:
    """Retrieve relevant memory for the current task.

    Args:
        state: Current graph state.

    Returns:
        Updated state with context populated from memory.
    """
    observations = memory.query(state["task"], top_k=5)
    if observations:
        context = "\n".join(
            f"- {obs['content']} (confidence: {obs.get('confidence', 1.0):.2f})"
            for obs in observations
        )
    else:
        context = "No relevant memory found."
    return {**state, "context": context}


def generate_node(state: AgentState) -> AgentState:
    """Generate an output given the task and retrieved context.

    Args:
        state: Current graph state.

    Returns:
        Updated state with output populated.
    """
    prior_critique = (
        f"\n\nPrior critique to address:\n{state['critique']}"
        if state["critique"]
        else ""
    )

    messages = [
        SystemMessage(
            content="You are a precise, helpful assistant. Answer based on the "
                    "provided context. If the context is insufficient, say so "
                    "explicitly rather than guessing."
        ),
        HumanMessage(
            content=f"Task: {state['task']}\n\n"
                    f"Relevant memory:\n{state['context']}"
                    f"{prior_critique}\n\n"
                    "Provide a precise, well-reasoned answer."
        ),
    ]

    response = llm.invoke(messages)
    return {**state, "output": response.content, "critique": ""}


def write_memory_node(state: AgentState) -> AgentState:
    """Write the final output as an observation to memory.

    Only writes on the final iteration to avoid polluting memory
    with intermediate reflection outputs.

    Args:
        state: Current graph state.

    Returns:
        State unchanged (side effect: observation written to memory).
    """
    if state["final"]:
        memory.write_observation(
            content=f"Task: {state['task']} | Answer: {state['output']}",
            observation_type="episode",
            confidence=0.85,
        )
    return state

Step 3: The reflection node with a real verifier

This is the critical design decision. The verifier runs before the critique. If the verifier passes, reflection is skipped. If it fails, the agent receives a structured critique.

The verifier used here is schema validation — the agent's output must parse as JSON matching a target schema. On code tasks, swap this for a test-runner verifier. On math tasks, check the answer against ground truth.

import json
from jsonschema import validate, ValidationError

ANSWER_SCHEMA = {
    "type": "object",
    "properties": {
        "answer": {"type": "string", "minLength": 1},
        "confidence": {"type": "number", "minimum": 0, "maximum": 1},
        "sources": {
            "type": "array",
            "items": {"type": "string"},
        },
    },
    "required": ["answer", "confidence"],
}

MAX_REFLECTION_ITERATIONS = 2


def verify_output(output: str) -> tuple[bool, str]:
    """Verify the agent's output against the answer schema.

    Args:
        output: Agent output string to verify.

    Returns:
        Tuple of (passes_verification, failure_reason).
        failure_reason is empty string if verification passes.
    """
    try:
        parsed = json.loads(output)
        validate(instance=parsed, schema=ANSWER_SCHEMA)
        return True, ""
    except json.JSONDecodeError as exc:
        return False, f"Output is not valid JSON: {exc}"
    except ValidationError as exc:
        return False, f"Schema validation failed: {exc.message}"


def reflection_node(state: AgentState) -> AgentState:
    """Reflect on the output if it fails verification.

    Only runs LLM critique if deterministic verification fails.
    This prevents the reflection collapse failure mode.

    Args:
        state: Current graph state.

    Returns:
        Updated state with critique, or final=True if verification passes.
    """
    passes, failure_reason = verify_output(state["output"])

    if passes or state["iteration"] >= MAX_REFLECTION_ITERATIONS:
        return {**state, "final": True}

    messages = [
        SystemMessage(
            content="You are a precise code reviewer. Identify the specific "
                    "issue with the provided output and give a short, actionable "
                    "correction. Do not rewrite the output."
        ),
        HumanMessage(
            content=f"Output that failed verification:\n{state['output']}\n\n"
                    f"Verification failure reason:\n{failure_reason}\n\n"
                    "What specifically needs to be fixed?"
        ),
    ]

    critique_response = llm.invoke(messages)
    return {
        **state,
        "critique": critique_response.content,
        "iteration": state["iteration"] + 1,
        "final": False,
    }


def should_reflect(state: AgentState) -> str:
    """Conditional edge: continue to reflection or end.

    Args:
        state: Current graph state.

    Returns:
        "reflect" to continue, "end" to terminate.
    """
    return "end" if state["final"] else "reflect"

Step 4: Wire the graph

def build_agent() -> StateGraph:
    """Build and compile the self-improving agent graph.

    Returns:
        Compiled LangGraph ready for invocation.
    """
    graph = StateGraph(AgentState)

    graph.add_node("retrieve", retrieve_node)
    graph.add_node("generate", generate_node)
    graph.add_node("reflect", reflection_node)
    graph.add_node("write_memory", write_memory_node)

    graph.set_entry_point("retrieve")
    graph.add_edge("retrieve", "generate")
    graph.add_edge("generate", "reflect")
    graph.add_conditional_edges(
        "reflect",
        should_reflect,
        {"reflect": "generate", "end": "write_memory"},
    )
    graph.add_edge("write_memory", END)

    return graph.compile()


agent = build_agent()


def run_task(task: str) -> str:
    """Run a single task through the agent.

    Args:
        task: Natural language task string.

    Returns:
        Agent's final output string.
    """
    initial_state: AgentState = {
        "task": task,
        "context": "",
        "output": "",
        "critique": "",
        "iteration": 0,
        "final": False,
    }
    result = agent.invoke(initial_state)
    return result["output"]

Step 5: The benchmark harness

The benchmark harness runs the agent against a stable task set and logs the results. Run it nightly via a cron job or CI schedule.

import json
import datetime
from pathlib import Path

BENCHMARK_TASKS = [
    {
        "id": "T001",
        "task": 'Return JSON: {"answer": "The primary database is PostgreSQL 15.", '
                '"confidence": 0.9}',
        "verifier": "schema",
    },
    {
        "id": "T002",
        "task": 'Return JSON: {"answer": "The deployment runs on Cloud Run.", '
                '"confidence": 0.85, "sources": ["infra-notes"]}',
        "verifier": "schema",
    },
]

RESULTS_LOG = Path("benchmark_results.jsonl")


def run_benchmark() -> dict:
    """Run the full benchmark suite and log results.

    Returns:
        Summary dict with timestamp, pass_rate, and per-task results.
    """
    results = []
    for task_def in BENCHMARK_TASKS:
        output = run_task(task_def["task"])
        passes, reason = verify_output(output)
        results.append({
            "task_id": task_def["id"],
            "passes": passes,
            "failure_reason": reason if not passes else "",
        })

    pass_rate = sum(1 for r in results if r["passes"]) / len(results)
    summary = {
        "timestamp": datetime.datetime.utcnow().isoformat(),
        "pass_rate": round(pass_rate, 3),
        "results": results,
    }

    with RESULTS_LOG.open("a") as f:
        f.write(json.dumps(summary) + "\n")

    print(f"Benchmark complete — pass rate: {pass_rate:.1%}")
    return summary


if __name__ == "__main__":
    run_benchmark()

Schedule this as a nightly cron job:

# crontab -e
0 2 * * * cd /path/to/agent && python benchmark.py

What this agent does

On each task invocation, the agent:

  1. Queries workspace memory for relevant context
  2. Generates an output conditioned on the task and context
  3. Runs schema validation on the output (the deterministic verifier)
  4. If validation fails and the iteration budget is not exhausted, generates a targeted critique and retries
  5. On the final iteration, writes the task and output as an episode to memory

Over time, the memory store accumulates task-output episodes. Future similar tasks retrieve these episodes as context, and the agent's outputs improve because the context is richer. This is the memory-accumulation mechanism.

The reflection loop only fires on schema validation failures — this is the key design choice. Reflection without a verifier is expensive theater. Reflection with a schema validator is a real quality gate.

FAQ

What Python version does this require?

Python 3.11+. The type hint syntax (str | None) requires 3.10+; the rest is compatible with 3.9+.

Can I use a different LLM provider?

Yes. Replace ChatAnthropic with any LangChain-compatible LLM. The rest of the code is provider-agnostic.

How do I use a test-runner verifier instead of schema validation?

Replace the verify_output function with one that runs your test suite. Return (True, "") if tests pass, (False, failure_output) with the test failure output as the reason. The reflection node passes the failure reason to the critique step.

How does memory accumulation produce improvement?

Each completed task writes an episode to memory. Future tasks with similar phrasing retrieve these episodes as context. The agent has examples of prior correct answers to condition on. This is the Reflexion-style cross-episode learning mechanism — without the parameter updates.

What if the agent writes wrong answers to memory?

Set confidence=0.85 or lower for agent-generated episodes and implement contradiction detection in the memory write step. The Oxagen API supports confidence-threshold filtering on retrieval — low-confidence observations can be filtered out of context when higher-confidence facts cover the same ground.

Further reading


Oxagen is the ontology layer for AI agents — a typed, Neo4j-backed knowledge graph with an MCP-native API that agents like the one above connect to directly. Read the docs to get an API key, or book a demo for production deployment guidance.