177 lines
5.5 KiB
Python
177 lines
5.5 KiB
Python
"""
|
||
调度器单元测试
|
||
"""
|
||
import pytest
|
||
from app.services.scheduler import (
|
||
AgentScheduler,
|
||
RoundRobinScheduler,
|
||
WeightedRoundRobinScheduler,
|
||
LeastConnectionsScheduler,
|
||
LeastResponseTimeScheduler,
|
||
CapabilityMatchScheduler,
|
||
)
|
||
from datetime import datetime, timedelta
|
||
|
||
|
||
class MockAgent:
|
||
"""模拟 Agent"""
|
||
def __init__(self, id, status='online', weight=10, priority=5,
|
||
current_sessions=0, connection_limit=5, capabilities=None,
|
||
last_heartbeat=None):
|
||
self.id = id
|
||
self.status = status
|
||
self.weight = weight
|
||
self.priority = priority
|
||
self.current_sessions = current_sessions
|
||
self.connection_limit = connection_limit
|
||
self.capabilities = capabilities or []
|
||
self.last_heartbeat = last_heartbeat
|
||
|
||
|
||
class TestRoundRobinScheduler:
|
||
"""轮询调度器测试"""
|
||
|
||
def test_select_agent(self):
|
||
"""测试选择 Agent"""
|
||
scheduler = RoundRobinScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1'),
|
||
MockAgent(id='agent2'),
|
||
MockAgent(id='agent3'),
|
||
]
|
||
|
||
# 轮询选择
|
||
agent1 = scheduler.select_agent(agents)
|
||
agent2 = scheduler.select_agent(agents)
|
||
agent3 = scheduler.select_agent(agents)
|
||
agent4 = scheduler.select_agent(agents) # 循环回第一个
|
||
|
||
assert agent1.id == 'agent1'
|
||
assert agent2.id == 'agent2'
|
||
assert agent3.id == 'agent3'
|
||
assert agent4.id == 'agent1'
|
||
|
||
def test_no_online_agents(self):
|
||
"""测试无在线 Agent"""
|
||
scheduler = RoundRobinScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', status='offline'),
|
||
MockAgent(id='agent2', status='offline'),
|
||
]
|
||
|
||
agent = scheduler.select_agent(agents)
|
||
assert agent is None
|
||
|
||
|
||
class TestWeightedRoundRobinScheduler:
|
||
"""加权轮询调度器测试"""
|
||
|
||
def test_weight_distribution(self):
|
||
"""测试权重分布"""
|
||
scheduler = WeightedRoundRobinScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', weight=3),
|
||
MockAgent(id='agent2', weight=1),
|
||
]
|
||
|
||
# 统计选择次数
|
||
counts = {'agent1': 0, 'agent2': 0}
|
||
for _ in range(100):
|
||
agent = scheduler.select_agent(agents)
|
||
counts[agent.id] += 1
|
||
|
||
# agent1 权重 3,agent2 权重 1,比例应该约 3:1
|
||
assert counts['agent1'] > counts['agent2']
|
||
|
||
def test_zero_weight(self):
|
||
"""测试零权重"""
|
||
scheduler = WeightedRoundRobinScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', weight=0),
|
||
MockAgent(id='agent2', weight=0),
|
||
]
|
||
|
||
# 权重都为 0 时返回第一个
|
||
agent = scheduler.select_agent(agents)
|
||
assert agent is not None
|
||
|
||
|
||
class TestLeastConnectionsScheduler:
|
||
"""最少连接调度器测试"""
|
||
|
||
def test_select_least_connections(self):
|
||
"""测试选择最少连接"""
|
||
scheduler = LeastConnectionsScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', current_sessions=5),
|
||
MockAgent(id='agent2', current_sessions=2),
|
||
MockAgent(id='agent3', current_sessions=8),
|
||
]
|
||
|
||
agent = scheduler.select_agent(agents)
|
||
assert agent.id == 'agent2'
|
||
|
||
def test_all_at_limit(self):
|
||
"""测试所有 Agent 都达上限"""
|
||
scheduler = LeastConnectionsScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', current_sessions=5, connection_limit=5),
|
||
MockAgent(id='agent2', current_sessions=5, connection_limit=5),
|
||
]
|
||
|
||
agent = scheduler.select_agent(agents)
|
||
assert agent is None
|
||
|
||
|
||
class TestCapabilityMatchScheduler:
|
||
"""能力匹配调度器测试"""
|
||
|
||
def test_match_capabilities(self):
|
||
"""测试能力匹配"""
|
||
scheduler = CapabilityMatchScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', capabilities=['chat', 'code']),
|
||
MockAgent(id='agent2', capabilities=['chat']),
|
||
MockAgent(id='agent3', capabilities=['code', 'translate']),
|
||
]
|
||
|
||
# 需要 code 能力
|
||
agent = scheduler.select_agent(agents, {'capabilities': ['code']})
|
||
assert agent.id in ['agent1', 'agent3']
|
||
|
||
def test_no_matching_capabilities(self):
|
||
"""测试无匹配能力"""
|
||
scheduler = CapabilityMatchScheduler()
|
||
agents = [
|
||
MockAgent(id='agent1', capabilities=['chat']),
|
||
]
|
||
|
||
# 需要 code 能力但无 Agent 具备
|
||
agent = scheduler.select_agent(agents, {'capabilities': ['code']})
|
||
# 回退到加权轮询
|
||
assert agent is not None
|
||
|
||
|
||
class TestAgentScheduler:
|
||
"""Agent 调度器工厂测试"""
|
||
|
||
def test_available_strategies(self):
|
||
"""测试可用策略"""
|
||
strategies = AgentScheduler.get_available_strategies()
|
||
|
||
assert 'round_robin' in strategies
|
||
assert 'weighted_round_robin' in strategies
|
||
assert 'least_connections' in strategies
|
||
assert 'least_response_time' in strategies
|
||
assert 'capability_match' in strategies
|
||
|
||
def test_default_strategy(self):
|
||
"""测试默认策略"""
|
||
scheduler = AgentScheduler()
|
||
assert scheduler.get_strategy() == 'weighted_round_robin'
|
||
|
||
def test_custom_strategy(self):
|
||
"""测试自定义策略"""
|
||
scheduler = AgentScheduler('round_robin')
|
||
assert scheduler.get_strategy() == 'round_robin'
|