主要修复: 1. llm/client.py - 添加 from __future__ import annotations,修复 AsyncIterator 导入 2. llm/openai_client.py - 修复 stream_chat 返回类型(同步返回 AsyncIterator) 3. agent/context.py - 添加 TokenCounter Protocol 类型 4. agent/failover.py - 修复 Callable 类型参数 5. agent/runner.py - 添加 __future__ annotations 6. session/manager.py - 添加 TYPE_CHECKING 和 TokenCounter 类型 7. tools/*.py - 添加 __future__ annotations 类型注解改进: - 使用 collections.abc.AsyncIterator 替代 typing.AsyncIterator - 添加 Protocol 定义用于类型检查 - 修复所有 missing type annotation 警告
Python 实现 OpenClaw Agent 核心功能技术方案
版本: v1.0
创建日期: 2026-03-17
作者: 小白 🐶
一、项目概述
1.1 目标
用 Python 实现 OpenClaw Agent 的核心功能,构建一个轻量级、易于扩展的 AI Agent 框架。
1.2 核心功能范围
| 功能模块 | OpenClaw 实现 | Python 实现优先级 |
|---|---|---|
| Agent 运行循环 | pi-embedded-runner/run.ts |
P0 - 核心 |
| 工具系统 | pi-tools.ts |
P0 - 核心 |
| 会话管理 | SessionManager |
P0 - 核心 |
| LLM 客户端 | 多供应商支持 | P0 - 核心 |
| 上下文管理 | 上下文窗口监控/压缩 | P1 - 重要 |
| 故障转移 | 认证轮换/模型回退 | P1 - 重要 |
| 记忆系统 | 向量存储/记忆检索 | P2 - 可选 |
| 插件系统 | 渠道/工具/钩子插件 | P2 - 可选 |
| Gateway 服务 | WebSocket/HTTP 服务 | P3 - 后期 |
二、架构分析
2.1 OpenClaw Agent 核心架构
┌─────────────────────────────────────────────────────────────────┐
│ Agent 运行循环 │
│ runEmbeddedPiAgent() → 主循环 │
│ ├── 认证配置解析 (resolveAuthProfileOrder) │
│ ├── 上下文引擎初始化 (ensureContextEnginesInitialized) │
│ ├── 构建 Payload (buildEmbeddedRunPayloads) │
│ └── 执行尝试循环 (runEmbeddedAttempt) │
│ ├── 模型调用 (streamSimple) │
│ ├── 工具执行 (executeTool) │
│ └── 结果处理 (handleResult) │
├─────────────────────────────────────────────────────────────────┤
│ 工具系统 │
│ AnyAgentTool = { name, description, parameters, execute } │
│ ├── 文件工具 (read, write, edit) │
│ ├── Shell 工具 (exec, process) │
│ ├── 浏览器工具 (browser) │
│ ├── 消息工具 (message) │
│ └── 自定义工具 (skills) │
├─────────────────────────────────────────────────────────────────┤
│ 会话管理 │
│ SessionManager │
│ ├── 会话历史 (messages) │
│ ├── 工具调用记录 (tool_calls) │
│ ├── 上下文压缩 (compaction) │
│ └── 持久化存储 (session_file) │
├─────────────────────────────────────────────────────────────────┤
│ LLM 客户端 │
│ 多供应商支持 │
│ ├── OpenAI (GPT-4, GPT-4o) │
│ ├── Anthropic (Claude) │
│ ├── Google (Gemini) │
│ ├── 本地模型 (Ollama) │
│ └── 其他供应商 │
└─────────────────────────────────────────────────────────────────┘
2.2 核心数据结构
// Agent 运行参数
interface RunEmbeddedPiAgentParams {
sessionId: string;
sessionFile: string;
workspaceDir: string;
prompt: string;
images?: ImageContent[];
provider?: string;
model?: string;
timeoutMs: number;
abortSignal?: AbortSignal;
tools?: AnyAgentTool[];
}
// Agent 运行结果
interface EmbeddedPiRunResult {
payloads?: Array<{
text?: string;
mediaUrl?: string;
isError?: boolean;
}>;
meta: {
durationMs: number;
agentMeta?: {
sessionId: string;
provider: string;
model: string;
usage?: { input, output, total };
};
error?: { kind, message };
};
}
// 工具定义
interface AgentTool<TParams, TResult> {
name: string;
description: string;
parameters: JSONSchema;
execute: (params: TParams, context: ToolContext) => Promise<TResult>;
}
三、Python 实现方案
3.1 技术栈选择
| 组件 | Python 技术选型 | 理由 |
|---|---|---|
| 异步框架 | asyncio + aiohttp | 原生支持,性能好 |
| LLM 客户端 | openai SDK | 官方支持,兼容多供应商 |
| 数据验证 | Pydantic v2 | 类型安全,性能优秀 |
| 配置管理 | pydantic-settings | 环境变量支持 |
| JSON Schema | jsonschema / pydantic | 工具参数验证 |
| 文件系统 | aiofiles | 异步文件操作 |
| 进程管理 | asyncio.subprocess | Shell 工具实现 |
| 向量存储 | chromadb / faiss | 记忆系统(可选) |
3.2 项目结构设计
py_agent/
├── src/
│ ├── agent/
│ │ ├── __init__.py
│ │ ├── runner.py # Agent 运行循环
│ │ ├── context.py # 上下文管理
│ │ ├── failover.py # 故障转移
│ │ └── types.py # 类型定义
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── base.py # 工具基类
│ │ ├── file_tools.py # 文件工具
│ │ ├── shell_tools.py # Shell 工具
│ │ ├── browser_tools.py # 浏览器工具
│ │ └── registry.py # 工具注册表
│ ├── session/
│ │ ├── __init__.py
│ │ ├── manager.py # 会话管理器
│ │ ├── history.py # 历史记录
│ │ ├── compaction.py # 上下文压缩
│ │ └── storage.py # 持久化存储
│ ├── llm/
│ │ ├── __init__.py
│ │ ├── client.py # LLM 客户端基类
│ │ ├── openai_client.py # OpenAI 客户端
│ │ ├── anthropic_client.py # Anthropic 客户端
│ │ ├── gemini_client.py # Gemini 客户端
│ │ └── ollama_client.py # Ollama 客户端
│ ├── memory/
│ │ ├── __init__.py
│ │ ├── store.py # 记忆存储
│ │ ├── embedder.py # 向量嵌入
│ │ └── retriever.py # 记忆检索
│ ├── config/
│ │ ├── __init__.py
│ │ ├── settings.py # 配置管理
│ │ └── auth.py # 认证配置
│ └── utils/
│ ├── __init__.py
│ ├── logging.py # 日志工具
│ └── retry.py # 重试机制
├── examples/
│ ├── simple_agent.py # 简单示例
│ ├── tool_agent.py # 工具调用示例
│ └── multi_model.py # 多模型示例
├── tests/
│ ├── test_agent.py
│ ├── test_tools.py
│ └── test_session.py
├── pyproject.toml
├── requirements.txt
└── README.md
3.3 核心代码实现
3.3.1 类型定义 (types.py)
from typing import Optional, List, Dict, Any, Literal
from pydantic import BaseModel, Field
from enum import Enum
class MessageRole(str, Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool"
class ImageContent(BaseModel):
type: Literal["image"]
url: Optional[str] = None
base64: Optional[str] = None
mime_type: Optional[str] = None
class ContentBlock(BaseModel):
type: Literal["text", "image"]
text: Optional[str] = None
image: Optional[ImageContent] = None
class Message(BaseModel):
role: MessageRole
content: str | List[ContentBlock]
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
class ToolParameter(BaseModel):
type: str
description: Optional[str] = None
enum: Optional[List[str]] = None
default: Optional[Any] = None
class ToolDefinition(BaseModel):
name: str
description: str
parameters: Dict[str, ToolParameter]
required: List[str] = Field(default_factory=list)
class ToolResult(BaseModel):
tool_call_id: str
name: str
result: Any
is_error: bool = False
class Usage(BaseModel):
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
total_tokens: int = 0
class AgentMeta(BaseModel):
session_id: str
provider: str
model: str
usage: Optional[Usage] = None
prompt_tokens: Optional[int] = None
class AgentResult(BaseModel):
text: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
is_error: bool = False
meta: AgentMeta
duration_ms: int = 0
class RunParams(BaseModel):
session_id: str
prompt: str
images: Optional[List[ImageContent]] = None
provider: Optional[str] = None
model: Optional[str] = None
timeout_ms: int = 300000
max_retries: int = 3
tools: Optional[List[ToolDefinition]] = None
3.3.2 工具基类 (tools/base.py)
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from ..agent.types import ToolDefinition, ToolResult
class ToolContext:
"""工具执行上下文"""
def __init__(
self,
session_id: str,
workspace_dir: str,
abort_signal: Optional[Any] = None,
):
self.session_id = session_id
self.workspace_dir = workspace_dir
self.abort_signal = abort_signal
class BaseTool(ABC):
"""工具基类"""
@property
@abstractmethod
def name(self) -> str:
"""工具名称"""
pass
@property
@abstractmethod
def description(self) -> str:
"""工具描述"""
pass
@property
@abstractmethod
def parameters(self) -> Dict[str, Any]:
"""参数 JSON Schema"""
pass
@property
def required(self) -> list[str]:
"""必需参数"""
return []
def to_definition(self) -> ToolDefinition:
"""转换为工具定义"""
return ToolDefinition(
name=self.name,
description=self.description,
parameters=self.parameters,
required=self.required,
)
@abstractmethod
async def execute(
self,
params: Dict[str, Any],
context: ToolContext,
) -> Any:
"""执行工具"""
pass
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self._tools: Dict[str, BaseTool] = {}
def register(self, tool: BaseTool) -> None:
"""注册工具"""
self._tools[tool.name] = tool
def get(self, name: str) -> Optional[BaseTool]:
"""获取工具"""
return self._tools.get(name)
def list_tools(self) -> list[ToolDefinition]:
"""列出所有工具定义"""
return [tool.to_definition() for tool in self._tools.values()]
async def execute(
self,
name: str,
params: Dict[str, Any],
context: ToolContext,
) -> Any:
"""执行工具"""
tool = self.get(name)
if not tool:
raise ValueError(f"Tool not found: {name}")
return await tool.execute(params, context)
3.3.3 文件工具 (tools/file_tools.py)
import aiofiles
import os
from pathlib import Path
from typing import Any, Dict, Optional
from .base import BaseTool, ToolContext
class ReadTool(BaseTool):
"""文件读取工具"""
@property
def name(self) -> str:
return "read"
@property
def description(self) -> str:
return "Read the contents of a file. Supports text files and images."
@property
def parameters(self) -> Dict[str, Any]:
return {
"path": {
"type": "string",
"description": "Path to the file to read",
},
"offset": {
"type": "integer",
"description": "Line number to start reading from",
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to read",
},
}
@property
def required(self) -> list[str]:
return ["path"]
async def execute(
self,
params: Dict[str, Any],
context: ToolContext,
) -> str:
path = Path(context.workspace_dir) / params["path"]
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
offset = params.get("offset", 1)
limit = params.get("limit", 2000)
async with aiofiles.open(path, "r") as f:
lines = []
for i, line in enumerate(f, 1):
if i >= offset:
lines.append(line)
if len(lines) >= limit:
break
return "".join(lines)
class WriteTool(BaseTool):
"""文件写入工具"""
@property
def name(self) -> str:
return "write"
@property
def description(self) -> str:
return "Write content to a file. Creates the file if it doesn't exist."
@property
def parameters(self) -> Dict[str, Any]:
return {
"path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
}
@property
def required(self) -> list[str]:
return ["path", "content"]
async def execute(
self,
params: Dict[str, Any],
context: ToolContext,
) -> str:
path = Path(context.workspace_dir) / params["path"]
content = params["content"]
# 创建父目录
path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(path, "w") as f:
await f.write(content)
return f"Successfully wrote {len(content)} characters to {path}"
class EditTool(BaseTool):
"""文件编辑工具"""
@property
def name(self) -> str:
return "edit"
@property
def description(self) -> str:
return "Edit a file by replacing exact text."
@property
def parameters(self) -> Dict[str, Any]:
return {
"path": {
"type": "string",
"description": "Path to the file to edit",
},
"oldText": {
"type": "string",
"description": "Exact text to find and replace",
},
"newText": {
"type": "string",
"description": "New text to replace with",
},
}
@property
def required(self) -> list[str]:
return ["path", "oldText", "newText"]
async def execute(
self,
params: Dict[str, Any],
context: ToolContext,
) -> str:
path = Path(context.workspace_dir) / params["path"]
old_text = params["oldText"]
new_text = params["newText"]
async with aiofiles.open(path, "r") as f:
content = await f.read()
if old_text not in content:
raise ValueError(f"Text not found in file: {old_text[:50]}...")
new_content = content.replace(old_text, new_text, 1)
async with aiofiles.open(path, "w") as f:
await f.write(new_content)
return f"Successfully edited {path}"
3.3.4 Shell 工具 (tools/shell_tools.py)
import asyncio
from typing import Any, Dict, Optional
from .base import BaseTool, ToolContext
class ExecTool(BaseTool):
"""Shell 命令执行工具"""
@property
def name(self) -> str:
return "exec"
@property
def description(self) -> str:
return "Execute shell commands."
@property
def parameters(self) -> Dict[str, Any]:
return {
"command": {
"type": "string",
"description": "Shell command to execute",
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds",
},
"cwd": {
"type": "string",
"description": "Working directory",
},
}
@property
def required(self) -> list[str]:
return ["command"]
async def execute(
self,
params: Dict[str, Any],
context: ToolContext,
) -> Dict[str, Any]:
command = params["command"]
timeout = params.get("timeout", 60)
cwd = params.get("cwd", context.workspace_dir)
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout,
)
return {
"exit_code": process.returncode,
"stdout": stdout.decode("utf-8", errors="replace"),
"stderr": stderr.decode("utf-8", errors="replace"),
}
except asyncio.TimeoutError:
process.kill()
raise TimeoutError(f"Command timed out after {timeout}s")
3.3.5 LLM 客户端 (llm/client.py)
from abc import ABC, abstractmethod
from typing import AsyncIterator, List, Optional, Dict, Any
from ..agent.types import Message, ToolDefinition, Usage
class LLMClient(ABC):
"""LLM 客户端基类"""
def __init__(
self,
model: str,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
):
self.model = model
self.api_key = api_key
self.base_url = base_url
@abstractmethod
async def chat(
self,
messages: List[Message],
tools: Optional[List[ToolDefinition]] = None,
**kwargs,
) -> Dict[str, Any]:
"""发送聊天请求"""
pass
@abstractmethod
async def stream_chat(
self,
messages: List[Message],
tools: Optional[List[ToolDefinition]] = None,
**kwargs,
) -> AsyncIterator[Dict[str, Any]]:
"""流式聊天请求"""
pass
@abstractmethod
def count_tokens(self, text: str) -> int:
"""计算 token 数量"""
pass
3.3.6 OpenAI 客户端 (llm/openai_client.py)
from typing import AsyncIterator, List, Optional, Dict, Any
from openai import AsyncOpenAI
from .client import LLMClient
from ..agent.types import Message, ToolDefinition
class OpenAIClient(LLMClient):
"""OpenAI 客户端"""
def __init__(
self,
model: str = "gpt-4o",
api_key: Optional[str] = None,
base_url: Optional[str] = None,
):
super().__init__(model, api_key, base_url)
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
)
def _convert_messages(self, messages: List[Message]) -> List[Dict[str, Any]]:
"""转换为 OpenAI 消息格式"""
result = []
for msg in messages:
item = {"role": msg.role.value}
if isinstance(msg.content, str):
item["content"] = msg.content
else:
# 多模态内容
content = []
for block in msg.content:
if block.type == "text":
content.append({"type": "text", "text": block.text})
elif block.type == "image" and block.image:
content.append({
"type": "image_url",
"image_url": {"url": block.image.url or f"data:{block.image.mime_type};base64,{block.image.base64}"},
})
item["content"] = content
if msg.tool_calls:
item["tool_calls"] = msg.tool_calls
if msg.tool_call_id:
item["tool_call_id"] = msg.tool_call_id
result.append(item)
return result
def _convert_tools(self, tools: Optional[List[ToolDefinition]]) -> Optional[List[Dict[str, Any]]]:
"""转换为 OpenAI 工具格式"""
if not tools:
return None
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
},
}
for tool in tools
]
async def chat(
self,
messages: List[Message],
tools: Optional[List[ToolDefinition]] = None,
**kwargs,
) -> Dict[str, Any]:
"""发送聊天请求"""
response = await self.client.chat.completions.create(
model=self.model,
messages=self._convert_messages(messages),
tools=self._convert_tools(tools),
**kwargs,
)
choice = response.choices[0]
return {
"content": choice.message.content,
"tool_calls": [
{
"id": tc.id,
"name": tc.function.name,
"arguments": tc.function.arguments,
}
for tc in (choice.message.tool_calls or [])
],
"usage": {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
"finish_reason": choice.finish_reason,
}
async def stream_chat(
self,
messages: List[Message],
tools: Optional[List[ToolDefinition]] = None,
**kwargs,
) -> AsyncIterator[Dict[str, Any]]:
"""流式聊天请求"""
stream = await self.client.chat.completions.create(
model=self.model,
messages=self._convert_messages(messages),
tools=self._convert_tools(tools),
stream=True,
**kwargs,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield {
"type": "text",
"content": chunk.choices[0].delta.content,
}
if chunk.choices[0].delta.tool_calls:
yield {
"type": "tool_call",
"tool_calls": [
{
"id": tc.id,
"name": tc.function.name,
"arguments": tc.function.arguments,
}
for tc in chunk.choices[0].delta.tool_calls
],
}
def count_tokens(self, text: str) -> int:
"""估算 token 数量"""
# 简单估算:平均每 4 字符 = 1 token
return len(text) // 4
3.3.7 会话管理器 (session/manager.py)
import json
import aiofiles
from pathlib import Path
from typing import List, Optional, Dict, Any
from ..agent.types import Message
class SessionManager:
"""会话管理器"""
def __init__(
self,
session_id: str,
session_dir: str = ".sessions",
max_history: int = 100,
):
self.session_id = session_id
self.session_dir = Path(session_dir)
self.max_history = max_history
self.messages: List[Message] = []
self.metadata: Dict[str, Any] = {}
self._file_path = self.session_dir / f"{session_id}.json"
async def load(self) -> None:
"""加载会话"""
if self._file_path.exists():
async with aiofiles.open(self._file_path, "r") as f:
data = json.loads(await f.read())
self.messages = [Message(**m) for m in data.get("messages", [])]
self.metadata = data.get("metadata", {})
async def save(self) -> None:
"""保存会话"""
self.session_dir.mkdir(parents=True, exist_ok=True)
data = {
"session_id": self.session_id,
"messages": [m.model_dump() for m in self.messages],
"metadata": self.metadata,
}
async with aiofiles.open(self._file_path, "w") as f:
await f.write(json.dumps(data, indent=2, ensure_ascii=False))
def add_message(self, message: Message) -> None:
"""添加消息"""
self.messages.append(message)
# 限制历史长度
if len(self.messages) > self.max_history:
# 保留系统消息
system_msgs = [m for m in self.messages if m.role == "system"]
other_msgs = [m for m in self.messages if m.role != "system"]
self.messages = system_msgs + other_msgs[-(self.max_history - len(system_msgs)):]
def get_messages(self) -> List[Message]:
"""获取所有消息"""
return self.messages.copy()
def clear(self) -> None:
"""清空会话"""
self.messages = []
self.metadata = {}
3.3.8 Agent 运行器 (agent/runner.py)
import asyncio
import time
from typing import Optional, List, Dict, Any
from .types import (
RunParams,
AgentResult,
AgentMeta,
Message,
MessageRole,
ToolDefinition,
Usage,
)
from .context import ContextManager
from .failover import FailoverHandler
from ..tools.base import ToolRegistry, ToolContext
from ..session.manager import SessionManager
from ..llm.openai_client import OpenAIClient
from ..llm.client import LLMClient
class AgentRunner:
"""Agent 运行器"""
def __init__(
self,
llm_client: LLMClient,
tool_registry: ToolRegistry,
session_manager: SessionManager,
):
self.llm = llm_client
self.tools = tool_registry
self.session = session_manager
self.context = ContextManager()
self.failover = FailoverHandler()
async def run(self, params: RunParams) -> AgentResult:
"""运行 Agent"""
start_time = time.time()
# 加载会话
await self.session.load()
# 添加用户消息
user_message = Message(
role=MessageRole.USER,
content=params.prompt,
)
self.session.add_message(user_message)
# 获取工具定义
tool_defs = self.tools.list_tools() if not params.tools else params.tools
# 主循环
max_iterations = params.max_retries + 3
for iteration in range(max_iterations):
try:
# 调用 LLM
messages = self.session.get_messages()
response = await asyncio.wait_for(
self.llm.chat(messages, tool_defs),
timeout=params.timeout_ms / 1000,
)
# 更新使用量
usage = Usage(**response.get("usage", {}))
# 检查是否有工具调用
tool_calls = response.get("tool_calls", [])
if tool_calls:
# 添加助手消息
assistant_message = Message(
role=MessageRole.ASSISTANT,
content=response.get("content") or "",
tool_calls=tool_calls,
)
self.session.add_message(assistant_message)
# 执行工具
tool_context = ToolContext(
session_id=params.session_id,
workspace_dir=self.session.metadata.get("workspace_dir", "."),
)
for tc in tool_calls:
try:
import json
tool_args = json.loads(tc["arguments"])
result = await self.tools.execute(
tc["name"],
tool_args,
tool_context,
)
# 添加工具结果
tool_message = Message(
role=MessageRole.TOOL,
content=str(result),
tool_call_id=tc["id"],
)
self.session.add_message(tool_message)
except Exception as e:
# 工具执行错误
error_message = Message(
role=MessageRole.TOOL,
content=f"Error: {str(e)}",
tool_call_id=tc["id"],
)
self.session.add_message(error_message)
# 继续循环让 LLM 处理工具结果
continue
# 没有工具调用,返回结果
assistant_message = Message(
role=MessageRole.ASSISTANT,
content=response.get("content") or "",
)
self.session.add_message(assistant_message)
# 保存会话
await self.session.save()
duration_ms = int((time.time() - start_time) * 1000)
return AgentResult(
text=response.get("content"),
meta=AgentMeta(
session_id=params.session_id,
provider=self.llm.__class__.__name__,
model=self.llm.model,
usage=usage,
),
duration_ms=duration_ms,
)
except asyncio.TimeoutError:
# 超时处理
self.failover.handle_timeout()
except Exception as e:
# 错误处理
should_retry = self.failover.handle_error(e)
if not should_retry:
return AgentResult(
is_error=True,
meta=AgentMeta(
session_id=params.session_id,
provider=self.llm.__class__.__name__,
model=self.llm.model,
),
text=f"Error: {str(e)}",
)
# 达到最大迭代次数
return AgentResult(
is_error=True,
meta=AgentMeta(
session_id=params.session_id,
provider=self.llm.__class__.__name__,
model=self.llm.model,
),
text="Error: Maximum iterations reached",
)
四、可行性评估
4.1 技术可行性
| 功能 | 可行性 | 难度 | 说明 |
|---|---|---|---|
| Agent 运行循环 | ✅ 完全可行 | 中 | Python asyncio 原生支持 |
| 工具系统 | ✅ 完全可行 | 低 | 装饰器 + 注册表模式 |
| 会话管理 | ✅ 完全可行 | 低 | JSON 文件存储简单 |
| LLM 客户端 | ✅ 完全可行 | 低 | OpenAI SDK 成熟 |
| 上下文压缩 | ✅ 可行 | 中 | 需要实现 token 计数和摘要 |
| 故障转移 | ✅ 可行 | 中 | 需要设计重试策略 |
| 记忆系统 | ✅ 可行 | 高 | 需要集成向量数据库 |
| 插件系统 | ⚠️ 部分可行 | 高 | Python 动态导入有限制 |
| Gateway 服务 | ✅ 可行 | 中 | FastAPI + WebSocket |
4.2 工作量估算
| 阶段 | 内容 | 时间 | 优先级 |
|---|---|---|---|
| 阶段 1 | 核心框架 + 基础工具 | 2-3 周 | P0 |
| 阶段 2 | 多模型支持 + 会话管理 | 1-2 周 | P0 |
| 阶段 3 | 上下文压缩 + 故障转移 | 1-2 周 | P1 |
| 阶段 4 | 记忆系统 | 1 周 | P2 |
| 阶段 5 | Gateway 服务 | 1-2 周 | P3 |
| 总计 | 6-9 周 |
4.3 风险与挑战
| 风险 | 影响 | 缓解措施 |
|---|---|---|
| Token 计数不准确 | 中 | 使用 tiktoken 库或 API 返回值 |
| 流式响应处理复杂 | 中 | 参考现有库实现 |
| 并发控制 | 低 | 使用 asyncio 信号量 |
| 插件热加载 | 低 | 使用 importlib |
| 内存占用 | 低 | 定期清理历史 |
五、与 OpenClaw 对比
5.1 功能对比
| 功能 | OpenClaw (TS) | PyAgent | 说明 |
|---|---|---|---|
| 多渠道支持 | ✅ 20+ 渠道 | ❌ | 后期可扩展 |
| 多模型支持 | ✅ | ✅ | OpenAI 兼容 |
| 工具系统 | ✅ | ✅ | 核心功能 |
| ACP 协议 | ✅ | ❌ | 可后续实现 |
| 插件系统 | ✅ 完整 | ⚠️ 简化 | Python 限制 |
| Gateway 服务 | ✅ | ⚠️ 可选 | FastAPI 实现 |
| 上下文压缩 | ✅ | ✅ | 核心功能 |
| 记忆系统 | ✅ | ✅ | 向量存储 |
5.2 架构对比
OpenClaw (TypeScript) PyAgent (Python)
───────────────────── ────────────────
CLI → Gateway → Agent 直接调用 → Agent
↓ ↓
Plugin System Tool Registry
↓ ↓
Channel Layer (可扩展)
↓
ACP Protocol
六、实施路线
6.1 第一阶段:核心框架 (2-3 周)
-
项目初始化
- 创建项目结构
- 配置 pyproject.toml
- 设置 CI/CD
-
类型系统
- 实现核心类型定义
- Pydantic 模型
-
工具系统
- 工具基类
- 文件工具 (read, write, edit)
- Shell 工具 (exec)
-
LLM 客户端
- 客户端基类
- OpenAI 客户端
- 流式响应
-
会话管理
- SessionManager
- 文件持久化
-
Agent 运行器
- 主循环
- 工具调用处理
6.2 第二阶段:完善功能 (1-2 周)
-
多模型支持
- Anthropic 客户端
- Gemini 客户端
- Ollama 客户端
-
配置管理
- Pydantic Settings
- 环境变量
-
错误处理
- 重试机制
- 超时处理
6.3 第三阶段:高级功能 (2-3 周)
-
上下文管理
- Token 计数
- 压缩策略
-
记忆系统
- 向量存储
- 记忆检索
-
Gateway 服务 (可选)
- FastAPI 服务
- WebSocket 支持
七、示例代码
7.1 简单使用
from py_agent import AgentRunner, OpenAIClient, ToolRegistry
from py_agent.tools import ReadTool, WriteTool, ExecTool
from py_agent.session import SessionManager
# 创建工具注册表
tools = ToolRegistry()
tools.register(ReadTool())
tools.register(WriteTool())
tools.register(ExecTool())
# 创建 LLM 客户端
llm = OpenAIClient(
model="gpt-4o",
api_key="your-api-key",
)
# 创建会话管理器
session = SessionManager(
session_id="my-session",
session_dir="./sessions",
)
# 创建 Agent 运行器
agent = AgentRunner(
llm_client=llm,
tool_registry=tools,
session_manager=session,
)
# 运行 Agent
result = await agent.run(
session_id="my-session",
prompt="帮我创建一个 Python 文件,实现一个简单的计算器",
)
print(result.text)
7.2 流式响应
async for chunk in agent.stream_run(
session_id="my-session",
prompt="解释一下 Python 的异步编程",
):
if chunk["type"] == "text":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "tool_call":
print(f"\n[工具调用: {chunk['tool_calls'][0]['name']}]")
八、总结
8.1 结论
用 Python 实现 OpenClaw Agent 核心功能完全可行。
关键优势:
- 技术成熟: Python 生态成熟,异步支持完善
- 代码简洁: Python 代码更简洁,易于维护
- 扩展性强: 模块化设计,易于扩展
- 开发效率高: Python 开发速度快
8.2 建议
- 先实现核心功能: Agent 运行循环、工具系统、会话管理
- 逐步添加高级功能: 上下文压缩、记忆系统
- 保持简洁: 不需要完全复制 OpenClaw,聚焦核心功能
- 重视测试: 单元测试和集成测试覆盖
8.3 后续工作
- 完善本文档
- 实现核心代码
- 编写测试用例
- 编写使用文档
- 发布到 PyPI
文档版本: v1.0
最后更新: 2026-03-17
作者: 小白 🐶
Languages
Python
100%