diff --git a/scripts/README.md b/scripts/README.md index 7aa3a7d..81a8df9 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -60,40 +60,36 @@ The database file should be located in the `./data/gitea-mirror.db` directory. I The following scripts help manage events in the SQLite database: -### Event Inspection (check-events.ts) +> **Note**: For a more user-friendly approach, you can use the cleanup button in the Activity Log page of the web interface to delete all activities with a single click. -Displays all events currently stored in the database. -```bash -bun scripts/check-events.ts -``` ### Event Cleanup (cleanup-events.ts) -Removes old events from the database to prevent it from growing too large. +Removes old events and duplicate events from the database to prevent it from growing too large. ```bash -# Remove events older than 7 days (default) +# Remove events older than 7 days (default) and duplicates bun scripts/cleanup-events.ts -# Remove events older than X days +# Remove events older than X days and duplicates bun scripts/cleanup-events.ts 14 ``` This script can be scheduled to run periodically (e.g., daily) using cron or another scheduler. When using Docker, this is automatically scheduled to run daily. -### Mark Events as Read (mark-events-read.ts) +### Remove Duplicate Events (remove-duplicate-events.ts) -Marks all unread events as read. +Specifically removes duplicate events based on deduplication keys without affecting old events. ```bash -bun scripts/mark-events-read.ts +# Remove duplicate events for all users +bun scripts/remove-duplicate-events.ts + +# Remove duplicate events for a specific user +bun scripts/remove-duplicate-events.ts ``` -### Make Events Appear Older (make-events-old.ts) - -For testing purposes, this script modifies event timestamps to make them appear older. - ### Mirror Jobs Cleanup (cleanup-mirror-jobs.ts) Removes old mirror jobs from the database to prevent it from growing too large. @@ -108,10 +104,20 @@ bun scripts/cleanup-mirror-jobs.ts 14 This script can be scheduled to run periodically (e.g., daily) using cron or another scheduler. When using Docker, this is automatically scheduled to run daily. +### Fix Interrupted Jobs (fix-interrupted-jobs.ts) + +Fixes interrupted jobs that might be preventing cleanup by marking them as failed. + ```bash -bun scripts/make-events-old.ts +# Fix all interrupted jobs +bun scripts/fix-interrupted-jobs.ts + +# Fix interrupted jobs for a specific user +bun scripts/fix-interrupted-jobs.ts ``` +Use this script if you're having trouble cleaning up activities due to "interrupted" jobs that won't delete. + ## Deployment Scripts ### Docker Deployment diff --git a/scripts/check-events.ts b/scripts/check-events.ts deleted file mode 100644 index 7f53976..0000000 --- a/scripts/check-events.ts +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bun -/** - * Script to check events in the database - */ - -import { Database } from "bun:sqlite"; -import path from "path"; -import fs from "fs"; - -// Define the database path -const dataDir = path.join(process.cwd(), "data"); -if (!fs.existsSync(dataDir)) { - console.error("Data directory not found:", dataDir); - process.exit(1); -} - -const dbPath = path.join(dataDir, "gitea-mirror.db"); -if (!fs.existsSync(dbPath)) { - console.error("Database file not found:", dbPath); - process.exit(1); -} - -// Open the database -const db = new Database(dbPath); - -// Check if the events table exists -const tableExists = db.query("SELECT name FROM sqlite_master WHERE type='table' AND name='events'").get(); - -if (!tableExists) { - console.error("Events table does not exist"); - process.exit(1); -} - -// Get all events -const events = db.query("SELECT * FROM events").all(); - -console.log("Events in the database:"); -console.log(JSON.stringify(events, null, 2)); diff --git a/scripts/cleanup-events.ts b/scripts/cleanup-events.ts index 7fafa4a..331bb9a 100644 --- a/scripts/cleanup-events.ts +++ b/scripts/cleanup-events.ts @@ -9,7 +9,7 @@ * Where [days] is the number of days to keep events (default: 7) */ -import { cleanupOldEvents } from "../src/lib/events"; +import { cleanupOldEvents, removeDuplicateEvents } from "../src/lib/events"; // Parse command line arguments const args = process.argv.slice(2); @@ -24,13 +24,20 @@ async function runCleanup() { try { console.log(`Starting event cleanup (retention: ${daysToKeep} days)...`); - // Call the cleanupOldEvents function from the events module + // First, remove duplicate events + console.log("Step 1: Removing duplicate events..."); + const duplicateResult = await removeDuplicateEvents(); + console.log(`- Duplicate events removed: ${duplicateResult.duplicatesRemoved}`); + + // Then, clean up old events + console.log("Step 2: Cleaning up old events..."); const result = await cleanupOldEvents(daysToKeep); console.log(`Cleanup summary:`); + console.log(`- Duplicate events removed: ${duplicateResult.duplicatesRemoved}`); console.log(`- Read events deleted: ${result.readEventsDeleted}`); console.log(`- Unread events deleted: ${result.unreadEventsDeleted}`); - console.log(`- Total events deleted: ${result.readEventsDeleted + result.unreadEventsDeleted}`); + console.log(`- Total events deleted: ${result.readEventsDeleted + result.unreadEventsDeleted + duplicateResult.duplicatesRemoved}`); console.log("Event cleanup completed successfully"); } catch (error) { diff --git a/scripts/fix-interrupted-jobs.ts b/scripts/fix-interrupted-jobs.ts new file mode 100644 index 0000000..7ab358a --- /dev/null +++ b/scripts/fix-interrupted-jobs.ts @@ -0,0 +1,74 @@ +#!/usr/bin/env bun +/** + * Script to fix interrupted jobs that might be preventing cleanup + * This script marks all in-progress jobs as failed to allow them to be deleted + * + * Usage: + * bun scripts/fix-interrupted-jobs.ts [userId] + * + * Where [userId] is optional - if provided, only fixes jobs for that user + */ + +import { db, mirrorJobs } from "../src/lib/db"; +import { eq } from "drizzle-orm"; + +// Parse command line arguments +const args = process.argv.slice(2); +const userId = args.length > 0 ? args[0] : undefined; + +async function fixInterruptedJobs() { + try { + console.log("Checking for interrupted jobs..."); + + // Build the query + let query = db + .select() + .from(mirrorJobs) + .where(eq(mirrorJobs.inProgress, true)); + + if (userId) { + console.log(`Filtering for user: ${userId}`); + query = query.where(eq(mirrorJobs.userId, userId)); + } + + // Find all in-progress jobs + const inProgressJobs = await query; + + if (inProgressJobs.length === 0) { + console.log("No interrupted jobs found."); + return; + } + + console.log(`Found ${inProgressJobs.length} interrupted jobs:`); + inProgressJobs.forEach(job => { + console.log(`- Job ${job.id}: ${job.message} (${job.repositoryName || job.organizationName || 'Unknown'})`); + }); + + // Mark all in-progress jobs as failed + let updateQuery = db + .update(mirrorJobs) + .set({ + inProgress: false, + completedAt: new Date(), + status: "failed", + message: "Job interrupted and marked as failed by cleanup script" + }) + .where(eq(mirrorJobs.inProgress, true)); + + if (userId) { + updateQuery = updateQuery.where(eq(mirrorJobs.userId, userId)); + } + + await updateQuery; + + console.log(`✅ Successfully marked ${inProgressJobs.length} interrupted jobs as failed.`); + console.log("These jobs can now be deleted through the normal cleanup process."); + + } catch (error) { + console.error("Error fixing interrupted jobs:", error); + process.exit(1); + } +} + +// Run the fix +fixInterruptedJobs(); diff --git a/scripts/make-events-old.ts b/scripts/make-events-old.ts deleted file mode 100644 index 1b3ff9a..0000000 --- a/scripts/make-events-old.ts +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bun -/** - * Script to make events appear older for testing cleanup - */ - -import { db, events } from "../src/lib/db"; - -async function makeEventsOld() { - try { - console.log("Making events appear older..."); - - // Calculate a timestamp from 2 days ago - const oldDate = new Date(); - oldDate.setDate(oldDate.getDate() - 2); - - // Update all events to have an older timestamp - const result = await db - .update(events) - .set({ createdAt: oldDate }); - - console.log(`Updated ${result.changes || 0} events to appear older`); - } catch (error) { - console.error("Error updating event timestamps:", error); - process.exit(1); - } -} - -// Run the function -makeEventsOld(); diff --git a/scripts/mark-events-read.ts b/scripts/mark-events-read.ts deleted file mode 100644 index aff5454..0000000 --- a/scripts/mark-events-read.ts +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env bun -/** - * Script to mark all events as read - */ - -import { db, events } from "../src/lib/db"; -import { eq } from "drizzle-orm"; - -async function markEventsAsRead() { - try { - console.log("Marking all events as read..."); - - // Update all events to mark them as read - const result = await db - .update(events) - .set({ read: true }) - .where(eq(events.read, false)); - - console.log(`Marked ${result.changes || 0} events as read`); - } catch (error) { - console.error("Error marking events as read:", error); - process.exit(1); - } -} - -// Run the function -markEventsAsRead(); diff --git a/scripts/remove-duplicate-events.ts b/scripts/remove-duplicate-events.ts new file mode 100644 index 0000000..d0333d0 --- /dev/null +++ b/scripts/remove-duplicate-events.ts @@ -0,0 +1,44 @@ +#!/usr/bin/env bun +/** + * Script to remove duplicate events from the database + * This script identifies and removes events with duplicate deduplication keys + * + * Usage: + * bun scripts/remove-duplicate-events.ts [userId] + * + * Where [userId] is optional - if provided, only removes duplicates for that user + */ + +import { removeDuplicateEvents } from "../src/lib/events"; + +// Parse command line arguments +const args = process.argv.slice(2); +const userId = args.length > 0 ? args[0] : undefined; + +async function runDuplicateRemoval() { + try { + if (userId) { + console.log(`Starting duplicate event removal for user: ${userId}...`); + } else { + console.log("Starting duplicate event removal for all users..."); + } + + // Call the removeDuplicateEvents function + const result = await removeDuplicateEvents(userId); + + console.log(`Duplicate removal summary:`); + console.log(`- Duplicate events removed: ${result.duplicatesRemoved}`); + + if (result.duplicatesRemoved > 0) { + console.log("Duplicate event removal completed successfully"); + } else { + console.log("No duplicate events found"); + } + } catch (error) { + console.error("Error running duplicate event removal:", error); + process.exit(1); + } +} + +// Run the duplicate removal +runDuplicateRemoval(); diff --git a/scripts/update-mirror-jobs-table.ts b/scripts/update-mirror-jobs-table.ts deleted file mode 100644 index 6563382..0000000 --- a/scripts/update-mirror-jobs-table.ts +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env bun -/** - * Script to update the mirror_jobs table with new columns for resilience - */ - -import { Database } from "bun:sqlite"; -import fs from "fs"; -import path from "path"; - -// Define the database paths -const dataDir = path.join(process.cwd(), "data"); -const dbPath = path.join(dataDir, "gitea-mirror.db"); - -// Ensure data directory exists -if (!fs.existsSync(dataDir)) { - fs.mkdirSync(dataDir, { recursive: true }); - console.log(`Created data directory at ${dataDir}`); -} - -// Check if database exists -if (!fs.existsSync(dbPath)) { - console.error(`Database file not found at ${dbPath}`); - console.error("Please run 'bun run init-db' first to create the database."); - process.exit(1); -} - -// Connect to the database -const db = new Database(dbPath); - -// Enable foreign keys -db.exec("PRAGMA foreign_keys = ON;"); - -// Function to check if a column exists in a table -function columnExists(tableName: string, columnName: string): boolean { - const result = db.query( - `PRAGMA table_info(${tableName})` - ).all() as { name: string }[]; - - return result.some(column => column.name === columnName); -} - -// Main function to update the mirror_jobs table -async function updateMirrorJobsTable() { - console.log("Checking mirror_jobs table for missing columns..."); - - // Start a transaction - db.exec("BEGIN TRANSACTION;"); - - try { - // Check and add each new column if it doesn't exist - const columnsToAdd = [ - { name: "job_type", definition: "TEXT NOT NULL DEFAULT 'mirror'" }, - { name: "batch_id", definition: "TEXT" }, - { name: "total_items", definition: "INTEGER" }, - { name: "completed_items", definition: "INTEGER DEFAULT 0" }, - { name: "item_ids", definition: "TEXT" }, // JSON array as text - { name: "completed_item_ids", definition: "TEXT DEFAULT '[]'" }, // JSON array as text - { name: "in_progress", definition: "INTEGER NOT NULL DEFAULT 0" }, // Boolean as integer - { name: "started_at", definition: "TIMESTAMP" }, - { name: "completed_at", definition: "TIMESTAMP" }, - { name: "last_checkpoint", definition: "TIMESTAMP" } - ]; - - let columnsAdded = 0; - - for (const column of columnsToAdd) { - if (!columnExists("mirror_jobs", column.name)) { - console.log(`Adding column '${column.name}' to mirror_jobs table...`); - db.exec(`ALTER TABLE mirror_jobs ADD COLUMN ${column.name} ${column.definition};`); - columnsAdded++; - } - } - - // Commit the transaction - db.exec("COMMIT;"); - - if (columnsAdded > 0) { - console.log(`✅ Added ${columnsAdded} new columns to mirror_jobs table.`); - } else { - console.log("✅ All required columns already exist in mirror_jobs table."); - } - - // Create indexes for better performance - console.log("Creating indexes for mirror_jobs table..."); - - // Only create indexes if they don't exist - const indexesResult = db.query( - `SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='mirror_jobs'` - ).all() as { name: string }[]; - - const existingIndexes = indexesResult.map(idx => idx.name); - - const indexesToCreate = [ - { name: "idx_mirror_jobs_user_id", columns: "user_id" }, - { name: "idx_mirror_jobs_batch_id", columns: "batch_id" }, - { name: "idx_mirror_jobs_in_progress", columns: "in_progress" }, - { name: "idx_mirror_jobs_job_type", columns: "job_type" }, - { name: "idx_mirror_jobs_timestamp", columns: "timestamp" } - ]; - - let indexesCreated = 0; - - for (const index of indexesToCreate) { - if (!existingIndexes.includes(index.name)) { - console.log(`Creating index '${index.name}'...`); - db.exec(`CREATE INDEX ${index.name} ON mirror_jobs(${index.columns});`); - indexesCreated++; - } - } - - if (indexesCreated > 0) { - console.log(`✅ Created ${indexesCreated} new indexes for mirror_jobs table.`); - } else { - console.log("✅ All required indexes already exist for mirror_jobs table."); - } - - console.log("Mirror jobs table update completed successfully."); - } catch (error) { - // Rollback the transaction in case of error - db.exec("ROLLBACK;"); - console.error("❌ Error updating mirror_jobs table:", error); - process.exit(1); - } finally { - // Close the database connection - db.close(); - } -} - -// Run the update function -updateMirrorJobsTable().catch(error => { - console.error("Unhandled error:", error); - process.exit(1); -}); diff --git a/src/components/activity/ActivityLog.tsx b/src/components/activity/ActivityLog.tsx index 09be2fc..fe7abef 100644 --- a/src/components/activity/ActivityLog.tsx +++ b/src/components/activity/ActivityLog.tsx @@ -1,12 +1,21 @@ -import { useCallback, useEffect, useState } from 'react'; +import { useCallback, useEffect, useState, useRef } from 'react'; import { Button } from '@/components/ui/button'; -import { ChevronDown, Download, RefreshCw, Search } from 'lucide-react'; +import { ChevronDown, Download, RefreshCw, Search, Trash2 } from 'lucide-react'; import { DropdownMenu, DropdownMenuContent, DropdownMenuItem, DropdownMenuTrigger, } from '../ui/dropdown-menu'; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from '../ui/dialog'; import { apiRequest, formatDate } from '@/lib/utils'; import { useAuth } from '@/hooks/useAuth'; import type { MirrorJob } from '@/lib/db/schema'; @@ -30,12 +39,30 @@ import { useNavigation } from '@/components/layout/MainLayout'; type MirrorJobWithKey = MirrorJob & { _rowKey: string }; -function genKey(job: MirrorJob): string { - return `${ - job.id ?? (typeof crypto !== 'undefined' - ? crypto.randomUUID() - : Math.random().toString(36).slice(2)) - }-${job.timestamp}`; +// Maximum number of activities to keep in memory to prevent performance issues +const MAX_ACTIVITIES = 1000; + +// More robust key generation to prevent collisions +function genKey(job: MirrorJob, index?: number): string { + const baseId = job.id || `temp-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const timestamp = job.timestamp instanceof Date ? job.timestamp.getTime() : new Date(job.timestamp).getTime(); + const indexSuffix = index !== undefined ? `-${index}` : ''; + return `${baseId}-${timestamp}${indexSuffix}`; +} + +// Create a deep clone without structuredClone for better browser compatibility +function deepClone(obj: T): T { + if (obj === null || typeof obj !== 'object') return obj; + if (obj instanceof Date) return new Date(obj.getTime()) as T; + if (Array.isArray(obj)) return obj.map(item => deepClone(item)) as T; + + const cloned = {} as T; + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + cloned[key] = deepClone(obj[key]); + } + } + return cloned; } export function ActivityLog() { @@ -46,6 +73,16 @@ export function ActivityLog() { const [activities, setActivities] = useState([]); const [isLoading, setIsLoading] = useState(false); + const [showCleanupDialog, setShowCleanupDialog] = useState(false); + + // Ref to track if component is mounted to prevent state updates after unmount + const isMountedRef = useRef(true); + + useEffect(() => { + return () => { + isMountedRef.current = false; + }; + }, []); const { filter, setFilter } = useFilterParams({ searchTerm: '', @@ -57,12 +94,41 @@ export function ActivityLog() { /* ----------------------------- SSE hook ----------------------------- */ const handleNewMessage = useCallback((data: MirrorJob) => { - const withKey: MirrorJobWithKey = { - ...structuredClone(data), - _rowKey: genKey(data), - }; + if (!isMountedRef.current) return; - setActivities((prev) => [withKey, ...prev]); + setActivities((prev) => { + // Create a deep clone of the new activity + const clonedData = deepClone(data); + + // Check if this activity already exists to prevent duplicates + const existingIndex = prev.findIndex(activity => + activity.id === clonedData.id || + (activity.repositoryId === clonedData.repositoryId && + activity.organizationId === clonedData.organizationId && + activity.message === clonedData.message && + Math.abs(new Date(activity.timestamp).getTime() - new Date(clonedData.timestamp).getTime()) < 1000) + ); + + if (existingIndex !== -1) { + // Update existing activity instead of adding duplicate + const updated = [...prev]; + updated[existingIndex] = { + ...clonedData, + _rowKey: prev[existingIndex]._rowKey, // Keep the same key + }; + return updated; + } + + // Add new activity with unique key + const withKey: MirrorJobWithKey = { + ...clonedData, + _rowKey: genKey(clonedData, prev.length), + }; + + // Limit the number of activities to prevent memory issues + const newActivities = [withKey, ...prev]; + return newActivities.slice(0, MAX_ACTIVITIES); + }); }, []); const { connected } = useSSE({ @@ -88,20 +154,37 @@ export function ActivityLog() { return false; } - const data: MirrorJobWithKey[] = res.activities.map((a) => ({ - ...structuredClone(a), - _rowKey: genKey(a), - })); + // Process activities with robust cloning and unique keys + const data: MirrorJobWithKey[] = res.activities.map((activity, index) => { + const clonedActivity = deepClone(activity); + return { + ...clonedActivity, + _rowKey: genKey(clonedActivity, index), + }; + }); - setActivities(data); + // Sort by timestamp (newest first) to ensure consistent ordering + data.sort((a, b) => { + const timeA = new Date(a.timestamp).getTime(); + const timeB = new Date(b.timestamp).getTime(); + return timeB - timeA; + }); + + if (isMountedRef.current) { + setActivities(data); + } return true; } catch (err) { - toast.error( - err instanceof Error ? err.message : 'Failed to fetch activities.', - ); + if (isMountedRef.current) { + toast.error( + err instanceof Error ? err.message : 'Failed to fetch activities.', + ); + } return false; } finally { - setIsLoading(false); + if (isMountedRef.current) { + setIsLoading(false); + } } }, [user?.id]); // Only depend on user.id, not entire user object @@ -210,6 +293,50 @@ export function ActivityLog() { link.click(); }; + const handleCleanupClick = () => { + setShowCleanupDialog(true); + }; + + const confirmCleanup = async () => { + if (!user?.id) return; + + try { + setIsLoading(true); + setShowCleanupDialog(false); + + // Use fetch directly to avoid potential axios issues + const response = await fetch('/api/activities/cleanup', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ userId: user.id }), + }); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({ error: 'Unknown error occurred' })); + throw new Error(errorData.error || `HTTP ${response.status}: ${response.statusText}`); + } + + const res = await response.json(); + + if (res.success) { + // Clear the activities from the UI + setActivities([]); + toast.success(`All activities cleaned up successfully. Deleted ${res.result.mirrorJobsDeleted} mirror jobs and ${res.result.eventsDeleted} events.`); + } else { + toast.error(res.error || 'Failed to cleanup activities.'); + } + } catch (error) { + console.error('Error cleaning up activities:', error); + toast.error(error instanceof Error ? error.message : 'Failed to cleanup activities.'); + } finally { + setIsLoading(false); + } + }; + + const cancelCleanup = () => { + setShowCleanupDialog(false); + }; + /* ------------------------------ UI ------------------------------ */ return ( @@ -308,6 +435,17 @@ export function ActivityLog() { > + + {/* cleanup all activities */} + {/* activity list */} @@ -317,6 +455,30 @@ export function ActivityLog() { filter={filter} setFilter={setFilter} /> + + {/* cleanup confirmation dialog */} + + + + Delete All Activities + + Are you sure you want to delete ALL activities? This action cannot be undone and will remove all mirror jobs and events from the database. + + + + + + + + ); } diff --git a/src/lib/events.ts b/src/lib/events.ts index 91591d3..ce660c0 100644 --- a/src/lib/events.ts +++ b/src/lib/events.ts @@ -1,6 +1,6 @@ import { v4 as uuidv4 } from "uuid"; import { db, events } from "./db"; -import { eq, and, gt, lt } from "drizzle-orm"; +import { eq, and, gt, lt, inArray } from "drizzle-orm"; /** * Publishes an event to a specific channel for a user @@ -10,21 +10,58 @@ export async function publishEvent({ userId, channel, payload, + deduplicationKey, }: { userId: string; channel: string; payload: any; + deduplicationKey?: string; // Optional key to prevent duplicate events }): Promise { try { const eventId = uuidv4(); console.log(`Publishing event to channel ${channel} for user ${userId}`); + // Check for duplicate events if deduplication key is provided + if (deduplicationKey) { + const existingEvent = await db + .select() + .from(events) + .where( + and( + eq(events.userId, userId), + eq(events.channel, channel), + eq(events.read, false) + ) + ) + .limit(10); // Check recent unread events + + // Check if any existing event has the same deduplication key in payload + const isDuplicate = existingEvent.some(event => { + try { + const eventPayload = JSON.parse(event.payload as string); + return eventPayload.deduplicationKey === deduplicationKey; + } catch { + return false; + } + }); + + if (isDuplicate) { + console.log(`Skipping duplicate event with key: ${deduplicationKey}`); + return eventId; // Return a valid ID but don't create the event + } + } + + // Add deduplication key to payload if provided + const eventPayload = deduplicationKey + ? { ...payload, deduplicationKey } + : payload; + // Insert the event into the SQLite database await db.insert(events).values({ id: eventId, userId, channel, - payload: JSON.stringify(payload), + payload: JSON.stringify(eventPayload), createdAt: new Date(), }); @@ -103,6 +140,78 @@ export async function getNewEvents({ } } +/** + * Removes duplicate events based on deduplication keys + * This can be called periodically to clean up any duplicates that may have slipped through + */ +export async function removeDuplicateEvents(userId?: string): Promise<{ duplicatesRemoved: number }> { + try { + console.log("Removing duplicate events..."); + + // Build the base query + let query = db.select().from(events); + if (userId) { + query = query.where(eq(events.userId, userId)); + } + + const allEvents = await query; + const duplicateIds: string[] = []; + const seenKeys = new Set(); + + // Group events by user and channel, then check for duplicates + const eventsByUserChannel = new Map(); + + for (const event of allEvents) { + const key = `${event.userId}-${event.channel}`; + if (!eventsByUserChannel.has(key)) { + eventsByUserChannel.set(key, []); + } + eventsByUserChannel.get(key)!.push(event); + } + + // Check each group for duplicates + for (const [, events] of eventsByUserChannel) { + const channelSeenKeys = new Set(); + + // Sort by creation time (keep the earliest) + events.sort((a, b) => new Date(a.createdAt).getTime() - new Date(b.createdAt).getTime()); + + for (const event of events) { + try { + const payload = JSON.parse(event.payload as string); + if (payload.deduplicationKey) { + if (channelSeenKeys.has(payload.deduplicationKey)) { + duplicateIds.push(event.id); + } else { + channelSeenKeys.add(payload.deduplicationKey); + } + } + } catch { + // Skip events with invalid JSON + } + } + } + + // Remove duplicates + if (duplicateIds.length > 0) { + console.log(`Removing ${duplicateIds.length} duplicate events`); + + // Delete in batches to avoid query size limits + const batchSize = 100; + for (let i = 0; i < duplicateIds.length; i += batchSize) { + const batch = duplicateIds.slice(i, i + batchSize); + await db.delete(events).where(inArray(events.id, batch)); + } + } + + console.log(`Removed ${duplicateIds.length} duplicate events`); + return { duplicatesRemoved: duplicateIds.length }; + } catch (error) { + console.error("Error removing duplicate events:", error); + return { duplicatesRemoved: 0 }; + } +} + /** * Cleans up old events to prevent the database from growing too large * Should be called periodically (e.g., daily via a cron job) diff --git a/src/lib/gitea.ts b/src/lib/gitea.ts index 1b7f540..bce2be6 100644 --- a/src/lib/gitea.ts +++ b/src/lib/gitea.ts @@ -295,15 +295,8 @@ export async function getOrCreateGiteaOrg({ if (orgRes.ok) { const org = await orgRes.json(); - - await createMirrorJob({ - userId: config.userId, - organizationId: org.id, - organizationName: orgName, - status: "imported", - message: `Organization ${orgName} fetched successfully`, - details: `Organization ${orgName} was fetched from GitHub`, - }); + // Note: Organization events are handled by the main mirroring process + // to avoid duplicate events return org.id; } @@ -325,13 +318,8 @@ export async function getOrCreateGiteaOrg({ throw new Error(`Failed to create Gitea org: ${await createRes.text()}`); } - await createMirrorJob({ - userId: config.userId, - organizationName: orgName, - status: "imported", - message: `Organization ${orgName} created successfully`, - details: `Organization ${orgName} was created in Gitea`, - }); + // Note: Organization creation events are handled by the main mirroring process + // to avoid duplicate events const newOrg = await createRes.json(); return newOrg.id; @@ -417,15 +405,8 @@ export async function mirrorGitHubRepoToGiteaOrg({ }) .where(eq(repositories.id, repository.id!)); - // Append log for "mirroring" status - await createMirrorJob({ - userId: config.userId, - repositoryId: repository.id, - repositoryName: repository.name, - message: `Started mirroring repository: ${repository.name}`, - details: `Repository ${repository.name} is now in the mirroring state.`, - status: "mirroring", - }); + // Note: "mirroring" status events are handled by the concurrency system + // to avoid duplicate events during batch operations const apiUrl = `${config.giteaConfig.url}/api/v1/repos/migrate`; diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index 6c008b1..c163b32 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -17,6 +17,7 @@ export async function createMirrorJob({ totalItems, itemIds, inProgress, + skipDuplicateEvent, }: { userId: string; organizationId?: string; @@ -31,6 +32,7 @@ export async function createMirrorJob({ totalItems?: number; itemIds?: string[]; inProgress?: boolean; + skipDuplicateEvent?: boolean; // Option to skip event publishing for internal operations }) { const jobId = uuidv4(); const currentTimestamp = new Date(); @@ -64,13 +66,27 @@ export async function createMirrorJob({ // Insert the job into the database await db.insert(mirrorJobs).values(job); - // Publish the event using SQLite instead of Redis - const channel = `mirror-status:${userId}`; - await publishEvent({ - userId, - channel, - payload: job - }); + // Publish the event using SQLite instead of Redis (unless skipped) + if (!skipDuplicateEvent) { + const channel = `mirror-status:${userId}`; + + // Create deduplication key based on the operation + let deduplicationKey: string | undefined; + if (repositoryId && status) { + deduplicationKey = `repo-${repositoryId}-${status}`; + } else if (organizationId && status) { + deduplicationKey = `org-${organizationId}-${status}`; + } else if (batchId) { + deduplicationKey = `batch-${batchId}-${status}`; + } + + await publishEvent({ + userId, + channel, + payload: job, + deduplicationKey + }); + } return jobId; } catch (error) { @@ -156,16 +172,27 @@ export async function updateMirrorJobProgress({ .set(updates) .where(mirrorJobs.id === jobId); - // Publish the event + // Publish the event with deduplication const updatedJob = { ...job, ...updates, }; + // Create deduplication key for progress updates + let deduplicationKey: string | undefined; + if (completedItemId) { + deduplicationKey = `progress-${jobId}-${completedItemId}`; + } else if (isCompleted) { + deduplicationKey = `completed-${jobId}`; + } else { + deduplicationKey = `update-${jobId}-${Date.now()}`; + } + await publishEvent({ userId: job.userId, channel: `mirror-status:${job.userId}`, payload: updatedJob, + deduplicationKey }); return updatedJob; diff --git a/src/lib/utils/concurrency.ts b/src/lib/utils/concurrency.ts index fb962d9..1bc6eac 100644 --- a/src/lib/utils/concurrency.ts +++ b/src/lib/utils/concurrency.ts @@ -181,7 +181,7 @@ export async function processWithResilience( getItemId, getItemName, resumeFromJobId, - checkpointInterval = 5, + checkpointInterval = 10, // Increased from 5 to reduce event frequency ...otherOptions } = options; diff --git a/src/pages/api/activities/cleanup.ts b/src/pages/api/activities/cleanup.ts new file mode 100644 index 0000000..4044ddf --- /dev/null +++ b/src/pages/api/activities/cleanup.ts @@ -0,0 +1,115 @@ +import type { APIRoute } from "astro"; +import { db, mirrorJobs, events } from "@/lib/db"; +import { eq, count } from "drizzle-orm"; + +export const POST: APIRoute = async ({ request }) => { + try { + let body; + try { + body = await request.json(); + } catch (jsonError) { + console.error("Invalid JSON in request body:", jsonError); + return new Response( + JSON.stringify({ error: "Invalid JSON in request body." }), + { status: 400, headers: { "Content-Type": "application/json" } } + ); + } + + const { userId } = body || {}; + + if (!userId) { + return new Response( + JSON.stringify({ error: "Missing 'userId' in request body." }), + { status: 400, headers: { "Content-Type": "application/json" } } + ); + } + + // Start a transaction to ensure all operations succeed or fail together + const result = await db.transaction(async (tx) => { + // Count activities before deletion + const mirrorJobsCountResult = await tx + .select({ count: count() }) + .from(mirrorJobs) + .where(eq(mirrorJobs.userId, userId)); + + const eventsCountResult = await tx + .select({ count: count() }) + .from(events) + .where(eq(events.userId, userId)); + + const totalMirrorJobs = mirrorJobsCountResult[0]?.count || 0; + const totalEvents = eventsCountResult[0]?.count || 0; + + console.log(`Found ${totalMirrorJobs} mirror jobs and ${totalEvents} events to delete for user ${userId}`); + + // First, mark all in-progress jobs as completed/failed to allow deletion + await tx + .update(mirrorJobs) + .set({ + inProgress: false, + completedAt: new Date(), + status: "failed", + message: "Job interrupted and cleaned up by user" + }) + .where(eq(mirrorJobs.userId, userId)); + + console.log(`Updated in-progress jobs to allow deletion`); + + // Delete all mirror jobs for the user (now that none are in progress) + await tx + .delete(mirrorJobs) + .where(eq(mirrorJobs.userId, userId)); + + // Delete all events for the user + await tx + .delete(events) + .where(eq(events.userId, userId)); + + return { + mirrorJobsDeleted: totalMirrorJobs, + eventsDeleted: totalEvents, + totalMirrorJobs, + totalEvents, + }; + }); + + console.log(`Cleaned up activities for user ${userId}:`, result); + + return new Response( + JSON.stringify({ + success: true, + message: "All activities cleaned up successfully.", + result: { + mirrorJobsDeleted: result.mirrorJobsDeleted, + eventsDeleted: result.eventsDeleted, + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } } + ); + } catch (error) { + console.error("Error cleaning up activities:", error); + + // Provide more specific error messages + let errorMessage = "An unknown error occurred."; + if (error instanceof Error) { + errorMessage = error.message; + + // Check for common database errors + if (error.message.includes("FOREIGN KEY constraint failed")) { + errorMessage = "Cannot delete activities due to database constraints. Some jobs may still be referenced by other records."; + } else if (error.message.includes("database is locked")) { + errorMessage = "Database is currently locked. Please try again in a moment."; + } else if (error.message.includes("no such table")) { + errorMessage = "Database tables are missing. Please check your database setup."; + } + } + + return new Response( + JSON.stringify({ + success: false, + error: errorMessage, + }), + { status: 500, headers: { "Content-Type": "application/json" } } + ); + } +}; diff --git a/src/pages/api/job/mirror-repo.ts b/src/pages/api/job/mirror-repo.ts index 91cc9c7..6b38220 100644 --- a/src/pages/api/job/mirror-repo.ts +++ b/src/pages/api/job/mirror-repo.ts @@ -126,7 +126,7 @@ export const POST: APIRoute = async ({ request }) => { concurrencyLimit: CONCURRENCY_LIMIT, maxRetries: 2, retryDelay: 2000, - checkpointInterval: 1, // Checkpoint after each repository + checkpointInterval: 5, // Checkpoint every 5 repositories to reduce event frequency onProgress: (completed, total, result) => { const percentComplete = Math.round((completed / total) * 100); console.log(`Mirroring progress: ${percentComplete}% (${completed}/${total})`);