Files
PIT_Channel/docs/PIT_Channel_Technical_Spec.md

13 KiB
Raw Blame History

PIT Channel 插件技术方案

版本: v2.1
创建日期: 2026-03-14
变更内容: 代码实现完成,修复技术方案问题


1. 概述

1.1 目标

开发一个 OpenClaw Channel 插件,使 OpenClaw Agent 能够连接到 PIT Router个人智能体团队路由服务实现多 Agent 协作。

1.2 核心功能

  • 连接到 PIT Router WebSocket 服务
  • 接收来自 PIT Router 的消息
  • 发送 Agent 响应到 PIT Router
  • 支持多账户配置
  • 支持消息分块(智能 Markdown 分块)
  • 支持媒体消息
  • Web UI 配置管理
  • 插件版本管理与更新
  • 消息队列与离线重发
  • 心跳超时检测
  • 消息确认机制
  • 指标监控

2. 架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────────────┐
│                         PIT Router (Flask)                          │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  WebSocket Server (Flask-SocketIO)                          │   │
│  │  - 用户连接管理                                              │   │
│  │  - 消息路由                                                  │   │
│  │  - Agent 调度                                                │   │
│  └─────────────────────────────────────────────────────────────┘   │
└──────────────────────────┬──────────────────────────────────────┘
                           │ WebSocket
                           ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      OpenClaw Gateway                                │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  PIT Channel Plugin                                         │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │   │
│  │  │  Gateway    │  │  Outbound   │  │   Config    │          │   │
│  │  │  (入站消息) │  │  (出站消息) │  │  (配置管理) │          │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘          │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │   │
│  │  │  Web UI     │  │  Update     │  │   Utils     │          │   │
│  │  │  (配置界面) │  │  (版本管理) │  │  (工具模块) │          │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘          │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

2.2 消息流

用户消息 → PIT Router → Gateway → Channel → Agent
                                        ↓
响应 ← PIT Router ← Gateway ← Channel ← Outbound

2.3 模块依赖

┌─────────────┐
│   channel   │  ← 主入口,管理 Gateway 映射
├─────────────┤
│   gateway   │  ← WebSocket 连接、心跳、重连、消息队列
├─────────────┤
│  outbound   │  ← 消息发送、分块
├─────────────┤
│   config    │  ← 配置解析、账户管理
├─────────────┤
│   webui    │  ← HTTP 路由、API、页面
├─────────────┤
│   update   │  ← 版本检查、迁移、回滚
├─────────────┤
│   utils    │  ← 日志、指标、队列、分块、负载处理
└─────────────┘

3. 插件结构

3.1 目录结构

pit-bot-plugin/
├── src/
│   ├── index.ts              # 插件入口
│   ├── channel.ts            # Channel 实现(主文件)
│   ├── config.ts             # 配置处理
│   ├── outbound.ts           # 消息发送
│   ├── gateway.ts            # WebSocket 连接 + 消息队列
│   ├── types.ts              # 类型定义
│   ├── webui/                # Web UI 模块
│   │   ├── index.ts
│   │   ├── routes.ts         # HTTP 路由注册
│   │   ├── api.ts            # REST API 处理
│   │   └── static.ts         # 静态资源服务
│   ├── update/               # 更新模块
│   │   ├── index.ts
│   │   ├── version.ts        # 版本检查
│   │   ├── migrate.ts        # 配置迁移
│   │   └── rollback.ts       # 回滚机制
│   └── utils/
│       ├── index.ts
│       ├── logger.ts          # 日志模块
│       ├── metrics.ts        # 指标监控
│       ├── queue.ts          # 消息队列
│       ├── chunker.ts        # 智能分块
│       └── payload.ts        # 消息负载处理
├── package.json
├── tsconfig.json
├── CHANGELOG.md
└── openclaw.plugin.json

4. 核心实现

4.1 消息队列 (queue.ts)

/**
 * 消息队列 - 解决离线消息和重发问题
 */
export class MessageQueue {
  private queue: Map<string, QueuedMessage> = new Map();
  
  // 入队
  enqueue(to: string, content: string | PITRouterMessage): QueuedMessage {
    // 队列满时删除最旧消息
    if (this.queue.size >= this.options.maxSize) {
      const oldest = this.getOldest();
      if (oldest) this.queue.delete(oldest.queueId);
    }
    // ... 添加到队列
  }
  
  // 发送失败时增加重试计数
  incrementRetry(queueId: string): boolean {
    const item = this.queue.get(queueId);
    if (!item) return false;
    item.retryCount++;
    return item.retryCount < item.maxRetries;
  }
  
  // 获取待重发的消息
  getPending(): QueuedMessage[] {
    return Array.from(this.queue.values())
      .filter(item => !item.acknowledged && item.retryCount < item.maxRetries);
  }
}

4.2 心跳超时检测 (gateway.ts)

/**
 * 修复:心跳超时处理
 */
function startHeartbeat(): void {
  heartbeatTimer = setInterval(() => {
    if (!connected) return;
    
    const pingId = `ping-${Date.now()}`;
    send({ type: "request", id: pingId, method: "ping" });
    
    // 设置 pong 响应超时
    heartbeatTimeoutTimer = setTimeout(() => {
      log.warn("Heartbeat timeout, disconnecting");
      ws?.close();  // 超时则断开连接
    }, heartbeatTimeout);
  }, heartbeatInterval);
}

4.3 智能分块 (chunker.ts)

/**
 * 智能 Markdown 分块
 * 保护代码块、URL、链接不被拆断
 */
function chunkMarkdown(text: string, limit: number): string[] {
  // 1. 识别保护区域代码块、URL、链接
  const protectedRanges = findProtectedRanges(text);
  
  // 2. 按优先级分割:段落 → 标题 → 列表 → 句子 → 词 → 强制
  let splitAt = findSplitBefore(text, "\n\n", limit, protectedRanges);
  if (splitAt > limit * 0.3) return chunks;
  
  // ... 继续其他优先级
}

4.4 消息确认机制 (gateway.ts)

/**
 * 发送消息并等待确认
 */
async function sendWithAck(
  message: PITRouterMessage, 
  messageId: string
): Promise<{ success: boolean; messageId?: string; error?: string }> {
  // 未连接时入队
  if (!connected) {
    queue.enqueue(message.params as { to: string; content: string }, message);
    return { success: true, messageId, error: "Queued for retry" };
  }
  
  // 发送消息
  if (!send(message)) {
    queue.enqueue(message.params as { to: string; content: string }, message);
    return { success: true, messageId, error: "Queued for retry" };
  }
  
  // 等待确认(带超时)
  return new Promise((resolve) => {
    const timeout = setTimeout(() => {
      pendingAcks.delete(messageId);
      metrics.recordMessageAckTimeout();
      resolve({ success: false, messageId, error: "Ack timeout" });
    }, ackTimeout);
    
    pendingAcks.set(messageId, {
      resolve: () => { clearTimeout(timeout); resolve({ success: true, messageId }); },
      reject: (reason) => { clearTimeout(timeout); resolve({ success: false, messageId, error: reason }); },
    });
  });
}

4.5 指标监控 (metrics.ts)

/**
 * 指标收集器
 */
export class MetricsCollector {
  private metrics: PITMetrics = {
    connectionSuccess: 0,
    connectionFailure: 0,
    messagesSent: 0,
    messagesReceived: 0,
    messagesAcked: 0,
    messagesAckTimeout: 0,
    averageLatency: 0,
    queueLength: 0,
    updatedAt: Date.now(),
  };
  
  private latencySamples: number[] = [];
  
  recordMessageAcked(latency: number): void {
    this.metrics.messagesAcked++;
    this.latencySamples.push(latency);
    if (this.latencySamples.length > 100) this.latencySamples.shift();
    this.metrics.averageLatency = this.calculateAverage();
  }
}

5. Web UI 配置管理

5.1 HTTP 路由

路由 方法 说明
/plugins/pit-bot GET Web UI 入口
/plugins/pit-bot/api/config GET/PUT 配置管理
/plugins/pit-bot/api/status GET 获取状态
/plugins/pit-bot/api/metrics GET 获取指标
/plugins/pit-bot/api/connect POST 连接 PIT Router
/plugins/pit-bot/api/disconnect POST 断开连接

5.2 页面功能

  • 连接状态显示(实时更新)
  • 配置表单编辑Router URL、Token、间隔等
  • 连接/断开按钮
  • 指标展示(发送/接收计数、队列长度、延迟)

6. 插件更新机制

6.1 版本管理

遵循语义化版本 (SemVer 2.0.0)

6.2 配置迁移 (migrate.ts)

const configMigrations: Record<string, Migration[]> = {
  "1.1.0": [
    (config) => ({
      ...config,
      reconnectInterval: config.reconnectInterval ?? 5000,
    }),
  ],
  "2.0.0": [
    (config) => {
      const { routerUrl, ...rest } = config;
      return { ...rest, gateway: { url: routerUrl } };
    },
  ],
};

6.3 回滚机制 (rollback.ts)

const configBackups = new Map<string, PITAccountConfig>();

function backupConfig(accountId: string, config: PITAccountConfig): void {
  configBackups.set(accountId, { ...config });
}

function restoreConfig(accountId: string): PITAccountConfig | null {
  return configBackups.get(accountId) ?? null;
}

7. 配置示例

channels:
  pit-bot:
    enabled: true
    routerUrl: "ws://localhost:9000/ws"
    authToken: "${PIT_ROUTER_TOKEN}"
    reconnectInterval: 5000
    heartbeatInterval: 30000
    heartbeatTimeout: 10000
    ackTimeout: 30000
    maxQueueSize: 100
    
    accounts:
      work:
        enabled: true
        routerUrl: "ws://work.example.com:9000/ws"
        authToken: "${PIT_WORK_TOKEN}"
        name: "工作 Agent"

8. 测试计划

模块 测试用例
chunker 长消息分块、Markdown 保护、URL 不拆断
gateway 重连、心跳超时、消息确认、队列重发
config 配置解析、环境变量、验证
queue 入队、出队、重试、清理
metrics 计数、平均延迟计算

9. 修复的问题

问题 修复方案
onMessage 回调未传递 在 GatewayContext 中添加 onMessage 参数
心跳无超时处理 添加 heartbeatTimeoutTimer超时断开连接
消息分块破坏 URL 智能识别保护区域代码块、URL、链接
发送失败无重试 实现 MessageQueue离线消息自动重发
无确认机制 sendWithAck + pendingAcks + 超时处理
无监控 实现 MetricsCollector 收集各类指标
日志不规范 统一 Logger 模块,区分 debug/info/warn/error

文档版本: v2.1 | 更新日期: 2026-03-14