Files
my_one_web/api/sessions.py
小白 2f620fa4d0 feat: 实现会话管理功能
- 实现会话列表 API
- 实现会话详情 API
- 实现终止会话 API
- 添加会话管理页面 UI
- 添加会话管理样式
- 添加会话管理交互脚本
2026-03-12 20:56:13 +08:00

93 lines
2.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
会话管理 API
作者:小白 🐶
"""
import json
import subprocess
from flask import jsonify, request
from . import api
@api.route('/sessions')
def get_sessions():
"""获取会话列表"""
try:
# 调用 sessions_list 命令
result = subprocess.run(
['sessions_list'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
data = json.loads(result.stdout)
return jsonify({
'success': True,
'data': data
})
else:
return jsonify({
'success': False,
'error': result.stderr
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@api.route('/sessions/<session_key>')
def get_session_detail(session_key):
"""获取会话详情"""
try:
# 获取会话历史
result = subprocess.run(
['sessions_history', '--session-key', session_key, '--limit', '20'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
data = json.loads(result.stdout)
return jsonify({
'success': True,
'data': data
})
else:
return jsonify({
'success': False,
'error': result.stderr
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@api.route('/sessions/<session_key>/kill', methods=['POST'])
def kill_session(session_key):
"""终止会话"""
try:
# 调用 subagents kill 命令
result = subprocess.run(
['subagents', 'kill', '--target', session_key],
capture_output=True,
text=True,
timeout=10
)
return jsonify({
'success': result.returncode == 0,
'message': '会话已终止' if result.returncode == 0 else '终止失败',
'output': result.stdout or result.stderr
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})