""" 频道配置模型 智队频道插件配置 """ from app.extensions import db from datetime import datetime import uuid class ChannelConfig(db.Model): """智队频道插件配置""" __tablename__ = 'channel_configs' id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) name = db.Column(db.String(80), nullable=False, unique=True) gateway_url = db.Column(db.String(256), nullable=False) auth_token = db.Column(db.String(256), nullable=True) reconnect_interval = db.Column(db.Integer, default=5000) heartbeat_interval = db.Column(db.Integer, default=30000) enabled = db.Column(db.Boolean, default=True) # 状态 status = db.Column(db.String(20), default='offline') last_connected = db.Column(db.DateTime, nullable=True) last_error = db.Column(db.Text, nullable=True) # 时间戳 created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def to_dict(self): return { 'id': self.id, 'name': self.name, 'gateway_url': self.gateway_url, 'auth_token': '••••••••' if self.auth_token else None, 'reconnect_interval': self.reconnect_interval, 'heartbeat_interval': self.heartbeat_interval, 'enabled': self.enabled, 'status': self.status, 'last_connected': self.last_connected.isoformat() if self.last_connected else None, 'created_at': self.created_at.isoformat() if self.created_at else None, 'updated_at': self.updated_at.isoformat() if self.updated_at else None, } class ChannelTester: """频道连接测试器""" def __init__(self, config): self.config = config self.results = [] self.start_time = None self.ws = None async def test_connection(self): """执行连接测试""" import time import asyncio self.start_time = time.time() self.results = [] try: # 1. WebSocket 连接测试 await self._test_websocket() # 2. 认证测试 await self._test_auth() # 3. 心跳测试 await self._test_heartbeat() # 4. 消息收发测试 await self._test_message() except Exception as e: self._log('ERROR', str(e), success=False) return { 'success': all(r['success'] for r in self.results), 'results': self.results, 'response_time': int((time.time() - self.start_time) * 1000), 'tested_at': datetime.utcnow().isoformat() } async def _test_websocket(self): """测试 WebSocket 连接""" import websockets self._log('INFO', f'正在连接 {self.config.gateway_url}...') try: self.ws = await websockets.connect( self.config.gateway_url, extra_headers={'Authorization': f'Bearer {self.config.auth_token}'} if self.config.auth_token else {} ) self._log('INFO', 'WebSocket 连接已建立', success=True) except Exception as e: self._log('ERROR', f'连接失败: {e}', success=False) raise async def _test_auth(self): """测试认证""" import asyncio self._log('INFO', '发送认证请求...') try: # 发送认证消息 await self.ws.send(json.dumps({ 'type': 'auth', 'token': self.config.auth_token })) # 等待响应 response = await asyncio.wait_for(self.ws.recv(), timeout=10) data = json.loads(response) if data.get('type') == 'auth_success': self._log('INFO', f"认证成功: agent_id={data.get('agent_id')}", success=True) else: self._log('ERROR', f"认证失败: {data}", success=False) raise Exception('认证失败') except Exception as e: self._log('ERROR', f'认证错误: {e}', success=False) raise async def _test_heartbeat(self): """测试心跳""" import asyncio self._log('INFO', '发送心跳...') try: await self.ws.send(json.dumps({'type': 'ping'})) response = await asyncio.wait_for(self.ws.recv(), timeout=10) data = json.loads(response) if data.get('type') == 'pong': self._log('INFO', '心跳响应: pong', success=True) else: self._log('WARNING', f'心跳响应异常: {data}', success=True) except Exception as e: self._log('ERROR', f'心跳错误: {e}', success=False) async def _test_message(self): """测试消息收发""" import asyncio self._log('INFO', '发送测试消息...') try: test_msg = { 'type': 'message', 'content': '__TEST__' } await self.ws.send(json.dumps(test_msg)) response = await asyncio.wait_for(self.ws.recv(), timeout=30) data = json.loads(response) self._log('INFO', f"收到响应: {data.get('content', data)[:50]}", success=True) except Exception as e: self._log('WARNING', f'消息测试跳过: {e}', success=True) await self.ws.close() self._log('INFO', '测试完成') def _log(self, level, message, success=None): """记录日志""" entry = { 'level': level, 'message': message, 'timestamp': datetime.utcnow().isoformat(), 'success': success if success is not None else level == 'INFO' } self.results.append(entry) # 需要导入 json import json