feat: migrate from Redis to SQLite for event handling and notifications

This commit is contained in:
Arunavo Ray
2025-05-20 19:09:48 +05:30
parent c179953649
commit 6d13ff29ca
17 changed files with 470 additions and 202 deletions

View File

@@ -1,5 +1,5 @@
import type { APIRoute } from "astro";
import { redisSubscriber } from "@/lib/redis";
import { getNewEvents } from "@/lib/events";
export const GET: APIRoute = async ({ request }) => {
const url = new URL(request.url);
@@ -11,13 +11,13 @@ export const GET: APIRoute = async ({ request }) => {
const channel = `mirror-status:${userId}`;
let isClosed = false;
let connectionAttempts = 0;
const MAX_ATTEMPTS = 5;
const RETRY_DELAY = 1000; // 1 second
const POLL_INTERVAL = 2000; // Poll every 2 seconds
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
let lastEventTime: Date | undefined = undefined;
let pollIntervalId: ReturnType<typeof setInterval> | null = null;
// Function to send a message to the client
const sendMessage = (message: string) => {
@@ -29,98 +29,63 @@ export const GET: APIRoute = async ({ request }) => {
}
};
// Function to handle Redis connection and subscription
const connectToRedis = () => {
// Function to poll for new events
const pollForEvents = async () => {
if (isClosed) return;
try {
// Set up message handler for Bun's Redis client
redisSubscriber.onmessage = (message, channelName) => {
if (isClosed || channelName !== channel) return;
sendMessage(`data: ${message}\n\n`);
};
console.log(`Polling for events for user ${userId} in channel ${channel}`);
// Send initial connection message
sendMessage(": connecting to Redis...\n\n");
// Get new events from SQLite
const events = await getNewEvents({
userId,
channel,
lastEventTime,
});
// Use a try-catch block specifically for the subscribe operation
let subscribed = false;
try {
// Bun's Redis client expects a string for the channel
// We need to wrap this in a try-catch because it can throw if Redis is down
subscribed = redisSubscriber.subscribe(channel);
console.log(`Found ${events.length} new events`);
if (subscribed) {
// If we get here, subscription was successful
sendMessage(": connected\n\n");
// Send events to client
if (events.length > 0) {
// Update last event time
lastEventTime = events[events.length - 1].createdAt;
// Reset connection attempts on successful connection
connectionAttempts = 0;
// Send a heartbeat every 30 seconds to keep the connection alive
const heartbeatInterval = setInterval(() => {
if (!isClosed) {
sendMessage(": heartbeat\n\n");
} else {
clearInterval(heartbeatInterval);
}
}, 30000);
} else {
throw new Error("Failed to subscribe to Redis channel");
}
} catch (subscribeErr) {
// Handle subscription error
console.error("Redis subscribe error:", subscribeErr);
// Retry connection if we haven't exceeded max attempts
if (connectionAttempts < MAX_ATTEMPTS) {
connectionAttempts++;
const nextRetryDelay = RETRY_DELAY * Math.pow(2, connectionAttempts - 1);
console.log(`Retrying Redis connection (attempt ${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...`);
// Send retry message to client
sendMessage(`: retrying connection (${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...\n\n`);
// Wait before retrying
setTimeout(connectToRedis, nextRetryDelay);
} else {
// Max retries exceeded, send error but keep the connection open
console.error("Max Redis connection attempts exceeded");
sendMessage(`data: {"error": "Redis connection failed after ${MAX_ATTEMPTS} attempts"}\n\n`);
// Set up a longer retry after max attempts
setTimeout(() => {
connectionAttempts = 0; // Reset counter for a fresh start
sendMessage(": attempting to reconnect after cooling period...\n\n");
connectToRedis();
}, 30000); // Try again after 30 seconds
// Send each event to the client
for (const event of events) {
console.log(`Sending event: ${JSON.stringify(event.payload)}`);
sendMessage(`data: ${JSON.stringify(event.payload)}\n\n`);
}
}
} catch (err) {
// This catches any other errors outside the subscribe operation
console.error("Redis connection error:", err);
sendMessage(`data: {"error": "Redis connection error"}\n\n`);
// Still attempt to retry
if (connectionAttempts < MAX_ATTEMPTS) {
connectionAttempts++;
setTimeout(connectToRedis, RETRY_DELAY * Math.pow(2, connectionAttempts - 1));
}
console.error("Error polling for events:", err);
sendMessage(`data: {"error": "Error polling for events"}\n\n`);
}
};
// Start the initial connection
connectToRedis();
// Send initial connection message
sendMessage(": connected\n\n");
// Start polling for events
pollForEvents();
// Set up polling interval
pollIntervalId = setInterval(pollForEvents, POLL_INTERVAL);
// Send a heartbeat every 30 seconds to keep the connection alive
const heartbeatInterval = setInterval(() => {
if (!isClosed) {
sendMessage(": heartbeat\n\n");
} else {
clearInterval(heartbeatInterval);
}
}, 30000);
// Handle client disconnection
request.signal?.addEventListener("abort", () => {
if (!isClosed) {
isClosed = true;
try {
redisSubscriber.unsubscribe(channel);
} catch (err) {
console.error("Error unsubscribing from Redis:", err);
if (pollIntervalId) {
clearInterval(pollIntervalId);
}
controller.close();
}
@@ -128,14 +93,7 @@ export const GET: APIRoute = async ({ request }) => {
},
cancel() {
// Extra safety in case cancel is triggered
if (!isClosed) {
isClosed = true;
try {
redisSubscriber.unsubscribe(channel);
} catch (err) {
console.error("Error unsubscribing from Redis:", err);
}
}
isClosed = true;
},
});