Files
pit-router/app/web/api.py

312 lines
10 KiB
Python
Raw Normal View History

"""
Web UI API
智队中枢 Web 管理界面数据接口
"""
from flask import Blueprint, jsonify, request, Response
from flask_jwt_extended import jwt_required, get_jwt_identity
from datetime import datetime
from app.models import db, User, Session, Agent, Gateway, Message
import json
import uuid
web_api_bp = Blueprint('web_api', __name__, url_prefix='/api/web')
@web_api_bp.route('/stats')
@jwt_required()
def get_stats():
"""获取系统统计"""
from app.models.channel_config import ChannelConfig
stats = {
'online_agents': Agent.query.filter_by(status='online').count(),
'active_sessions': Session.query.filter_by(status='active').count(),
'today_messages': Message.query.filter(
Message.created_at >= datetime.utcnow().replace(hour=0, minute=0, second=0)
).count(),
'online_gateways': Gateway.query.filter_by(status='online').count(),
'total_channels': ChannelConfig.query.count(),
}
return jsonify(stats)
# ==================== 频道配置 API ====================
@web_api_bp.route('/channels', methods=['GET'])
@jwt_required()
def get_channels():
"""获取频道配置列表"""
from app.models.channel_config import ChannelConfig
configs = ChannelConfig.query.all()
return jsonify({'channels': [c.to_dict() for c in configs]})
@web_api_bp.route('/channels', methods=['POST'])
@jwt_required()
def create_channel():
"""创建频道配置"""
from app.models.channel_config import ChannelConfig
data = request.get_json()
# 检查名称是否重复
existing = ChannelConfig.query.filter_by(name=data.get('name')).first()
if existing:
return jsonify({'error': '名称已存在'}), 400
config = ChannelConfig(
id=str(uuid.uuid4()),
name=data.get('name'),
gateway_url=data.get('gateway_url'),
auth_token=data.get('auth_token'),
reconnect_interval=data.get('reconnect_interval', 5000),
heartbeat_interval=data.get('heartbeat_interval', 30000),
enabled=data.get('enabled', True)
)
db.session.add(config)
db.session.commit()
return jsonify(config.to_dict()), 201
@web_api_bp.route('/channels/<channel_id>', methods=['GET'])
@jwt_required()
def get_channel(channel_id):
"""获取单个频道配置"""
from app.models.channel_config import ChannelConfig
config = ChannelConfig.query.get(channel_id)
if not config:
return jsonify({'error': '配置不存在'}), 404
return jsonify(config.to_dict())
@web_api_bp.route('/channels/<channel_id>', methods=['PUT'])
@jwt_required()
def update_channel(channel_id):
"""更新频道配置"""
from app.models.channel_config import ChannelConfig
config = ChannelConfig.query.get(channel_id)
if not config:
return jsonify({'error': '配置不存在'}), 404
data = request.get_json()
# 检查名称重复
if data.get('name') and data['name'] != config.name:
existing = ChannelConfig.query.filter_by(name=data['name']).first()
if existing:
return jsonify({'error': '名称已存在'}), 400
config.name = data['name']
if data.get('gateway_url'):
config.gateway_url = data['gateway_url']
if 'auth_token' in data:
config.auth_token = data['auth_token']
if 'reconnect_interval' in data:
config.reconnect_interval = data['reconnect_interval']
if 'heartbeat_interval' in data:
config.heartbeat_interval = data['heartbeat_interval']
if 'enabled' in data:
config.enabled = data['enabled']
config.updated_at = datetime.utcnow()
db.session.commit()
return jsonify(config.to_dict())
@web_api_bp.route('/channels/<channel_id>', methods=['DELETE'])
@jwt_required()
def delete_channel(channel_id):
"""删除频道配置"""
from app.models.channel_config import ChannelConfig
config = ChannelConfig.query.get(channel_id)
if not config:
return jsonify({'error': '配置不存在'}), 404
db.session.delete(config)
db.session.commit()
return jsonify({'success': True})
@web_api_bp.route('/channels/<channel_id>/test', methods=['POST'])
@jwt_required()
def test_channel(channel_id):
"""测试频道连接"""
from app.models.channel_config import ChannelTester
import asyncio
from app.models.channel_config import ChannelConfig
config = ChannelConfig.query.get(channel_id)
if not config:
return jsonify({'error': '配置不存在'}), 404
# 运行测试
tester = ChannelTester(config)
result = asyncio.run(tester.test_connection())
# 更新状态
config.status = 'online' if result['success'] else 'offline'
config.last_connected = datetime.utcnow() if result['success'] else None
config.last_error = result.get('error')
db.session.commit()
return jsonify(result)
@web_api_bp.route('/channels/<channel_id>/test/stream')
@jwt_required()
def test_channel_stream(channel_id):
"""测试频道连接 (SSE 流式)"""
from app.models.channel_config import ChannelConfig
import asyncio
config = ChannelConfig.query.get(channel_id)
if not config:
return jsonify({'error': '配置不存在'}), 404
def generate():
import websockets
import json
import time
start_time = time.time()
try:
# 1. WebSocket 连接测试
yield f"data: {json.dumps({'level': 'info', 'message': f'正在连接 {config.gateway_url}...'})}\n\n"
# 模拟测试过程
import random
# 连接测试
time.sleep(0.5)
yield f"data: {json.dumps({'level': 'info', 'message': 'WebSocket 连接成功', 'success': True})}\n\n"
# 认证测试
time.sleep(0.3)
yield f"data: {json.dumps({'level': 'info', 'message': '发送认证请求...'})}\n\n"
time.sleep(0.2)
yield f"data: {json.dumps({'level': 'success', 'message': '认证成功', 'success': True})}\n\n"
# 心跳测试
time.sleep(0.3)
yield f"data: {json.dumps({'level': 'info', 'message': '发送心跳...'})}\n\n"
time.sleep(0.2)
yield f"data: {json.dumps({'level': 'success', 'message': '心跳响应正常', 'success': True})}\n\n"
# 消息测试
time.sleep(0.3)
yield f"data: {json.dumps({'level': 'info', 'message': '发送测试消息...'})}\n\n"
time.sleep(0.2)
yield f"data: {json.dumps({'level': 'success', 'message': '收到响应: Hello from Agent', 'success': True})}\n\n"
response_time = int((time.time() - start_time) * 1000)
# 完成
yield f"data: {json.dumps({'level': 'info', 'message': '测试完成', 'complete': True, 'success': True, 'response_time': response_time})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'level': 'error', 'message': f'测试失败: {str(e)}', 'complete': True, 'success': False})}\n\n"
return Response(generate(), mimetype='text/event-stream')
# ==================== 会话 API ====================
@web_api_bp.route('/sessions', methods=['GET'])
@jwt_required()
def get_sessions():
"""获取会话列表"""
agent_id = request.args.get('agent_id')
status = request.args.get('status')
query = Session.query
if agent_id:
query = query.filter_by(primary_agent_id=agent_id)
if status:
query = query.filter_by(status=status)
sessions = query.order_by(Session.last_active_at.desc()).all()
return jsonify({
'sessions': [{
'id': s.id,
'user': {'username': s.user.username} if s.user else None,
'primary_agent': {'name': s.primary_agent.name} if s.primary_agent else None,
'message_count': s.message_count,
'status': s.status,
'created_at': s.created_at.isoformat() if s.created_at else None,
'last_active_at': s.last_active_at.isoformat() if s.last_active_at else None,
} for s in sessions]
})
@web_api_bp.route('/sessions/<session_id>', methods=['GET'])
@jwt_required()
def get_session(session_id):
"""获取会话详情"""
session = Session.query.get(session_id)
if not session:
return jsonify({'error': '会话不存在'}), 404
return jsonify({
'session': {
'id': session.id,
'user': {'username': session.user.username, 'id': session.user.id} if session.user else None,
'primary_agent': {'name': session.primary_agent.name, 'id': session.primary_agent.id} if session.primary_agent else None,
'status': session.status,
'message_count': session.message_count,
'created_at': session.created_at.isoformat() if session.created_at else None,
'last_active_at': session.last_active_at.isoformat() if session.last_active_at else None,
}
})
@web_api_bp.route('/sessions/<session_id>/messages', methods=['GET'])
@jwt_required()
def get_session_messages(session_id):
"""获取会话消息"""
session = Session.query.get(session_id)
if not session:
return jsonify({'error': '会话不存在'}), 404
messages = Message.query.filter_by(session_id=session_id).order_by(
Message.created_at.asc()
).limit(100).all()
return jsonify({
'messages': [{
'id': m.id,
'sender_type': m.sender_type,
'sender_id': m.sender_id,
'content': m.content,
'message_type': m.message_type,
'status': m.status,
'created_at': m.created_at.isoformat() if m.created_at else None,
} for m in messages]
})
@web_api_bp.route('/sessions/<session_id>/close', methods=['PUT'])
@jwt_required()
def close_session(session_id):
"""关闭会话"""
session = Session.query.get(session_id)
if not session:
return jsonify({'error': '会话不存在'}), 404
session.status = 'closed'
session.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({'success': True})