""" 消息服务 处理消息相关的业务逻辑 """ from typing import Optional, List from datetime import datetime from app.models import db, Message, Session from app.services.message_queue import MessageQueue import uuid class MessageService: """消息服务""" def __init__(self): self.queue = MessageQueue() def create_message( self, session_id: str, sender_type: str, sender_id: str, content: str, message_type: str = 'text', content_type: str = 'markdown', reply_to: str = None ) -> Message: """创建消息""" message = Message( id=str(uuid.uuid4()), session_id=session_id, sender_type=sender_type, sender_id=sender_id, message_type=message_type, content=content, content_type=content_type, reply_to=reply_to, status='sent', ack_status='pending' ) db.session.add(message) # 更新会话统计 session = Session.query.get(session_id) if session: session.message_count += 1 session.last_active_at = datetime.utcnow() db.session.commit() # 入队等待确认 self.queue.enqueue(message) return message def get_message(self, message_id: str) -> Optional[Message]: """获取消息""" return Message.query.get(message_id) def get_session_messages( self, session_id: str, limit: int = 50, offset: int = 0 ) -> List[Message]: """获取会话消息列表""" return Message.query.filter_by(session_id=session_id)\ .order_by(Message.created_at.asc())\ .offset(offset)\ .limit(limit)\ .all() def acknowledge_message(self, message_id: str) -> Optional[Message]: """确认消息已送达""" message = Message.query.get(message_id) if not message: return None message.ack_status = 'acknowledged' message.status = 'delivered' message.delivered_at = datetime.utcnow() db.session.commit() # 从队列中移除 self.queue.ack(message_id) return message def mark_as_read(self, message_id: str) -> Optional[Message]: """标记消息已读""" message = Message.query.get(message_id) if not message: return None message.status = 'read' db.session.commit() return message def mark_session_read(self, session_id: str, user_id: str) -> int: """标记会话所有消息已读""" # 更新数据库 count = Message.query.filter_by( session_id=session_id, status='delivered' ).update({'status': 'read'}) # 更新会话未读数 session = Session.query.get(session_id) if session: session.unread_count = 0 db.session.commit() return count def retry_message(self, message_id: str) -> Optional[Message]: """重试发送消息""" message = Message.query.get(message_id) if not message: return None # 检查重试次数 if message.retry_count >= 3: message.status = 'failed' db.session.commit() return None # 增加重试计数 message.retry_count += 1 message.status = 'sent' db.session.commit() # 加入重试队列 self.queue.retry(message_id) return message def get_pending_messages(self) -> List[dict]: """获取待处理消息""" return self.queue.process_retry_queue() def get_message_stats(self, session_id: str = None) -> dict: """获取消息统计""" query = Message.query if session_id: query = query.filter_by(session_id=session_id) total = query.count() sent = query.filter_by(status='sent').count() delivered = query.filter_by(status='delivered').count() read = query.filter_by(status='read').count() failed = query.filter_by(status='failed').count() return { 'total': total, 'sent': sent, 'delivered': delivered, 'read': read, 'failed': failed, 'pending': self.queue.get_pending_count(), 'retry': self.queue.get_retry_count(), }