Files
pit-router/app/models/channel_config.py

186 lines
6.1 KiB
Python

"""
频道配置模型
智队频道插件配置
"""
from app.extensions import db
from datetime import datetime
import uuid
class ChannelConfig(db.Model):
"""智队频道插件配置"""
__tablename__ = 'channel_configs'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name = db.Column(db.String(80), nullable=False, unique=True)
gateway_url = db.Column(db.String(256), nullable=False)
auth_token = db.Column(db.String(256), nullable=True)
reconnect_interval = db.Column(db.Integer, default=5000)
heartbeat_interval = db.Column(db.Integer, default=30000)
enabled = db.Column(db.Boolean, default=True)
# 状态
status = db.Column(db.String(20), default='offline')
last_connected = db.Column(db.DateTime, nullable=True)
last_error = db.Column(db.Text, nullable=True)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'gateway_url': self.gateway_url,
'auth_token': '••••••••' if self.auth_token else None,
'reconnect_interval': self.reconnect_interval,
'heartbeat_interval': self.heartbeat_interval,
'enabled': self.enabled,
'status': self.status,
'last_connected': self.last_connected.isoformat() if self.last_connected else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
}
class ChannelTester:
"""频道连接测试器"""
def __init__(self, config):
self.config = config
self.results = []
self.start_time = None
self.ws = None
async def test_connection(self):
"""执行连接测试"""
import time
import asyncio
self.start_time = time.time()
self.results = []
try:
# 1. WebSocket 连接测试
await self._test_websocket()
# 2. 认证测试
await self._test_auth()
# 3. 心跳测试
await self._test_heartbeat()
# 4. 消息收发测试
await self._test_message()
except Exception as e:
self._log('ERROR', str(e), success=False)
return {
'success': all(r['success'] for r in self.results),
'results': self.results,
'response_time': int((time.time() - self.start_time) * 1000),
'tested_at': datetime.utcnow().isoformat()
}
async def _test_websocket(self):
"""测试 WebSocket 连接"""
import websockets
self._log('INFO', f'正在连接 {self.config.gateway_url}...')
try:
self.ws = await websockets.connect(
self.config.gateway_url,
extra_headers={'Authorization': f'Bearer {self.config.auth_token}'} if self.config.auth_token else {}
)
self._log('INFO', 'WebSocket 连接已建立', success=True)
except Exception as e:
self._log('ERROR', f'连接失败: {e}', success=False)
raise
async def _test_auth(self):
"""测试认证"""
import asyncio
self._log('INFO', '发送认证请求...')
try:
# 发送认证消息
await self.ws.send(json.dumps({
'type': 'auth',
'token': self.config.auth_token
}))
# 等待响应
response = await asyncio.wait_for(self.ws.recv(), timeout=10)
data = json.loads(response)
if data.get('type') == 'auth_success':
self._log('INFO', f"认证成功: agent_id={data.get('agent_id')}", success=True)
else:
self._log('ERROR', f"认证失败: {data}", success=False)
raise Exception('认证失败')
except Exception as e:
self._log('ERROR', f'认证错误: {e}', success=False)
raise
async def _test_heartbeat(self):
"""测试心跳"""
import asyncio
self._log('INFO', '发送心跳...')
try:
await self.ws.send(json.dumps({'type': 'ping'}))
response = await asyncio.wait_for(self.ws.recv(), timeout=10)
data = json.loads(response)
if data.get('type') == 'pong':
self._log('INFO', '心跳响应: pong', success=True)
else:
self._log('WARNING', f'心跳响应异常: {data}', success=True)
except Exception as e:
self._log('ERROR', f'心跳错误: {e}', success=False)
async def _test_message(self):
"""测试消息收发"""
import asyncio
self._log('INFO', '发送测试消息...')
try:
test_msg = {
'type': 'message',
'content': '__TEST__'
}
await self.ws.send(json.dumps(test_msg))
response = await asyncio.wait_for(self.ws.recv(), timeout=30)
data = json.loads(response)
self._log('INFO', f"收到响应: {data.get('content', data)[:50]}", success=True)
except Exception as e:
self._log('WARNING', f'消息测试跳过: {e}', success=True)
await self.ws.close()
self._log('INFO', '测试完成')
def _log(self, level, message, success=None):
"""记录日志"""
entry = {
'level': level,
'message': message,
'timestamp': datetime.utcnow().isoformat(),
'success': success if success is not None else level == 'INFO'
}
self.results.append(entry)
# 需要导入 json
import json