feat: Implement comprehensive job recovery and resume process improvements

- Added a startup recovery script to handle interrupted jobs before application startup.
- Enhanced recovery system with database connection validation and stale job cleanup.
- Improved middleware to check for recovery needs and handle recovery during requests.
- Updated health check endpoint to include recovery system status and metrics.
- Introduced test scripts for verifying recovery functionality and job state management.
- Enhanced logging and error handling throughout the recovery process.
This commit is contained in:
Arunavo Ray
2025-05-24 13:45:25 +05:30
parent 98610482ae
commit a988be1028
10 changed files with 1006 additions and 111 deletions

View File

@@ -4,101 +4,274 @@
*/
import { findInterruptedJobs, resumeInterruptedJob } from './helpers';
import { db, repositories, organizations } from './db';
import { eq } from 'drizzle-orm';
import { db, repositories, organizations, mirrorJobs } from './db';
import { eq, and, lt } from 'drizzle-orm';
import { mirrorGithubRepoToGitea, mirrorGitHubOrgRepoToGiteaOrg, syncGiteaRepo } from './gitea';
import { createGitHubClient } from './github';
import { processWithResilience } from './utils/concurrency';
import { repositoryVisibilityEnum, repoStatusEnum } from '@/types/Repository';
import type { Repository } from './db/schema';
// Recovery state tracking
let recoveryInProgress = false;
let lastRecoveryAttempt: Date | null = null;
/**
* Initialize the recovery system
* This should be called when the application starts
* Validates database connection before attempting recovery
*/
export async function initializeRecovery() {
console.log('Initializing recovery system...');
async function validateDatabaseConnection(): Promise<boolean> {
try {
// Find interrupted jobs
const interruptedJobs = await findInterruptedJobs();
if (interruptedJobs.length === 0) {
console.log('No interrupted jobs found.');
return;
}
console.log(`Found ${interruptedJobs.length} interrupted jobs. Starting recovery...`);
// Process each interrupted job
for (const job of interruptedJobs) {
const resumeData = await resumeInterruptedJob(job);
if (!resumeData) {
console.log(`Job ${job.id} could not be resumed.`);
continue;
}
const { job: updatedJob, remainingItemIds } = resumeData;
// Handle different job types
switch (updatedJob.jobType) {
case 'mirror':
await recoverMirrorJob(updatedJob, remainingItemIds);
break;
case 'sync':
await recoverSyncJob(updatedJob, remainingItemIds);
break;
case 'retry':
await recoverRetryJob(updatedJob, remainingItemIds);
break;
default:
console.log(`Unknown job type: ${updatedJob.jobType}`);
}
}
console.log('Recovery process completed.');
// Simple query to test database connectivity
await db.select().from(mirrorJobs).limit(1);
return true;
} catch (error) {
console.error('Error during recovery process:', error);
console.error('Database connection validation failed:', error);
return false;
}
}
/**
* Recover a mirror job
* Cleans up stale jobs that are too old to recover
*/
async function cleanupStaleJobs(): Promise<void> {
try {
const staleThreshold = new Date();
staleThreshold.setHours(staleThreshold.getHours() - 24); // Jobs older than 24 hours
const staleJobs = await db
.select()
.from(mirrorJobs)
.where(
and(
eq(mirrorJobs.inProgress, true),
lt(mirrorJobs.startedAt, staleThreshold)
)
);
if (staleJobs.length > 0) {
console.log(`Found ${staleJobs.length} stale jobs to clean up`);
// Mark stale jobs as failed
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "failed",
message: "Job marked as failed due to being stale (older than 24 hours)"
})
.where(
and(
eq(mirrorJobs.inProgress, true),
lt(mirrorJobs.startedAt, staleThreshold)
)
);
console.log(`Cleaned up ${staleJobs.length} stale jobs`);
}
} catch (error) {
console.error('Error cleaning up stale jobs:', error);
}
}
/**
* Initialize the recovery system with enhanced error handling and resilience
* This should be called when the application starts
*/
export async function initializeRecovery(options: {
maxRetries?: number;
retryDelay?: number;
skipIfRecentAttempt?: boolean;
} = {}): Promise<boolean> {
const { maxRetries = 3, retryDelay = 5000, skipIfRecentAttempt = true } = options;
// Prevent concurrent recovery attempts
if (recoveryInProgress) {
console.log('Recovery already in progress, skipping...');
return false;
}
// Skip if recent attempt (within last 5 minutes) unless forced
if (skipIfRecentAttempt && lastRecoveryAttempt) {
const timeSinceLastAttempt = Date.now() - lastRecoveryAttempt.getTime();
if (timeSinceLastAttempt < 5 * 60 * 1000) {
console.log('Recent recovery attempt detected, skipping...');
return false;
}
}
recoveryInProgress = true;
lastRecoveryAttempt = new Date();
console.log('Initializing recovery system...');
let attempt = 0;
while (attempt < maxRetries) {
try {
attempt++;
console.log(`Recovery attempt ${attempt}/${maxRetries}`);
// Validate database connection first
const dbConnected = await validateDatabaseConnection();
if (!dbConnected) {
throw new Error('Database connection validation failed');
}
// Clean up stale jobs first
await cleanupStaleJobs();
// Find interrupted jobs
const interruptedJobs = await findInterruptedJobs();
if (interruptedJobs.length === 0) {
console.log('No interrupted jobs found.');
recoveryInProgress = false;
return true;
}
console.log(`Found ${interruptedJobs.length} interrupted jobs. Starting recovery...`);
// Process each interrupted job with individual error handling
let successCount = 0;
let failureCount = 0;
for (const job of interruptedJobs) {
try {
const resumeData = await resumeInterruptedJob(job);
if (!resumeData) {
console.log(`Job ${job.id} could not be resumed.`);
failureCount++;
continue;
}
const { job: updatedJob, remainingItemIds } = resumeData;
// Handle different job types
switch (updatedJob.jobType) {
case 'mirror':
await recoverMirrorJob(updatedJob, remainingItemIds);
break;
case 'sync':
await recoverSyncJob(updatedJob, remainingItemIds);
break;
case 'retry':
await recoverRetryJob(updatedJob, remainingItemIds);
break;
default:
console.log(`Unknown job type: ${updatedJob.jobType}`);
failureCount++;
continue;
}
successCount++;
} catch (jobError) {
console.error(`Error recovering individual job ${job.id}:`, jobError);
failureCount++;
// Mark the job as failed if recovery fails
try {
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "failed",
message: `Job recovery failed: ${jobError instanceof Error ? jobError.message : String(jobError)}`
})
.where(eq(mirrorJobs.id, job.id));
} catch (updateError) {
console.error(`Failed to mark job ${job.id} as failed:`, updateError);
}
}
}
console.log(`Recovery process completed. Success: ${successCount}, Failures: ${failureCount}`);
recoveryInProgress = false;
return true;
} catch (error) {
console.error(`Recovery attempt ${attempt} failed:`, error);
if (attempt < maxRetries) {
console.log(`Retrying in ${retryDelay}ms...`);
await new Promise(resolve => setTimeout(resolve, retryDelay));
} else {
console.error('All recovery attempts failed');
recoveryInProgress = false;
return false;
}
}
}
recoveryInProgress = false;
return false;
}
/**
* Recover a mirror job with enhanced error handling
*/
async function recoverMirrorJob(job: any, remainingItemIds: string[]) {
console.log(`Recovering mirror job ${job.id} with ${remainingItemIds.length} remaining items`);
try {
// Get the config for this user
const [config] = await db
// Get the config for this user with better error handling
const configs = await db
.select()
.from(repositories)
.where(eq(repositories.userId, job.userId))
.limit(1);
if (!config || !config.configId) {
throw new Error('Config not found for user');
if (configs.length === 0) {
throw new Error(`No configuration found for user ${job.userId}`);
}
// Get repositories to process
const config = configs[0];
if (!config.configId) {
throw new Error(`Configuration missing configId for user ${job.userId}`);
}
// Get repositories to process with validation
const repos = await db
.select()
.from(repositories)
.where(eq(repositories.id, remainingItemIds));
if (repos.length === 0) {
throw new Error('No repositories found for the remaining item IDs');
console.warn(`No repositories found for remaining item IDs: ${remainingItemIds.join(', ')}`);
// Mark job as completed since there's nothing to process
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "mirrored",
message: "Job completed - no repositories found to process"
})
.where(eq(mirrorJobs.id, job.id));
return;
}
// Create GitHub client
const octokit = createGitHubClient(config.githubConfig.token);
// Process repositories with resilience
console.log(`Found ${repos.length} repositories to process for recovery`);
// Validate GitHub configuration before creating client
if (!config.githubConfig?.token) {
throw new Error('GitHub token not found in configuration');
}
// Create GitHub client with error handling
let octokit;
try {
octokit = createGitHubClient(config.githubConfig.token);
} catch (error) {
throw new Error(`Failed to create GitHub client: ${error instanceof Error ? error.message : String(error)}`);
}
// Process repositories with resilience and reduced concurrency for recovery
await processWithResilience(
repos,
async (repo) => {
// Prepare repository data
// Prepare repository data with validation
const repoData = {
...repo,
status: repoStatusEnum.parse("imported"),
@@ -106,10 +279,10 @@ async function recoverMirrorJob(job: any, remainingItemIds: string[]) {
lastMirrored: repo.lastMirrored ?? undefined,
errorMessage: repo.errorMessage ?? undefined,
forkedFrom: repo.forkedFrom ?? undefined,
visibility: repositoryVisibilityEnum.parse(repo.visibility),
visibility: repositoryVisibilityEnum.parse(repo.visibility || "public"),
mirroredLocation: repo.mirroredLocation || "",
};
// Mirror the repository based on whether it's in an organization
if (repo.organization && config.githubConfig.preserveOrgStructure) {
await mirrorGitHubOrgRepoToGiteaOrg({
@@ -125,7 +298,7 @@ async function recoverMirrorJob(job: any, remainingItemIds: string[]) {
config,
});
}
return repo;
},
{
@@ -134,66 +307,102 @@ async function recoverMirrorJob(job: any, remainingItemIds: string[]) {
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
resumeFromJobId: job.id,
concurrencyLimit: 3,
maxRetries: 2,
retryDelay: 2000,
concurrencyLimit: 2, // Reduced concurrency for recovery to be more stable
maxRetries: 3, // Increased retries for recovery
retryDelay: 3000, // Longer delay for recovery
}
);
console.log(`Successfully recovered mirror job ${job.id}`);
} catch (error) {
console.error(`Error recovering mirror job ${job.id}:`, error);
// Mark the job as failed
try {
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "failed",
message: `Mirror job recovery failed: ${error instanceof Error ? error.message : String(error)}`
})
.where(eq(mirrorJobs.id, job.id));
} catch (updateError) {
console.error(`Failed to mark mirror job ${job.id} as failed:`, updateError);
}
throw error; // Re-throw to be handled by the caller
}
}
/**
* Recover a sync job
* Recover a sync job with enhanced error handling
*/
async function recoverSyncJob(job: any, remainingItemIds: string[]) {
// Implementation similar to recoverMirrorJob but for sync operations
console.log(`Recovering sync job ${job.id} with ${remainingItemIds.length} remaining items`);
try {
// Get the config for this user
const [config] = await db
// Get the config for this user with better error handling
const configs = await db
.select()
.from(repositories)
.where(eq(repositories.userId, job.userId))
.limit(1);
if (!config || !config.configId) {
throw new Error('Config not found for user');
if (configs.length === 0) {
throw new Error(`No configuration found for user ${job.userId}`);
}
// Get repositories to process
const config = configs[0];
if (!config.configId) {
throw new Error(`Configuration missing configId for user ${job.userId}`);
}
// Get repositories to process with validation
const repos = await db
.select()
.from(repositories)
.where(eq(repositories.id, remainingItemIds));
if (repos.length === 0) {
throw new Error('No repositories found for the remaining item IDs');
console.warn(`No repositories found for remaining item IDs: ${remainingItemIds.join(', ')}`);
// Mark job as completed since there's nothing to process
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "mirrored",
message: "Job completed - no repositories found to process"
})
.where(eq(mirrorJobs.id, job.id));
return;
}
// Process repositories with resilience
console.log(`Found ${repos.length} repositories to process for sync recovery`);
// Process repositories with resilience and reduced concurrency for recovery
await processWithResilience(
repos,
async (repo) => {
// Prepare repository data
// Prepare repository data with validation
const repoData = {
...repo,
status: repoStatusEnum.parse(repo.status),
status: repoStatusEnum.parse(repo.status || "imported"),
organization: repo.organization ?? undefined,
lastMirrored: repo.lastMirrored ?? undefined,
errorMessage: repo.errorMessage ?? undefined,
forkedFrom: repo.forkedFrom ?? undefined,
visibility: repositoryVisibilityEnum.parse(repo.visibility),
visibility: repositoryVisibilityEnum.parse(repo.visibility || "public"),
};
// Sync the repository
await syncGiteaRepo({
config,
repository: repoData,
});
return repo;
},
{
@@ -202,23 +411,94 @@ async function recoverSyncJob(job: any, remainingItemIds: string[]) {
getItemId: (repo) => repo.id,
getItemName: (repo) => repo.name,
resumeFromJobId: job.id,
concurrencyLimit: 5,
maxRetries: 2,
retryDelay: 2000,
concurrencyLimit: 3, // Reduced concurrency for recovery
maxRetries: 3, // Increased retries for recovery
retryDelay: 3000, // Longer delay for recovery
}
);
console.log(`Successfully recovered sync job ${job.id}`);
} catch (error) {
console.error(`Error recovering sync job ${job.id}:`, error);
// Mark the job as failed
try {
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "failed",
message: `Sync job recovery failed: ${error instanceof Error ? error.message : String(error)}`
})
.where(eq(mirrorJobs.id, job.id));
} catch (updateError) {
console.error(`Failed to mark sync job ${job.id} as failed:`, updateError);
}
throw error; // Re-throw to be handled by the caller
}
}
/**
* Recover a retry job
* Recover a retry job with enhanced error handling
*/
async function recoverRetryJob(job: any, remainingItemIds: string[]) {
// Implementation similar to recoverMirrorJob but for retry operations
console.log(`Recovering retry job ${job.id} with ${remainingItemIds.length} remaining items`);
// This would be similar to recoverMirrorJob but with retry-specific logic
console.log('Retry job recovery not yet implemented');
try {
// For now, retry jobs are treated similarly to mirror jobs
// In the future, this could have specific retry logic
await recoverMirrorJob(job, remainingItemIds);
console.log(`Successfully recovered retry job ${job.id}`);
} catch (error) {
console.error(`Error recovering retry job ${job.id}:`, error);
// Mark the job as failed
try {
await db
.update(mirrorJobs)
.set({
inProgress: false,
completedAt: new Date(),
status: "failed",
message: `Retry job recovery failed: ${error instanceof Error ? error.message : String(error)}`
})
.where(eq(mirrorJobs.id, job.id));
} catch (updateError) {
console.error(`Failed to mark retry job ${job.id} as failed:`, updateError);
}
throw error; // Re-throw to be handled by the caller
}
}
/**
* Get recovery system status
*/
export function getRecoveryStatus() {
return {
inProgress: recoveryInProgress,
lastAttempt: lastRecoveryAttempt,
};
}
/**
* Force recovery to run (bypassing recent attempt check)
*/
export async function forceRecovery(): Promise<boolean> {
return initializeRecovery({ skipIfRecentAttempt: false });
}
/**
* Check if there are any jobs that need recovery
*/
export async function hasJobsNeedingRecovery(): Promise<boolean> {
try {
const interruptedJobs = await findInterruptedJobs();
return interruptedJobs.length > 0;
} catch (error) {
console.error('Error checking for jobs needing recovery:', error);
return false;
}
}