Files
pit-router/app/services/scheduler.py

175 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent 调度器服务
实现多种调度策略
"""
from typing import List, Optional
from app.models import Agent
class BaseScheduler:
"""调度器基类"""
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
"""选择 Agent子类实现"""
raise NotImplementedError
class RoundRobinScheduler(BaseScheduler):
"""轮询调度器"""
def __init__(self):
self._index = 0
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
if not agents:
return None
# 只选择在线的 Agent
online_agents = [a for a in agents if a.status == 'online']
if not online_agents:
return None
# 轮询选择
agent = online_agents[self._index % len(online_agents)]
self._index += 1
return agent
class WeightedRoundRobinScheduler(BaseScheduler):
"""加权轮询调度器(默认)"""
def __init__(self):
self._index = 0
self._weights = {}
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
if not agents:
return None
# 只选择在线的 Agent
online_agents = [a for a in agents if a.status == 'online']
if not online_agents:
return None
# 计算总权重
total_weight = sum(a.weight for a in online_agents)
if total_weight == 0:
return online_agents[0] if online_agents else None
# 加权轮询
current_weight = self._index % total_weight
cumulative = 0
for agent in online_agents:
cumulative += agent.weight
if current_weight < cumulative:
self._index += 1
return agent
return online_agents[-1]
class LeastConnectionsScheduler(BaseScheduler):
"""最少连接调度器"""
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
if not agents:
return None
# 只选择在线且未达连接上限的 Agent
available_agents = [
a for a in agents
if a.status == 'online' and a.current_sessions < a.connection_limit
]
if not available_agents:
return None
# 选择连接数最少的
return min(available_agents, key=lambda a: a.current_sessions)
class LeastResponseTimeScheduler(BaseScheduler):
"""最快响应调度器(基于最后心跳时间)"""
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
if not agents:
return None
# 只选择在线的 Agent
online_agents = [a for a in agents if a.status == 'online']
if not online_agents:
return None
# 选择最近有心跳的(响应快的)
from datetime import datetime
now = datetime.utcnow()
def response_score(agent):
if not agent.last_heartbeat:
return float('inf')
# 最近心跳 = 分数低 = 优先选择
return (now - agent.last_heartbeat).total_seconds()
return min(online_agents, key=response_score)
class CapabilityMatchScheduler(BaseScheduler):
"""能力匹配调度器"""
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
if not agents or not context:
return None
required_capabilities = context.get('capabilities', [])
if not required_capabilities:
# 没有特殊要求,使用加权轮询
return WeightedRoundRobinScheduler().select_agent(agents, context)
# 只选择在线且有对应能力的 Agent
matching_agents = []
for agent in agents:
if agent.status != 'online':
continue
agent_caps = agent.capabilities or []
# 检查是否具备所有必需能力
if all(cap in agent_caps for cap in required_capabilities):
matching_agents.append(agent)
if not matching_agents:
# 没有匹配的,回退到加权轮询
return WeightedRoundRobinScheduler().select_agent(agents, context)
# 在匹配的 Agent 中使用加权轮询
return WeightedRoundRobinScheduler().select_agent(matching_agents, context)
class AgentScheduler:
"""Agent 调度器工厂"""
STRATEGIES = {
'round_robin': RoundRobinScheduler,
'weighted_round_robin': WeightedRoundRobinScheduler,
'least_connections': LeastConnectionsScheduler,
'least_response_time': LeastResponseTimeScheduler,
'capability_match': CapabilityMatchScheduler,
}
def __init__(self, strategy: str = 'weighted_round_robin'):
self._strategy_name = strategy
self._scheduler = self.STRATEGIES.get(strategy, WeightedRoundRobinScheduler)()
def select_agent(self, agents: List[Agent], context: dict = None) -> Optional[Agent]:
"""选择 Agent"""
return self._scheduler.select_agent(agents, context)
def get_strategy(self) -> str:
"""获取当前策略"""
return self._strategy_name
@classmethod
def get_available_strategies(cls) -> list:
"""获取可用策略列表"""
return list(cls.STRATEGIES.keys())