Files
pit-router/app/services/message_queue.py

156 lines
5.0 KiB
Python

"""
消息队列服务
实现消息缓存、重试机制
"""
from datetime import datetime
from typing import Optional, List
from app.models import db, Message
from app.extensions import redis_client
import json
class MessageQueue:
"""消息队列管理"""
# Redis 键前缀
PENDING_QUEUE = "pit:messages:pending"
RETRY_QUEUE = "pit:messages:retry"
MESSAGE_PREFIX = "pit:message:"
# 配置
MAX_RETRY = 3
RETRY_DELAY = 5 # 秒
@classmethod
def enqueue(cls, message: Message) -> bool:
"""消息入队"""
try:
# 存储消息详情
message_key = f"{cls.MESSAGE_PREFIX}{message.id}"
redis_client.hset(message_key, mapping={
'id': message.id,
'session_id': message.session_id,
'sender_type': message.sender_type,
'sender_id': message.sender_id,
'content': message.content or '',
'status': message.status,
'retry_count': str(message.retry_count),
'created_at': message.created_at.isoformat() if message.created_at else '',
})
# 加入待处理队列
redis_client.rpush(cls.PENDING_QUEUE, message.id)
return True
except Exception as e:
print(f"Failed to enqueue message: {e}")
return False
@classmethod
def dequeue(cls) -> Optional[dict]:
"""消息出队"""
try:
message_id = redis_client.lpop(cls.PENDING_QUEUE)
if not message_id:
return None
message_key = f"{cls.MESSAGE_PREFIX}{message_id}"
message_data = redis_client.hgetall(message_key)
if not message_data:
return None
return {
'id': message_data.get('id'),
'session_id': message_data.get('session_id'),
'sender_type': message_data.get('sender_type'),
'sender_id': message_data.get('sender_id'),
'content': message_data.get('content'),
'status': message_data.get('status'),
'retry_count': int(message_data.get('retry_count', 0)),
}
except Exception as e:
print(f"Failed to dequeue message: {e}")
return None
@classmethod
def ack(cls, message_id: str) -> bool:
"""消息确认"""
try:
message_key = f"{cls.MESSAGE_PREFIX}{message_id}"
redis_client.delete(message_key)
return True
except Exception as e:
print(f"Failed to ack message: {e}")
return False
@classmethod
def retry(cls, message_id: str) -> bool:
"""消息重试"""
try:
message_key = f"{cls.MESSAGE_PREFIX}{message_id}"
retry_count = redis_client.hget(message_key, 'retry_count')
if retry_count is None:
return False
retry_count = int(retry_count)
if retry_count >= cls.MAX_RETRY:
# 超过最大重试次数,标记为失败
redis_client.hset(message_key, 'status', 'failed')
return False
# 增加重试计数
redis_client.hset(message_key, 'retry_count', str(retry_count + 1))
# 加入重试队列
redis_client.rpush(cls.RETRY_QUEUE, message_id)
return True
except Exception as e:
print(f"Failed to retry message: {e}")
return False
@classmethod
def get_pending_count(cls) -> int:
"""获取待处理消息数量"""
try:
return redis_client.llen(cls.PENDING_QUEUE)
except:
return 0
@classmethod
def get_retry_count(cls) -> int:
"""获取重试消息数量"""
try:
return redis_client.llen(cls.RETRY_QUEUE)
except:
return 0
@classmethod
def process_retry_queue(cls) -> List[dict]:
"""处理重试队列"""
messages = []
try:
while True:
message_id = redis_client.lpop(cls.RETRY_QUEUE)
if not message_id:
break
message_key = f"{cls.MESSAGE_PREFIX}{message_id}"
message_data = redis_client.hgetall(message_key)
if message_data:
messages.append({
'id': message_data.get('id'),
'session_id': message_data.get('session_id'),
'content': message_data.get('content'),
'retry_count': int(message_data.get('retry_count', 0)),
})
except Exception as e:
print(f"Failed to process retry queue: {e}")
return messages