feat: implement exponential backoff for SSE reconnections
This commit is contained in:
@@ -56,6 +56,7 @@ import type {
|
||||
TaskCommentRead,
|
||||
TaskRead,
|
||||
} from "@/api/generated/model";
|
||||
import { createExponentialBackoff } from "@/lib/backoff";
|
||||
import { cn } from "@/lib/utils";
|
||||
|
||||
type Board = BoardRead;
|
||||
@@ -123,6 +124,13 @@ const EMOJI_GLYPHS: Record<string, string> = {
|
||||
":brain:": "🧠",
|
||||
};
|
||||
|
||||
const SSE_RECONNECT_BACKOFF = {
|
||||
baseMs: 1_000,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
maxMs: 5 * 60_000,
|
||||
} as const;
|
||||
|
||||
const MARKDOWN_TABLE_COMPONENTS: Components = {
|
||||
table: ({ node: _node, className, ...props }) => (
|
||||
<div className="my-3 overflow-x-auto">
|
||||
@@ -344,6 +352,8 @@ export default function BoardDetailPage() {
|
||||
if (!isSignedIn || !boardId || !board) return;
|
||||
let isCancelled = false;
|
||||
const abortController = new AbortController();
|
||||
const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF);
|
||||
let reconnectTimeout: number | undefined;
|
||||
|
||||
const connect = async () => {
|
||||
try {
|
||||
@@ -372,6 +382,11 @@ export default function BoardDetailPage() {
|
||||
while (!isCancelled) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value && value.length) {
|
||||
// Consider the stream "healthy" once we receive any bytes (including pings),
|
||||
// then reset the backoff for future reconnects.
|
||||
backoff.reset();
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
buffer = buffer.replace(/\r\n/g, "\n");
|
||||
let boundary = buffer.indexOf("\n\n");
|
||||
@@ -414,23 +429,37 @@ export default function BoardDetailPage() {
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if (!isCancelled) {
|
||||
setTimeout(connect, 3000);
|
||||
// Reconnect handled below.
|
||||
}
|
||||
|
||||
if (!isCancelled) {
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
const delay = backoff.nextDelayMs();
|
||||
reconnectTimeout = window.setTimeout(() => {
|
||||
reconnectTimeout = undefined;
|
||||
void connect();
|
||||
}, delay);
|
||||
}
|
||||
};
|
||||
|
||||
connect();
|
||||
void connect();
|
||||
return () => {
|
||||
isCancelled = true;
|
||||
abortController.abort();
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
};
|
||||
}, [boardId, isSignedIn]);
|
||||
}, [board, boardId, isSignedIn]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isSignedIn || !boardId || !board) return;
|
||||
let isCancelled = false;
|
||||
const abortController = new AbortController();
|
||||
const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF);
|
||||
let reconnectTimeout: number | undefined;
|
||||
|
||||
const connect = async () => {
|
||||
try {
|
||||
@@ -458,6 +487,9 @@ export default function BoardDetailPage() {
|
||||
while (!isCancelled) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value && value.length) {
|
||||
backoff.reset();
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
buffer = buffer.replace(/\r\n/g, "\n");
|
||||
let boundary = buffer.indexOf("\n\n");
|
||||
@@ -529,16 +561,28 @@ export default function BoardDetailPage() {
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if (!isCancelled) {
|
||||
setTimeout(connect, 3000);
|
||||
// Reconnect handled below.
|
||||
}
|
||||
|
||||
if (!isCancelled) {
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
const delay = backoff.nextDelayMs();
|
||||
reconnectTimeout = window.setTimeout(() => {
|
||||
reconnectTimeout = undefined;
|
||||
void connect();
|
||||
}, delay);
|
||||
}
|
||||
};
|
||||
|
||||
connect();
|
||||
void connect();
|
||||
return () => {
|
||||
isCancelled = true;
|
||||
abortController.abort();
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
};
|
||||
}, [board, boardId, isSignedIn]);
|
||||
|
||||
@@ -564,6 +608,8 @@ export default function BoardDetailPage() {
|
||||
if (!isSignedIn || !boardId || !board) return;
|
||||
let isCancelled = false;
|
||||
const abortController = new AbortController();
|
||||
const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF);
|
||||
let reconnectTimeout: number | undefined;
|
||||
|
||||
const connect = async () => {
|
||||
try {
|
||||
@@ -590,6 +636,9 @@ export default function BoardDetailPage() {
|
||||
while (!isCancelled) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value && value.length) {
|
||||
backoff.reset();
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
buffer = buffer.replace(/\r\n/g, "\n");
|
||||
let boundary = buffer.indexOf("\n\n");
|
||||
@@ -668,16 +717,28 @@ export default function BoardDetailPage() {
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if (!isCancelled) {
|
||||
setTimeout(connect, 3000);
|
||||
// Reconnect handled below.
|
||||
}
|
||||
|
||||
if (!isCancelled) {
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
const delay = backoff.nextDelayMs();
|
||||
reconnectTimeout = window.setTimeout(() => {
|
||||
reconnectTimeout = undefined;
|
||||
void connect();
|
||||
}, delay);
|
||||
}
|
||||
};
|
||||
|
||||
connect();
|
||||
void connect();
|
||||
return () => {
|
||||
isCancelled = true;
|
||||
abortController.abort();
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
};
|
||||
}, [board, boardId, isSignedIn, selectedTask?.id, pushLiveFeed]);
|
||||
|
||||
@@ -685,6 +746,8 @@ export default function BoardDetailPage() {
|
||||
if (!isSignedIn || !boardId) return;
|
||||
let isCancelled = false;
|
||||
const abortController = new AbortController();
|
||||
const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF);
|
||||
let reconnectTimeout: number | undefined;
|
||||
|
||||
const connect = async () => {
|
||||
try {
|
||||
@@ -713,6 +776,9 @@ export default function BoardDetailPage() {
|
||||
while (!isCancelled) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value && value.length) {
|
||||
backoff.reset();
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
buffer = buffer.replace(/\r\n/g, "\n");
|
||||
let boundary = buffer.indexOf("\n\n");
|
||||
@@ -755,16 +821,28 @@ export default function BoardDetailPage() {
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if (!isCancelled) {
|
||||
setTimeout(connect, 3000);
|
||||
// Reconnect handled below.
|
||||
}
|
||||
|
||||
if (!isCancelled) {
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
const delay = backoff.nextDelayMs();
|
||||
reconnectTimeout = window.setTimeout(() => {
|
||||
reconnectTimeout = undefined;
|
||||
void connect();
|
||||
}, delay);
|
||||
}
|
||||
};
|
||||
|
||||
connect();
|
||||
void connect();
|
||||
return () => {
|
||||
isCancelled = true;
|
||||
abortController.abort();
|
||||
if (reconnectTimeout !== undefined) {
|
||||
window.clearTimeout(reconnectTimeout);
|
||||
}
|
||||
};
|
||||
}, [board, boardId, isSignedIn]);
|
||||
|
||||
|
||||
48
frontend/src/lib/backoff.ts
Normal file
48
frontend/src/lib/backoff.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
export type ExponentialBackoffOptions = {
|
||||
baseMs?: number;
|
||||
factor?: number;
|
||||
maxMs?: number;
|
||||
jitter?: number;
|
||||
};
|
||||
|
||||
export type ExponentialBackoff = {
|
||||
nextDelayMs: () => number;
|
||||
reset: () => void;
|
||||
attempt: () => number;
|
||||
};
|
||||
|
||||
const clampMs = (value: number, { min, max }: { min: number; max: number }): number => {
|
||||
if (Number.isNaN(value) || !Number.isFinite(value)) return min;
|
||||
return Math.min(max, Math.max(min, Math.trunc(value)));
|
||||
};
|
||||
|
||||
export const createExponentialBackoff = (
|
||||
options: ExponentialBackoffOptions = {},
|
||||
): ExponentialBackoff => {
|
||||
const baseMs = clampMs(options.baseMs ?? 1_000, { min: 50, max: 60_000 });
|
||||
const factor = options.factor ?? 2;
|
||||
const maxMs = clampMs(options.maxMs ?? 5 * 60_000, { min: baseMs, max: 60 * 60_000 });
|
||||
const jitter = options.jitter ?? 0.2;
|
||||
|
||||
let attempt = 0;
|
||||
|
||||
return {
|
||||
nextDelayMs: () => {
|
||||
const raw = baseMs * Math.pow(factor, attempt);
|
||||
const capped = Math.min(maxMs, raw);
|
||||
const normalized = clampMs(capped, { min: baseMs, max: maxMs });
|
||||
|
||||
// K8s-style jitter: only add extra random delay (no negative jitter),
|
||||
// which avoids thundering-herd reconnects.
|
||||
const jitterFactor = Math.max(0, jitter);
|
||||
const delay = normalized + Math.floor(Math.random() * jitterFactor * normalized);
|
||||
|
||||
attempt = Math.min(attempt + 1, 64);
|
||||
return clampMs(delay, { min: baseMs, max: maxMs });
|
||||
},
|
||||
reset: () => {
|
||||
attempt = 0;
|
||||
},
|
||||
attempt: () => attempt,
|
||||
};
|
||||
};
|
||||
Reference in New Issue
Block a user