Files
PIT_Channel/src/utils/queue.ts

192 lines
4.4 KiB
TypeScript

/**
* 消息队列模块
* @module utils/queue
*/
import type { QueuedMessage, PITRouterMessage, Logger } from "../types.js";
import { createLogger } from "./logger.js";
/**
* 消息队列配置
*/
export interface MessageQueueOptions {
/** 最大队列长度 */
maxSize: number;
/** 最大重试次数 */
maxRetries: number;
/** 重试延迟(毫秒) */
retryDelay: number;
/** 日志器 */
logger?: Logger;
}
/**
* 消息队列
*/
export class MessageQueue {
private queue: Map<string, QueuedMessage> = new Map();
private options: MessageQueueOptions;
private log: Logger;
constructor(options: Partial<MessageQueueOptions> = {}) {
this.options = {
maxSize: options.maxSize ?? 100,
maxRetries: options.maxRetries ?? 3,
retryDelay: options.retryDelay ?? 5000,
logger: options.logger,
};
this.log = this.options.logger ?? createLogger("queue");
}
/**
* 入队消息
* @param to 目标用户
* @param content 消息内容
* @returns 队列项
*/
enqueue(to: string, content: string | PITRouterMessage): QueuedMessage {
// 检查队列是否已满
if (this.queue.size >= this.options.maxSize) {
// 移除最旧的消息
const oldest = this.getOldest();
if (oldest) {
this.queue.delete(oldest.queueId);
this.log.warn("Queue full, dropped oldest message", { queueId: oldest.queueId });
}
}
const queueId = this.generateQueueId();
const messageId = this.generateMessageId();
const item: QueuedMessage = {
queueId,
messageId,
to,
content,
enqueuedAt: Date.now(),
retryCount: 0,
maxRetries: this.options.maxRetries,
acknowledged: false,
};
this.queue.set(queueId, item);
this.log.debug("Message enqueued", { queueId, to });
return item;
}
/**
* 出队消息
* @param queueId 队列 ID
* @returns 消息项或 undefined
*/
dequeue(queueId: string): QueuedMessage | undefined {
const item = this.queue.get(queueId);
if (item) {
this.queue.delete(queueId);
this.log.debug("Message dequeued", { queueId });
}
return item;
}
/**
* 获取待处理消息
* @returns 待处理的消息列表
*/
getPending(): QueuedMessage[] {
return Array.from(this.queue.values())
.filter(item => !item.acknowledged && item.retryCount < item.maxRetries)
.sort((a, b) => a.enqueuedAt - b.enqueuedAt);
}
/**
* 标记消息已确认
* @param messageId 消息 ID
*/
acknowledge(messageId: string): void {
for (const item of this.queue.values()) {
if (item.messageId === messageId) {
item.acknowledged = true;
this.log.debug("Message acknowledged", { queueId: item.queueId, messageId });
break;
}
}
}
/**
* 增加重试计数
* @param queueId 队列 ID
* @returns 是否可以重试
*/
incrementRetry(queueId: string): boolean {
const item = this.queue.get(queueId);
if (!item) return false;
item.retryCount++;
const canRetry = item.retryCount < item.maxRetries;
this.log.debug("Retry incremented", {
queueId,
retryCount: item.retryCount,
canRetry
});
return canRetry;
}
/**
* 清理已确认和过期的消息
*/
cleanup(): void {
const now = Date.now();
const maxAge = 24 * 60 * 60 * 1000; // 24 小时
for (const [queueId, item] of this.queue.entries()) {
if (item.acknowledged || now - item.enqueuedAt > maxAge) {
this.queue.delete(queueId);
this.log.debug("Cleaned up message", { queueId });
}
}
}
/**
* 获取队列长度
*/
get length(): number {
return this.queue.size;
}
/**
* 检查队列是否为空
*/
isEmpty(): boolean {
return this.queue.size === 0;
}
/**
* 清空队列
*/
clear(): void {
this.queue.clear();
this.log.info("Queue cleared");
}
private getOldest(): QueuedMessage | undefined {
let oldest: QueuedMessage | undefined;
for (const item of this.queue.values()) {
if (!oldest || item.enqueuedAt < oldest.enqueuedAt) {
oldest = item;
}
}
return oldest;
}
private generateQueueId(): string {
return `q-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
private generateMessageId(): string {
return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}