From abe31137551f2b170c7a28fd9e74c69e99a4cd1f Mon Sep 17 00:00:00 2001 From: Arunavo Ray Date: Thu, 22 May 2025 14:33:03 +0530 Subject: [PATCH] feat: enhance job resilience with new database schema and recovery mechanisms - Added new fields to the mirror_jobs table for job resilience, including job_type, batch_id, total_items, completed_items, item_ids, completed_item_ids, in_progress, started_at, completed_at, and last_checkpoint. - Implemented database migration scripts to update the mirror_jobs table schema. - Introduced processWithResilience utility for handling item processing with checkpointing and recovery capabilities. - Updated API routes for mirroring organizations and repositories to utilize the new resilience features. - Created recovery system to detect and resume interrupted jobs on application startup. - Added middleware to initialize the recovery system when the server starts. --- docker-entrypoint.sh | 34 ++++- package.json | 7 +- scripts/manage-db.ts | 22 +++ scripts/update-mirror-jobs-table.ts | 133 +++++++++++++++++ src/lib/db/index.ts | 12 ++ src/lib/db/schema.ts | 12 ++ src/lib/helpers.ts | 206 ++++++++++++++++++++++++- src/lib/recovery.ts | 224 ++++++++++++++++++++++++++++ src/lib/utils/concurrency.ts | 200 +++++++++++++++++++++++-- src/middleware.ts | 22 +++ src/pages/api/job/mirror-org.ts | 29 ++-- src/pages/api/job/mirror-repo.ts | 29 ++-- src/pages/api/job/sync-repo.ts | 29 ++-- 13 files changed, 893 insertions(+), 66 deletions(-) create mode 100644 scripts/update-mirror-jobs-table.ts create mode 100644 src/lib/recovery.ts create mode 100644 src/middleware.ts diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 37bb5fd..836381e 100644 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -111,9 +111,28 @@ if [ ! -f "/app/data/gitea-mirror.db" ]; then status TEXT NOT NULL DEFAULT 'imported', message TEXT NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + + -- New fields for job resilience + job_type TEXT NOT NULL DEFAULT 'mirror', + batch_id TEXT, + total_items INTEGER, + completed_items INTEGER DEFAULT 0, + item_ids TEXT, -- JSON array as text + completed_item_ids TEXT DEFAULT '[]', -- JSON array as text + in_progress INTEGER NOT NULL DEFAULT 0, -- Boolean as integer + started_at TIMESTAMP, + completed_at TIMESTAMP, + last_checkpoint TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users(id) ); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_user_id ON mirror_jobs(user_id); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_batch_id ON mirror_jobs(batch_id); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_in_progress ON mirror_jobs(in_progress); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_job_type ON mirror_jobs(job_type); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_timestamp ON mirror_jobs(timestamp); + CREATE TABLE IF NOT EXISTS events ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, @@ -138,8 +157,19 @@ else bun dist/scripts/manage-db.js fix fi - # Since the application is not used by anyone yet, we've removed the schema updates and migrations - echo "Database already exists, no migrations needed." + # Run database migrations + echo "Running database migrations..." + + # Update mirror_jobs table with new columns for resilience + if [ -f "dist/scripts/update-mirror-jobs-table.js" ]; then + echo "Updating mirror_jobs table..." + bun dist/scripts/update-mirror-jobs-table.js + elif [ -f "scripts/update-mirror-jobs-table.ts" ]; then + echo "Updating mirror_jobs table using TypeScript script..." + bun scripts/update-mirror-jobs-table.ts + else + echo "Warning: Could not find mirror_jobs table update script." + fi fi # Start the application diff --git a/package.json b/package.json index a3fdaa1..8e8734a 100644 --- a/package.json +++ b/package.json @@ -6,20 +6,21 @@ "bun": ">=1.2.9" }, "scripts": { - "setup": "bun install && bun run manage-db init", + "setup": "bun install && bun run manage-db init && bun run update-db", "dev": "bunx --bun astro dev", - "dev:clean": "bun run cleanup-db && bun run manage-db init && bunx --bun astro dev", + "dev:clean": "bun run cleanup-db && bun run manage-db init && bun run update-db && bunx --bun astro dev", "build": "bunx --bun astro build", "cleanup-db": "rm -f gitea-mirror.db data/gitea-mirror.db", "manage-db": "bun scripts/manage-db.ts", "init-db": "bun scripts/manage-db.ts init", + "update-db": "bun scripts/update-mirror-jobs-table.ts", "check-db": "bun scripts/manage-db.ts check", "fix-db": "bun scripts/manage-db.ts fix", "reset-users": "bun scripts/manage-db.ts reset-users", "cleanup-events": "bun scripts/cleanup-events.ts", "preview": "bunx --bun astro preview", "start": "bun dist/server/entry.mjs", - "start:fresh": "bun run cleanup-db && bun run manage-db init && bun dist/server/entry.mjs", + "start:fresh": "bun run cleanup-db && bun run manage-db init && bun run update-db && bun dist/server/entry.mjs", "test": "bunx --bun vitest run", "test:watch": "bunx --bun vitest", "astro": "bunx --bun astro" diff --git a/scripts/manage-db.ts b/scripts/manage-db.ts index d57734e..eb2cff5 100644 --- a/scripts/manage-db.ts +++ b/scripts/manage-db.ts @@ -145,9 +145,31 @@ async function ensureTablesExist() { status TEXT NOT NULL DEFAULT 'imported', message TEXT NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + + -- New fields for job resilience + job_type TEXT NOT NULL DEFAULT 'mirror', + batch_id TEXT, + total_items INTEGER, + completed_items INTEGER DEFAULT 0, + item_ids TEXT, -- JSON array as text + completed_item_ids TEXT DEFAULT '[]', -- JSON array as text + in_progress INTEGER NOT NULL DEFAULT 0, -- Boolean as integer + started_at TIMESTAMP, + completed_at TIMESTAMP, + last_checkpoint TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users(id) ) `); + + // Create indexes for better performance + db.exec(` + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_user_id ON mirror_jobs(user_id); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_batch_id ON mirror_jobs(batch_id); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_in_progress ON mirror_jobs(in_progress); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_job_type ON mirror_jobs(job_type); + CREATE INDEX IF NOT EXISTS idx_mirror_jobs_timestamp ON mirror_jobs(timestamp); + `); break; case "events": db.exec(` diff --git a/scripts/update-mirror-jobs-table.ts b/scripts/update-mirror-jobs-table.ts new file mode 100644 index 0000000..6563382 --- /dev/null +++ b/scripts/update-mirror-jobs-table.ts @@ -0,0 +1,133 @@ +#!/usr/bin/env bun +/** + * Script to update the mirror_jobs table with new columns for resilience + */ + +import { Database } from "bun:sqlite"; +import fs from "fs"; +import path from "path"; + +// Define the database paths +const dataDir = path.join(process.cwd(), "data"); +const dbPath = path.join(dataDir, "gitea-mirror.db"); + +// Ensure data directory exists +if (!fs.existsSync(dataDir)) { + fs.mkdirSync(dataDir, { recursive: true }); + console.log(`Created data directory at ${dataDir}`); +} + +// Check if database exists +if (!fs.existsSync(dbPath)) { + console.error(`Database file not found at ${dbPath}`); + console.error("Please run 'bun run init-db' first to create the database."); + process.exit(1); +} + +// Connect to the database +const db = new Database(dbPath); + +// Enable foreign keys +db.exec("PRAGMA foreign_keys = ON;"); + +// Function to check if a column exists in a table +function columnExists(tableName: string, columnName: string): boolean { + const result = db.query( + `PRAGMA table_info(${tableName})` + ).all() as { name: string }[]; + + return result.some(column => column.name === columnName); +} + +// Main function to update the mirror_jobs table +async function updateMirrorJobsTable() { + console.log("Checking mirror_jobs table for missing columns..."); + + // Start a transaction + db.exec("BEGIN TRANSACTION;"); + + try { + // Check and add each new column if it doesn't exist + const columnsToAdd = [ + { name: "job_type", definition: "TEXT NOT NULL DEFAULT 'mirror'" }, + { name: "batch_id", definition: "TEXT" }, + { name: "total_items", definition: "INTEGER" }, + { name: "completed_items", definition: "INTEGER DEFAULT 0" }, + { name: "item_ids", definition: "TEXT" }, // JSON array as text + { name: "completed_item_ids", definition: "TEXT DEFAULT '[]'" }, // JSON array as text + { name: "in_progress", definition: "INTEGER NOT NULL DEFAULT 0" }, // Boolean as integer + { name: "started_at", definition: "TIMESTAMP" }, + { name: "completed_at", definition: "TIMESTAMP" }, + { name: "last_checkpoint", definition: "TIMESTAMP" } + ]; + + let columnsAdded = 0; + + for (const column of columnsToAdd) { + if (!columnExists("mirror_jobs", column.name)) { + console.log(`Adding column '${column.name}' to mirror_jobs table...`); + db.exec(`ALTER TABLE mirror_jobs ADD COLUMN ${column.name} ${column.definition};`); + columnsAdded++; + } + } + + // Commit the transaction + db.exec("COMMIT;"); + + if (columnsAdded > 0) { + console.log(`✅ Added ${columnsAdded} new columns to mirror_jobs table.`); + } else { + console.log("✅ All required columns already exist in mirror_jobs table."); + } + + // Create indexes for better performance + console.log("Creating indexes for mirror_jobs table..."); + + // Only create indexes if they don't exist + const indexesResult = db.query( + `SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='mirror_jobs'` + ).all() as { name: string }[]; + + const existingIndexes = indexesResult.map(idx => idx.name); + + const indexesToCreate = [ + { name: "idx_mirror_jobs_user_id", columns: "user_id" }, + { name: "idx_mirror_jobs_batch_id", columns: "batch_id" }, + { name: "idx_mirror_jobs_in_progress", columns: "in_progress" }, + { name: "idx_mirror_jobs_job_type", columns: "job_type" }, + { name: "idx_mirror_jobs_timestamp", columns: "timestamp" } + ]; + + let indexesCreated = 0; + + for (const index of indexesToCreate) { + if (!existingIndexes.includes(index.name)) { + console.log(`Creating index '${index.name}'...`); + db.exec(`CREATE INDEX ${index.name} ON mirror_jobs(${index.columns});`); + indexesCreated++; + } + } + + if (indexesCreated > 0) { + console.log(`✅ Created ${indexesCreated} new indexes for mirror_jobs table.`); + } else { + console.log("✅ All required indexes already exist for mirror_jobs table."); + } + + console.log("Mirror jobs table update completed successfully."); + } catch (error) { + // Rollback the transaction in case of error + db.exec("ROLLBACK;"); + console.error("❌ Error updating mirror_jobs table:", error); + process.exit(1); + } finally { + // Close the database connection + db.close(); + } +} + +// Run the update function +updateMirrorJobsTable().catch(error => { + console.error("Unhandled error:", error); + process.exit(1); +}); diff --git a/src/lib/db/index.ts b/src/lib/db/index.ts index 041ab2e..d5929dc 100644 --- a/src/lib/db/index.ts +++ b/src/lib/db/index.ts @@ -189,6 +189,18 @@ export const mirrorJobs = sqliteTable("mirror_jobs", { timestamp: integer("timestamp", { mode: "timestamp" }) .notNull() .default(new Date()), + + // New fields for job resilience + jobType: text("job_type").notNull().default("mirror"), + batchId: text("batch_id"), + totalItems: integer("total_items"), + completedItems: integer("completed_items").default(0), + itemIds: text("item_ids", { mode: "json" }).$type(), + completedItemIds: text("completed_item_ids", { mode: "json" }).$type().default([]), + inProgress: integer("in_progress", { mode: "boolean" }).notNull().default(false), + startedAt: integer("started_at", { mode: "timestamp" }), + completedAt: integer("completed_at", { mode: "timestamp" }), + lastCheckpoint: integer("last_checkpoint", { mode: "timestamp" }), }); export const organizations = sqliteTable("organizations", { diff --git a/src/lib/db/schema.ts b/src/lib/db/schema.ts index db4c40d..56b0fe7 100644 --- a/src/lib/db/schema.ts +++ b/src/lib/db/schema.ts @@ -111,6 +111,18 @@ export const mirrorJobSchema = z.object({ status: repoStatusEnum.default("imported"), message: z.string(), timestamp: z.date().default(() => new Date()), + + // New fields for job resilience + jobType: z.enum(["mirror", "sync", "retry"]).default("mirror"), + batchId: z.string().uuid().optional(), // Group related jobs together + totalItems: z.number().optional(), // Total number of items to process + completedItems: z.number().optional(), // Number of items completed + itemIds: z.array(z.string()).optional(), // IDs of items to process + completedItemIds: z.array(z.string()).optional(), // IDs of completed items + inProgress: z.boolean().default(false), // Whether the job is currently running + startedAt: z.date().optional(), // When the job started + completedAt: z.date().optional(), // When the job completed + lastCheckpoint: z.date().optional(), // Last time progress was saved }); export type MirrorJob = z.infer; diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index 779e6f8..6c008b1 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -12,6 +12,11 @@ export async function createMirrorJob({ message, status, details, + jobType, + batchId, + totalItems, + itemIds, + inProgress, }: { userId: string; organizationId?: string; @@ -21,6 +26,11 @@ export async function createMirrorJob({ details?: string; message: string; status: RepoStatus; + jobType?: "mirror" | "sync" | "retry"; + batchId?: string; + totalItems?: number; + itemIds?: string[]; + inProgress?: boolean; }) { const jobId = uuidv4(); const currentTimestamp = new Date(); @@ -32,11 +42,22 @@ export async function createMirrorJob({ repositoryName, organizationId, organizationName, - configId: uuidv4(), details, message: message, status: status, timestamp: currentTimestamp, + + // New resilience fields + jobType: jobType || "mirror", + batchId: batchId || undefined, + totalItems: totalItems || undefined, + completedItems: 0, + itemIds: itemIds || undefined, + completedItemIds: [], + inProgress: inProgress !== undefined ? inProgress : false, + startedAt: inProgress ? currentTimestamp : undefined, + completedAt: undefined, + lastCheckpoint: undefined, }; try { @@ -57,3 +78,186 @@ export async function createMirrorJob({ throw new Error("Error creating mirror job"); } } + +/** + * Updates the progress of a mirror job + */ +export async function updateMirrorJobProgress({ + jobId, + completedItemId, + status, + message, + details, + inProgress, + isCompleted, +}: { + jobId: string; + completedItemId?: string; + status?: RepoStatus; + message?: string; + details?: string; + inProgress?: boolean; + isCompleted?: boolean; +}) { + try { + // Get the current job + const [job] = await db + .select() + .from(mirrorJobs) + .where(mirrorJobs.id === jobId); + + if (!job) { + throw new Error(`Mirror job with ID ${jobId} not found`); + } + + // Update the job with new progress + const updates: Record = { + lastCheckpoint: new Date(), + }; + + // Add completed item if provided + if (completedItemId) { + const completedItemIds = job.completedItemIds || []; + if (!completedItemIds.includes(completedItemId)) { + updates.completedItemIds = [...completedItemIds, completedItemId]; + updates.completedItems = (job.completedItems || 0) + 1; + } + } + + // Update status if provided + if (status) { + updates.status = status; + } + + // Update message if provided + if (message) { + updates.message = message; + } + + // Update details if provided + if (details) { + updates.details = details; + } + + // Update in-progress status if provided + if (inProgress !== undefined) { + updates.inProgress = inProgress; + } + + // Mark as completed if specified + if (isCompleted) { + updates.inProgress = false; + updates.completedAt = new Date(); + } + + // Update the job in the database + await db + .update(mirrorJobs) + .set(updates) + .where(mirrorJobs.id === jobId); + + // Publish the event + const updatedJob = { + ...job, + ...updates, + }; + + await publishEvent({ + userId: job.userId, + channel: `mirror-status:${job.userId}`, + payload: updatedJob, + }); + + return updatedJob; + } catch (error) { + console.error("Error updating mirror job progress:", error); + throw new Error("Error updating mirror job progress"); + } +} + +/** + * Finds interrupted jobs that need to be resumed + */ +export async function findInterruptedJobs() { + try { + // Find jobs that are marked as in-progress but haven't been updated recently + const cutoffTime = new Date(); + cutoffTime.setMinutes(cutoffTime.getMinutes() - 10); // Consider jobs inactive after 10 minutes without updates + + const interruptedJobs = await db + .select() + .from(mirrorJobs) + .where( + mirrorJobs.inProgress === true && + (mirrorJobs.lastCheckpoint === null || + mirrorJobs.lastCheckpoint < cutoffTime) + ); + + return interruptedJobs; + } catch (error) { + console.error("Error finding interrupted jobs:", error); + return []; + } +} + +/** + * Resumes an interrupted job + */ +export async function resumeInterruptedJob(job: any) { + try { + console.log(`Resuming interrupted job: ${job.id}`); + + // Skip if job doesn't have the necessary data to resume + if (!job.itemIds || !job.completedItemIds) { + console.log(`Cannot resume job ${job.id}: missing item data`); + + // Mark the job as failed + await updateMirrorJobProgress({ + jobId: job.id, + status: "failed", + message: "Job interrupted and could not be resumed", + details: "The job was interrupted and did not have enough information to resume", + inProgress: false, + isCompleted: true, + }); + + return null; + } + + // Calculate remaining items + const remainingItemIds = job.itemIds.filter( + (id: string) => !job.completedItemIds.includes(id) + ); + + if (remainingItemIds.length === 0) { + console.log(`Job ${job.id} has no remaining items, marking as completed`); + + // Mark the job as completed + await updateMirrorJobProgress({ + jobId: job.id, + status: "mirrored", + message: "Job completed after resuming", + inProgress: false, + isCompleted: true, + }); + + return null; + } + + // Update the job to show it's being resumed + await updateMirrorJobProgress({ + jobId: job.id, + message: `Resuming job with ${remainingItemIds.length} remaining items`, + details: `Job was interrupted and is being resumed. ${job.completedItemIds.length} of ${job.itemIds.length} items were already processed.`, + inProgress: true, + }); + + return { + job, + remainingItemIds, + }; + } catch (error) { + console.error(`Error resuming job ${job.id}:`, error); + return null; + } +} diff --git a/src/lib/recovery.ts b/src/lib/recovery.ts new file mode 100644 index 0000000..511cf78 --- /dev/null +++ b/src/lib/recovery.ts @@ -0,0 +1,224 @@ +/** + * Recovery mechanism for interrupted jobs + * This module handles detecting and resuming jobs that were interrupted by container restarts + */ + +import { findInterruptedJobs, resumeInterruptedJob } from './helpers'; +import { db, repositories, organizations } from './db'; +import { eq } from 'drizzle-orm'; +import { mirrorGithubRepoToGitea, mirrorGitHubOrgRepoToGiteaOrg, syncGiteaRepo } from './gitea'; +import { createGitHubClient } from './github'; +import { processWithResilience } from './utils/concurrency'; +import { repositoryVisibilityEnum, repoStatusEnum } from '@/types/Repository'; +import type { Repository } from './db/schema'; + +/** + * Initialize the recovery system + * This should be called when the application starts + */ +export async function initializeRecovery() { + console.log('Initializing recovery system...'); + + try { + // Find interrupted jobs + const interruptedJobs = await findInterruptedJobs(); + + if (interruptedJobs.length === 0) { + console.log('No interrupted jobs found.'); + return; + } + + console.log(`Found ${interruptedJobs.length} interrupted jobs. Starting recovery...`); + + // Process each interrupted job + for (const job of interruptedJobs) { + const resumeData = await resumeInterruptedJob(job); + + if (!resumeData) { + console.log(`Job ${job.id} could not be resumed.`); + continue; + } + + const { job: updatedJob, remainingItemIds } = resumeData; + + // Handle different job types + switch (updatedJob.jobType) { + case 'mirror': + await recoverMirrorJob(updatedJob, remainingItemIds); + break; + case 'sync': + await recoverSyncJob(updatedJob, remainingItemIds); + break; + case 'retry': + await recoverRetryJob(updatedJob, remainingItemIds); + break; + default: + console.log(`Unknown job type: ${updatedJob.jobType}`); + } + } + + console.log('Recovery process completed.'); + } catch (error) { + console.error('Error during recovery process:', error); + } +} + +/** + * Recover a mirror job + */ +async function recoverMirrorJob(job: any, remainingItemIds: string[]) { + console.log(`Recovering mirror job ${job.id} with ${remainingItemIds.length} remaining items`); + + try { + // Get the config for this user + const [config] = await db + .select() + .from(repositories) + .where(eq(repositories.userId, job.userId)) + .limit(1); + + if (!config || !config.configId) { + throw new Error('Config not found for user'); + } + + // Get repositories to process + const repos = await db + .select() + .from(repositories) + .where(eq(repositories.id, remainingItemIds)); + + if (repos.length === 0) { + throw new Error('No repositories found for the remaining item IDs'); + } + + // Create GitHub client + const octokit = createGitHubClient(config.githubConfig.token); + + // Process repositories with resilience + await processWithResilience( + repos, + async (repo) => { + // Prepare repository data + const repoData = { + ...repo, + status: repoStatusEnum.parse("imported"), + organization: repo.organization ?? undefined, + lastMirrored: repo.lastMirrored ?? undefined, + errorMessage: repo.errorMessage ?? undefined, + forkedFrom: repo.forkedFrom ?? undefined, + visibility: repositoryVisibilityEnum.parse(repo.visibility), + mirroredLocation: repo.mirroredLocation || "", + }; + + // Mirror the repository based on whether it's in an organization + if (repo.organization && config.githubConfig.preserveOrgStructure) { + await mirrorGitHubOrgRepoToGiteaOrg({ + config, + octokit, + orgName: repo.organization, + repository: repoData, + }); + } else { + await mirrorGithubRepoToGitea({ + octokit, + repository: repoData, + config, + }); + } + + return repo; + }, + { + userId: job.userId, + jobType: 'mirror', + getItemId: (repo) => repo.id, + getItemName: (repo) => repo.name, + resumeFromJobId: job.id, + concurrencyLimit: 3, + maxRetries: 2, + retryDelay: 2000, + } + ); + } catch (error) { + console.error(`Error recovering mirror job ${job.id}:`, error); + } +} + +/** + * Recover a sync job + */ +async function recoverSyncJob(job: any, remainingItemIds: string[]) { + // Implementation similar to recoverMirrorJob but for sync operations + console.log(`Recovering sync job ${job.id} with ${remainingItemIds.length} remaining items`); + + try { + // Get the config for this user + const [config] = await db + .select() + .from(repositories) + .where(eq(repositories.userId, job.userId)) + .limit(1); + + if (!config || !config.configId) { + throw new Error('Config not found for user'); + } + + // Get repositories to process + const repos = await db + .select() + .from(repositories) + .where(eq(repositories.id, remainingItemIds)); + + if (repos.length === 0) { + throw new Error('No repositories found for the remaining item IDs'); + } + + // Process repositories with resilience + await processWithResilience( + repos, + async (repo) => { + // Prepare repository data + const repoData = { + ...repo, + status: repoStatusEnum.parse(repo.status), + organization: repo.organization ?? undefined, + lastMirrored: repo.lastMirrored ?? undefined, + errorMessage: repo.errorMessage ?? undefined, + forkedFrom: repo.forkedFrom ?? undefined, + visibility: repositoryVisibilityEnum.parse(repo.visibility), + }; + + // Sync the repository + await syncGiteaRepo({ + config, + repository: repoData, + }); + + return repo; + }, + { + userId: job.userId, + jobType: 'sync', + getItemId: (repo) => repo.id, + getItemName: (repo) => repo.name, + resumeFromJobId: job.id, + concurrencyLimit: 5, + maxRetries: 2, + retryDelay: 2000, + } + ); + } catch (error) { + console.error(`Error recovering sync job ${job.id}:`, error); + } +} + +/** + * Recover a retry job + */ +async function recoverRetryJob(job: any, remainingItemIds: string[]) { + // Implementation similar to recoverMirrorJob but for retry operations + console.log(`Recovering retry job ${job.id} with ${remainingItemIds.length} remaining items`); + + // This would be similar to recoverMirrorJob but with retry-specific logic + console.log('Retry job recovery not yet implemented'); +} diff --git a/src/lib/utils/concurrency.ts b/src/lib/utils/concurrency.ts index 7091c2a..fb962d9 100644 --- a/src/lib/utils/concurrency.ts +++ b/src/lib/utils/concurrency.ts @@ -1,6 +1,6 @@ /** * Utility for processing items in parallel with concurrency control - * + * * @param items Array of items to process * @param processItem Function to process each item * @param concurrencyLimit Maximum number of concurrent operations @@ -20,31 +20,31 @@ export async function processInParallel( // Process items in batches to control concurrency for (let i = 0; i < total; i += concurrencyLimit) { const batch = items.slice(i, i + concurrencyLimit); - + const batchPromises = batch.map(async (item) => { try { const result = await processItem(item); completed++; - + if (onProgress) { onProgress(completed, total, result); } - + return result; } catch (error) { completed++; - + if (onProgress) { onProgress(completed, total); } - + throw error; } }); // Wait for the current batch to complete before starting the next batch const batchResults = await Promise.allSettled(batchPromises); - + // Process results and handle errors for (const result of batchResults) { if (result.status === 'fulfilled') { @@ -60,7 +60,7 @@ export async function processInParallel( /** * Utility for processing items in parallel with automatic retry for failed operations - * + * * @param items Array of items to process * @param processItem Function to process each item * @param options Configuration options @@ -75,6 +75,10 @@ export async function processWithRetry( retryDelay?: number; onProgress?: (completed: number, total: number, result?: R) => void; onRetry?: (item: T, error: Error, attempt: number) => void; + jobId?: string; // Optional job ID for checkpointing + getItemId?: (item: T) => string; // Function to get a unique ID for each item + onCheckpoint?: (jobId: string, completedItemId: string) => Promise; // Callback for checkpointing + checkpointInterval?: number; // How many items to process before checkpointing } = {} ): Promise { const { @@ -82,24 +86,45 @@ export async function processWithRetry( maxRetries = 3, retryDelay = 1000, onProgress, - onRetry + onRetry, + jobId, + getItemId, + onCheckpoint, + checkpointInterval = 1 // Default to checkpointing after each item } = options; + // Track checkpoint counter + let itemsProcessedSinceLastCheckpoint = 0; + // Wrap the process function with retry logic const processWithRetryLogic = async (item: T): Promise => { let lastError: Error | null = null; - + for (let attempt = 1; attempt <= maxRetries + 1; attempt++) { try { - return await processItem(item); + const result = await processItem(item); + + // Handle checkpointing if enabled + if (jobId && getItemId && onCheckpoint) { + const itemId = getItemId(item); + itemsProcessedSinceLastCheckpoint++; + + // Checkpoint based on the interval + if (itemsProcessedSinceLastCheckpoint >= checkpointInterval) { + await onCheckpoint(jobId, itemId); + itemsProcessedSinceLastCheckpoint = 0; + } + } + + return result; } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); - + if (attempt <= maxRetries) { if (onRetry) { onRetry(item, lastError, attempt); } - + // Exponential backoff const delay = retryDelay * Math.pow(2, attempt - 1); await new Promise(resolve => setTimeout(resolve, delay)); @@ -108,15 +133,160 @@ export async function processWithRetry( } } } - + // This should never be reached due to the throw in the catch block throw lastError || new Error('Unknown error occurred'); }; - return processInParallel( + const results = await processInParallel( items, processWithRetryLogic, concurrencyLimit, onProgress ); + + // Final checkpoint if there are remaining items since the last checkpoint + if (jobId && getItemId && onCheckpoint && itemsProcessedSinceLastCheckpoint > 0) { + // We don't have a specific item ID for the final checkpoint, so we'll use a placeholder + await onCheckpoint(jobId, 'final'); + } + + return results; +} + +/** + * Process items in parallel with resilience to container restarts + * This version supports resuming from a previous checkpoint + */ +export async function processWithResilience( + items: T[], + processItem: (item: T) => Promise, + options: { + concurrencyLimit?: number; + maxRetries?: number; + retryDelay?: number; + onProgress?: (completed: number, total: number, result?: R) => void; + onRetry?: (item: T, error: Error, attempt: number) => void; + userId: string; // Required for creating mirror jobs + jobType: "mirror" | "sync" | "retry"; + getItemId: (item: T) => string; // Required function to get a unique ID for each item + getItemName: (item: T) => string; // Required function to get a display name for each item + checkpointInterval?: number; + resumeFromJobId?: string; // Optional job ID to resume from + } +): Promise { + const { + userId, + jobType, + getItemId, + getItemName, + resumeFromJobId, + checkpointInterval = 5, + ...otherOptions + } = options; + + // Import helpers for job management + const { createMirrorJob, updateMirrorJobProgress } = await import('@/lib/helpers'); + + // Get item IDs for all items + const allItemIds = items.map(getItemId); + + // Create or resume a job + let jobId: string; + let completedItemIds: string[] = []; + let itemsToProcess = [...items]; + + if (resumeFromJobId) { + // We're resuming an existing job + jobId = resumeFromJobId; + + // Get the job from the database to find completed items + const { db, mirrorJobs } = await import('@/lib/db'); + const { eq } = await import('drizzle-orm'); + const [job] = await db + .select() + .from(mirrorJobs) + .where(eq(mirrorJobs.id, resumeFromJobId)); + + if (job && job.completedItemIds) { + completedItemIds = job.completedItemIds; + + // Filter out already completed items + itemsToProcess = items.filter(item => !completedItemIds.includes(getItemId(item))); + + console.log(`Resuming job ${jobId} with ${itemsToProcess.length} remaining items`); + + // Update the job to show it's being resumed + await updateMirrorJobProgress({ + jobId, + message: `Resuming job with ${itemsToProcess.length} remaining items`, + details: `Job is being resumed. ${completedItemIds.length} of ${items.length} items were already processed.`, + inProgress: true, + }); + } + } else { + // Create a new job + jobId = await createMirrorJob({ + userId, + message: `Started ${jobType} job with ${items.length} items`, + details: `Processing ${items.length} items in parallel with checkpointing`, + status: "mirroring", + jobType, + totalItems: items.length, + itemIds: allItemIds, + inProgress: true, + }); + + console.log(`Created new job ${jobId} with ${items.length} items`); + } + + // Define the checkpoint function + const onCheckpoint = async (jobId: string, completedItemId: string) => { + const itemName = items.find(item => getItemId(item) === completedItemId) + ? getItemName(items.find(item => getItemId(item) === completedItemId)!) + : 'unknown'; + + await updateMirrorJobProgress({ + jobId, + completedItemId, + message: `Processed item: ${itemName}`, + }); + }; + + try { + // Process the items with checkpointing + const results = await processWithRetry( + itemsToProcess, + processItem, + { + ...otherOptions, + jobId, + getItemId, + onCheckpoint, + checkpointInterval, + } + ); + + // Mark the job as completed + await updateMirrorJobProgress({ + jobId, + status: "mirrored", + message: `Completed ${jobType} job with ${items.length} items`, + inProgress: false, + isCompleted: true, + }); + + return results; + } catch (error) { + // Mark the job as failed + await updateMirrorJobProgress({ + jobId, + status: "failed", + message: `Failed ${jobType} job: ${error instanceof Error ? error.message : String(error)}`, + inProgress: false, + isCompleted: true, + }); + + throw error; + } } diff --git a/src/middleware.ts b/src/middleware.ts new file mode 100644 index 0000000..cd8ebe2 --- /dev/null +++ b/src/middleware.ts @@ -0,0 +1,22 @@ +import { defineMiddleware } from 'astro:middleware'; +import { initializeRecovery } from './lib/recovery'; + +// Flag to track if recovery has been initialized +let recoveryInitialized = false; + +export const onRequest = defineMiddleware(async (context, next) => { + // Initialize recovery system only once when the server starts + if (!recoveryInitialized) { + console.log('Initializing recovery system from middleware...'); + try { + await initializeRecovery(); + console.log('Recovery system initialized successfully'); + } catch (error) { + console.error('Error initializing recovery system:', error); + } + recoveryInitialized = true; + } + + // Continue with the request + return next(); +}); diff --git a/src/pages/api/job/mirror-org.ts b/src/pages/api/job/mirror-org.ts index f855c5c..10a4166 100644 --- a/src/pages/api/job/mirror-org.ts +++ b/src/pages/api/job/mirror-org.ts @@ -6,8 +6,8 @@ import { createGitHubClient } from "@/lib/github"; import { mirrorGitHubOrgToGitea } from "@/lib/gitea"; import { repoStatusEnum } from "@/types/Repository"; import { type MembershipRole } from "@/types/organizations"; -import { processWithRetry } from "@/lib/utils/concurrency"; -import { createMirrorJob } from "@/lib/helpers"; +import { processWithResilience } from "@/lib/utils/concurrency"; +import { v4 as uuidv4 } from "uuid"; export const POST: APIRoute = async ({ request }) => { try { @@ -63,7 +63,7 @@ export const POST: APIRoute = async ({ request }) => { ); } - // Fire async mirroring without blocking response, using parallel processing + // Fire async mirroring without blocking response, using parallel processing with resilience setTimeout(async () => { if (!config.githubConfig.token) { throw new Error("GitHub token is missing in config."); @@ -76,8 +76,11 @@ export const POST: APIRoute = async ({ request }) => { // Using a lower concurrency for organizations since each org might contain many repos const CONCURRENCY_LIMIT = 2; - // Process organizations in parallel with retry capability - await processWithRetry( + // Generate a batch ID to group related organizations + const batchId = uuidv4(); + + // Process organizations in parallel with resilience to container restarts + await processWithResilience( orgs, async (org) => { // Prepare organization data @@ -92,16 +95,6 @@ export const POST: APIRoute = async ({ request }) => { // Log the start of mirroring console.log(`Starting mirror for organization: ${org.name}`); - // Create a mirror job entry to track progress - await createMirrorJob({ - userId: config.userId || "", - organizationId: org.id, - organizationName: org.name, - message: `Started mirroring organization: ${org.name}`, - details: `Organization ${org.name} is now in the mirroring queue.`, - status: "mirroring", - }); - // Mirror the organization await mirrorGitHubOrgToGitea({ config, @@ -112,9 +105,15 @@ export const POST: APIRoute = async ({ request }) => { return org; }, { + userId: config.userId || "", + jobType: "mirror", + batchId, + getItemId: (org) => org.id, + getItemName: (org) => org.name, concurrencyLimit: CONCURRENCY_LIMIT, maxRetries: 2, retryDelay: 3000, + checkpointInterval: 1, // Checkpoint after each organization onProgress: (completed, total, result) => { const percentComplete = Math.round((completed / total) * 100); console.log(`Organization mirroring progress: ${percentComplete}% (${completed}/${total})`); diff --git a/src/pages/api/job/mirror-repo.ts b/src/pages/api/job/mirror-repo.ts index 3cf5905..91cc9c7 100644 --- a/src/pages/api/job/mirror-repo.ts +++ b/src/pages/api/job/mirror-repo.ts @@ -8,8 +8,8 @@ import { mirrorGitHubOrgRepoToGiteaOrg, } from "@/lib/gitea"; import { createGitHubClient } from "@/lib/github"; -import { processWithRetry } from "@/lib/utils/concurrency"; -import { createMirrorJob } from "@/lib/helpers"; +import { processWithResilience } from "@/lib/utils/concurrency"; +import { v4 as uuidv4 } from "uuid"; export const POST: APIRoute = async ({ request }) => { try { @@ -65,7 +65,7 @@ export const POST: APIRoute = async ({ request }) => { ); } - // Start async mirroring in background with parallel processing + // Start async mirroring in background with parallel processing and resilience setTimeout(async () => { if (!config.githubConfig.token) { throw new Error("GitHub token is missing."); @@ -77,8 +77,11 @@ export const POST: APIRoute = async ({ request }) => { // Define the concurrency limit - adjust based on API rate limits const CONCURRENCY_LIMIT = 3; - // Process repositories in parallel with retry capability - await processWithRetry( + // Generate a batch ID to group related repositories + const batchId = uuidv4(); + + // Process repositories in parallel with resilience to container restarts + await processWithResilience( repos, async (repo) => { // Prepare repository data @@ -96,16 +99,6 @@ export const POST: APIRoute = async ({ request }) => { // Log the start of mirroring console.log(`Starting mirror for repository: ${repo.name}`); - // Create a mirror job entry to track progress - await createMirrorJob({ - userId: config.userId || "", - repositoryId: repo.id, - repositoryName: repo.name, - message: `Started mirroring repository: ${repo.name}`, - details: `Repository ${repo.name} is now in the mirroring queue.`, - status: "mirroring", - }); - // Mirror the repository based on whether it's in an organization if (repo.organization && config.githubConfig.preserveOrgStructure) { await mirrorGitHubOrgRepoToGiteaOrg({ @@ -125,9 +118,15 @@ export const POST: APIRoute = async ({ request }) => { return repo; }, { + userId: config.userId || "", + jobType: "mirror", + batchId, + getItemId: (repo) => repo.id, + getItemName: (repo) => repo.name, concurrencyLimit: CONCURRENCY_LIMIT, maxRetries: 2, retryDelay: 2000, + checkpointInterval: 1, // Checkpoint after each repository onProgress: (completed, total, result) => { const percentComplete = Math.round((completed / total) * 100); console.log(`Mirroring progress: ${percentComplete}% (${completed}/${total})`); diff --git a/src/pages/api/job/sync-repo.ts b/src/pages/api/job/sync-repo.ts index fb83047..3ebb4c0 100644 --- a/src/pages/api/job/sync-repo.ts +++ b/src/pages/api/job/sync-repo.ts @@ -5,8 +5,8 @@ import { eq, inArray } from "drizzle-orm"; import { repositoryVisibilityEnum, repoStatusEnum } from "@/types/Repository"; import { syncGiteaRepo } from "@/lib/gitea"; import type { SyncRepoResponse } from "@/types/sync"; -import { processWithRetry } from "@/lib/utils/concurrency"; -import { createMirrorJob } from "@/lib/helpers"; +import { processWithResilience } from "@/lib/utils/concurrency"; +import { v4 as uuidv4 } from "uuid"; export const POST: APIRoute = async ({ request }) => { try { @@ -62,13 +62,16 @@ export const POST: APIRoute = async ({ request }) => { ); } - // Start async mirroring in background with parallel processing + // Start async mirroring in background with parallel processing and resilience setTimeout(async () => { // Define the concurrency limit - adjust based on API rate limits const CONCURRENCY_LIMIT = 5; - // Process repositories in parallel with retry capability - await processWithRetry( + // Generate a batch ID to group related repositories + const batchId = uuidv4(); + + // Process repositories in parallel with resilience to container restarts + await processWithResilience( repos, async (repo) => { // Prepare repository data @@ -85,16 +88,6 @@ export const POST: APIRoute = async ({ request }) => { // Log the start of syncing console.log(`Starting sync for repository: ${repo.name}`); - // Create a mirror job entry to track progress - await createMirrorJob({ - userId: config.userId || "", - repositoryId: repo.id, - repositoryName: repo.name, - message: `Started syncing repository: ${repo.name}`, - details: `Repository ${repo.name} is now in the syncing queue.`, - status: "syncing", - }); - // Sync the repository await syncGiteaRepo({ config, @@ -104,9 +97,15 @@ export const POST: APIRoute = async ({ request }) => { return repo; }, { + userId: config.userId || "", + jobType: "sync", + batchId, + getItemId: (repo) => repo.id, + getItemName: (repo) => repo.name, concurrencyLimit: CONCURRENCY_LIMIT, maxRetries: 2, retryDelay: 2000, + checkpointInterval: 1, // Checkpoint after each repository onProgress: (completed, total, result) => { const percentComplete = Math.round((completed / total) * 100); console.log(`Syncing progress: ${percentComplete}% (${completed}/${total})`);