Files
PIT_Channel/docs/PIT_Channel_Technical_Spec.md

394 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# PIT Channel 插件技术方案
**版本**: v2.1
**创建日期**: 2026-03-14
**变更内容**: 代码实现完成,修复技术方案问题
---
# 1. 概述
## 1.1 目标
开发一个 OpenClaw Channel 插件,使 OpenClaw Agent 能够连接到 PIT Router个人智能体团队路由服务实现多 Agent 协作。
## 1.2 核心功能
- ✅ 连接到 PIT Router WebSocket 服务
- ✅ 接收来自 PIT Router 的消息
- ✅ 发送 Agent 响应到 PIT Router
- ✅ 支持多账户配置
- ✅ 支持消息分块(智能 Markdown 分块)
- ✅ 支持媒体消息
- ✅ Web UI 配置管理
- ✅ 插件版本管理与更新
- ✅ 消息队列与离线重发
- ✅ 心跳超时检测
- ✅ 消息确认机制
- ✅ 指标监控
---
# 2. 架构设计
## 2.1 整体架构
```
┌─────────────────────────────────────────────────────────────────────┐
│ PIT Router (Flask) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ WebSocket Server (Flask-SocketIO) │ │
│ │ - 用户连接管理 │ │
│ │ - 消息路由 │ │
│ │ - Agent 调度 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
│ WebSocket
┌─────────────────────────────────────────────────────────────────────┐
│ OpenClaw Gateway │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ PIT Channel Plugin │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Gateway │ │ Outbound │ │ Config │ │ │
│ │ │ (入站消息) │ │ (出站消息) │ │ (配置管理) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Web UI │ │ Update │ │ Utils │ │ │
│ │ │ (配置界面) │ │ (版本管理) │ │ (工具模块) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
```
## 2.2 消息流
```
用户消息 → PIT Router → Gateway → Channel → Agent
响应 ← PIT Router ← Gateway ← Channel ← Outbound
```
## 2.3 模块依赖
```
┌─────────────┐
│ channel │ ← 主入口,管理 Gateway 映射
├─────────────┤
│ gateway │ ← WebSocket 连接、心跳、重连、消息队列
├─────────────┤
│ outbound │ ← 消息发送、分块
├─────────────┤
│ config │ ← 配置解析、账户管理
├─────────────┤
│ webui │ ← HTTP 路由、API、页面
├─────────────┤
│ update │ ← 版本检查、迁移、回滚
├─────────────┤
│ utils │ ← 日志、指标、队列、分块、负载处理
└─────────────┘
```
---
# 3. 插件结构
## 3.1 目录结构
```
pit-bot-plugin/
├── src/
│ ├── index.ts # 插件入口
│ ├── channel.ts # Channel 实现(主文件)
│ ├── config.ts # 配置处理
│ ├── outbound.ts # 消息发送
│ ├── gateway.ts # WebSocket 连接 + 消息队列
│ ├── types.ts # 类型定义
│ ├── webui/ # Web UI 模块
│ │ ├── index.ts
│ │ ├── routes.ts # HTTP 路由注册
│ │ ├── api.ts # REST API 处理
│ │ └── static.ts # 静态资源服务
│ ├── update/ # 更新模块
│ │ ├── index.ts
│ │ ├── version.ts # 版本检查
│ │ ├── migrate.ts # 配置迁移
│ │ └── rollback.ts # 回滚机制
│ └── utils/
│ ├── index.ts
│ ├── logger.ts # 日志模块
│ ├── metrics.ts # 指标监控
│ ├── queue.ts # 消息队列
│ ├── chunker.ts # 智能分块
│ └── payload.ts # 消息负载处理
├── package.json
├── tsconfig.json
├── CHANGELOG.md
└── openclaw.plugin.json
```
---
# 4. 核心实现
## 4.1 消息队列 (queue.ts)
```typescript
/**
* 消息队列 - 解决离线消息和重发问题
*/
export class MessageQueue {
private queue: Map<string, QueuedMessage> = new Map();
// 入队
enqueue(to: string, content: string | PITRouterMessage): QueuedMessage {
// 队列满时删除最旧消息
if (this.queue.size >= this.options.maxSize) {
const oldest = this.getOldest();
if (oldest) this.queue.delete(oldest.queueId);
}
// ... 添加到队列
}
// 发送失败时增加重试计数
incrementRetry(queueId: string): boolean {
const item = this.queue.get(queueId);
if (!item) return false;
item.retryCount++;
return item.retryCount < item.maxRetries;
}
// 获取待重发的消息
getPending(): QueuedMessage[] {
return Array.from(this.queue.values())
.filter(item => !item.acknowledged && item.retryCount < item.maxRetries);
}
}
```
## 4.2 心跳超时检测 (gateway.ts)
```typescript
/**
* 修复:心跳超时处理
*/
function startHeartbeat(): void {
heartbeatTimer = setInterval(() => {
if (!connected) return;
const pingId = `ping-${Date.now()}`;
send({ type: "request", id: pingId, method: "ping" });
// 设置 pong 响应超时
heartbeatTimeoutTimer = setTimeout(() => {
log.warn("Heartbeat timeout, disconnecting");
ws?.close(); // 超时则断开连接
}, heartbeatTimeout);
}, heartbeatInterval);
}
```
## 4.3 智能分块 (chunker.ts)
```typescript
/**
* 智能 Markdown 分块
* 保护代码块、URL、链接不被拆断
*/
function chunkMarkdown(text: string, limit: number): string[] {
// 1. 识别保护区域代码块、URL、链接
const protectedRanges = findProtectedRanges(text);
// 2. 按优先级分割:段落 → 标题 → 列表 → 句子 → 词 → 强制
let splitAt = findSplitBefore(text, "\n\n", limit, protectedRanges);
if (splitAt > limit * 0.3) return chunks;
// ... 继续其他优先级
}
```
## 4.4 消息确认机制 (gateway.ts)
```typescript
/**
* 发送消息并等待确认
*/
async function sendWithAck(
message: PITRouterMessage,
messageId: string
): Promise<{ success: boolean; messageId?: string; error?: string }> {
// 未连接时入队
if (!connected) {
queue.enqueue(message.params as { to: string; content: string }, message);
return { success: true, messageId, error: "Queued for retry" };
}
// 发送消息
if (!send(message)) {
queue.enqueue(message.params as { to: string; content: string }, message);
return { success: true, messageId, error: "Queued for retry" };
}
// 等待确认(带超时)
return new Promise((resolve) => {
const timeout = setTimeout(() => {
pendingAcks.delete(messageId);
metrics.recordMessageAckTimeout();
resolve({ success: false, messageId, error: "Ack timeout" });
}, ackTimeout);
pendingAcks.set(messageId, {
resolve: () => { clearTimeout(timeout); resolve({ success: true, messageId }); },
reject: (reason) => { clearTimeout(timeout); resolve({ success: false, messageId, error: reason }); },
});
});
}
```
## 4.5 指标监控 (metrics.ts)
```typescript
/**
* 指标收集器
*/
export class MetricsCollector {
private metrics: PITMetrics = {
connectionSuccess: 0,
connectionFailure: 0,
messagesSent: 0,
messagesReceived: 0,
messagesAcked: 0,
messagesAckTimeout: 0,
averageLatency: 0,
queueLength: 0,
updatedAt: Date.now(),
};
private latencySamples: number[] = [];
recordMessageAcked(latency: number): void {
this.metrics.messagesAcked++;
this.latencySamples.push(latency);
if (this.latencySamples.length > 100) this.latencySamples.shift();
this.metrics.averageLatency = this.calculateAverage();
}
}
```
---
# 5. Web UI 配置管理
## 5.1 HTTP 路由
|路由|方法|说明|
|------|----------|-----------------|
|`/plugins/pit-bot`|GET|Web UI 入口|
|`/plugins/pit-bot/api/config`|GET/PUT|配置管理|
|`/plugins/pit-bot/api/status`|GET|获取状态|
|`/plugins/pit-bot/api/metrics`|GET|获取指标|
|`/plugins/pit-bot/api/connect`|POST|连接 PIT Router|
|`/plugins/pit-bot/api/disconnect`|POST|断开连接|
## 5.2 页面功能
- 连接状态显示(实时更新)
- 配置表单编辑Router URL、Token、间隔等
- 连接/断开按钮
- 指标展示(发送/接收计数、队列长度、延迟)
---
# 6. 插件更新机制
## 6.1 版本管理
遵循语义化版本 (SemVer 2.0.0)
## 6.2 配置迁移 (migrate.ts)
```typescript
const configMigrations: Record<string, Migration[]> = {
"1.1.0": [
(config) => ({
...config,
reconnectInterval: config.reconnectInterval ?? 5000,
}),
],
"2.0.0": [
(config) => {
const { routerUrl, ...rest } = config;
return { ...rest, gateway: { url: routerUrl } };
},
],
};
```
## 6.3 回滚机制 (rollback.ts)
```typescript
const configBackups = new Map<string, PITAccountConfig>();
function backupConfig(accountId: string, config: PITAccountConfig): void {
configBackups.set(accountId, { ...config });
}
function restoreConfig(accountId: string): PITAccountConfig | null {
return configBackups.get(accountId) ?? null;
}
```
---
# 7. 配置示例
```yaml
channels:
pit-bot:
enabled: true
routerUrl: "ws://localhost:9000/ws"
authToken: "${PIT_ROUTER_TOKEN}"
reconnectInterval: 5000
heartbeatInterval: 30000
heartbeatTimeout: 10000
ackTimeout: 30000
maxQueueSize: 100
accounts:
work:
enabled: true
routerUrl: "ws://work.example.com:9000/ws"
authToken: "${PIT_WORK_TOKEN}"
name: "工作 Agent"
```
---
# 8. 测试计划
|模块|测试用例|
|------|--------------------|
|chunker|长消息分块、Markdown 保护、URL 不拆断|
|gateway|重连、心跳超时、消息确认、队列重发|
|config|配置解析、环境变量、验证|
|queue|入队、出队、重试、清理|
|metrics|计数、平均延迟计算|
---
# 9. 修复的问题
|问题|修复方案|
|------|----------|
|onMessage 回调未传递|在 GatewayContext 中添加 onMessage 参数|
|心跳无超时处理|添加 heartbeatTimeoutTimer超时断开连接|
|消息分块破坏 URL|智能识别保护区域代码块、URL、链接|
|发送失败无重试|实现 MessageQueue离线消息自动重发|
|无确认机制|sendWithAck + pendingAcks + 超时处理|
|无监控|实现 MetricsCollector 收集各类指标|
|日志不规范|统一 Logger 模块,区分 debug/info/warn/error|
---
*文档版本: v2.1 | 更新日期: 2026-03-14*