From 6d13ff29ca0a532f1e6a9e882f19de6c1c4fcb2f Mon Sep 17 00:00:00 2001 From: Arunavo Ray Date: Tue, 20 May 2025 19:09:48 +0530 Subject: [PATCH] feat: migrate from Redis to SQLite for event handling and notifications --- .env.example | 1 + README.md | 77 ++++++++------------- docker-compose.dev.yml | 15 +---- docker-compose.yml | 13 ---- docker-entrypoint.sh | 14 ++++ package.json | 2 + scripts/check-events.ts | 38 +++++++++++ scripts/cleanup-redis.ts | 33 +++++++++ scripts/manage-db.ts | 37 ++++++++++ scripts/migrate-db.ts | 53 +++++++++++++++ src/lib/db/index.ts | 12 ++++ src/lib/db/schema.ts | 12 ++++ src/lib/events.ts | 130 ++++++++++++++++++++++++++++++++++++ src/lib/helpers.ts | 10 ++- src/lib/redis.ts | 39 ----------- src/pages/api/sse/index.ts | 130 ++++++++++++------------------------ src/pages/api/test-event.ts | 56 ++++++++++++++++ 17 files changed, 470 insertions(+), 202 deletions(-) create mode 100644 scripts/check-events.ts create mode 100644 scripts/cleanup-redis.ts create mode 100644 scripts/migrate-db.ts create mode 100644 src/lib/events.ts delete mode 100644 src/lib/redis.ts create mode 100644 src/pages/api/test-event.ts diff --git a/.env.example b/.env.example index 4d6d768..292522d 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,7 @@ NODE_ENV=production HOST=0.0.0.0 PORT=4321 DATABASE_URL=sqlite://data/gitea-mirror.db +# Note: Redis is no longer required as SQLite is used for all functionality # Security JWT_SECRET=change-this-to-a-secure-random-string-in-production diff --git a/README.md b/README.md index 64b63da..1d3bd39 100644 --- a/README.md +++ b/README.md @@ -124,19 +124,15 @@ docker compose -f docker-compose.dev.yml up -d ##### Using Pre-built Images from GitHub Container Registry -If you want to run the container directly without Docker Compose, you'll need to set up a Redis instance separately: +If you want to run the container directly without Docker Compose: ```bash -# First, start a Redis container -docker run -d --name gitea-mirror-redis redis:alpine - # Pull the latest multi-architecture image docker pull ghcr.io/arunavo4/gitea-mirror:latest -# Run the application with a link to the Redis container -# Note: The REDIS_URL environment variable is required and must point to the Redis container -docker run -d -p 4321:4321 --link gitea-mirror-redis:redis \ - -e REDIS_URL=redis://redis:6379 \ +# Run the application with a volume for persistent data +docker run -d -p 4321:4321 \ + -v gitea-mirror-data:/app/data \ ghcr.io/arunavo4/gitea-mirror:latest ``` @@ -254,7 +250,7 @@ Key configuration options include: - Scheduling options for automatic mirroring > [!IMPORTANT] -> **Redis is a required component for Gitea Mirror** as it's used for job queuing and caching. +> **SQLite is the only database required for Gitea Mirror**, handling both data storage and real-time event notifications. ## 🚀 Development @@ -360,8 +356,7 @@ docker compose -f docker-compose.dev.yml up -d - **Frontend**: Astro, React, Shadcn UI, Tailwind CSS v4 - **Backend**: Bun -- **Database**: SQLite (default) or PostgreSQL -- **Caching/Queue**: Redis +- **Database**: SQLite (handles both data storage and event notifications) - **API Integration**: GitHub API (Octokit), Gitea API ## Contributing @@ -439,48 +434,34 @@ Try the following steps: > external: true > ``` -### Redis Connection Issues - -> [!CAUTION] -> If the application fails to connect to Redis with errors like `ECONNREFUSED 127.0.0.1:6379`, ensure: -> -> 1. The Redis container is running: -> ```bash -> docker ps | grep redis -> ``` -> 2. The `REDIS_URL` environment variable is correctly set to `redis://redis:6379` in your Docker Compose file. -> 3. Both the application and Redis containers are on the same Docker network. -> 4. If running without Docker Compose, ensure you've started a Redis container and linked it properly: -> ```bash -> # Start Redis container -> docker run -d --name gitea-mirror-redis redis:alpine -> # Run application with link to Redis -> docker run -d -p 4321:4321 --link gitea-mirror-redis:redis \ -> -e REDIS_URL=redis://redis:6379 \ -> ghcr.io/arunavo4/gitea-mirror:latest -> ``` - - -#### Improving Redis Connection Resilience +### Database Persistence > [!TIP] -> For better Redis connection handling, you can modify the `src/lib/redis.ts` file to include retry logic and better error handling: +> The application uses SQLite for all data storage and event notifications. Make sure the database file is properly mounted when using Docker: +> +> ```bash +> # Run with a volume for persistent data storage +> docker run -d -p 4321:4321 \ +> -v gitea-mirror-data:/app/data \ +> ghcr.io/arunavo4/gitea-mirror:latest +> ``` -```typescript -import { RedisClient } from "bun"; -// Connect to Redis using REDIS_URL environment variable or default to redis://redis:6379 -const redisUrl = process.env.REDIS_URL ?? "redis://redis:6379"; +#### Database Maintenance -console.log(`Connecting to Redis at: ${redisUrl}`); - -const redis = new RedisClient(redisUrl, { autoReconnect: true }); - -redis.onconnect = () => console.log("Redis client connected"); -redis.onclose = err => { - if (err) console.error("Redis client error:", err); -}; -``` +> [!TIP] +> For database maintenance, you can use the provided scripts: +> +> ```bash +> # Check database integrity +> bun run check-db +> +> # Fix database issues +> bun run fix-db +> +> # Reset user accounts (for development) +> bun run reset-users +> ``` > [!NOTE] diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 16a097c..2dfa3fa 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -51,7 +51,6 @@ services: - gitea-mirror-data:/app/data depends_on: - gitea - - redis environment: - NODE_ENV=development - DATABASE_URL=file:data/gitea-mirror.db @@ -75,7 +74,6 @@ services: - GITEA_ORGANIZATION=${GITEA_ORGANIZATION:-github-mirrors} - GITEA_ORG_VISIBILITY=${GITEA_ORG_VISIBILITY:-public} - DELAY=${DELAY:-3600} - - REDIS_URL=redis://redis:6379 healthcheck: test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:4321/"] interval: 30s @@ -85,16 +83,7 @@ services: networks: - gitea-network - redis: - image: redis:7-alpine - container_name: redis - restart: unless-stopped - ports: - - "6379:6379" - volumes: - - redis-data:/data - networks: - - gitea-network + # Define named volumes for data persistence volumes: @@ -102,8 +91,6 @@ volumes: gitea-config: # Gitea config volume gitea-mirror-data: # Gitea Mirror database volume - redis-data: - # Define networks networks: gitea-network: diff --git a/docker-compose.yml b/docker-compose.yml index 76064fe..c1b5da7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,8 +19,6 @@ services: - "4321:4321" volumes: - gitea-mirror-data:/app/data - depends_on: - - redis environment: - NODE_ENV=production - DATABASE_URL=file:data/gitea-mirror.db @@ -44,7 +42,6 @@ services: - GITEA_ORGANIZATION=${GITEA_ORGANIZATION:-github-mirrors} - GITEA_ORG_VISIBILITY=${GITEA_ORG_VISIBILITY:-public} - DELAY=${DELAY:-3600} - - REDIS_URL=redis://redis:6379 healthcheck: test: ["CMD", "wget", "--no-verbose", "--tries=3", "--spider", "http://localhost:4321/"] interval: 30s @@ -53,16 +50,6 @@ services: start_period: 15s profiles: ["production"] - redis: - image: redis:7-alpine - container_name: redis - restart: unless-stopped - ports: - - "6379:6379" - volumes: - - redis-data:/data - # Define named volumes for database persistence volumes: gitea-mirror-data: # Database volume - redis-data: diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index c746662..37bb5fd 100644 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -113,6 +113,20 @@ if [ ! -f "/app/data/gitea-mirror.db" ]; then timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ); + + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + channel TEXT NOT NULL, + payload TEXT NOT NULL, + read INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + FOREIGN KEY (user_id) REFERENCES users(id) + ); + + CREATE INDEX IF NOT EXISTS idx_events_user_channel ON events(user_id, channel); + CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at); + CREATE INDEX IF NOT EXISTS idx_events_read ON events(read); EOF echo "Database initialized with required tables." fi diff --git a/package.json b/package.json index ac64334..dc0be06 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,8 @@ "check-db": "bun scripts/manage-db.ts check", "fix-db": "bun scripts/manage-db.ts fix", "reset-users": "bun scripts/manage-db.ts reset-users", + "migrate-db": "bun scripts/migrate-db.ts", + "cleanup-redis": "bun scripts/cleanup-redis.ts", "preview": "bunx --bun astro preview", "start": "bun dist/server/entry.mjs", "start:fresh": "bun run cleanup-db && bun run manage-db init && bun dist/server/entry.mjs", diff --git a/scripts/check-events.ts b/scripts/check-events.ts new file mode 100644 index 0000000..7f53976 --- /dev/null +++ b/scripts/check-events.ts @@ -0,0 +1,38 @@ +#!/usr/bin/env bun +/** + * Script to check events in the database + */ + +import { Database } from "bun:sqlite"; +import path from "path"; +import fs from "fs"; + +// Define the database path +const dataDir = path.join(process.cwd(), "data"); +if (!fs.existsSync(dataDir)) { + console.error("Data directory not found:", dataDir); + process.exit(1); +} + +const dbPath = path.join(dataDir, "gitea-mirror.db"); +if (!fs.existsSync(dbPath)) { + console.error("Database file not found:", dbPath); + process.exit(1); +} + +// Open the database +const db = new Database(dbPath); + +// Check if the events table exists +const tableExists = db.query("SELECT name FROM sqlite_master WHERE type='table' AND name='events'").get(); + +if (!tableExists) { + console.error("Events table does not exist"); + process.exit(1); +} + +// Get all events +const events = db.query("SELECT * FROM events").all(); + +console.log("Events in the database:"); +console.log(JSON.stringify(events, null, 2)); diff --git a/scripts/cleanup-redis.ts b/scripts/cleanup-redis.ts new file mode 100644 index 0000000..dfa0146 --- /dev/null +++ b/scripts/cleanup-redis.ts @@ -0,0 +1,33 @@ +#!/usr/bin/env bun +/** + * Cleanup script to remove Redis-related files and code + * This script should be run when migrating from Redis to SQLite + */ + +import fs from "fs"; +import path from "path"; + +// Files to remove +const filesToRemove = [ + "src/lib/redis.ts" +]; + +// Remove files +console.log("Removing Redis-related files..."); +for (const file of filesToRemove) { + const filePath = path.join(process.cwd(), file); + if (fs.existsSync(filePath)) { + fs.unlinkSync(filePath); + console.log(`Removed: ${file}`); + } else { + console.log(`File not found: ${file}`); + } +} + +console.log("\nRedis cleanup completed successfully"); +console.log("\nReminder: You should also remove Redis from your Docker Compose files and environment variables."); +console.log("The following files have been updated to use SQLite instead of Redis:"); +console.log("- src/lib/helpers.ts"); +console.log("- src/pages/api/sse/index.ts"); +console.log("\nNew files created:"); +console.log("- src/lib/events.ts"); diff --git a/scripts/manage-db.ts b/scripts/manage-db.ts index 33f24e7..d57734e 100644 --- a/scripts/manage-db.ts +++ b/scripts/manage-db.ts @@ -35,6 +35,7 @@ async function ensureTablesExist() { "repositories", "organizations", "mirror_jobs", + "events", ]; for (const table of requiredTables) { @@ -148,6 +149,24 @@ async function ensureTablesExist() { ) `); break; + case "events": + db.exec(` + CREATE TABLE events ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + channel TEXT NOT NULL, + payload TEXT NOT NULL, + read INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + FOREIGN KEY (user_id) REFERENCES users(id) + ) + `); + db.exec(` + CREATE INDEX idx_events_user_channel ON events(user_id, channel); + CREATE INDEX idx_events_created_at ON events(created_at); + CREATE INDEX idx_events_read ON events(read); + `); + break; } console.log(`✅ Table '${table}' created successfully.`); } @@ -362,6 +381,24 @@ async function initializeDatabase() { ) `); + db.exec(` + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + channel TEXT NOT NULL, + payload TEXT NOT NULL, + read INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + FOREIGN KEY (user_id) REFERENCES users(id) + ) + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_events_user_channel ON events(user_id, channel); + CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at); + CREATE INDEX IF NOT EXISTS idx_events_read ON events(read); + `); + // Insert default config if none exists const configCountResult = db.query(`SELECT COUNT(*) as count FROM configs`).get(); const configCount = configCountResult?.count || 0; diff --git a/scripts/migrate-db.ts b/scripts/migrate-db.ts new file mode 100644 index 0000000..03d9d09 --- /dev/null +++ b/scripts/migrate-db.ts @@ -0,0 +1,53 @@ +#!/usr/bin/env bun +/** + * Database migration script to add the events table + * This script should be run when upgrading from a version that used Redis + */ + +import { Database } from "bun:sqlite"; +import fs from "fs"; +import path from "path"; + +// Define the database path +const dataDir = path.join(process.cwd(), "data"); +if (!fs.existsSync(dataDir)) { + fs.mkdirSync(dataDir, { recursive: true }); +} + +const dbPath = path.join(dataDir, "gitea-mirror.db"); +if (!fs.existsSync(dbPath)) { + console.error("Database file not found:", dbPath); + process.exit(1); +} + +// Open the database +const db = new Database(dbPath); + +// Check if the events table already exists +const tableExists = db.query("SELECT name FROM sqlite_master WHERE type='table' AND name='events'").get(); + +if (tableExists) { + console.log("Events table already exists, skipping migration"); + process.exit(0); +} + +// Create the events table +console.log("Creating events table..."); +db.exec(` +CREATE TABLE events ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + channel TEXT NOT NULL, + payload TEXT NOT NULL, + read INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + FOREIGN KEY (user_id) REFERENCES users(id) +); + +-- Create indexes for efficient querying +CREATE INDEX idx_events_user_channel ON events(user_id, channel); +CREATE INDEX idx_events_created_at ON events(created_at); +CREATE INDEX idx_events_read ON events(read); +`); + +console.log("Migration completed successfully"); diff --git a/src/lib/db/index.ts b/src/lib/db/index.ts index d1167ff..041ab2e 100644 --- a/src/lib/db/index.ts +++ b/src/lib/db/index.ts @@ -66,6 +66,18 @@ export const users = sqliteTable("users", { .default(new Date()), }); +// New table for event notifications (replacing Redis pub/sub) +export const events = sqliteTable("events", { + id: text("id").primaryKey(), + userId: text("user_id").notNull().references(() => users.id), + channel: text("channel").notNull(), + payload: text("payload", { mode: "json" }).notNull(), + read: integer("read", { mode: "boolean" }).notNull().default(false), + createdAt: integer("created_at", { mode: "timestamp" }) + .notNull() + .default(new Date()), +}); + const githubSchema = configSchema.shape.githubConfig; const giteaSchema = configSchema.shape.giteaConfig; const scheduleSchema = configSchema.shape.scheduleConfig; diff --git a/src/lib/db/schema.ts b/src/lib/db/schema.ts index dec34aa..db4c40d 100644 --- a/src/lib/db/schema.ts +++ b/src/lib/db/schema.ts @@ -140,3 +140,15 @@ export const organizationSchema = z.object({ }); export type Organization = z.infer; + +// Event schema (for SQLite-based pub/sub) +export const eventSchema = z.object({ + id: z.string().uuid().optional(), + userId: z.string().uuid(), + channel: z.string().min(1), + payload: z.any(), + read: z.boolean().default(false), + createdAt: z.date().default(() => new Date()), +}); + +export type Event = z.infer; diff --git a/src/lib/events.ts b/src/lib/events.ts new file mode 100644 index 0000000..901f055 --- /dev/null +++ b/src/lib/events.ts @@ -0,0 +1,130 @@ +import { v4 as uuidv4 } from "uuid"; +import { db, events } from "./db"; +import { eq, and, gt } from "drizzle-orm"; + +/** + * Publishes an event to a specific channel for a user + * This replaces Redis pub/sub with SQLite storage + */ +export async function publishEvent({ + userId, + channel, + payload, +}: { + userId: string; + channel: string; + payload: any; +}): Promise { + try { + const eventId = uuidv4(); + console.log(`Publishing event to channel ${channel} for user ${userId}`); + + // Insert the event into the SQLite database + await db.insert(events).values({ + id: eventId, + userId, + channel, + payload: JSON.stringify(payload), + createdAt: new Date(), + }); + + console.log(`Event published successfully with ID ${eventId}`); + return eventId; + } catch (error) { + console.error("Error publishing event:", error); + throw new Error("Failed to publish event"); + } +} + +/** + * Gets new events for a specific user and channel + * This replaces Redis subscribe with SQLite polling + */ +export async function getNewEvents({ + userId, + channel, + lastEventTime, +}: { + userId: string; + channel: string; + lastEventTime?: Date; +}): Promise { + try { + console.log(`Getting new events for user ${userId} in channel ${channel}`); + if (lastEventTime) { + console.log(`Looking for events after ${lastEventTime.toISOString()}`); + } + + // Build the query + let query = db + .select() + .from(events) + .where( + and( + eq(events.userId, userId), + eq(events.channel, channel), + eq(events.read, false) + ) + ) + .orderBy(events.createdAt); + + // Add time filter if provided + if (lastEventTime) { + query = query.where(gt(events.createdAt, lastEventTime)); + } + + // Execute the query + const newEvents = await query; + console.log(`Found ${newEvents.length} new events`); + + // Mark events as read + if (newEvents.length > 0) { + console.log(`Marking ${newEvents.length} events as read`); + await db + .update(events) + .set({ read: true }) + .where( + and( + eq(events.userId, userId), + eq(events.channel, channel), + eq(events.read, false) + ) + ); + } + + // Parse the payloads + return newEvents.map(event => ({ + ...event, + payload: JSON.parse(event.payload as string), + })); + } catch (error) { + console.error("Error getting new events:", error); + return []; + } +} + +/** + * Cleans up old events to prevent the database from growing too large + * Should be called periodically (e.g., daily via a cron job) + */ +export async function cleanupOldEvents(maxAgeInDays: number = 7): Promise { + try { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - maxAgeInDays); + + // Delete events older than the cutoff date + const result = await db + .delete(events) + .where( + and( + eq(events.read, true), + gt(cutoffDate, events.createdAt) + ) + ); + + return result.changes || 0; + } catch (error) { + console.error("Error cleaning up old events:", error); + return 0; + } +} diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index 69469ff..779e6f8 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -1,7 +1,7 @@ import type { RepoStatus } from "@/types/Repository"; import { db, mirrorJobs } from "./db"; import { v4 as uuidv4 } from "uuid"; -import { redisPublisher } from "./redis"; +import { publishEvent } from "./events"; export async function createMirrorJob({ userId, @@ -40,10 +40,16 @@ export async function createMirrorJob({ }; try { + // Insert the job into the database await db.insert(mirrorJobs).values(job); + // Publish the event using SQLite instead of Redis const channel = `mirror-status:${userId}`; - await redisPublisher.publish(channel, JSON.stringify(job)); + await publishEvent({ + userId, + channel, + payload: job + }); return jobId; } catch (error) { diff --git a/src/lib/redis.ts b/src/lib/redis.ts deleted file mode 100644 index 14fa4de..0000000 --- a/src/lib/redis.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { RedisClient } from "bun"; - -// Connect to Redis using REDIS_URL environment variable or default to redis://redis:6379 -// This ensures we have a fallback URL when running with Docker Compose -const redisUrl = process.env.REDIS_URL ?? "redis://localhost:6379"; - -console.log(`Connecting to Redis at: ${redisUrl}`); - -// Configure Redis client with connection options and retry logic -function createClient() { - const client = new RedisClient(redisUrl, { - autoReconnect: true, - connectTimeout: 30000, // Increase timeout to 30 seconds - retryStrategy: (attempt: number) => { - // Exponential backoff with jitter - const delay = Math.min(Math.pow(2, attempt) * 100, 10000); - console.log(`Redis connection attempt ${attempt}, retrying in ${delay}ms`); - return delay; - }, - }); - - // Set up event handlers - client.onconnect = () => console.log("Redis client connected successfully"); - client.onclose = (err: Error | null) => { - if (err) { - console.error("Redis connection error:", err); - console.log("Redis will attempt to reconnect automatically"); - } else { - console.log("Redis connection closed"); - } - }; - - return client; -} - -// Create Redis clients with improved error handling -export const redis = createClient(); -export const redisPublisher = createClient(); -export const redisSubscriber = createClient(); diff --git a/src/pages/api/sse/index.ts b/src/pages/api/sse/index.ts index dcc0180..bd0e801 100644 --- a/src/pages/api/sse/index.ts +++ b/src/pages/api/sse/index.ts @@ -1,5 +1,5 @@ import type { APIRoute } from "astro"; -import { redisSubscriber } from "@/lib/redis"; +import { getNewEvents } from "@/lib/events"; export const GET: APIRoute = async ({ request }) => { const url = new URL(request.url); @@ -11,13 +11,13 @@ export const GET: APIRoute = async ({ request }) => { const channel = `mirror-status:${userId}`; let isClosed = false; - let connectionAttempts = 0; - const MAX_ATTEMPTS = 5; - const RETRY_DELAY = 1000; // 1 second + const POLL_INTERVAL = 2000; // Poll every 2 seconds const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); + let lastEventTime: Date | undefined = undefined; + let pollIntervalId: ReturnType | null = null; // Function to send a message to the client const sendMessage = (message: string) => { @@ -29,98 +29,63 @@ export const GET: APIRoute = async ({ request }) => { } }; - // Function to handle Redis connection and subscription - const connectToRedis = () => { + // Function to poll for new events + const pollForEvents = async () => { if (isClosed) return; try { - // Set up message handler for Bun's Redis client - redisSubscriber.onmessage = (message, channelName) => { - if (isClosed || channelName !== channel) return; - sendMessage(`data: ${message}\n\n`); - }; + console.log(`Polling for events for user ${userId} in channel ${channel}`); - // Send initial connection message - sendMessage(": connecting to Redis...\n\n"); + // Get new events from SQLite + const events = await getNewEvents({ + userId, + channel, + lastEventTime, + }); - // Use a try-catch block specifically for the subscribe operation - let subscribed = false; - try { - // Bun's Redis client expects a string for the channel - // We need to wrap this in a try-catch because it can throw if Redis is down - subscribed = redisSubscriber.subscribe(channel); + console.log(`Found ${events.length} new events`); - if (subscribed) { - // If we get here, subscription was successful - sendMessage(": connected\n\n"); + // Send events to client + if (events.length > 0) { + // Update last event time + lastEventTime = events[events.length - 1].createdAt; - // Reset connection attempts on successful connection - connectionAttempts = 0; - - // Send a heartbeat every 30 seconds to keep the connection alive - const heartbeatInterval = setInterval(() => { - if (!isClosed) { - sendMessage(": heartbeat\n\n"); - } else { - clearInterval(heartbeatInterval); - } - }, 30000); - } else { - throw new Error("Failed to subscribe to Redis channel"); - } - - } catch (subscribeErr) { - // Handle subscription error - console.error("Redis subscribe error:", subscribeErr); - - // Retry connection if we haven't exceeded max attempts - if (connectionAttempts < MAX_ATTEMPTS) { - connectionAttempts++; - const nextRetryDelay = RETRY_DELAY * Math.pow(2, connectionAttempts - 1); - console.log(`Retrying Redis connection (attempt ${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...`); - - // Send retry message to client - sendMessage(`: retrying connection (${connectionAttempts}/${MAX_ATTEMPTS}) in ${nextRetryDelay}ms...\n\n`); - - // Wait before retrying - setTimeout(connectToRedis, nextRetryDelay); - } else { - // Max retries exceeded, send error but keep the connection open - console.error("Max Redis connection attempts exceeded"); - sendMessage(`data: {"error": "Redis connection failed after ${MAX_ATTEMPTS} attempts"}\n\n`); - - // Set up a longer retry after max attempts - setTimeout(() => { - connectionAttempts = 0; // Reset counter for a fresh start - sendMessage(": attempting to reconnect after cooling period...\n\n"); - connectToRedis(); - }, 30000); // Try again after 30 seconds + // Send each event to the client + for (const event of events) { + console.log(`Sending event: ${JSON.stringify(event.payload)}`); + sendMessage(`data: ${JSON.stringify(event.payload)}\n\n`); } } } catch (err) { - // This catches any other errors outside the subscribe operation - console.error("Redis connection error:", err); - sendMessage(`data: {"error": "Redis connection error"}\n\n`); - - // Still attempt to retry - if (connectionAttempts < MAX_ATTEMPTS) { - connectionAttempts++; - setTimeout(connectToRedis, RETRY_DELAY * Math.pow(2, connectionAttempts - 1)); - } + console.error("Error polling for events:", err); + sendMessage(`data: {"error": "Error polling for events"}\n\n`); } }; - // Start the initial connection - connectToRedis(); + // Send initial connection message + sendMessage(": connected\n\n"); + + // Start polling for events + pollForEvents(); + + // Set up polling interval + pollIntervalId = setInterval(pollForEvents, POLL_INTERVAL); + + // Send a heartbeat every 30 seconds to keep the connection alive + const heartbeatInterval = setInterval(() => { + if (!isClosed) { + sendMessage(": heartbeat\n\n"); + } else { + clearInterval(heartbeatInterval); + } + }, 30000); // Handle client disconnection request.signal?.addEventListener("abort", () => { if (!isClosed) { isClosed = true; - try { - redisSubscriber.unsubscribe(channel); - } catch (err) { - console.error("Error unsubscribing from Redis:", err); + if (pollIntervalId) { + clearInterval(pollIntervalId); } controller.close(); } @@ -128,14 +93,7 @@ export const GET: APIRoute = async ({ request }) => { }, cancel() { // Extra safety in case cancel is triggered - if (!isClosed) { - isClosed = true; - try { - redisSubscriber.unsubscribe(channel); - } catch (err) { - console.error("Error unsubscribing from Redis:", err); - } - } + isClosed = true; }, }); diff --git a/src/pages/api/test-event.ts b/src/pages/api/test-event.ts new file mode 100644 index 0000000..90f2c7d --- /dev/null +++ b/src/pages/api/test-event.ts @@ -0,0 +1,56 @@ +import type { APIRoute } from "astro"; +import { publishEvent } from "@/lib/events"; +import { v4 as uuidv4 } from "uuid"; + +export const POST: APIRoute = async ({ request }) => { + try { + const body = await request.json(); + const { userId, message, status } = body; + + if (!userId || !message || !status) { + return new Response( + JSON.stringify({ + error: "Missing required fields: userId, message, status", + }), + { status: 400 } + ); + } + + // Create a test event + const eventData = { + id: uuidv4(), + userId, + repositoryId: uuidv4(), + repositoryName: "test-repo", + message, + status, + timestamp: new Date(), + }; + + // Publish the event + const channel = `mirror-status:${userId}`; + await publishEvent({ + userId, + channel, + payload: eventData, + }); + + return new Response( + JSON.stringify({ + success: true, + message: "Event published successfully", + event: eventData, + }), + { status: 200 } + ); + } catch (error) { + console.error("Error publishing test event:", error); + return new Response( + JSON.stringify({ + error: "Failed to publish event", + details: error instanceof Error ? error.message : String(error), + }), + { status: 500 } + ); + } +};