feat: enhance SSE connection with reconnect logic and error handling

- Updated `useSSE` hook to include max reconnect attempts and exponential backoff for reconnections.
- Improved error handling for SSE messages and connection errors.
- Added connection status reset on successful connection.

fix: improve SQLite database connection handling

- Simplified database initialization and connection logic.
- Ensured the database file is created if it doesn't exist.

fix: enhance Redis client connection with retry strategy

- Implemented exponential backoff for Redis connection retries.
- Added event handlers for connection success and error handling.

feat: improve SSE API endpoint with robust Redis connection management

- Added connection retry logic for Redis in the SSE API.
- Implemented heartbeat messages to keep the connection alive.
- Enhanced error handling for Redis subscription and connection attempts.
This commit is contained in:
Arunavo Ray
2025-05-20 18:33:56 +05:30
parent eb2d76a4b7
commit c179953649
6 changed files with 483 additions and 73 deletions

View File

@@ -1,34 +1,61 @@
import { useEffect, useState, useRef } from "react";
import { useEffect, useState, useRef, useCallback } from "react";
import type { MirrorJob } from "@/lib/db/schema";
interface UseSSEOptions {
userId?: string;
onMessage: (data: MirrorJob) => void;
maxReconnectAttempts?: number;
reconnectDelay?: number;
}
export const useSSE = ({ userId, onMessage }: UseSSEOptions) => {
export const useSSE = ({
userId,
onMessage,
maxReconnectAttempts = 5,
reconnectDelay = 3000
}: UseSSEOptions) => {
const [connected, setConnected] = useState<boolean>(false);
const [reconnectCount, setReconnectCount] = useState<number>(0);
const onMessageRef = useRef(onMessage);
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectTimeoutRef = useRef<number | null>(null);
// Update the ref when onMessage changes
useEffect(() => {
onMessageRef.current = onMessage;
}, [onMessage]);
useEffect(() => {
// Create a stable connect function that can be called for reconnection
const connect = useCallback(() => {
if (!userId) return;
// Clean up any existing connection
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
// Clear any pending reconnect timeout
if (reconnectTimeoutRef.current) {
window.clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
// Create new EventSource connection
const eventSource = new EventSource(`/api/sse?userId=${userId}`);
eventSourceRef.current = eventSource;
const handleMessage = (event: MessageEvent) => {
try {
// Check if this is an error message from our server
if (event.data.startsWith('{"error":')) {
console.warn("SSE server error:", event.data);
return;
}
const parsedMessage: MirrorJob = JSON.parse(event.data);
// console.log("Received new log:", parsedMessage);
onMessageRef.current(parsedMessage); // Use ref instead of prop directly
onMessageRef.current(parsedMessage);
} catch (error) {
console.error("Error parsing message:", error);
console.error("Error parsing SSE message:", error);
}
};
@@ -36,19 +63,50 @@ export const useSSE = ({ userId, onMessage }: UseSSEOptions) => {
eventSource.onopen = () => {
setConnected(true);
setReconnectCount(0); // Reset reconnect counter on successful connection
console.log(`Connected to SSE for user: ${userId}`);
};
eventSource.onerror = () => {
console.error("SSE connection error");
eventSource.onerror = (error) => {
console.error("SSE connection error:", error);
setConnected(false);
eventSource.close();
};
eventSourceRef.current = null;
return () => {
eventSource.close();
// Attempt to reconnect if we haven't exceeded max attempts
if (reconnectCount < maxReconnectAttempts) {
const nextReconnectDelay = Math.min(reconnectDelay * Math.pow(1.5, reconnectCount), 30000);
console.log(`Attempting to reconnect in ${nextReconnectDelay}ms (attempt ${reconnectCount + 1}/${maxReconnectAttempts})`);
reconnectTimeoutRef.current = window.setTimeout(() => {
setReconnectCount(prev => prev + 1);
connect();
}, nextReconnectDelay);
} else {
console.error(`Failed to reconnect after ${maxReconnectAttempts} attempts`);
}
};
}, [userId]); // Only depends on userId now
}, [userId, maxReconnectAttempts, reconnectDelay, reconnectCount]);
// Set up the connection
useEffect(() => {
if (!userId) return;
connect();
// Cleanup function
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (reconnectTimeoutRef.current) {
window.clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
};
}, [userId, connect]);
return { connected };
};

View File

@@ -1,7 +1,7 @@
import { z } from "zod";
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core";
import { Database } from "bun:sqlite";
import { drizzle } from "drizzle-orm/bun-sqlite";
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core";
import fs from "fs";
import path from "path";
import { configSchema } from "./schema";
@@ -13,38 +13,45 @@ if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
const dbUrl =
process.env.DATABASE_URL || `file:${path.join(dataDir, "gitea-mirror.db")}`;
const dbPath = path.join(dataDir, "gitea-mirror.db");
// Create a SQLite database instance using Bun's native driver
// Create an empty database file if it doesn't exist
if (!fs.existsSync(dbPath)) {
fs.writeFileSync(dbPath, "");
}
// Create SQLite database instance using Bun's native driver
let sqlite: Database;
try {
// Create an empty database file if it doesn't exist
if (!fs.existsSync(path.join(dataDir, "gitea-mirror.db"))) {
fs.writeFileSync(path.join(dataDir, "gitea-mirror.db"), "");
}
sqlite = new Database(dbUrl);
sqlite = new Database(dbPath);
console.log("Successfully connected to SQLite database using Bun's native driver");
} catch (error) {
console.error("Error opening database:", error);
throw error;
}
// Simple async wrapper around Bun's SQLite API for compatibility
// Create drizzle instance with the SQLite client
export const db = drizzle({ client: sqlite });
// Simple async wrapper around SQLite API for compatibility
// This maintains backward compatibility with existing code
export const client = {
async execute(sql: string, params?: any[]) {
const stmt = sqlite.query(sql);
if (/^\s*select/i.test(sql)) {
const rows = stmt.all(params ?? []);
return { rows } as { rows: any[] };
try {
const stmt = sqlite.query(sql);
if (/^\s*select/i.test(sql)) {
const rows = stmt.all(params ?? []);
return { rows } as { rows: any[] };
}
stmt.run(params ?? []);
return { rows: [] } as { rows: any[] };
} catch (error) {
console.error(`Error executing SQL: ${sql}`, error);
throw error;
}
stmt.run(params ?? []);
return { rows: [] } as { rows: any[] };
},
};
// Create a drizzle instance
export const db = drizzle(sqlite);
// Define the tables
export const users = sqliteTable("users", {
id: text("id").primaryKey(),

View File

@@ -2,22 +2,38 @@ import { RedisClient } from "bun";
// Connect to Redis using REDIS_URL environment variable or default to redis://redis:6379
// This ensures we have a fallback URL when running with Docker Compose
const redisUrl = process.env.REDIS_URL ?? "redis://redis:6379";
const redisUrl = process.env.REDIS_URL ?? "redis://localhost:6379";
console.log(`Connecting to Redis at: ${redisUrl}`);
// Configure Redis client with connection options
// Configure Redis client with connection options and retry logic
function createClient() {
return new RedisClient(redisUrl, {
const client = new RedisClient(redisUrl, {
autoReconnect: true,
connectTimeout: 30000, // Increase timeout to 30 seconds
retryStrategy: (attempt: number) => {
// Exponential backoff with jitter
const delay = Math.min(Math.pow(2, attempt) * 100, 10000);
console.log(`Redis connection attempt ${attempt}, retrying in ${delay}ms`);
return delay;
},
});
// Set up event handlers
client.onconnect = () => console.log("Redis client connected successfully");
client.onclose = (err: Error | null) => {
if (err) {
console.error("Redis connection error:", err);
console.log("Redis will attempt to reconnect automatically");
} else {
console.log("Redis connection closed");
}
};
return client;
}
// Create Redis clients with improved error handling
export const redis = createClient();
export const redisPublisher = createClient();
export const redisSubscriber = createClient();
redis.onconnect = () => console.log("Connected to Redis server");
redis.onclose = (err) => {
if (err) console.error("Disconnected from Redis server:", err);
};

View File

@@ -11,49 +11,130 @@ export const GET: APIRoute = async ({ request }) => {
const channel = `mirror-status:${userId}`;
let isClosed = false;
let connectionAttempts = 0;
const MAX_ATTEMPTS = 5;
const RETRY_DELAY = 1000; // 1 second
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const handleMessage = (ch: string, message: string) => {
if (isClosed || ch !== channel) return;
// Function to send a message to the client
const sendMessage = (message: string) => {
if (isClosed) return;
try {
controller.enqueue(encoder.encode(`data: ${message}\n\n`));
controller.enqueue(encoder.encode(message));
} catch (err) {
console.error("Stream enqueue error:", err);
}
};
redisSubscriber.subscribe(channel, (err) => {
if (err) {
isClosed = true;
controller.error(err);
// Function to handle Redis connection and subscription
const connectToRedis = () => {
if (isClosed) return;
try {
// Set up message handler for Bun's Redis client
redisSubscriber.onmessage = (message, channelName) => {
if (isClosed || channelName !== channel) return;
sendMessage(`data: ${message}\n\n`);
};
// Send initial connection message
sendMessage(": connecting to Redis...\n\n");
// Use a try-catch block specifically for the subscribe operation
let subscribed = false;
try {
// Bun's Redis client expects a string for the channel
// We need to wrap this in a try-catch because it can throw if Redis is down
subscribed = redisSubscriber.subscribe(channel);
if (subscribed) {
// If we get here, subscription was successful
sendMessage(": connected\n\n");
// Reset connection attempts on successful connection
connectionAttempts = 0;
// Send a heartbeat every 30 seconds to keep the connection alive
const heartbeatInterval = setInterval(() => {
if (!isClosed) {
sendMessage(": heartbeat\n\n");
} else {
clearInterval(heartbeatInterval);
}
}, 30000);
} else {
throw new Error("Failed to subscribe to Redis channel");
}
} catch (subscribeErr) {
// Handle subscription error
console.error("Redis subscribe error:", subscribeErr);
// Retry connection if we haven't exceeded max attempts
if (connectionAttempts < MAX_ATTEMPTS) {
connectionAttempts++;
const nextRetryDelay = RETRY_DELAY * Math.pow(2, connectionAttempts - 1);
console.log(`Retrying Redis connection (attempt ${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...`);
// Send retry message to client
sendMessage(`: retrying connection (${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...\n\n`);
// Wait before retrying
setTimeout(connectToRedis, nextRetryDelay);
} else {
// Max retries exceeded, send error but keep the connection open
console.error("Max Redis connection attempts exceeded");
sendMessage(`data: {"error": "Redis connection failed after ${MAX_ATTEMPTS} attempts"}\n\n`);
// Set up a longer retry after max attempts
setTimeout(() => {
connectionAttempts = 0; // Reset counter for a fresh start
sendMessage(": attempting to reconnect after cooling period...\n\n");
connectToRedis();
}, 30000); // Try again after 30 seconds
}
}
} catch (err) {
// This catches any other errors outside the subscribe operation
console.error("Redis connection error:", err);
sendMessage(`data: {"error": "Redis connection error"}\n\n`);
// Still attempt to retry
if (connectionAttempts < MAX_ATTEMPTS) {
connectionAttempts++;
setTimeout(connectToRedis, RETRY_DELAY * Math.pow(2, connectionAttempts - 1));
}
}
});
};
redisSubscriber.on("message", handleMessage);
try {
controller.enqueue(encoder.encode(": connected\n\n"));
} catch (err) {
console.error("Initial enqueue error:", err);
}
// Start the initial connection
connectToRedis();
// Handle client disconnection
request.signal?.addEventListener("abort", () => {
if (!isClosed) {
isClosed = true;
redisSubscriber.off("message", handleMessage);
redisSubscriber.unsubscribe(channel);
try {
redisSubscriber.unsubscribe(channel);
} catch (err) {
console.error("Error unsubscribing from Redis:", err);
}
controller.close();
}
});
},
cancel() {
// extra safety in case cancel is triggered
// Extra safety in case cancel is triggered
if (!isClosed) {
isClosed = true;
redisSubscriber.unsubscribe(channel);
try {
redisSubscriber.unsubscribe(channel);
} catch (err) {
console.error("Error unsubscribing from Redis:", err);
}
}
},
});