177 lines
5.5 KiB
Python
177 lines
5.5 KiB
Python
|
|
"""
|
|||
|
|
调度器单元测试
|
|||
|
|
"""
|
|||
|
|
import pytest
|
|||
|
|
from app.services.scheduler import (
|
|||
|
|
AgentScheduler,
|
|||
|
|
RoundRobinScheduler,
|
|||
|
|
WeightedRoundRobinScheduler,
|
|||
|
|
LeastConnectionsScheduler,
|
|||
|
|
LeastResponseTimeScheduler,
|
|||
|
|
CapabilityMatchScheduler,
|
|||
|
|
)
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MockAgent:
|
|||
|
|
"""模拟 Agent"""
|
|||
|
|
def __init__(self, id, status='online', weight=10, priority=5,
|
|||
|
|
current_sessions=0, connection_limit=5, capabilities=None,
|
|||
|
|
last_heartbeat=None):
|
|||
|
|
self.id = id
|
|||
|
|
self.status = status
|
|||
|
|
self.weight = weight
|
|||
|
|
self.priority = priority
|
|||
|
|
self.current_sessions = current_sessions
|
|||
|
|
self.connection_limit = connection_limit
|
|||
|
|
self.capabilities = capabilities or []
|
|||
|
|
self.last_heartbeat = last_heartbeat
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestRoundRobinScheduler:
|
|||
|
|
"""轮询调度器测试"""
|
|||
|
|
|
|||
|
|
def test_select_agent(self):
|
|||
|
|
"""测试选择 Agent"""
|
|||
|
|
scheduler = RoundRobinScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1'),
|
|||
|
|
MockAgent(id='agent2'),
|
|||
|
|
MockAgent(id='agent3'),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 轮询选择
|
|||
|
|
agent1 = scheduler.select_agent(agents)
|
|||
|
|
agent2 = scheduler.select_agent(agents)
|
|||
|
|
agent3 = scheduler.select_agent(agents)
|
|||
|
|
agent4 = scheduler.select_agent(agents) # 循环回第一个
|
|||
|
|
|
|||
|
|
assert agent1.id == 'agent1'
|
|||
|
|
assert agent2.id == 'agent2'
|
|||
|
|
assert agent3.id == 'agent3'
|
|||
|
|
assert agent4.id == 'agent1'
|
|||
|
|
|
|||
|
|
def test_no_online_agents(self):
|
|||
|
|
"""测试无在线 Agent"""
|
|||
|
|
scheduler = RoundRobinScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', status='offline'),
|
|||
|
|
MockAgent(id='agent2', status='offline'),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
agent = scheduler.select_agent(agents)
|
|||
|
|
assert agent is None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestWeightedRoundRobinScheduler:
|
|||
|
|
"""加权轮询调度器测试"""
|
|||
|
|
|
|||
|
|
def test_weight_distribution(self):
|
|||
|
|
"""测试权重分布"""
|
|||
|
|
scheduler = WeightedRoundRobinScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', weight=3),
|
|||
|
|
MockAgent(id='agent2', weight=1),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 统计选择次数
|
|||
|
|
counts = {'agent1': 0, 'agent2': 0}
|
|||
|
|
for _ in range(100):
|
|||
|
|
agent = scheduler.select_agent(agents)
|
|||
|
|
counts[agent.id] += 1
|
|||
|
|
|
|||
|
|
# agent1 权重 3,agent2 权重 1,比例应该约 3:1
|
|||
|
|
assert counts['agent1'] > counts['agent2']
|
|||
|
|
|
|||
|
|
def test_zero_weight(self):
|
|||
|
|
"""测试零权重"""
|
|||
|
|
scheduler = WeightedRoundRobinScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', weight=0),
|
|||
|
|
MockAgent(id='agent2', weight=0),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 权重都为 0 时返回第一个
|
|||
|
|
agent = scheduler.select_agent(agents)
|
|||
|
|
assert agent is not None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestLeastConnectionsScheduler:
|
|||
|
|
"""最少连接调度器测试"""
|
|||
|
|
|
|||
|
|
def test_select_least_connections(self):
|
|||
|
|
"""测试选择最少连接"""
|
|||
|
|
scheduler = LeastConnectionsScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', current_sessions=5),
|
|||
|
|
MockAgent(id='agent2', current_sessions=2),
|
|||
|
|
MockAgent(id='agent3', current_sessions=8),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
agent = scheduler.select_agent(agents)
|
|||
|
|
assert agent.id == 'agent2'
|
|||
|
|
|
|||
|
|
def test_all_at_limit(self):
|
|||
|
|
"""测试所有 Agent 都达上限"""
|
|||
|
|
scheduler = LeastConnectionsScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', current_sessions=5, connection_limit=5),
|
|||
|
|
MockAgent(id='agent2', current_sessions=5, connection_limit=5),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
agent = scheduler.select_agent(agents)
|
|||
|
|
assert agent is None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestCapabilityMatchScheduler:
|
|||
|
|
"""能力匹配调度器测试"""
|
|||
|
|
|
|||
|
|
def test_match_capabilities(self):
|
|||
|
|
"""测试能力匹配"""
|
|||
|
|
scheduler = CapabilityMatchScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', capabilities=['chat', 'code']),
|
|||
|
|
MockAgent(id='agent2', capabilities=['chat']),
|
|||
|
|
MockAgent(id='agent3', capabilities=['code', 'translate']),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 需要 code 能力
|
|||
|
|
agent = scheduler.select_agent(agents, {'capabilities': ['code']})
|
|||
|
|
assert agent.id in ['agent1', 'agent3']
|
|||
|
|
|
|||
|
|
def test_no_matching_capabilities(self):
|
|||
|
|
"""测试无匹配能力"""
|
|||
|
|
scheduler = CapabilityMatchScheduler()
|
|||
|
|
agents = [
|
|||
|
|
MockAgent(id='agent1', capabilities=['chat']),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 需要 code 能力但无 Agent 具备
|
|||
|
|
agent = scheduler.select_agent(agents, {'capabilities': ['code']})
|
|||
|
|
# 回退到加权轮询
|
|||
|
|
assert agent is not None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestAgentScheduler:
|
|||
|
|
"""Agent 调度器工厂测试"""
|
|||
|
|
|
|||
|
|
def test_available_strategies(self):
|
|||
|
|
"""测试可用策略"""
|
|||
|
|
strategies = AgentScheduler.get_available_strategies()
|
|||
|
|
|
|||
|
|
assert 'round_robin' in strategies
|
|||
|
|
assert 'weighted_round_robin' in strategies
|
|||
|
|
assert 'least_connections' in strategies
|
|||
|
|
assert 'least_response_time' in strategies
|
|||
|
|
assert 'capability_match' in strategies
|
|||
|
|
|
|||
|
|
def test_default_strategy(self):
|
|||
|
|
"""测试默认策略"""
|
|||
|
|
scheduler = AgentScheduler()
|
|||
|
|
assert scheduler.get_strategy() == 'weighted_round_robin'
|
|||
|
|
|
|||
|
|
def test_custom_strategy(self):
|
|||
|
|
"""测试自定义策略"""
|
|||
|
|
scheduler = AgentScheduler('round_robin')
|
|||
|
|
assert scheduler.get_strategy() == 'round_robin'
|