394 lines
13 KiB
Markdown
394 lines
13 KiB
Markdown
# PIT Channel 插件技术方案
|
||
|
||
**版本**: v2.1
|
||
**创建日期**: 2026-03-14
|
||
**变更内容**: 代码实现完成,修复技术方案问题
|
||
|
||
---
|
||
|
||
# 1. 概述
|
||
|
||
## 1.1 目标
|
||
|
||
开发一个 OpenClaw Channel 插件,使 OpenClaw Agent 能够连接到 PIT Router(个人智能体团队路由服务),实现多 Agent 协作。
|
||
|
||
## 1.2 核心功能
|
||
|
||
- ✅ 连接到 PIT Router WebSocket 服务
|
||
- ✅ 接收来自 PIT Router 的消息
|
||
- ✅ 发送 Agent 响应到 PIT Router
|
||
- ✅ 支持多账户配置
|
||
- ✅ 支持消息分块(智能 Markdown 分块)
|
||
- ✅ 支持媒体消息
|
||
- ✅ Web UI 配置管理
|
||
- ✅ 插件版本管理与更新
|
||
- ✅ 消息队列与离线重发
|
||
- ✅ 心跳超时检测
|
||
- ✅ 消息确认机制
|
||
- ✅ 指标监控
|
||
|
||
---
|
||
|
||
# 2. 架构设计
|
||
|
||
## 2.1 整体架构
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────────┐
|
||
│ PIT Router (Flask) │
|
||
│ ┌─────────────────────────────────────────────────────────────┐ │
|
||
│ │ WebSocket Server (Flask-SocketIO) │ │
|
||
│ │ - 用户连接管理 │ │
|
||
│ │ - 消息路由 │ │
|
||
│ │ - Agent 调度 │ │
|
||
│ └─────────────────────────────────────────────────────────────┘ │
|
||
└──────────────────────────┬──────────────────────────────────────┘
|
||
│ WebSocket
|
||
▼
|
||
┌─────────────────────────────────────────────────────────────────────┐
|
||
│ OpenClaw Gateway │
|
||
│ ┌─────────────────────────────────────────────────────────────┐ │
|
||
│ │ PIT Channel Plugin │ │
|
||
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
|
||
│ │ │ Gateway │ │ Outbound │ │ Config │ │ │
|
||
│ │ │ (入站消息) │ │ (出站消息) │ │ (配置管理) │ │ │
|
||
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
|
||
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
|
||
│ │ │ Web UI │ │ Update │ │ Utils │ │ │
|
||
│ │ │ (配置界面) │ │ (版本管理) │ │ (工具模块) │ │ │
|
||
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
|
||
│ └─────────────────────────────────────────────────────────────┘ │
|
||
└─────────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
## 2.2 消息流
|
||
|
||
```
|
||
用户消息 → PIT Router → Gateway → Channel → Agent
|
||
↓
|
||
响应 ← PIT Router ← Gateway ← Channel ← Outbound
|
||
```
|
||
|
||
## 2.3 模块依赖
|
||
|
||
```
|
||
┌─────────────┐
|
||
│ channel │ ← 主入口,管理 Gateway 映射
|
||
├─────────────┤
|
||
│ gateway │ ← WebSocket 连接、心跳、重连、消息队列
|
||
├─────────────┤
|
||
│ outbound │ ← 消息发送、分块
|
||
├─────────────┤
|
||
│ config │ ← 配置解析、账户管理
|
||
├─────────────┤
|
||
│ webui │ ← HTTP 路由、API、页面
|
||
├─────────────┤
|
||
│ update │ ← 版本检查、迁移、回滚
|
||
├─────────────┤
|
||
│ utils │ ← 日志、指标、队列、分块、负载处理
|
||
└─────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
# 3. 插件结构
|
||
|
||
## 3.1 目录结构
|
||
|
||
```
|
||
pit-bot-plugin/
|
||
├── src/
|
||
│ ├── index.ts # 插件入口
|
||
│ ├── channel.ts # Channel 实现(主文件)
|
||
│ ├── config.ts # 配置处理
|
||
│ ├── outbound.ts # 消息发送
|
||
│ ├── gateway.ts # WebSocket 连接 + 消息队列
|
||
│ ├── types.ts # 类型定义
|
||
│ ├── webui/ # Web UI 模块
|
||
│ │ ├── index.ts
|
||
│ │ ├── routes.ts # HTTP 路由注册
|
||
│ │ ├── api.ts # REST API 处理
|
||
│ │ └── static.ts # 静态资源服务
|
||
│ ├── update/ # 更新模块
|
||
│ │ ├── index.ts
|
||
│ │ ├── version.ts # 版本检查
|
||
│ │ ├── migrate.ts # 配置迁移
|
||
│ │ └── rollback.ts # 回滚机制
|
||
│ └── utils/
|
||
│ ├── index.ts
|
||
│ ├── logger.ts # 日志模块
|
||
│ ├── metrics.ts # 指标监控
|
||
│ ├── queue.ts # 消息队列
|
||
│ ├── chunker.ts # 智能分块
|
||
│ └── payload.ts # 消息负载处理
|
||
├── package.json
|
||
├── tsconfig.json
|
||
├── CHANGELOG.md
|
||
└── openclaw.plugin.json
|
||
```
|
||
|
||
---
|
||
|
||
# 4. 核心实现
|
||
|
||
## 4.1 消息队列 (queue.ts)
|
||
|
||
```typescript
|
||
/**
|
||
* 消息队列 - 解决离线消息和重发问题
|
||
*/
|
||
export class MessageQueue {
|
||
private queue: Map<string, QueuedMessage> = new Map();
|
||
|
||
// 入队
|
||
enqueue(to: string, content: string | PITRouterMessage): QueuedMessage {
|
||
// 队列满时删除最旧消息
|
||
if (this.queue.size >= this.options.maxSize) {
|
||
const oldest = this.getOldest();
|
||
if (oldest) this.queue.delete(oldest.queueId);
|
||
}
|
||
// ... 添加到队列
|
||
}
|
||
|
||
// 发送失败时增加重试计数
|
||
incrementRetry(queueId: string): boolean {
|
||
const item = this.queue.get(queueId);
|
||
if (!item) return false;
|
||
item.retryCount++;
|
||
return item.retryCount < item.maxRetries;
|
||
}
|
||
|
||
// 获取待重发的消息
|
||
getPending(): QueuedMessage[] {
|
||
return Array.from(this.queue.values())
|
||
.filter(item => !item.acknowledged && item.retryCount < item.maxRetries);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 4.2 心跳超时检测 (gateway.ts)
|
||
|
||
```typescript
|
||
/**
|
||
* 修复:心跳超时处理
|
||
*/
|
||
function startHeartbeat(): void {
|
||
heartbeatTimer = setInterval(() => {
|
||
if (!connected) return;
|
||
|
||
const pingId = `ping-${Date.now()}`;
|
||
send({ type: "request", id: pingId, method: "ping" });
|
||
|
||
// 设置 pong 响应超时
|
||
heartbeatTimeoutTimer = setTimeout(() => {
|
||
log.warn("Heartbeat timeout, disconnecting");
|
||
ws?.close(); // 超时则断开连接
|
||
}, heartbeatTimeout);
|
||
}, heartbeatInterval);
|
||
}
|
||
```
|
||
|
||
## 4.3 智能分块 (chunker.ts)
|
||
|
||
```typescript
|
||
/**
|
||
* 智能 Markdown 分块
|
||
* 保护代码块、URL、链接不被拆断
|
||
*/
|
||
function chunkMarkdown(text: string, limit: number): string[] {
|
||
// 1. 识别保护区域(代码块、URL、链接)
|
||
const protectedRanges = findProtectedRanges(text);
|
||
|
||
// 2. 按优先级分割:段落 → 标题 → 列表 → 句子 → 词 → 强制
|
||
let splitAt = findSplitBefore(text, "\n\n", limit, protectedRanges);
|
||
if (splitAt > limit * 0.3) return chunks;
|
||
|
||
// ... 继续其他优先级
|
||
}
|
||
```
|
||
|
||
## 4.4 消息确认机制 (gateway.ts)
|
||
|
||
```typescript
|
||
/**
|
||
* 发送消息并等待确认
|
||
*/
|
||
async function sendWithAck(
|
||
message: PITRouterMessage,
|
||
messageId: string
|
||
): Promise<{ success: boolean; messageId?: string; error?: string }> {
|
||
// 未连接时入队
|
||
if (!connected) {
|
||
queue.enqueue(message.params as { to: string; content: string }, message);
|
||
return { success: true, messageId, error: "Queued for retry" };
|
||
}
|
||
|
||
// 发送消息
|
||
if (!send(message)) {
|
||
queue.enqueue(message.params as { to: string; content: string }, message);
|
||
return { success: true, messageId, error: "Queued for retry" };
|
||
}
|
||
|
||
// 等待确认(带超时)
|
||
return new Promise((resolve) => {
|
||
const timeout = setTimeout(() => {
|
||
pendingAcks.delete(messageId);
|
||
metrics.recordMessageAckTimeout();
|
||
resolve({ success: false, messageId, error: "Ack timeout" });
|
||
}, ackTimeout);
|
||
|
||
pendingAcks.set(messageId, {
|
||
resolve: () => { clearTimeout(timeout); resolve({ success: true, messageId }); },
|
||
reject: (reason) => { clearTimeout(timeout); resolve({ success: false, messageId, error: reason }); },
|
||
});
|
||
});
|
||
}
|
||
```
|
||
|
||
## 4.5 指标监控 (metrics.ts)
|
||
|
||
```typescript
|
||
/**
|
||
* 指标收集器
|
||
*/
|
||
export class MetricsCollector {
|
||
private metrics: PITMetrics = {
|
||
connectionSuccess: 0,
|
||
connectionFailure: 0,
|
||
messagesSent: 0,
|
||
messagesReceived: 0,
|
||
messagesAcked: 0,
|
||
messagesAckTimeout: 0,
|
||
averageLatency: 0,
|
||
queueLength: 0,
|
||
updatedAt: Date.now(),
|
||
};
|
||
|
||
private latencySamples: number[] = [];
|
||
|
||
recordMessageAcked(latency: number): void {
|
||
this.metrics.messagesAcked++;
|
||
this.latencySamples.push(latency);
|
||
if (this.latencySamples.length > 100) this.latencySamples.shift();
|
||
this.metrics.averageLatency = this.calculateAverage();
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
# 5. Web UI 配置管理
|
||
|
||
## 5.1 HTTP 路由
|
||
|
||
|路由|方法|说明|
|
||
|------|----------|-----------------|
|
||
|`/plugins/pit-bot`|GET|Web UI 入口|
|
||
|`/plugins/pit-bot/api/config`|GET/PUT|配置管理|
|
||
|`/plugins/pit-bot/api/status`|GET|获取状态|
|
||
|`/plugins/pit-bot/api/metrics`|GET|获取指标|
|
||
|`/plugins/pit-bot/api/connect`|POST|连接 PIT Router|
|
||
|`/plugins/pit-bot/api/disconnect`|POST|断开连接|
|
||
|
||
## 5.2 页面功能
|
||
|
||
- 连接状态显示(实时更新)
|
||
- 配置表单编辑(Router URL、Token、间隔等)
|
||
- 连接/断开按钮
|
||
- 指标展示(发送/接收计数、队列长度、延迟)
|
||
|
||
---
|
||
|
||
# 6. 插件更新机制
|
||
|
||
## 6.1 版本管理
|
||
|
||
遵循语义化版本 (SemVer 2.0.0)
|
||
|
||
## 6.2 配置迁移 (migrate.ts)
|
||
|
||
```typescript
|
||
const configMigrations: Record<string, Migration[]> = {
|
||
"1.1.0": [
|
||
(config) => ({
|
||
...config,
|
||
reconnectInterval: config.reconnectInterval ?? 5000,
|
||
}),
|
||
],
|
||
"2.0.0": [
|
||
(config) => {
|
||
const { routerUrl, ...rest } = config;
|
||
return { ...rest, gateway: { url: routerUrl } };
|
||
},
|
||
],
|
||
};
|
||
```
|
||
|
||
## 6.3 回滚机制 (rollback.ts)
|
||
|
||
```typescript
|
||
const configBackups = new Map<string, PITAccountConfig>();
|
||
|
||
function backupConfig(accountId: string, config: PITAccountConfig): void {
|
||
configBackups.set(accountId, { ...config });
|
||
}
|
||
|
||
function restoreConfig(accountId: string): PITAccountConfig | null {
|
||
return configBackups.get(accountId) ?? null;
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
# 7. 配置示例
|
||
|
||
```yaml
|
||
channels:
|
||
pit-bot:
|
||
enabled: true
|
||
routerUrl: "ws://localhost:9000/ws"
|
||
authToken: "${PIT_ROUTER_TOKEN}"
|
||
reconnectInterval: 5000
|
||
heartbeatInterval: 30000
|
||
heartbeatTimeout: 10000
|
||
ackTimeout: 30000
|
||
maxQueueSize: 100
|
||
|
||
accounts:
|
||
work:
|
||
enabled: true
|
||
routerUrl: "ws://work.example.com:9000/ws"
|
||
authToken: "${PIT_WORK_TOKEN}"
|
||
name: "工作 Agent"
|
||
```
|
||
|
||
---
|
||
|
||
# 8. 测试计划
|
||
|
||
|模块|测试用例|
|
||
|------|--------------------|
|
||
|chunker|长消息分块、Markdown 保护、URL 不拆断|
|
||
|gateway|重连、心跳超时、消息确认、队列重发|
|
||
|config|配置解析、环境变量、验证|
|
||
|queue|入队、出队、重试、清理|
|
||
|metrics|计数、平均延迟计算|
|
||
|
||
---
|
||
|
||
# 9. 修复的问题
|
||
|
||
|问题|修复方案|
|
||
|------|----------|
|
||
|onMessage 回调未传递|在 GatewayContext 中添加 onMessage 参数|
|
||
|心跳无超时处理|添加 heartbeatTimeoutTimer,超时断开连接|
|
||
|消息分块破坏 URL|智能识别保护区域(代码块、URL、链接)|
|
||
|发送失败无重试|实现 MessageQueue,离线消息自动重发|
|
||
|无确认机制|sendWithAck + pendingAcks + 超时处理|
|
||
|无监控|实现 MetricsCollector 收集各类指标|
|
||
|日志不规范|统一 Logger 模块,区分 debug/info/warn/error|
|
||
|
||
---
|
||
|
||
*文档版本: v2.1 | 更新日期: 2026-03-14*
|