#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ OpenClaw Gateway 连接器 用于连接多个 OpenClaw 实例并转发消息 作者:小白 🐶 """ import websocket import json import threading import time from typing import Dict, Optional, Callable, Any class OpenClawConnector: """OpenClaw Gateway 连接器""" def __init__(self, gateway_url: str, token: str, name: str = "default"): self.gateway_url = gateway_url self.token = token self.name = name self.ws: Optional[websocket.WebSocketApp] = None self.connected = False self.message_callback: Optional[Callable[[str, dict], None]] = None self._reconnect_delay = 1 self._max_reconnect_delay = 30 self._should_reconnect = True def connect(self): """连接到 OpenClaw Gateway""" if not self.gateway_url or not self.token: print(f"[Connector:{self.name}] Missing gateway_url or token") return url = f"{self.gateway_url}?token={self.token}" print(f"[Connector:{self.name}] Connecting to {self.gateway_url}...") try: self.ws = websocket.WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close ) # 启动后台线程 thread = threading.Thread(target=self._run_forever, daemon=True) thread.start() except Exception as e: print(f"[Connector:{self.name}] Connection error: {e}") def _run_forever(self): """后台运行 WebSocket""" while self._should_reconnect: try: self.ws.run_forever() except Exception as e: print(f"[Connector:{self.name}] WebSocket error: {e}") # 重连逻辑 if self._should_reconnect: print(f"[Connector:{self.name}] Reconnecting in {self._reconnect_delay}s...") time.sleep(self._reconnect_delay) self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay) def _on_open(self, ws): self.connected = True self._reconnect_delay = 1 print(f"[Connector:{self.name}] ✅ Connected") def _on_message(self, ws, message): """收到 Gateway 消息""" try: data = json.loads(message) print(f"[Connector:{self.name}] Received: {data.get('method', 'unknown')}") if self.message_callback: self.message_callback(self.name, data) except json.JSONDecodeError: print(f"[Connector:{self.name}] Invalid JSON: {message[:100]}") def _on_error(self, ws, error): print(f"[Connector:{self.name}] Error: {error}") def _on_close(self, ws, close_status_code, close_msg): self.connected = False print(f"[Connector:{self.name}] Disconnected: {close_status_code} {close_msg}") def send_message(self, message: str, session_key: str = None) -> bool: """发送消息到 Gateway""" if not self.connected or not self.ws: print(f"[Connector:{self.name}] Not connected") return False payload = { "method": "agent.turn", "id": str(int(time.time() * 1000)), "params": { "message": message, "sessionKey": session_key or f"webchat:{self.name}", "deliver": False # 不直接发送,等待响应 } } try: self.ws.send(json.dumps(payload)) print(f"[Connector:{self.name}] Sent message: {message[:50]}...") return True except Exception as e: print(f"[Connector:{self.name}] Send error: {e}") return False def disconnect(self): """断开连接""" self._should_reconnect = False if self.ws: self.ws.close() class MultiGatewayManager: """多 Gateway 连接管理器""" def __init__(self): self.gateways: Dict[str, OpenClawConnector] = {} self._lock = threading.Lock() def add_gateway(self, name: str, url: str, token: str) -> OpenClawConnector: """添加 Gateway 连接""" with self._lock: if name in self.gateways: self.gateways[name].disconnect() connector = OpenClawConnector(url, token, name) self.gateways[name] = connector return connector def connect_all(self): """连接所有 Gateway""" for connector in self.gateways.values(): connector.connect() def get_gateway(self, name: str) -> Optional[OpenClawConnector]: return self.gateways.get(name) def list_gateways(self) -> list: return list(self.gateways.keys()) def get_status(self) -> Dict[str, Any]: """获取所有 Gateway 状态""" return { name: { "connected": connector.connected, "url": connector.gateway_url } for name, connector in self.gateways.items() } def disconnect_all(self): """断开所有连接""" for connector in self.gateways.values(): connector.disconnect() self.gateways.clear() # 全局单例 gateway_manager = MultiGatewayManager()