添加 Python 实现 Agent 技术方案 v1.0

- 可行性评估:完全可行
- 核心架构设计:模型层、工具层、Agent运行时
- 关键模块代码示例
- 开发计划:约6周
This commit is contained in:
小白
2026-03-17 12:50:01 +08:00
parent 916392bfa8
commit b8eb01f562

View File

@@ -0,0 +1,899 @@
# OpenClaw Agent Python 实现技术方案
**版本**: v1.0
**分析日期**: 2026-03-17
**分析师**: 小白 🐶
---
## 一、可行性评估
### 1.1 结论:**完全可行** ✅
OpenClaw Agent 的核心功能可以用 Python 实现,主要原因:
1. **核心依赖可替换**
- TypeScript 的 `@mariozechner/pi-agent-core` → Python 的 `anthropic``openai``google-generativeai` SDK
- TypeScript 的 `@mariozechner/pi-ai` → Python 的统一模型抽象层
2. **功能模块化**
- Agent 核心运行时与具体语言无关
- 工具系统可独立实现
- 认证系统可复用
3. **已有先例**
- OpenAI Python SDK
- Anthropic Python SDK
- LangChain/LlamaIndex 等框架
### 1.2 难度评估
| 模块 | 难度 | 工作量 | 说明 |
|------|------|--------|------|
| Agent 核心运行时 | ⭐⭐⭐ | 2周 | 模型调用、流式处理、工具循环 |
| 工具系统 | ⭐⭐ | 1周 | Shell 执行、文件操作 |
| 浏览器自动化 | ⭐⭐ | 1周 | Playwright Python |
| 网页抓取 | ⭐ | 3天 | requests + BeautifulSoup |
| 认证系统 | ⭐ | 2天 | API Key 管理 |
| Gateway 客户端 | ⭐⭐ | 1周 | WebSocket 客户端 |
| **总计** | - | **约6周** | - |
---
## 二、核心架构设计
### 2.1 整体架构
```
┌─────────────────────────────────────────────────────────────────┐
│ Python Agent Framework │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Agent Core (agent/) │ │
│ │ ├── runner.py # Agent 运行时 │ │
│ │ ├── session.py # 会话管理 │ │
│ │ ├── context.py # 上下文管理 │ │
│ │ └── failover.py # 故障转移 │ │
│ └─────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Model Layer (models/) │ │
│ │ ├── base.py # 模型抽象基类 │ │
│ │ ├── openai.py # OpenAI 实现 │ │
│ │ ├── anthropic.py # Anthropic 实现 │ │
│ │ ├── gemini.py # Google Gemini 实现 │ │
│ │ ├── ollama.py # Ollama 本地模型 │ │
│ │ └── registry.py # 模型注册表 │ │
│ └─────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Tools Layer (tools/) │ │
│ │ ├── base.py # 工具基类 │ │
│ │ ├── bash.py # Shell 命令执行 │ │
│ │ ├── browser.py # 浏览器自动化 │ │
│ │ ├── web_fetch.py # 网页抓取 │ │
│ │ ├── web_search.py # 网页搜索 │ │
│ │ ├── memory.py # 记忆管理 │ │
│ │ ├── message.py # 消息发送 │ │
│ │ └── registry.py # 工具注册表 │ │
│ └─────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Auth Layer (auth/) │ │
│ │ ├── profiles.py # 认证配置文件 │ │
│ │ ├── api_key.py # API Key 管理 │ │
│ │ └── store.py # 凭证存储 │ │
│ └─────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Gateway Client (gateway/) │ │
│ │ ├── client.py # WebSocket 客户端 │ │
│ │ ├── methods.py # RPC 方法 │ │
│ │ └── events.py # 事件处理 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 2.2 目录结构
```
pyopenclaw/
├── pyproject.toml # 项目配置
├── README.md # 使用文档
├── src/
│ └── pyopenclaw/
│ ├── __init__.py
│ ├── agent/
│ │ ├── __init__.py
│ │ ├── runner.py
│ │ ├── session.py
│ │ ├── context.py
│ │ └── failover.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── openai.py
│ │ ├── anthropic.py
│ │ ├── gemini.py
│ │ ├── ollama.py
│ │ └── registry.py
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── bash.py
│ │ ├── browser.py
│ │ ├── web_fetch.py
│ │ ├── web_search.py
│ │ ├── memory.py
│ │ ├── message.py
│ │ └── registry.py
│ ├── auth/
│ │ ├── __init__.py
│ │ ├── profiles.py
│ │ ├── api_key.py
│ │ └── store.py
│ ├── gateway/
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── methods.py
│ │ └── events.py
│ ├── config/
│ │ ├── __init__.py
│ │ ├── loader.py
│ │ └── schema.py
│ └── utils/
│ ├── __init__.py
│ ├── logging.py
│ └── helpers.py
├── tests/
│ ├── __init__.py
│ ├── test_agent.py
│ ├── test_models.py
│ └── test_tools.py
└── examples/
├── basic_usage.py
└── custom_tool.py
```
---
## 三、核心模块设计
### 3.1 模型抽象层 (models/base.py)
```python
from abc import ABC, abstractmethod
from typing import AsyncIterator, Any, Optional
from dataclasses import dataclass
from enum import Enum
class MessageRole(Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool"
@dataclass
class Message:
role: MessageRole
content: str | list[dict]
name: Optional[str] = None
tool_call_id: Optional[str] = None
tool_calls: Optional[list[dict]] = None
@dataclass
class ToolDefinition:
name: str
description: str
parameters: dict # JSON Schema
@dataclass
class ToolResult:
tool_call_id: str
content: str
is_error: bool = False
@dataclass
class StreamChunk:
delta: str
finish_reason: Optional[str] = None
tool_calls: Optional[list[dict]] = None
usage: Optional[dict] = None
class BaseModelProvider(ABC):
"""模型提供商抽象基类"""
@property
@abstractmethod
def provider_name(self) -> str:
"""提供商名称"""
pass
@abstractmethod
async def stream(
self,
messages: list[Message],
tools: Optional[list[ToolDefinition]] = None,
**kwargs
) -> AsyncIterator[StreamChunk]:
"""流式生成响应"""
pass
@abstractmethod
async def complete(
self,
messages: list[Message],
tools: Optional[list[ToolDefinition]] = None,
**kwargs
) -> Message:
"""一次性生成响应"""
pass
def supports_tools(self) -> bool:
"""是否支持工具调用"""
return True
def supports_vision(self) -> bool:
"""是否支持视觉"""
return False
```
### 3.2 OpenAI 实现 (models/openai.py)
```python
from openai import AsyncOpenAI
from .base import BaseModelProvider, Message, StreamChunk, ToolDefinition
from typing import AsyncIterator, Optional
class OpenAIProvider(BaseModelProvider):
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
@property
def provider_name(self) -> str:
return "openai"
async def stream(
self,
messages: list[Message],
tools: Optional[list[ToolDefinition]] = None,
**kwargs
) -> AsyncIterator[StreamChunk]:
# 转换消息格式
openai_messages = self._convert_messages(messages)
# 构建请求参数
params = {
"model": kwargs.get("model", "gpt-4"),
"messages": openai_messages,
"stream": True,
}
if tools:
params["tools"] = self._convert_tools(tools)
# 流式处理
async with self.client.chat.completions.create(**params) as stream:
async for chunk in stream:
delta = chunk.choices[0].delta
yield StreamChunk(
delta=delta.content or "",
finish_reason=chunk.choices[0].finish_reason,
tool_calls=delta.tool_calls,
usage=chunk.usage,
)
def _convert_messages(self, messages: list[Message]) -> list[dict]:
return [
{"role": m.role.value, "content": m.content}
for m in messages
]
def _convert_tools(self, tools: list[ToolDefinition]) -> list[dict]:
return [
{
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": t.parameters,
}
}
for t in tools
]
```
### 3.3 Agent 运行时 (agent/runner.py)
```python
import asyncio
from typing import Optional, Callable, Any
from dataclasses import dataclass
from .session import SessionManager
from .context import ContextManager
from .failover import FailoverManager
from ..models.base import BaseModelProvider, Message, ToolDefinition, ToolResult
from ..tools.registry import ToolRegistry
@dataclass
class AgentConfig:
model: str
provider: str
max_tokens: int = 4096
temperature: float = 0.7
timeout_ms: int = 120000
max_retries: int = 3
@dataclass
class AgentResult:
content: str
tool_calls: list[dict]
usage: dict
finish_reason: str
class AgentRunner:
"""Agent 核心运行时"""
def __init__(
self,
provider: BaseModelProvider,
tools: ToolRegistry,
config: AgentConfig,
):
self.provider = provider
self.tools = tools
self.config = config
self.session = SessionManager()
self.context = ContextManager()
self.failover = FailoverManager(config)
async def run(
self,
prompt: str,
on_chunk: Optional[Callable[[str], None]] = None,
on_tool_call: Optional[Callable[[dict], None]] = None,
) -> AgentResult:
"""运行 Agent"""
# 添加用户消息到会话
self.session.add_message(Message(role="user", content=prompt))
# 工具调用循环
iterations = 0
max_iterations = self.config.max_retries * 3
while iterations < max_iterations:
iterations += 1
try:
# 获取上下文
messages = self.session.get_messages()
# 获取工具定义
tool_defs = self.tools.get_definitions()
# 调用模型
result = await self._run_with_timeout(
messages, tool_defs, on_chunk
)
# 检查是否需要工具调用
if result.tool_calls:
# 执行工具
tool_results = await self._execute_tools(
result.tool_calls, on_tool_call
)
# 添加助手消息和工具结果
self.session.add_message(Message(
role="assistant",
content=result.content,
tool_calls=result.tool_calls,
))
for tr in tool_results:
self.session.add_message(Message(
role="tool",
content=tr.content,
tool_call_id=tr.tool_call_id,
))
continue
# 完成
return result
except Exception as e:
# Failover 处理
handled = await self.failover.handle_error(e)
if not handled:
raise
raise RuntimeError("Max iterations exceeded")
async def _run_with_timeout(
self,
messages: list[Message],
tools: list[ToolDefinition],
on_chunk: Optional[Callable[[str], None]],
) -> AgentResult:
"""带超时的模型调用"""
content = ""
tool_calls = []
usage = {}
async def stream_with_timeout():
nonlocal content, tool_calls, usage
timeout = self.config.timeout_ms / 1000
async for chunk in asyncio.wait_for(
self.provider.stream(messages, tools, model=self.config.model),
timeout=timeout
):
content += chunk.delta
if chunk.tool_calls:
tool_calls.extend(chunk.tool_calls)
if chunk.usage:
usage = chunk.usage
if on_chunk and chunk.delta:
on_chunk(chunk.delta)
await stream_with_timeout()
return AgentResult(
content=content,
tool_calls=tool_calls,
usage=usage,
finish_reason="completed",
)
async def _execute_tools(
self,
tool_calls: list[dict],
on_tool_call: Optional[Callable[[dict], None]],
) -> list[ToolResult]:
"""执行工具调用"""
results = []
for tc in tool_calls:
tool_name = tc["function"]["name"]
tool_args = tc["function"]["arguments"]
if on_tool_call:
on_tool_call(tc)
try:
result = await self.tools.execute(tool_name, tool_args)
results.append(ToolResult(
tool_call_id=tc["id"],
content=result,
is_error=False,
))
except Exception as e:
results.append(ToolResult(
tool_call_id=tc["id"],
content=str(e),
is_error=True,
))
return results
```
### 3.4 工具基类 (tools/base.py)
```python
from abc import ABC, abstractmethod
from typing import Any
from pydantic import BaseModel
import json
class ToolResult:
def __init__(self, content: Any, is_error: bool = False):
self.content = content
self.is_error = is_error
def to_json(self) -> str:
if isinstance(self.content, str):
return self.content
return json.dumps(self.content, ensure_ascii=False, indent=2)
class BaseTool(ABC):
"""工具抽象基类"""
@property
@abstractmethod
def name(self) -> str:
"""工具名称"""
pass
@property
@abstractmethod
def description(self) -> str:
"""工具描述"""
pass
@property
def parameters_schema(self) -> dict:
"""参数 JSON Schema"""
return {}
@abstractmethod
async def execute(self, arguments: dict) -> ToolResult:
"""执行工具"""
pass
def to_definition(self) -> dict:
"""转换为工具定义"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters_schema,
}
}
```
### 3.5 Shell 执行工具 (tools/bash.py)
```python
import asyncio
import subprocess
from typing import Optional
from .base import BaseTool, ToolResult
class BashTool(BaseTool):
"""Shell 命令执行工具"""
@property
def name(self) -> str:
return "bash"
@property
def description(self) -> str:
return "Execute shell commands"
@property
def parameters_schema(self) -> dict:
return {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute",
},
"timeout": {
"type": "number",
"description": "Timeout in seconds",
"default": 30,
},
"cwd": {
"type": "string",
"description": "Working directory",
},
},
"required": ["command"],
}
async def execute(self, arguments: dict) -> ToolResult:
command = arguments["command"]
timeout = arguments.get("timeout", 30)
cwd = arguments.get("cwd")
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
output = stdout.decode("utf-8", errors="replace")
error = stderr.decode("utf-8", errors="replace")
if process.returncode != 0:
return ToolResult(
content=f"Exit code: {process.returncode}\nStdout: {output}\nStderr: {error}",
is_error=True,
)
return ToolResult(content=output or "Command executed successfully")
except asyncio.TimeoutError:
return ToolResult(
content=f"Command timed out after {timeout} seconds",
is_error=True,
)
except Exception as e:
return ToolResult(content=str(e), is_error=True)
```
### 3.6 浏览器工具 (tools/browser.py)
```python
from playwright.async_api import async_playwright, Browser, Page
from .base import BaseTool, ToolResult
from typing import Optional
import base64
class BrowserTool(BaseTool):
"""浏览器自动化工具"""
def __init__(self):
self.browser: Optional[Browser] = None
self.page: Optional[Page] = None
@property
def name(self) -> str:
return "browser"
@property
def description(self) -> str:
return "Control a web browser for navigation, clicking, typing, and screenshots"
@property
def parameters_schema(self) -> dict:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["navigate", "click", "type", "screenshot", "close"],
"description": "Browser action to perform",
},
"url": {
"type": "string",
"description": "URL to navigate to",
},
"selector": {
"type": "string",
"description": "CSS selector for click/type actions",
},
"text": {
"type": "string",
"description": "Text to type",
},
},
"required": ["action"],
}
async def _ensure_browser(self):
if not self.browser:
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(headless=True)
self.page = await self.browser.new_page()
async def execute(self, arguments: dict) -> ToolResult:
action = arguments["action"]
try:
await self._ensure_browser()
if action == "navigate":
url = arguments["url"]
await self.page.goto(url)
return ToolResult(content=f"Navigated to {url}")
elif action == "click":
selector = arguments["selector"]
await self.page.click(selector)
return ToolResult(content=f"Clicked {selector}")
elif action == "type":
selector = arguments["selector"]
text = arguments["text"]
await self.page.fill(selector, text)
return ToolResult(content=f"Typed into {selector}")
elif action == "screenshot":
screenshot = await self.page.screenshot()
b64 = base64.b64encode(screenshot).decode()
return ToolResult(content=f"SCREENSHOT:data:image/png;base64,{b64}")
elif action == "close":
await self.browser.close()
self.browser = None
self.page = None
return ToolResult(content="Browser closed")
else:
return ToolResult(content=f"Unknown action: {action}", is_error=True)
except Exception as e:
return ToolResult(content=str(e), is_error=True)
```
---
## 四、使用示例
### 4.1 基本使用
```python
import asyncio
from pyopenclaw import AgentRunner, AgentConfig
from pyopenclaw.models import OpenAIProvider
from pyopenclaw.tools import ToolRegistry, BashTool, WebFetchTool
async def main():
# 创建模型提供商
provider = OpenAIProvider(api_key="sk-...")
# 注册工具
tools = ToolRegistry()
tools.register(BashTool())
tools.register(WebFetchTool())
# 配置 Agent
config = AgentConfig(
model="gpt-4",
provider="openai",
max_tokens=4096,
)
# 创建 Agent
agent = AgentRunner(provider, tools, config)
# 运行
def on_chunk(text):
print(text, end="", flush=True)
result = await agent.run(
"List files in the current directory",
on_chunk=on_chunk,
)
print(f"\n\nResult: {result.content}")
print(f"Usage: {result.usage}")
asyncio.run(main())
```
### 4.2 自定义工具
```python
from pyopenclaw.tools import BaseTool, ToolResult
class WeatherTool(BaseTool):
@property
def name(self):
return "get_weather"
@property
def description(self):
return "Get current weather for a city"
@property
def parameters_schema(self):
return {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name",
},
},
"required": ["city"],
}
async def execute(self, arguments):
city = arguments["city"]
# 调用天气 API
weather = await fetch_weather(city)
return ToolResult(content=f"Weather in {city}: {weather}")
# 使用
tools.register(WeatherTool())
```
---
## 五、依赖清单
```toml
# pyproject.toml
[project]
name = "pyopenclaw"
version = "0.1.0"
description = "Python implementation of OpenClaw Agent"
requires-python = ">=3.11"
dependencies = [
# 核心依赖
"pydantic>=2.0",
"httpx>=0.25",
"websockets>=12.0",
# 模型 SDK
"openai>=1.0",
"anthropic>=0.25",
"google-generativeai>=0.3",
# 工具依赖
"playwright>=1.40",
"beautifulsoup4>=4.12",
"lxml>=5.0",
# 可选依赖
"tiktoken>=0.5", # Token 计算
"aiofiles>=23.0", # 异步文件操作
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-asyncio>=0.21",
"mypy>=1.0",
"ruff>=0.1",
]
```
---
## 六、与 TypeScript 版本对比
| 功能 | TypeScript 版本 | Python 版本 | 兼容性 |
|------|----------------|-------------|--------|
| Agent 核心运行时 | pi-embedded-runner | agent/runner.py | ✅ 完全兼容 |
| 模型抽象 | @mariozechner/pi-ai | models/ | ✅ API 兼容 |
| 工具系统 | tools/*.ts | tools/*.py | ✅ 功能一致 |
| Gateway 客户端 | acp/client.ts | gateway/client.py | ✅ 协议兼容 |
| 浏览器自动化 | browser-tool.ts | tools/browser.py | ✅ Playwright 通用 |
| Shell 执行 | bash-tools.exec.ts | tools/bash.py | ✅ 功能一致 |
---
## 七、开发计划
### Phase 1: 核心框架 (2周)
- [ ] 模型抽象层 (models/)
- [ ] Agent 运行时 (agent/)
- [ ] 工具基类和注册表 (tools/base.py, tools/registry.py)
### Phase 2: 基础工具 (1周)
- [ ] Shell 执行工具 (tools/bash.py)
- [ ] 文件操作工具
- [ ] 网页抓取工具 (tools/web_fetch.py)
### Phase 3: 高级功能 (2周)
- [ ] 浏览器自动化 (tools/browser.py)
- [ ] 网页搜索 (tools/web_search.py)
- [ ] 记忆管理 (tools/memory.py)
### Phase 4: Gateway 集成 (1周)
- [ ] WebSocket 客户端 (gateway/client.py)
- [ ] ACP 协议支持
- [ ] 事件处理
---
## 八、结论
**Python 实现 OpenClaw Agent 完全可行**,主要优势:
1. **生态成熟**Python AI/ML 生态丰富
2. **开发效率**Python 语法简洁,开发速度快
3. **易于集成**:可与现有 Python 项目无缝集成
4. **学习成本低**Python 开发者众多
**建议**
- 先实现核心功能Agent 运行时 + 基础工具)
- 逐步添加高级功能(浏览器、搜索、记忆)
- 保持与 TypeScript 版本的 API 兼容
---
**报告结束** 🐶