feat: enhance job resilience with new database schema and recovery mechanisms

- Added new fields to the mirror_jobs table for job resilience, including job_type, batch_id, total_items, completed_items, item_ids, completed_item_ids, in_progress, started_at, completed_at, and last_checkpoint.
- Implemented database migration scripts to update the mirror_jobs table schema.
- Introduced processWithResilience utility for handling item processing with checkpointing and recovery capabilities.
- Updated API routes for mirroring organizations and repositories to utilize the new resilience features.
- Created recovery system to detect and resume interrupted jobs on application startup.
- Added middleware to initialize the recovery system when the server starts.
This commit is contained in:
Arunavo Ray
2025-05-22 14:33:03 +05:30
parent f4bc28e6c2
commit abe3113755
13 changed files with 893 additions and 66 deletions

View File

@@ -111,9 +111,28 @@ if [ ! -f "/app/data/gitea-mirror.db" ]; then
status TEXT NOT NULL DEFAULT 'imported', status TEXT NOT NULL DEFAULT 'imported',
message TEXT NOT NULL, message TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- New fields for job resilience
job_type TEXT NOT NULL DEFAULT 'mirror',
batch_id TEXT,
total_items INTEGER,
completed_items INTEGER DEFAULT 0,
item_ids TEXT, -- JSON array as text
completed_item_ids TEXT DEFAULT '[]', -- JSON array as text
in_progress INTEGER NOT NULL DEFAULT 0, -- Boolean as integer
started_at TIMESTAMP,
completed_at TIMESTAMP,
last_checkpoint TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) FOREIGN KEY (user_id) REFERENCES users(id)
); );
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_user_id ON mirror_jobs(user_id);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_batch_id ON mirror_jobs(batch_id);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_in_progress ON mirror_jobs(in_progress);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_job_type ON mirror_jobs(job_type);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_timestamp ON mirror_jobs(timestamp);
CREATE TABLE IF NOT EXISTS events ( CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
user_id TEXT NOT NULL, user_id TEXT NOT NULL,
@@ -138,8 +157,19 @@ else
bun dist/scripts/manage-db.js fix bun dist/scripts/manage-db.js fix
fi fi
# Since the application is not used by anyone yet, we've removed the schema updates and migrations # Run database migrations
echo "Database already exists, no migrations needed." echo "Running database migrations..."
# Update mirror_jobs table with new columns for resilience
if [ -f "dist/scripts/update-mirror-jobs-table.js" ]; then
echo "Updating mirror_jobs table..."
bun dist/scripts/update-mirror-jobs-table.js
elif [ -f "scripts/update-mirror-jobs-table.ts" ]; then
echo "Updating mirror_jobs table using TypeScript script..."
bun scripts/update-mirror-jobs-table.ts
else
echo "Warning: Could not find mirror_jobs table update script."
fi
fi fi
# Start the application # Start the application

View File

@@ -6,20 +6,21 @@
"bun": ">=1.2.9" "bun": ">=1.2.9"
}, },
"scripts": { "scripts": {
"setup": "bun install && bun run manage-db init", "setup": "bun install && bun run manage-db init && bun run update-db",
"dev": "bunx --bun astro dev", "dev": "bunx --bun astro dev",
"dev:clean": "bun run cleanup-db && bun run manage-db init && bunx --bun astro dev", "dev:clean": "bun run cleanup-db && bun run manage-db init && bun run update-db && bunx --bun astro dev",
"build": "bunx --bun astro build", "build": "bunx --bun astro build",
"cleanup-db": "rm -f gitea-mirror.db data/gitea-mirror.db", "cleanup-db": "rm -f gitea-mirror.db data/gitea-mirror.db",
"manage-db": "bun scripts/manage-db.ts", "manage-db": "bun scripts/manage-db.ts",
"init-db": "bun scripts/manage-db.ts init", "init-db": "bun scripts/manage-db.ts init",
"update-db": "bun scripts/update-mirror-jobs-table.ts",
"check-db": "bun scripts/manage-db.ts check", "check-db": "bun scripts/manage-db.ts check",
"fix-db": "bun scripts/manage-db.ts fix", "fix-db": "bun scripts/manage-db.ts fix",
"reset-users": "bun scripts/manage-db.ts reset-users", "reset-users": "bun scripts/manage-db.ts reset-users",
"cleanup-events": "bun scripts/cleanup-events.ts", "cleanup-events": "bun scripts/cleanup-events.ts",
"preview": "bunx --bun astro preview", "preview": "bunx --bun astro preview",
"start": "bun dist/server/entry.mjs", "start": "bun dist/server/entry.mjs",
"start:fresh": "bun run cleanup-db && bun run manage-db init && bun dist/server/entry.mjs", "start:fresh": "bun run cleanup-db && bun run manage-db init && bun run update-db && bun dist/server/entry.mjs",
"test": "bunx --bun vitest run", "test": "bunx --bun vitest run",
"test:watch": "bunx --bun vitest", "test:watch": "bunx --bun vitest",
"astro": "bunx --bun astro" "astro": "bunx --bun astro"

View File

@@ -145,9 +145,31 @@ async function ensureTablesExist() {
status TEXT NOT NULL DEFAULT 'imported', status TEXT NOT NULL DEFAULT 'imported',
message TEXT NOT NULL, message TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- New fields for job resilience
job_type TEXT NOT NULL DEFAULT 'mirror',
batch_id TEXT,
total_items INTEGER,
completed_items INTEGER DEFAULT 0,
item_ids TEXT, -- JSON array as text
completed_item_ids TEXT DEFAULT '[]', -- JSON array as text
in_progress INTEGER NOT NULL DEFAULT 0, -- Boolean as integer
started_at TIMESTAMP,
completed_at TIMESTAMP,
last_checkpoint TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) FOREIGN KEY (user_id) REFERENCES users(id)
) )
`); `);
// Create indexes for better performance
db.exec(`
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_user_id ON mirror_jobs(user_id);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_batch_id ON mirror_jobs(batch_id);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_in_progress ON mirror_jobs(in_progress);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_job_type ON mirror_jobs(job_type);
CREATE INDEX IF NOT EXISTS idx_mirror_jobs_timestamp ON mirror_jobs(timestamp);
`);
break; break;
case "events": case "events":
db.exec(` db.exec(`

View File

@@ -0,0 +1,133 @@
#!/usr/bin/env bun
/**
* Script to update the mirror_jobs table with new columns for resilience
*/
import { Database } from "bun:sqlite";
import fs from "fs";
import path from "path";
// Define the database paths
const dataDir = path.join(process.cwd(), "data");
const dbPath = path.join(dataDir, "gitea-mirror.db");
// Ensure data directory exists
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
console.log(`Created data directory at ${dataDir}`);
}
// Check if database exists
if (!fs.existsSync(dbPath)) {
console.error(`Database file not found at ${dbPath}`);
console.error("Please run 'bun run init-db' first to create the database.");
process.exit(1);
}
// Connect to the database
const db = new Database(dbPath);
// Enable foreign keys
db.exec("PRAGMA foreign_keys = ON;");
// Function to check if a column exists in a table
function columnExists(tableName: string, columnName: string): boolean {
const result = db.query(
`PRAGMA table_info(${tableName})`
).all() as { name: string }[];
return result.some(column => column.name === columnName);
}
// Main function to update the mirror_jobs table
async function updateMirrorJobsTable() {
console.log("Checking mirror_jobs table for missing columns...");
// Start a transaction
db.exec("BEGIN TRANSACTION;");
try {
// Check and add each new column if it doesn't exist
const columnsToAdd = [
{ name: "job_type", definition: "TEXT NOT NULL DEFAULT 'mirror'" },
{ name: "batch_id", definition: "TEXT" },
{ name: "total_items", definition: "INTEGER" },
{ name: "completed_items", definition: "INTEGER DEFAULT 0" },
{ name: "item_ids", definition: "TEXT" }, // JSON array as text
{ name: "completed_item_ids", definition: "TEXT DEFAULT '[]'" }, // JSON array as text
{ name: "in_progress", definition: "INTEGER NOT NULL DEFAULT 0" }, // Boolean as integer
{ name: "started_at", definition: "TIMESTAMP" },
{ name: "completed_at", definition: "TIMESTAMP" },
{ name: "last_checkpoint", definition: "TIMESTAMP" }
];
let columnsAdded = 0;
for (const column of columnsToAdd) {
if (!columnExists("mirror_jobs", column.name)) {
console.log(`Adding column '${column.name}' to mirror_jobs table...`);
db.exec(`ALTER TABLE mirror_jobs ADD COLUMN ${column.name} ${column.definition};`);
columnsAdded++;
}
}
// Commit the transaction
db.exec("COMMIT;");
if (columnsAdded > 0) {
console.log(`✅ Added ${columnsAdded} new columns to mirror_jobs table.`);
} else {
console.log("✅ All required columns already exist in mirror_jobs table.");
}
// Create indexes for better performance
console.log("Creating indexes for mirror_jobs table...");
// Only create indexes if they don't exist
const indexesResult = db.query(
`SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='mirror_jobs'`
).all() as { name: string }[];
const existingIndexes = indexesResult.map(idx => idx.name);
const indexesToCreate = [
{ name: "idx_mirror_jobs_user_id", columns: "user_id" },
{ name: "idx_mirror_jobs_batch_id", columns: "batch_id" },
{ name: "idx_mirror_jobs_in_progress", columns: "in_progress" },
{ name: "idx_mirror_jobs_job_type", columns: "job_type" },
{ name: "idx_mirror_jobs_timestamp", columns: "timestamp" }
];
let indexesCreated = 0;
for (const index of indexesToCreate) {
if (!existingIndexes.includes(index.name)) {
console.log(`Creating index '${index.name}'...`);
db.exec(`CREATE INDEX ${index.name} ON mirror_jobs(${index.columns});`);
indexesCreated++;
}
}
if (indexesCreated > 0) {
console.log(`✅ Created ${indexesCreated} new indexes for mirror_jobs table.`);
} else {
console.log("✅ All required indexes already exist for mirror_jobs table.");
}
console.log("Mirror jobs table update completed successfully.");
} catch (error) {
// Rollback the transaction in case of error
db.exec("ROLLBACK;");
console.error("❌ Error updating mirror_jobs table:", error);
process.exit(1);
} finally {
// Close the database connection
db.close();
}
}
// Run the update function
updateMirrorJobsTable().catch(error => {
console.error("Unhandled error:", error);
process.exit(1);
});

View File

@@ -189,6 +189,18 @@ export const mirrorJobs = sqliteTable("mirror_jobs", {
timestamp: integer("timestamp", { mode: "timestamp" }) timestamp: integer("timestamp", { mode: "timestamp" })
.notNull() .notNull()
.default(new Date()), .default(new Date()),
// New fields for job resilience
jobType: text("job_type").notNull().default("mirror"),
batchId: text("batch_id"),
totalItems: integer("total_items"),
completedItems: integer("completed_items").default(0),
itemIds: text("item_ids", { mode: "json" }).$type<string[]>(),
completedItemIds: text("completed_item_ids", { mode: "json" }).$type<string[]>().default([]),
inProgress: integer("in_progress", { mode: "boolean" }).notNull().default(false),
startedAt: integer("started_at", { mode: "timestamp" }),
completedAt: integer("completed_at", { mode: "timestamp" }),
lastCheckpoint: integer("last_checkpoint", { mode: "timestamp" }),
}); });
export const organizations = sqliteTable("organizations", { export const organizations = sqliteTable("organizations", {

View File

@@ -111,6 +111,18 @@ export const mirrorJobSchema = z.object({
status: repoStatusEnum.default("imported"), status: repoStatusEnum.default("imported"),
message: z.string(), message: z.string(),
timestamp: z.date().default(() => new Date()), timestamp: z.date().default(() => new Date()),
// New fields for job resilience
jobType: z.enum(["mirror", "sync", "retry"]).default("mirror"),
batchId: z.string().uuid().optional(), // Group related jobs together
totalItems: z.number().optional(), // Total number of items to process
completedItems: z.number().optional(), // Number of items completed
itemIds: z.array(z.string()).optional(), // IDs of items to process
completedItemIds: z.array(z.string()).optional(), // IDs of completed items
inProgress: z.boolean().default(false), // Whether the job is currently running
startedAt: z.date().optional(), // When the job started
completedAt: z.date().optional(), // When the job completed
lastCheckpoint: z.date().optional(), // Last time progress was saved
}); });
export type MirrorJob = z.infer<typeof mirrorJobSchema>; export type MirrorJob = z.infer<typeof mirrorJobSchema>;

View File

@@ -12,6 +12,11 @@ export async function createMirrorJob({
message, message,
status, status,
details, details,
jobType,
batchId,
totalItems,
itemIds,
inProgress,
}: { }: {
userId: string; userId: string;
organizationId?: string; organizationId?: string;
@@ -21,6 +26,11 @@ export async function createMirrorJob({
details?: string; details?: string;
message: string; message: string;
status: RepoStatus; status: RepoStatus;
jobType?: "mirror" | "sync" | "retry";
batchId?: string;
totalItems?: number;
itemIds?: string[];
inProgress?: boolean;
}) { }) {
const jobId = uuidv4(); const jobId = uuidv4();
const currentTimestamp = new Date(); const currentTimestamp = new Date();
@@ -32,11 +42,22 @@ export async function createMirrorJob({
repositoryName, repositoryName,
organizationId, organizationId,
organizationName, organizationName,
configId: uuidv4(),
details, details,
message: message, message: message,
status: status, status: status,
timestamp: currentTimestamp, timestamp: currentTimestamp,
// New resilience fields
jobType: jobType || "mirror",
batchId: batchId || undefined,
totalItems: totalItems || undefined,
completedItems: 0,
itemIds: itemIds || undefined,
completedItemIds: [],
inProgress: inProgress !== undefined ? inProgress : false,
startedAt: inProgress ? currentTimestamp : undefined,
completedAt: undefined,
lastCheckpoint: undefined,
}; };
try { try {
@@ -57,3 +78,186 @@ export async function createMirrorJob({
throw new Error("Error creating mirror job"); throw new Error("Error creating mirror job");
} }
} }
/**
* Updates the progress of a mirror job
*/
export async function updateMirrorJobProgress({
jobId,
completedItemId,
status,
message,
details,
inProgress,
isCompleted,
}: {
jobId: string;
completedItemId?: string;
status?: RepoStatus;
message?: string;
details?: string;
inProgress?: boolean;
isCompleted?: boolean;
}) {
try {
// Get the current job
const [job] = await db
.select()
.from(mirrorJobs)
.where(mirrorJobs.id === jobId);
if (!job) {
throw new Error(`Mirror job with ID ${jobId} not found`);
}
// Update the job with new progress
const updates: Record<string, any> = {
lastCheckpoint: new Date(),
};
// Add completed item if provided
if (completedItemId) {
const completedItemIds = job.completedItemIds || [];
if (!completedItemIds.includes(completedItemId)) {
updates.completedItemIds = [...completedItemIds, completedItemId];
updates.completedItems = (job.completedItems || 0) + 1;
}
}
// Update status if provided
if (status) {
updates.status = status;
}
// Update message if provided
if (message) {
updates.message = message;
}
// Update details if provided
if (details) {
updates.details = details;
}
// Update in-progress status if provided
if (inProgress !== undefined) {
updates.inProgress = inProgress;
}
// Mark as completed if specified
if (isCompleted) {
updates.inProgress = false;
updates.completedAt = new Date();
}
// Update the job in the database
await db
.update(mirrorJobs)
.set(updates)
.where(mirrorJobs.id === jobId);
// Publish the event
const updatedJob = {
...job,
...updates,
};
await publishEvent({
userId: job.userId,
channel: `mirror-status:${job.userId}`,
payload: updatedJob,
});
return updatedJob;
} catch (error) {
console.error("Error updating mirror job progress:", error);
throw new Error("Error updating mirror job progress");
}
}
/**
* Finds interrupted jobs that need to be resumed
*/
export async function findInterruptedJobs() {
try {
// Find jobs that are marked as in-progress but haven't been updated recently
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - 10); // Consider jobs inactive after 10 minutes without updates
const interruptedJobs = await db
.select()
.from(mirrorJobs)
.where(
mirrorJobs.inProgress === true &&
(mirrorJobs.lastCheckpoint === null ||
mirrorJobs.lastCheckpoint < cutoffTime)
);
return interruptedJobs;
} catch (error) {
console.error("Error finding interrupted jobs:", error);
return [];
}
}
/**
* Resumes an interrupted job
*/
export async function resumeInterruptedJob(job: any) {
try {
console.log(`Resuming interrupted job: ${job.id}`);
// Skip if job doesn't have the necessary data to resume
if (!job.itemIds || !job.completedItemIds) {
console.log(`Cannot resume job ${job.id}: missing item data`);
// Mark the job as failed
await updateMirrorJobProgress({
jobId: job.id,
status: "failed",
message: "Job interrupted and could not be resumed",
details: "The job was interrupted and did not have enough information to resume",
inProgress: false,
isCompleted: true,
});
return null;
}
// Calculate remaining items
const remainingItemIds = job.itemIds.filter(
(id: string) => !job.completedItemIds.includes(id)
);
if (remainingItemIds.length === 0) {
console.log(`Job ${job.id} has no remaining items, marking as completed`);
// Mark the job as completed
await updateMirrorJobProgress({
jobId: job.id,
status: "mirrored",
message: "Job completed after resuming",
inProgress: false,
isCompleted: true,
});
return null;
}
// Update the job to show it's being resumed
await updateMirrorJobProgress({
jobId: job.id,
message: `Resuming job with ${remainingItemIds.length} remaining items`,
details: `Job was interrupted and is being resumed. ${job.completedItemIds.length} of ${job.itemIds.length} items were already processed.`,
inProgress: true,
});
return {
job,
remainingItemIds,
};
} catch (error) {
console.error(`Error resuming job ${job.id}:`, error);
return null;
}
}

224
src/lib/recovery.ts Normal file
View File

@@ -0,0 +1,224 @@
/**
* Recovery mechanism for interrupted jobs
* This module handles detecting and resuming jobs that were interrupted by container restarts
*/
import { findInterruptedJobs, resumeInterruptedJob } from './helpers';
import { db, repositories, organizations } from './db';
import { eq } from 'drizzle-orm';
import { mirrorGithubRepoToGitea, mirrorGitHubOrgRepoToGiteaOrg, syncGiteaRepo } from './gitea';
import { createGitHubClient } from './github';
import { processWithResilience } from './utils/concurrency';
import { repositoryVisibilityEnum, repoStatusEnum } from '@/types/Repository';
import type { Repository } from './db/schema';
/**
* Initialize the recovery system
* This should be called when the application starts
*/
export async function initializeRecovery() {
console.log('Initializing recovery system...');
try {
// Find interrupted jobs
const interruptedJobs = await findInterruptedJobs();
if (interruptedJobs.length === 0) {
console.log('No interrupted jobs found.');
return;
}
console.log(`Found ${interruptedJobs.length} interrupted jobs. Starting recovery...`);
// Process each interrupted job
for (const job of interruptedJobs) {
const resumeData = await resumeInterruptedJob(job);
if (!resumeData) {
console.log(`Job ${job.id} could not be resumed.`);
continue;
}
const { job: updatedJob, remainingItemIds } = resumeData;
// Handle different job types
switch (updatedJob.jobType) {
case 'mirror':
await recoverMirrorJob(updatedJob, remainingItemIds);
break;
case 'sync':
await recoverSyncJob(updatedJob, remainingItemIds);
break;
case 'retry':
await recoverRetryJob(updatedJob, remainingItemIds);
break;
default:
console.log(`Unknown job type: ${updatedJob.jobType}`);
}
}
console.log('Recovery process completed.');
} catch (error) {
console.error('Error during recovery process:', error);
}
}
/**
* Recover a mirror job
*/
async function recoverMirrorJob(job: any, remainingItemIds: string[]) {
console.log(`Recovering mirror job ${job.id} with ${remainingItemIds.length} remaining items`);
try {
// Get the config for this user
const [config] = await db
.select()
.from(repositories)
.where(eq(repositories.userId, job.userId))
.limit(1);
if (!config || !config.configId) {
throw new Error('Config not found for user');
}
// Get repositories to process
const repos = await db
.select()
.from(repositories)
.where(eq(repositories.id, remainingItemIds));
if (repos.length === 0) {
throw new Error('No repositories found for the remaining item IDs');
}
// Create GitHub client
const octokit = createGitHubClient(config.githubConfig.token);
// Process repositories with resilience
await processWithResilience(
repos,
async (repo) => {
// Prepare repository data
const repoData = {
...repo,
status: repoStatusEnum.parse("imported"),
organization: repo.organization ?? undefined,
lastMirrored: repo.lastMirrored ?? undefined,
errorMessage: repo.errorMessage ?? undefined,
forkedFrom: repo.forkedFrom ?? undefined,
visibility: repositoryVisibilityEnum.parse(repo.visibility),
mirroredLocation: repo.mirroredLocation || "",
};
// Mirror the repository based on whether it's in an organization
if (repo.organization && config.githubConfig.preserveOrgStructure) {
await mirrorGitHubOrgRepoToGiteaOrg({
config,
octokit,
orgName: repo.organization,
repository: repoData,
});
} else {
await mirrorGithubRepoToGitea({
octokit,
repository: repoData,
config,
});
}
return repo;
},
{
userId: job.userId,
jobType: 'mirror',
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
resumeFromJobId: job.id,
concurrencyLimit: 3,
maxRetries: 2,
retryDelay: 2000,
}
);
} catch (error) {
console.error(`Error recovering mirror job ${job.id}:`, error);
}
}
/**
* Recover a sync job
*/
async function recoverSyncJob(job: any, remainingItemIds: string[]) {
// Implementation similar to recoverMirrorJob but for sync operations
console.log(`Recovering sync job ${job.id} with ${remainingItemIds.length} remaining items`);
try {
// Get the config for this user
const [config] = await db
.select()
.from(repositories)
.where(eq(repositories.userId, job.userId))
.limit(1);
if (!config || !config.configId) {
throw new Error('Config not found for user');
}
// Get repositories to process
const repos = await db
.select()
.from(repositories)
.where(eq(repositories.id, remainingItemIds));
if (repos.length === 0) {
throw new Error('No repositories found for the remaining item IDs');
}
// Process repositories with resilience
await processWithResilience(
repos,
async (repo) => {
// Prepare repository data
const repoData = {
...repo,
status: repoStatusEnum.parse(repo.status),
organization: repo.organization ?? undefined,
lastMirrored: repo.lastMirrored ?? undefined,
errorMessage: repo.errorMessage ?? undefined,
forkedFrom: repo.forkedFrom ?? undefined,
visibility: repositoryVisibilityEnum.parse(repo.visibility),
};
// Sync the repository
await syncGiteaRepo({
config,
repository: repoData,
});
return repo;
},
{
userId: job.userId,
jobType: 'sync',
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
resumeFromJobId: job.id,
concurrencyLimit: 5,
maxRetries: 2,
retryDelay: 2000,
}
);
} catch (error) {
console.error(`Error recovering sync job ${job.id}:`, error);
}
}
/**
* Recover a retry job
*/
async function recoverRetryJob(job: any, remainingItemIds: string[]) {
// Implementation similar to recoverMirrorJob but for retry operations
console.log(`Recovering retry job ${job.id} with ${remainingItemIds.length} remaining items`);
// This would be similar to recoverMirrorJob but with retry-specific logic
console.log('Retry job recovery not yet implemented');
}

View File

@@ -75,6 +75,10 @@ export async function processWithRetry<T, R>(
retryDelay?: number; retryDelay?: number;
onProgress?: (completed: number, total: number, result?: R) => void; onProgress?: (completed: number, total: number, result?: R) => void;
onRetry?: (item: T, error: Error, attempt: number) => void; onRetry?: (item: T, error: Error, attempt: number) => void;
jobId?: string; // Optional job ID for checkpointing
getItemId?: (item: T) => string; // Function to get a unique ID for each item
onCheckpoint?: (jobId: string, completedItemId: string) => Promise<void>; // Callback for checkpointing
checkpointInterval?: number; // How many items to process before checkpointing
} = {} } = {}
): Promise<R[]> { ): Promise<R[]> {
const { const {
@@ -82,16 +86,37 @@ export async function processWithRetry<T, R>(
maxRetries = 3, maxRetries = 3,
retryDelay = 1000, retryDelay = 1000,
onProgress, onProgress,
onRetry onRetry,
jobId,
getItemId,
onCheckpoint,
checkpointInterval = 1 // Default to checkpointing after each item
} = options; } = options;
// Track checkpoint counter
let itemsProcessedSinceLastCheckpoint = 0;
// Wrap the process function with retry logic // Wrap the process function with retry logic
const processWithRetryLogic = async (item: T): Promise<R> => { const processWithRetryLogic = async (item: T): Promise<R> => {
let lastError: Error | null = null; let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) { for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
try { try {
return await processItem(item); const result = await processItem(item);
// Handle checkpointing if enabled
if (jobId && getItemId && onCheckpoint) {
const itemId = getItemId(item);
itemsProcessedSinceLastCheckpoint++;
// Checkpoint based on the interval
if (itemsProcessedSinceLastCheckpoint >= checkpointInterval) {
await onCheckpoint(jobId, itemId);
itemsProcessedSinceLastCheckpoint = 0;
}
}
return result;
} catch (error) { } catch (error) {
lastError = error instanceof Error ? error : new Error(String(error)); lastError = error instanceof Error ? error : new Error(String(error));
@@ -113,10 +138,155 @@ export async function processWithRetry<T, R>(
throw lastError || new Error('Unknown error occurred'); throw lastError || new Error('Unknown error occurred');
}; };
return processInParallel( const results = await processInParallel(
items, items,
processWithRetryLogic, processWithRetryLogic,
concurrencyLimit, concurrencyLimit,
onProgress onProgress
); );
// Final checkpoint if there are remaining items since the last checkpoint
if (jobId && getItemId && onCheckpoint && itemsProcessedSinceLastCheckpoint > 0) {
// We don't have a specific item ID for the final checkpoint, so we'll use a placeholder
await onCheckpoint(jobId, 'final');
}
return results;
}
/**
* Process items in parallel with resilience to container restarts
* This version supports resuming from a previous checkpoint
*/
export async function processWithResilience<T, R>(
items: T[],
processItem: (item: T) => Promise<R>,
options: {
concurrencyLimit?: number;
maxRetries?: number;
retryDelay?: number;
onProgress?: (completed: number, total: number, result?: R) => void;
onRetry?: (item: T, error: Error, attempt: number) => void;
userId: string; // Required for creating mirror jobs
jobType: "mirror" | "sync" | "retry";
getItemId: (item: T) => string; // Required function to get a unique ID for each item
getItemName: (item: T) => string; // Required function to get a display name for each item
checkpointInterval?: number;
resumeFromJobId?: string; // Optional job ID to resume from
}
): Promise<R[]> {
const {
userId,
jobType,
getItemId,
getItemName,
resumeFromJobId,
checkpointInterval = 5,
...otherOptions
} = options;
// Import helpers for job management
const { createMirrorJob, updateMirrorJobProgress } = await import('@/lib/helpers');
// Get item IDs for all items
const allItemIds = items.map(getItemId);
// Create or resume a job
let jobId: string;
let completedItemIds: string[] = [];
let itemsToProcess = [...items];
if (resumeFromJobId) {
// We're resuming an existing job
jobId = resumeFromJobId;
// Get the job from the database to find completed items
const { db, mirrorJobs } = await import('@/lib/db');
const { eq } = await import('drizzle-orm');
const [job] = await db
.select()
.from(mirrorJobs)
.where(eq(mirrorJobs.id, resumeFromJobId));
if (job && job.completedItemIds) {
completedItemIds = job.completedItemIds;
// Filter out already completed items
itemsToProcess = items.filter(item => !completedItemIds.includes(getItemId(item)));
console.log(`Resuming job ${jobId} with ${itemsToProcess.length} remaining items`);
// Update the job to show it's being resumed
await updateMirrorJobProgress({
jobId,
message: `Resuming job with ${itemsToProcess.length} remaining items`,
details: `Job is being resumed. ${completedItemIds.length} of ${items.length} items were already processed.`,
inProgress: true,
});
}
} else {
// Create a new job
jobId = await createMirrorJob({
userId,
message: `Started ${jobType} job with ${items.length} items`,
details: `Processing ${items.length} items in parallel with checkpointing`,
status: "mirroring",
jobType,
totalItems: items.length,
itemIds: allItemIds,
inProgress: true,
});
console.log(`Created new job ${jobId} with ${items.length} items`);
}
// Define the checkpoint function
const onCheckpoint = async (jobId: string, completedItemId: string) => {
const itemName = items.find(item => getItemId(item) === completedItemId)
? getItemName(items.find(item => getItemId(item) === completedItemId)!)
: 'unknown';
await updateMirrorJobProgress({
jobId,
completedItemId,
message: `Processed item: ${itemName}`,
});
};
try {
// Process the items with checkpointing
const results = await processWithRetry(
itemsToProcess,
processItem,
{
...otherOptions,
jobId,
getItemId,
onCheckpoint,
checkpointInterval,
}
);
// Mark the job as completed
await updateMirrorJobProgress({
jobId,
status: "mirrored",
message: `Completed ${jobType} job with ${items.length} items`,
inProgress: false,
isCompleted: true,
});
return results;
} catch (error) {
// Mark the job as failed
await updateMirrorJobProgress({
jobId,
status: "failed",
message: `Failed ${jobType} job: ${error instanceof Error ? error.message : String(error)}`,
inProgress: false,
isCompleted: true,
});
throw error;
}
} }

22
src/middleware.ts Normal file
View File

@@ -0,0 +1,22 @@
import { defineMiddleware } from 'astro:middleware';
import { initializeRecovery } from './lib/recovery';
// Flag to track if recovery has been initialized
let recoveryInitialized = false;
export const onRequest = defineMiddleware(async (context, next) => {
// Initialize recovery system only once when the server starts
if (!recoveryInitialized) {
console.log('Initializing recovery system from middleware...');
try {
await initializeRecovery();
console.log('Recovery system initialized successfully');
} catch (error) {
console.error('Error initializing recovery system:', error);
}
recoveryInitialized = true;
}
// Continue with the request
return next();
});

View File

@@ -6,8 +6,8 @@ import { createGitHubClient } from "@/lib/github";
import { mirrorGitHubOrgToGitea } from "@/lib/gitea"; import { mirrorGitHubOrgToGitea } from "@/lib/gitea";
import { repoStatusEnum } from "@/types/Repository"; import { repoStatusEnum } from "@/types/Repository";
import { type MembershipRole } from "@/types/organizations"; import { type MembershipRole } from "@/types/organizations";
import { processWithRetry } from "@/lib/utils/concurrency"; import { processWithResilience } from "@/lib/utils/concurrency";
import { createMirrorJob } from "@/lib/helpers"; import { v4 as uuidv4 } from "uuid";
export const POST: APIRoute = async ({ request }) => { export const POST: APIRoute = async ({ request }) => {
try { try {
@@ -63,7 +63,7 @@ export const POST: APIRoute = async ({ request }) => {
); );
} }
// Fire async mirroring without blocking response, using parallel processing // Fire async mirroring without blocking response, using parallel processing with resilience
setTimeout(async () => { setTimeout(async () => {
if (!config.githubConfig.token) { if (!config.githubConfig.token) {
throw new Error("GitHub token is missing in config."); throw new Error("GitHub token is missing in config.");
@@ -76,8 +76,11 @@ export const POST: APIRoute = async ({ request }) => {
// Using a lower concurrency for organizations since each org might contain many repos // Using a lower concurrency for organizations since each org might contain many repos
const CONCURRENCY_LIMIT = 2; const CONCURRENCY_LIMIT = 2;
// Process organizations in parallel with retry capability // Generate a batch ID to group related organizations
await processWithRetry( const batchId = uuidv4();
// Process organizations in parallel with resilience to container restarts
await processWithResilience(
orgs, orgs,
async (org) => { async (org) => {
// Prepare organization data // Prepare organization data
@@ -92,16 +95,6 @@ export const POST: APIRoute = async ({ request }) => {
// Log the start of mirroring // Log the start of mirroring
console.log(`Starting mirror for organization: ${org.name}`); console.log(`Starting mirror for organization: ${org.name}`);
// Create a mirror job entry to track progress
await createMirrorJob({
userId: config.userId || "",
organizationId: org.id,
organizationName: org.name,
message: `Started mirroring organization: ${org.name}`,
details: `Organization ${org.name} is now in the mirroring queue.`,
status: "mirroring",
});
// Mirror the organization // Mirror the organization
await mirrorGitHubOrgToGitea({ await mirrorGitHubOrgToGitea({
config, config,
@@ -112,9 +105,15 @@ export const POST: APIRoute = async ({ request }) => {
return org; return org;
}, },
{ {
userId: config.userId || "",
jobType: "mirror",
batchId,
getItemId: (org) => org.id,
getItemName: (org) => org.name,
concurrencyLimit: CONCURRENCY_LIMIT, concurrencyLimit: CONCURRENCY_LIMIT,
maxRetries: 2, maxRetries: 2,
retryDelay: 3000, retryDelay: 3000,
checkpointInterval: 1, // Checkpoint after each organization
onProgress: (completed, total, result) => { onProgress: (completed, total, result) => {
const percentComplete = Math.round((completed / total) * 100); const percentComplete = Math.round((completed / total) * 100);
console.log(`Organization mirroring progress: ${percentComplete}% (${completed}/${total})`); console.log(`Organization mirroring progress: ${percentComplete}% (${completed}/${total})`);

View File

@@ -8,8 +8,8 @@ import {
mirrorGitHubOrgRepoToGiteaOrg, mirrorGitHubOrgRepoToGiteaOrg,
} from "@/lib/gitea"; } from "@/lib/gitea";
import { createGitHubClient } from "@/lib/github"; import { createGitHubClient } from "@/lib/github";
import { processWithRetry } from "@/lib/utils/concurrency"; import { processWithResilience } from "@/lib/utils/concurrency";
import { createMirrorJob } from "@/lib/helpers"; import { v4 as uuidv4 } from "uuid";
export const POST: APIRoute = async ({ request }) => { export const POST: APIRoute = async ({ request }) => {
try { try {
@@ -65,7 +65,7 @@ export const POST: APIRoute = async ({ request }) => {
); );
} }
// Start async mirroring in background with parallel processing // Start async mirroring in background with parallel processing and resilience
setTimeout(async () => { setTimeout(async () => {
if (!config.githubConfig.token) { if (!config.githubConfig.token) {
throw new Error("GitHub token is missing."); throw new Error("GitHub token is missing.");
@@ -77,8 +77,11 @@ export const POST: APIRoute = async ({ request }) => {
// Define the concurrency limit - adjust based on API rate limits // Define the concurrency limit - adjust based on API rate limits
const CONCURRENCY_LIMIT = 3; const CONCURRENCY_LIMIT = 3;
// Process repositories in parallel with retry capability // Generate a batch ID to group related repositories
await processWithRetry( const batchId = uuidv4();
// Process repositories in parallel with resilience to container restarts
await processWithResilience(
repos, repos,
async (repo) => { async (repo) => {
// Prepare repository data // Prepare repository data
@@ -96,16 +99,6 @@ export const POST: APIRoute = async ({ request }) => {
// Log the start of mirroring // Log the start of mirroring
console.log(`Starting mirror for repository: ${repo.name}`); console.log(`Starting mirror for repository: ${repo.name}`);
// Create a mirror job entry to track progress
await createMirrorJob({
userId: config.userId || "",
repositoryId: repo.id,
repositoryName: repo.name,
message: `Started mirroring repository: ${repo.name}`,
details: `Repository ${repo.name} is now in the mirroring queue.`,
status: "mirroring",
});
// Mirror the repository based on whether it's in an organization // Mirror the repository based on whether it's in an organization
if (repo.organization && config.githubConfig.preserveOrgStructure) { if (repo.organization && config.githubConfig.preserveOrgStructure) {
await mirrorGitHubOrgRepoToGiteaOrg({ await mirrorGitHubOrgRepoToGiteaOrg({
@@ -125,9 +118,15 @@ export const POST: APIRoute = async ({ request }) => {
return repo; return repo;
}, },
{ {
userId: config.userId || "",
jobType: "mirror",
batchId,
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
concurrencyLimit: CONCURRENCY_LIMIT, concurrencyLimit: CONCURRENCY_LIMIT,
maxRetries: 2, maxRetries: 2,
retryDelay: 2000, retryDelay: 2000,
checkpointInterval: 1, // Checkpoint after each repository
onProgress: (completed, total, result) => { onProgress: (completed, total, result) => {
const percentComplete = Math.round((completed / total) * 100); const percentComplete = Math.round((completed / total) * 100);
console.log(`Mirroring progress: ${percentComplete}% (${completed}/${total})`); console.log(`Mirroring progress: ${percentComplete}% (${completed}/${total})`);

View File

@@ -5,8 +5,8 @@ import { eq, inArray } from "drizzle-orm";
import { repositoryVisibilityEnum, repoStatusEnum } from "@/types/Repository"; import { repositoryVisibilityEnum, repoStatusEnum } from "@/types/Repository";
import { syncGiteaRepo } from "@/lib/gitea"; import { syncGiteaRepo } from "@/lib/gitea";
import type { SyncRepoResponse } from "@/types/sync"; import type { SyncRepoResponse } from "@/types/sync";
import { processWithRetry } from "@/lib/utils/concurrency"; import { processWithResilience } from "@/lib/utils/concurrency";
import { createMirrorJob } from "@/lib/helpers"; import { v4 as uuidv4 } from "uuid";
export const POST: APIRoute = async ({ request }) => { export const POST: APIRoute = async ({ request }) => {
try { try {
@@ -62,13 +62,16 @@ export const POST: APIRoute = async ({ request }) => {
); );
} }
// Start async mirroring in background with parallel processing // Start async mirroring in background with parallel processing and resilience
setTimeout(async () => { setTimeout(async () => {
// Define the concurrency limit - adjust based on API rate limits // Define the concurrency limit - adjust based on API rate limits
const CONCURRENCY_LIMIT = 5; const CONCURRENCY_LIMIT = 5;
// Process repositories in parallel with retry capability // Generate a batch ID to group related repositories
await processWithRetry( const batchId = uuidv4();
// Process repositories in parallel with resilience to container restarts
await processWithResilience(
repos, repos,
async (repo) => { async (repo) => {
// Prepare repository data // Prepare repository data
@@ -85,16 +88,6 @@ export const POST: APIRoute = async ({ request }) => {
// Log the start of syncing // Log the start of syncing
console.log(`Starting sync for repository: ${repo.name}`); console.log(`Starting sync for repository: ${repo.name}`);
// Create a mirror job entry to track progress
await createMirrorJob({
userId: config.userId || "",
repositoryId: repo.id,
repositoryName: repo.name,
message: `Started syncing repository: ${repo.name}`,
details: `Repository ${repo.name} is now in the syncing queue.`,
status: "syncing",
});
// Sync the repository // Sync the repository
await syncGiteaRepo({ await syncGiteaRepo({
config, config,
@@ -104,9 +97,15 @@ export const POST: APIRoute = async ({ request }) => {
return repo; return repo;
}, },
{ {
userId: config.userId || "",
jobType: "sync",
batchId,
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
concurrencyLimit: CONCURRENCY_LIMIT, concurrencyLimit: CONCURRENCY_LIMIT,
maxRetries: 2, maxRetries: 2,
retryDelay: 2000, retryDelay: 2000,
checkpointInterval: 1, // Checkpoint after each repository
onProgress: (completed, total, result) => { onProgress: (completed, total, result) => {
const percentComplete = Math.round((completed / total) * 100); const percentComplete = Math.round((completed / total) * 100);
console.log(`Syncing progress: ${percentComplete}% (${completed}/${total})`); console.log(`Syncing progress: ${percentComplete}% (${completed}/${total})`);