yunxiafei 1ce5f12655 fix: 修复 Python 3.12 类型注解兼容性问题
主要修复:
1. llm/client.py - 添加 from __future__ import annotations,修复 AsyncIterator 导入
2. llm/openai_client.py - 修复 stream_chat 返回类型(同步返回 AsyncIterator)
3. agent/context.py - 添加 TokenCounter Protocol 类型
4. agent/failover.py - 修复 Callable 类型参数
5. agent/runner.py - 添加 __future__ annotations
6. session/manager.py - 添加 TYPE_CHECKING 和 TokenCounter 类型
7. tools/*.py - 添加 __future__ annotations

类型注解改进:
- 使用 collections.abc.AsyncIterator 替代 typing.AsyncIterator
- 添加 Protocol 定义用于类型检查
- 修复所有 missing type annotation 警告
2026-03-17 17:43:39 +08:00
2026-03-17 14:06:34 +08:00

Python 实现 OpenClaw Agent 核心功能技术方案

版本: v1.0
创建日期: 2026-03-17
作者: 小白 🐶


一、项目概述

1.1 目标

用 Python 实现 OpenClaw Agent 的核心功能,构建一个轻量级、易于扩展的 AI Agent 框架。

1.2 核心功能范围

功能模块 OpenClaw 实现 Python 实现优先级
Agent 运行循环 pi-embedded-runner/run.ts P0 - 核心
工具系统 pi-tools.ts P0 - 核心
会话管理 SessionManager P0 - 核心
LLM 客户端 多供应商支持 P0 - 核心
上下文管理 上下文窗口监控/压缩 P1 - 重要
故障转移 认证轮换/模型回退 P1 - 重要
记忆系统 向量存储/记忆检索 P2 - 可选
插件系统 渠道/工具/钩子插件 P2 - 可选
Gateway 服务 WebSocket/HTTP 服务 P3 - 后期

二、架构分析

2.1 OpenClaw Agent 核心架构

┌─────────────────────────────────────────────────────────────────┐
│                     Agent 运行循环                               │
│  runEmbeddedPiAgent() → 主循环                                   │
│    ├── 认证配置解析 (resolveAuthProfileOrder)                    │
│    ├── 上下文引擎初始化 (ensureContextEnginesInitialized)        │
│    ├── 构建 Payload (buildEmbeddedRunPayloads)                  │
│    └── 执行尝试循环 (runEmbeddedAttempt)                         │
│         ├── 模型调用 (streamSimple)                              │
│         ├── 工具执行 (executeTool)                               │
│         └── 结果处理 (handleResult)                              │
├─────────────────────────────────────────────────────────────────┤
│                     工具系统                                     │
│  AnyAgentTool = { name, description, parameters, execute }       │
│    ├── 文件工具 (read, write, edit)                              │
│    ├── Shell 工具 (exec, process)                                │
│    ├── 浏览器工具 (browser)                                      │
│    ├── 消息工具 (message)                                        │
│    └── 自定义工具 (skills)                                       │
├─────────────────────────────────────────────────────────────────┤
│                     会话管理                                     │
│  SessionManager                                                  │
│    ├── 会话历史 (messages)                                       │
│    ├── 工具调用记录 (tool_calls)                                 │
│    ├── 上下文压缩 (compaction)                                   │
│    └── 持久化存储 (session_file)                                 │
├─────────────────────────────────────────────────────────────────┤
│                     LLM 客户端                                   │
│  多供应商支持                                                     │
│    ├── OpenAI (GPT-4, GPT-4o)                                    │
│    ├── Anthropic (Claude)                                        │
│    ├── Google (Gemini)                                           │
│    ├── 本地模型 (Ollama)                                         │
│    └── 其他供应商                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 核心数据结构

// Agent 运行参数
interface RunEmbeddedPiAgentParams {
  sessionId: string;
  sessionFile: string;
  workspaceDir: string;
  prompt: string;
  images?: ImageContent[];
  provider?: string;
  model?: string;
  timeoutMs: number;
  abortSignal?: AbortSignal;
  tools?: AnyAgentTool[];
}

// Agent 运行结果
interface EmbeddedPiRunResult {
  payloads?: Array<{
    text?: string;
    mediaUrl?: string;
    isError?: boolean;
  }>;
  meta: {
    durationMs: number;
    agentMeta?: {
      sessionId: string;
      provider: string;
      model: string;
      usage?: { input, output, total };
    };
    error?: { kind, message };
  };
}

// 工具定义
interface AgentTool<TParams, TResult> {
  name: string;
  description: string;
  parameters: JSONSchema;
  execute: (params: TParams, context: ToolContext) => Promise<TResult>;
}

三、Python 实现方案

3.1 技术栈选择

组件 Python 技术选型 理由
异步框架 asyncio + aiohttp 原生支持,性能好
LLM 客户端 openai SDK 官方支持,兼容多供应商
数据验证 Pydantic v2 类型安全,性能优秀
配置管理 pydantic-settings 环境变量支持
JSON Schema jsonschema / pydantic 工具参数验证
文件系统 aiofiles 异步文件操作
进程管理 asyncio.subprocess Shell 工具实现
向量存储 chromadb / faiss 记忆系统(可选)

3.2 项目结构设计

py_agent/
├── src/
│   ├── agent/
│   │   ├── __init__.py
│   │   ├── runner.py          # Agent 运行循环
│   │   ├── context.py         # 上下文管理
│   │   ├── failover.py        # 故障转移
│   │   └── types.py           # 类型定义
│   ├── tools/
│   │   ├── __init__.py
│   │   ├── base.py            # 工具基类
│   │   ├── file_tools.py      # 文件工具
│   │   ├── shell_tools.py     # Shell 工具
│   │   ├── browser_tools.py   # 浏览器工具
│   │   └── registry.py        # 工具注册表
│   ├── session/
│   │   ├── __init__.py
│   │   ├── manager.py         # 会话管理器
│   │   ├── history.py         # 历史记录
│   │   ├── compaction.py      # 上下文压缩
│   │   └── storage.py         # 持久化存储
│   ├── llm/
│   │   ├── __init__.py
│   │   ├── client.py          # LLM 客户端基类
│   │   ├── openai_client.py   # OpenAI 客户端
│   │   ├── anthropic_client.py # Anthropic 客户端
│   │   ├── gemini_client.py   # Gemini 客户端
│   │   └── ollama_client.py   # Ollama 客户端
│   ├── memory/
│   │   ├── __init__.py
│   │   ├── store.py           # 记忆存储
│   │   ├── embedder.py        # 向量嵌入
│   │   └── retriever.py       # 记忆检索
│   ├── config/
│   │   ├── __init__.py
│   │   ├── settings.py        # 配置管理
│   │   └── auth.py            # 认证配置
│   └── utils/
│       ├── __init__.py
│       ├── logging.py         # 日志工具
│       └── retry.py           # 重试机制
├── examples/
│   ├── simple_agent.py        # 简单示例
│   ├── tool_agent.py          # 工具调用示例
│   └── multi_model.py         # 多模型示例
├── tests/
│   ├── test_agent.py
│   ├── test_tools.py
│   └── test_session.py
├── pyproject.toml
├── requirements.txt
└── README.md

3.3 核心代码实现

3.3.1 类型定义 (types.py)

from typing import Optional, List, Dict, Any, Literal
from pydantic import BaseModel, Field
from enum import Enum

class MessageRole(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"

class ImageContent(BaseModel):
    type: Literal["image"]
    url: Optional[str] = None
    base64: Optional[str] = None
    mime_type: Optional[str] = None

class ContentBlock(BaseModel):
    type: Literal["text", "image"]
    text: Optional[str] = None
    image: Optional[ImageContent] = None

class Message(BaseModel):
    role: MessageRole
    content: str | List[ContentBlock]
    tool_calls: Optional[List[Dict[str, Any]]] = None
    tool_call_id: Optional[str] = None

class ToolParameter(BaseModel):
    type: str
    description: Optional[str] = None
    enum: Optional[List[str]] = None
    default: Optional[Any] = None

class ToolDefinition(BaseModel):
    name: str
    description: str
    parameters: Dict[str, ToolParameter]
    required: List[str] = Field(default_factory=list)

class ToolResult(BaseModel):
    tool_call_id: str
    name: str
    result: Any
    is_error: bool = False

class Usage(BaseModel):
    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_tokens: int = 0
    cache_write_tokens: int = 0
    total_tokens: int = 0

class AgentMeta(BaseModel):
    session_id: str
    provider: str
    model: str
    usage: Optional[Usage] = None
    prompt_tokens: Optional[int] = None

class AgentResult(BaseModel):
    text: Optional[str] = None
    tool_calls: Optional[List[Dict[str, Any]]] = None
    is_error: bool = False
    meta: AgentMeta
    duration_ms: int = 0

class RunParams(BaseModel):
    session_id: str
    prompt: str
    images: Optional[List[ImageContent]] = None
    provider: Optional[str] = None
    model: Optional[str] = None
    timeout_ms: int = 300000
    max_retries: int = 3
    tools: Optional[List[ToolDefinition]] = None

3.3.2 工具基类 (tools/base.py)

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from ..agent.types import ToolDefinition, ToolResult

class ToolContext:
    """工具执行上下文"""
    def __init__(
        self,
        session_id: str,
        workspace_dir: str,
        abort_signal: Optional[Any] = None,
    ):
        self.session_id = session_id
        self.workspace_dir = workspace_dir
        self.abort_signal = abort_signal

class BaseTool(ABC):
    """工具基类"""
    
    @property
    @abstractmethod
    def name(self) -> str:
        """工具名称"""
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        """工具描述"""
        pass
    
    @property
    @abstractmethod
    def parameters(self) -> Dict[str, Any]:
        """参数 JSON Schema"""
        pass
    
    @property
    def required(self) -> list[str]:
        """必需参数"""
        return []
    
    def to_definition(self) -> ToolDefinition:
        """转换为工具定义"""
        return ToolDefinition(
            name=self.name,
            description=self.description,
            parameters=self.parameters,
            required=self.required,
        )
    
    @abstractmethod
    async def execute(
        self,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> Any:
        """执行工具"""
        pass

class ToolRegistry:
    """工具注册表"""
    
    def __init__(self):
        self._tools: Dict[str, BaseTool] = {}
    
    def register(self, tool: BaseTool) -> None:
        """注册工具"""
        self._tools[tool.name] = tool
    
    def get(self, name: str) -> Optional[BaseTool]:
        """获取工具"""
        return self._tools.get(name)
    
    def list_tools(self) -> list[ToolDefinition]:
        """列出所有工具定义"""
        return [tool.to_definition() for tool in self._tools.values()]
    
    async def execute(
        self,
        name: str,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> Any:
        """执行工具"""
        tool = self.get(name)
        if not tool:
            raise ValueError(f"Tool not found: {name}")
        return await tool.execute(params, context)

3.3.3 文件工具 (tools/file_tools.py)

import aiofiles
import os
from pathlib import Path
from typing import Any, Dict, Optional
from .base import BaseTool, ToolContext

class ReadTool(BaseTool):
    """文件读取工具"""
    
    @property
    def name(self) -> str:
        return "read"
    
    @property
    def description(self) -> str:
        return "Read the contents of a file. Supports text files and images."
    
    @property
    def parameters(self) -> Dict[str, Any]:
        return {
            "path": {
                "type": "string",
                "description": "Path to the file to read",
            },
            "offset": {
                "type": "integer",
                "description": "Line number to start reading from",
            },
            "limit": {
                "type": "integer",
                "description": "Maximum number of lines to read",
            },
        }
    
    @property
    def required(self) -> list[str]:
        return ["path"]
    
    async def execute(
        self,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> str:
        path = Path(context.workspace_dir) / params["path"]
        
        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")
        
        offset = params.get("offset", 1)
        limit = params.get("limit", 2000)
        
        async with aiofiles.open(path, "r") as f:
            lines = []
            for i, line in enumerate(f, 1):
                if i >= offset:
                    lines.append(line)
                if len(lines) >= limit:
                    break
            return "".join(lines)

class WriteTool(BaseTool):
    """文件写入工具"""
    
    @property
    def name(self) -> str:
        return "write"
    
    @property
    def description(self) -> str:
        return "Write content to a file. Creates the file if it doesn't exist."
    
    @property
    def parameters(self) -> Dict[str, Any]:
        return {
            "path": {
                "type": "string",
                "description": "Path to the file to write",
            },
            "content": {
                "type": "string",
                "description": "Content to write to the file",
            },
        }
    
    @property
    def required(self) -> list[str]:
        return ["path", "content"]
    
    async def execute(
        self,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> str:
        path = Path(context.workspace_dir) / params["path"]
        content = params["content"]
        
        # 创建父目录
        path.parent.mkdir(parents=True, exist_ok=True)
        
        async with aiofiles.open(path, "w") as f:
            await f.write(content)
        
        return f"Successfully wrote {len(content)} characters to {path}"

class EditTool(BaseTool):
    """文件编辑工具"""
    
    @property
    def name(self) -> str:
        return "edit"
    
    @property
    def description(self) -> str:
        return "Edit a file by replacing exact text."
    
    @property
    def parameters(self) -> Dict[str, Any]:
        return {
            "path": {
                "type": "string",
                "description": "Path to the file to edit",
            },
            "oldText": {
                "type": "string",
                "description": "Exact text to find and replace",
            },
            "newText": {
                "type": "string",
                "description": "New text to replace with",
            },
        }
    
    @property
    def required(self) -> list[str]:
        return ["path", "oldText", "newText"]
    
    async def execute(
        self,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> str:
        path = Path(context.workspace_dir) / params["path"]
        old_text = params["oldText"]
        new_text = params["newText"]
        
        async with aiofiles.open(path, "r") as f:
            content = await f.read()
        
        if old_text not in content:
            raise ValueError(f"Text not found in file: {old_text[:50]}...")
        
        new_content = content.replace(old_text, new_text, 1)
        
        async with aiofiles.open(path, "w") as f:
            await f.write(new_content)
        
        return f"Successfully edited {path}"

3.3.4 Shell 工具 (tools/shell_tools.py)

import asyncio
from typing import Any, Dict, Optional
from .base import BaseTool, ToolContext

class ExecTool(BaseTool):
    """Shell 命令执行工具"""
    
    @property
    def name(self) -> str:
        return "exec"
    
    @property
    def description(self) -> str:
        return "Execute shell commands."
    
    @property
    def parameters(self) -> Dict[str, Any]:
        return {
            "command": {
                "type": "string",
                "description": "Shell command to execute",
            },
            "timeout": {
                "type": "integer",
                "description": "Timeout in seconds",
            },
            "cwd": {
                "type": "string",
                "description": "Working directory",
            },
        }
    
    @property
    def required(self) -> list[str]:
        return ["command"]
    
    async def execute(
        self,
        params: Dict[str, Any],
        context: ToolContext,
    ) -> Dict[str, Any]:
        command = params["command"]
        timeout = params.get("timeout", 60)
        cwd = params.get("cwd", context.workspace_dir)
        
        try:
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=cwd,
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=timeout,
            )
            
            return {
                "exit_code": process.returncode,
                "stdout": stdout.decode("utf-8", errors="replace"),
                "stderr": stderr.decode("utf-8", errors="replace"),
            }
        except asyncio.TimeoutError:
            process.kill()
            raise TimeoutError(f"Command timed out after {timeout}s")

3.3.5 LLM 客户端 (llm/client.py)

from abc import ABC, abstractmethod
from typing import AsyncIterator, List, Optional, Dict, Any
from ..agent.types import Message, ToolDefinition, Usage

class LLMClient(ABC):
    """LLM 客户端基类"""
    
    def __init__(
        self,
        model: str,
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
    ):
        self.model = model
        self.api_key = api_key
        self.base_url = base_url
    
    @abstractmethod
    async def chat(
        self,
        messages: List[Message],
        tools: Optional[List[ToolDefinition]] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """发送聊天请求"""
        pass
    
    @abstractmethod
    async def stream_chat(
        self,
        messages: List[Message],
        tools: Optional[List[ToolDefinition]] = None,
        **kwargs,
    ) -> AsyncIterator[Dict[str, Any]]:
        """流式聊天请求"""
        pass
    
    @abstractmethod
    def count_tokens(self, text: str) -> int:
        """计算 token 数量"""
        pass

3.3.6 OpenAI 客户端 (llm/openai_client.py)

from typing import AsyncIterator, List, Optional, Dict, Any
from openai import AsyncOpenAI
from .client import LLMClient
from ..agent.types import Message, ToolDefinition

class OpenAIClient(LLMClient):
    """OpenAI 客户端"""
    
    def __init__(
        self,
        model: str = "gpt-4o",
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
    ):
        super().__init__(model, api_key, base_url)
        self.client = AsyncOpenAI(
            api_key=api_key,
            base_url=base_url,
        )
    
    def _convert_messages(self, messages: List[Message]) -> List[Dict[str, Any]]:
        """转换为 OpenAI 消息格式"""
        result = []
        for msg in messages:
            item = {"role": msg.role.value}
            if isinstance(msg.content, str):
                item["content"] = msg.content
            else:
                # 多模态内容
                content = []
                for block in msg.content:
                    if block.type == "text":
                        content.append({"type": "text", "text": block.text})
                    elif block.type == "image" and block.image:
                        content.append({
                            "type": "image_url",
                            "image_url": {"url": block.image.url or f"data:{block.image.mime_type};base64,{block.image.base64}"},
                        })
                item["content"] = content
            
            if msg.tool_calls:
                item["tool_calls"] = msg.tool_calls
            if msg.tool_call_id:
                item["tool_call_id"] = msg.tool_call_id
            
            result.append(item)
        return result
    
    def _convert_tools(self, tools: Optional[List[ToolDefinition]]) -> Optional[List[Dict[str, Any]]]:
        """转换为 OpenAI 工具格式"""
        if not tools:
            return None
        
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters,
                },
            }
            for tool in tools
        ]
    
    async def chat(
        self,
        messages: List[Message],
        tools: Optional[List[ToolDefinition]] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """发送聊天请求"""
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=self._convert_messages(messages),
            tools=self._convert_tools(tools),
            **kwargs,
        )
        
        choice = response.choices[0]
        return {
            "content": choice.message.content,
            "tool_calls": [
                {
                    "id": tc.id,
                    "name": tc.function.name,
                    "arguments": tc.function.arguments,
                }
                for tc in (choice.message.tool_calls or [])
            ],
            "usage": {
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            },
            "finish_reason": choice.finish_reason,
        }
    
    async def stream_chat(
        self,
        messages: List[Message],
        tools: Optional[List[ToolDefinition]] = None,
        **kwargs,
    ) -> AsyncIterator[Dict[str, Any]]:
        """流式聊天请求"""
        stream = await self.client.chat.completions.create(
            model=self.model,
            messages=self._convert_messages(messages),
            tools=self._convert_tools(tools),
            stream=True,
            **kwargs,
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield {
                    "type": "text",
                    "content": chunk.choices[0].delta.content,
                }
            if chunk.choices[0].delta.tool_calls:
                yield {
                    "type": "tool_call",
                    "tool_calls": [
                        {
                            "id": tc.id,
                            "name": tc.function.name,
                            "arguments": tc.function.arguments,
                        }
                        for tc in chunk.choices[0].delta.tool_calls
                    ],
                }
    
    def count_tokens(self, text: str) -> int:
        """估算 token 数量"""
        # 简单估算:平均每 4 字符 = 1 token
        return len(text) // 4

3.3.7 会话管理器 (session/manager.py)

import json
import aiofiles
from pathlib import Path
from typing import List, Optional, Dict, Any
from ..agent.types import Message

class SessionManager:
    """会话管理器"""
    
    def __init__(
        self,
        session_id: str,
        session_dir: str = ".sessions",
        max_history: int = 100,
    ):
        self.session_id = session_id
        self.session_dir = Path(session_dir)
        self.max_history = max_history
        self.messages: List[Message] = []
        self.metadata: Dict[str, Any] = {}
        
        self._file_path = self.session_dir / f"{session_id}.json"
    
    async def load(self) -> None:
        """加载会话"""
        if self._file_path.exists():
            async with aiofiles.open(self._file_path, "r") as f:
                data = json.loads(await f.read())
                self.messages = [Message(**m) for m in data.get("messages", [])]
                self.metadata = data.get("metadata", {})
    
    async def save(self) -> None:
        """保存会话"""
        self.session_dir.mkdir(parents=True, exist_ok=True)
        data = {
            "session_id": self.session_id,
            "messages": [m.model_dump() for m in self.messages],
            "metadata": self.metadata,
        }
        async with aiofiles.open(self._file_path, "w") as f:
            await f.write(json.dumps(data, indent=2, ensure_ascii=False))
    
    def add_message(self, message: Message) -> None:
        """添加消息"""
        self.messages.append(message)
        # 限制历史长度
        if len(self.messages) > self.max_history:
            # 保留系统消息
            system_msgs = [m for m in self.messages if m.role == "system"]
            other_msgs = [m for m in self.messages if m.role != "system"]
            self.messages = system_msgs + other_msgs[-(self.max_history - len(system_msgs)):]
    
    def get_messages(self) -> List[Message]:
        """获取所有消息"""
        return self.messages.copy()
    
    def clear(self) -> None:
        """清空会话"""
        self.messages = []
        self.metadata = {}

3.3.8 Agent 运行器 (agent/runner.py)

import asyncio
import time
from typing import Optional, List, Dict, Any
from .types import (
    RunParams,
    AgentResult,
    AgentMeta,
    Message,
    MessageRole,
    ToolDefinition,
    Usage,
)
from .context import ContextManager
from .failover import FailoverHandler
from ..tools.base import ToolRegistry, ToolContext
from ..session.manager import SessionManager
from ..llm.openai_client import OpenAIClient
from ..llm.client import LLMClient

class AgentRunner:
    """Agent 运行器"""
    
    def __init__(
        self,
        llm_client: LLMClient,
        tool_registry: ToolRegistry,
        session_manager: SessionManager,
    ):
        self.llm = llm_client
        self.tools = tool_registry
        self.session = session_manager
        self.context = ContextManager()
        self.failover = FailoverHandler()
    
    async def run(self, params: RunParams) -> AgentResult:
        """运行 Agent"""
        start_time = time.time()
        
        # 加载会话
        await self.session.load()
        
        # 添加用户消息
        user_message = Message(
            role=MessageRole.USER,
            content=params.prompt,
        )
        self.session.add_message(user_message)
        
        # 获取工具定义
        tool_defs = self.tools.list_tools() if not params.tools else params.tools
        
        # 主循环
        max_iterations = params.max_retries + 3
        for iteration in range(max_iterations):
            try:
                # 调用 LLM
                messages = self.session.get_messages()
                response = await asyncio.wait_for(
                    self.llm.chat(messages, tool_defs),
                    timeout=params.timeout_ms / 1000,
                )
                
                # 更新使用量
                usage = Usage(**response.get("usage", {}))
                
                # 检查是否有工具调用
                tool_calls = response.get("tool_calls", [])
                if tool_calls:
                    # 添加助手消息
                    assistant_message = Message(
                        role=MessageRole.ASSISTANT,
                        content=response.get("content") or "",
                        tool_calls=tool_calls,
                    )
                    self.session.add_message(assistant_message)
                    
                    # 执行工具
                    tool_context = ToolContext(
                        session_id=params.session_id,
                        workspace_dir=self.session.metadata.get("workspace_dir", "."),
                    )
                    
                    for tc in tool_calls:
                        try:
                            import json
                            tool_args = json.loads(tc["arguments"])
                            result = await self.tools.execute(
                                tc["name"],
                                tool_args,
                                tool_context,
                            )
                            
                            # 添加工具结果
                            tool_message = Message(
                                role=MessageRole.TOOL,
                                content=str(result),
                                tool_call_id=tc["id"],
                            )
                            self.session.add_message(tool_message)
                        except Exception as e:
                            # 工具执行错误
                            error_message = Message(
                                role=MessageRole.TOOL,
                                content=f"Error: {str(e)}",
                                tool_call_id=tc["id"],
                            )
                            self.session.add_message(error_message)
                    
                    # 继续循环让 LLM 处理工具结果
                    continue
                
                # 没有工具调用,返回结果
                assistant_message = Message(
                    role=MessageRole.ASSISTANT,
                    content=response.get("content") or "",
                )
                self.session.add_message(assistant_message)
                
                # 保存会话
                await self.session.save()
                
                duration_ms = int((time.time() - start_time) * 1000)
                return AgentResult(
                    text=response.get("content"),
                    meta=AgentMeta(
                        session_id=params.session_id,
                        provider=self.llm.__class__.__name__,
                        model=self.llm.model,
                        usage=usage,
                    ),
                    duration_ms=duration_ms,
                )
                
            except asyncio.TimeoutError:
                # 超时处理
                self.failover.handle_timeout()
            except Exception as e:
                # 错误处理
                should_retry = self.failover.handle_error(e)
                if not should_retry:
                    return AgentResult(
                        is_error=True,
                        meta=AgentMeta(
                            session_id=params.session_id,
                            provider=self.llm.__class__.__name__,
                            model=self.llm.model,
                        ),
                        text=f"Error: {str(e)}",
                    )
        
        # 达到最大迭代次数
        return AgentResult(
            is_error=True,
            meta=AgentMeta(
                session_id=params.session_id,
                provider=self.llm.__class__.__name__,
                model=self.llm.model,
            ),
            text="Error: Maximum iterations reached",
        )

四、可行性评估

4.1 技术可行性

功能 可行性 难度 说明
Agent 运行循环 完全可行 Python asyncio 原生支持
工具系统 完全可行 装饰器 + 注册表模式
会话管理 完全可行 JSON 文件存储简单
LLM 客户端 完全可行 OpenAI SDK 成熟
上下文压缩 可行 需要实现 token 计数和摘要
故障转移 可行 需要设计重试策略
记忆系统 可行 需要集成向量数据库
插件系统 ⚠️ 部分可行 Python 动态导入有限制
Gateway 服务 可行 FastAPI + WebSocket

4.2 工作量估算

阶段 内容 时间 优先级
阶段 1 核心框架 + 基础工具 2-3 周 P0
阶段 2 多模型支持 + 会话管理 1-2 周 P0
阶段 3 上下文压缩 + 故障转移 1-2 周 P1
阶段 4 记忆系统 1 周 P2
阶段 5 Gateway 服务 1-2 周 P3
总计 6-9 周

4.3 风险与挑战

风险 影响 缓解措施
Token 计数不准确 使用 tiktoken 库或 API 返回值
流式响应处理复杂 参考现有库实现
并发控制 使用 asyncio 信号量
插件热加载 使用 importlib
内存占用 定期清理历史

五、与 OpenClaw 对比

5.1 功能对比

功能 OpenClaw (TS) PyAgent 说明
多渠道支持 20+ 渠道 后期可扩展
多模型支持 OpenAI 兼容
工具系统 核心功能
ACP 协议 可后续实现
插件系统 完整 ⚠️ 简化 Python 限制
Gateway 服务 ⚠️ 可选 FastAPI 实现
上下文压缩 核心功能
记忆系统 向量存储

5.2 架构对比

OpenClaw (TypeScript)          PyAgent (Python)
─────────────────────          ────────────────
CLI → Gateway → Agent          直接调用 → Agent
    ↓                              ↓
Plugin System                  Tool Registry
    ↓                              ↓
Channel Layer                  (可扩展)
    ↓
ACP Protocol

六、实施路线

6.1 第一阶段:核心框架 (2-3 周)

  1. 项目初始化

    • 创建项目结构
    • 配置 pyproject.toml
    • 设置 CI/CD
  2. 类型系统

    • 实现核心类型定义
    • Pydantic 模型
  3. 工具系统

    • 工具基类
    • 文件工具 (read, write, edit)
    • Shell 工具 (exec)
  4. LLM 客户端

    • 客户端基类
    • OpenAI 客户端
    • 流式响应
  5. 会话管理

    • SessionManager
    • 文件持久化
  6. Agent 运行器

    • 主循环
    • 工具调用处理

6.2 第二阶段:完善功能 (1-2 周)

  1. 多模型支持

    • Anthropic 客户端
    • Gemini 客户端
    • Ollama 客户端
  2. 配置管理

    • Pydantic Settings
    • 环境变量
  3. 错误处理

    • 重试机制
    • 超时处理

6.3 第三阶段:高级功能 (2-3 周)

  1. 上下文管理

    • Token 计数
    • 压缩策略
  2. 记忆系统

    • 向量存储
    • 记忆检索
  3. Gateway 服务 (可选)

    • FastAPI 服务
    • WebSocket 支持

七、示例代码

7.1 简单使用

from py_agent import AgentRunner, OpenAIClient, ToolRegistry
from py_agent.tools import ReadTool, WriteTool, ExecTool
from py_agent.session import SessionManager

# 创建工具注册表
tools = ToolRegistry()
tools.register(ReadTool())
tools.register(WriteTool())
tools.register(ExecTool())

# 创建 LLM 客户端
llm = OpenAIClient(
    model="gpt-4o",
    api_key="your-api-key",
)

# 创建会话管理器
session = SessionManager(
    session_id="my-session",
    session_dir="./sessions",
)

# 创建 Agent 运行器
agent = AgentRunner(
    llm_client=llm,
    tool_registry=tools,
    session_manager=session,
)

# 运行 Agent
result = await agent.run(
    session_id="my-session",
    prompt="帮我创建一个 Python 文件,实现一个简单的计算器",
)

print(result.text)

7.2 流式响应

async for chunk in agent.stream_run(
    session_id="my-session",
    prompt="解释一下 Python 的异步编程",
):
    if chunk["type"] == "text":
        print(chunk["content"], end="", flush=True)
    elif chunk["type"] == "tool_call":
        print(f"\n[工具调用: {chunk['tool_calls'][0]['name']}]")

八、总结

8.1 结论

用 Python 实现 OpenClaw Agent 核心功能完全可行。

关键优势:

  1. 技术成熟: Python 生态成熟,异步支持完善
  2. 代码简洁: Python 代码更简洁,易于维护
  3. 扩展性强: 模块化设计,易于扩展
  4. 开发效率高: Python 开发速度快

8.2 建议

  1. 先实现核心功能: Agent 运行循环、工具系统、会话管理
  2. 逐步添加高级功能: 上下文压缩、记忆系统
  3. 保持简洁: 不需要完全复制 OpenClaw聚焦核心功能
  4. 重视测试: 单元测试和集成测试覆盖

8.3 后续工作

  1. 完善本文档
  2. 实现核心代码
  3. 编写测试用例
  4. 编写使用文档
  5. 发布到 PyPI

文档版本: v1.0
最后更新: 2026-03-17
作者: 小白 🐶

Description
No description provided
Readme MIT 120 KiB
Languages
Python 100%