""" 消息队列服务 实现消息缓存、重试机制 """ from datetime import datetime from typing import Optional, List from app.models import db, Message from app.extensions import redis_client import json class MessageQueue: """消息队列管理""" # Redis 键前缀 PENDING_QUEUE = "pit:messages:pending" RETRY_QUEUE = "pit:messages:retry" MESSAGE_PREFIX = "pit:message:" # 配置 MAX_RETRY = 3 RETRY_DELAY = 5 # 秒 @classmethod def enqueue(cls, message: Message) -> bool: """消息入队""" try: # 存储消息详情 message_key = f"{cls.MESSAGE_PREFIX}{message.id}" redis_client.hset(message_key, mapping={ 'id': message.id, 'session_id': message.session_id, 'sender_type': message.sender_type, 'sender_id': message.sender_id, 'content': message.content or '', 'status': message.status, 'retry_count': str(message.retry_count), 'created_at': message.created_at.isoformat() if message.created_at else '', }) # 加入待处理队列 redis_client.rpush(cls.PENDING_QUEUE, message.id) return True except Exception as e: print(f"Failed to enqueue message: {e}") return False @classmethod def dequeue(cls) -> Optional[dict]: """消息出队""" try: message_id = redis_client.lpop(cls.PENDING_QUEUE) if not message_id: return None message_key = f"{cls.MESSAGE_PREFIX}{message_id}" message_data = redis_client.hgetall(message_key) if not message_data: return None return { 'id': message_data.get('id'), 'session_id': message_data.get('session_id'), 'sender_type': message_data.get('sender_type'), 'sender_id': message_data.get('sender_id'), 'content': message_data.get('content'), 'status': message_data.get('status'), 'retry_count': int(message_data.get('retry_count', 0)), } except Exception as e: print(f"Failed to dequeue message: {e}") return None @classmethod def ack(cls, message_id: str) -> bool: """消息确认""" try: message_key = f"{cls.MESSAGE_PREFIX}{message_id}" redis_client.delete(message_key) return True except Exception as e: print(f"Failed to ack message: {e}") return False @classmethod def retry(cls, message_id: str) -> bool: """消息重试""" try: message_key = f"{cls.MESSAGE_PREFIX}{message_id}" retry_count = redis_client.hget(message_key, 'retry_count') if retry_count is None: return False retry_count = int(retry_count) if retry_count >= cls.MAX_RETRY: # 超过最大重试次数,标记为失败 redis_client.hset(message_key, 'status', 'failed') return False # 增加重试计数 redis_client.hset(message_key, 'retry_count', str(retry_count + 1)) # 加入重试队列 redis_client.rpush(cls.RETRY_QUEUE, message_id) return True except Exception as e: print(f"Failed to retry message: {e}") return False @classmethod def get_pending_count(cls) -> int: """获取待处理消息数量""" try: return redis_client.llen(cls.PENDING_QUEUE) except: return 0 @classmethod def get_retry_count(cls) -> int: """获取重试消息数量""" try: return redis_client.llen(cls.RETRY_QUEUE) except: return 0 @classmethod def process_retry_queue(cls) -> List[dict]: """处理重试队列""" messages = [] try: while True: message_id = redis_client.lpop(cls.RETRY_QUEUE) if not message_id: break message_key = f"{cls.MESSAGE_PREFIX}{message_id}" message_data = redis_client.hgetall(message_key) if message_data: messages.append({ 'id': message_data.get('id'), 'session_id': message_data.get('session_id'), 'content': message_data.get('content'), 'retry_count': int(message_data.get('retry_count', 0)), }) except Exception as e: print(f"Failed to process retry queue: {e}") return messages