feat: Implement graceful shutdown and enhanced job recovery

- Added shutdown handler in docker-entrypoint.sh to manage application termination signals.
- Introduced shutdown manager to track active jobs and ensure state persistence during shutdown.
- Enhanced cleanup service to support stopping and status retrieval.
- Integrated signal handlers for proper response to termination signals (SIGTERM, SIGINT, SIGHUP).
- Updated middleware to initialize shutdown manager and cleanup service.
- Created integration tests for graceful shutdown functionality, verifying job state preservation and recovery.
- Documented graceful shutdown process and configuration in GRACEFUL_SHUTDOWN.md and SHUTDOWN_PROCESS.md.
- Added new scripts for testing shutdown behavior and cleanup.
This commit is contained in:
Arunavo Ray
2025-05-24 23:06:28 +05:30
parent 4404af7d40
commit daf4ab6a93
10 changed files with 1243 additions and 12 deletions

View File

@@ -181,30 +181,41 @@ export async function runAutomaticCleanup(): Promise<CleanupResult[]> {
}
}
// Service state tracking
let cleanupIntervalId: NodeJS.Timeout | null = null;
let initialCleanupTimeoutId: NodeJS.Timeout | null = null;
let cleanupServiceRunning = false;
/**
* Start the cleanup service with periodic execution
* This should be called when the application starts
*/
export function startCleanupService() {
if (cleanupServiceRunning) {
console.log('⚠️ Cleanup service already running, skipping start');
return;
}
console.log('Starting background cleanup service...');
// Run cleanup every hour
const CLEANUP_INTERVAL = 60 * 60 * 1000; // 1 hour in milliseconds
// Run initial cleanup after 5 minutes to allow app to fully start
setTimeout(() => {
initialCleanupTimeoutId = setTimeout(() => {
runAutomaticCleanup().catch(error => {
console.error('Error in initial cleanup run:', error);
});
}, 5 * 60 * 1000); // 5 minutes
// Set up periodic cleanup
setInterval(() => {
cleanupIntervalId = setInterval(() => {
runAutomaticCleanup().catch(error => {
console.error('Error in periodic cleanup run:', error);
});
}, CLEANUP_INTERVAL);
cleanupServiceRunning = true;
console.log(`✅ Cleanup service started. Will run every ${CLEANUP_INTERVAL / 1000 / 60} minutes.`);
}
@@ -212,7 +223,36 @@ export function startCleanupService() {
* Stop the cleanup service (for testing or shutdown)
*/
export function stopCleanupService() {
// Note: In a real implementation, you'd want to track the interval ID
// and clear it here. For now, this is a placeholder.
console.log('Cleanup service stop requested (not implemented)');
if (!cleanupServiceRunning) {
console.log('Cleanup service is not running');
return;
}
console.log('🛑 Stopping cleanup service...');
// Clear the periodic interval
if (cleanupIntervalId) {
clearInterval(cleanupIntervalId);
cleanupIntervalId = null;
}
// Clear the initial timeout
if (initialCleanupTimeoutId) {
clearTimeout(initialCleanupTimeoutId);
initialCleanupTimeoutId = null;
}
cleanupServiceRunning = false;
console.log('✅ Cleanup service stopped');
}
/**
* Get cleanup service status
*/
export function getCleanupServiceStatus() {
return {
running: cleanupServiceRunning,
hasInterval: cleanupIntervalId !== null,
hasInitialTimeout: initialCleanupTimeoutId !== null,
};
}

240
src/lib/shutdown-manager.ts Normal file
View File

@@ -0,0 +1,240 @@
/**
* Shutdown Manager for Graceful Application Termination
*
* This module provides centralized shutdown coordination for the gitea-mirror application.
* It ensures that:
* - In-progress jobs are properly saved to the database
* - Database connections are closed cleanly
* - Background services are stopped gracefully
* - No data loss occurs during container restarts
*/
import { db, mirrorJobs } from './db';
import { eq, and } from 'drizzle-orm';
import type { MirrorJob } from './db/schema';
// Shutdown state tracking
let shutdownInProgress = false;
let shutdownStartTime: Date | null = null;
let shutdownCallbacks: Array<() => Promise<void>> = [];
let activeJobs = new Set<string>();
let shutdownTimeout: NodeJS.Timeout | null = null;
// Configuration
const SHUTDOWN_TIMEOUT = 30000; // 30 seconds max shutdown time
const JOB_SAVE_TIMEOUT = 10000; // 10 seconds to save job state
/**
* Register a callback to be executed during shutdown
*/
export function registerShutdownCallback(callback: () => Promise<void>): void {
shutdownCallbacks.push(callback);
}
/**
* Register an active job that needs to be tracked during shutdown
*/
export function registerActiveJob(jobId: string): void {
activeJobs.add(jobId);
console.log(`Registered active job: ${jobId} (${activeJobs.size} total active jobs)`);
}
/**
* Unregister a job when it completes normally
*/
export function unregisterActiveJob(jobId: string): void {
activeJobs.delete(jobId);
console.log(`Unregistered job: ${jobId} (${activeJobs.size} remaining active jobs)`);
}
/**
* Check if shutdown is currently in progress
*/
export function isShuttingDown(): boolean {
return shutdownInProgress;
}
/**
* Get shutdown status information
*/
export function getShutdownStatus() {
return {
inProgress: shutdownInProgress,
startTime: shutdownStartTime,
activeJobs: Array.from(activeJobs),
registeredCallbacks: shutdownCallbacks.length,
};
}
/**
* Save the current state of an active job to the database
*/
async function saveJobState(jobId: string): Promise<void> {
try {
console.log(`Saving state for job ${jobId}...`);
// Update the job to mark it as interrupted but not failed
await db
.update(mirrorJobs)
.set({
inProgress: false,
lastCheckpoint: new Date(),
message: 'Job interrupted by application shutdown - will resume on restart',
})
.where(eq(mirrorJobs.id, jobId));
console.log(`✅ Saved state for job ${jobId}`);
} catch (error) {
console.error(`❌ Failed to save state for job ${jobId}:`, error);
throw error;
}
}
/**
* Save all active jobs to the database
*/
async function saveAllActiveJobs(): Promise<void> {
if (activeJobs.size === 0) {
console.log('No active jobs to save');
return;
}
console.log(`Saving state for ${activeJobs.size} active jobs...`);
const savePromises = Array.from(activeJobs).map(async (jobId) => {
try {
await Promise.race([
saveJobState(jobId),
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Timeout saving job ${jobId}`)), JOB_SAVE_TIMEOUT);
})
]);
} catch (error) {
console.error(`Failed to save job ${jobId} within timeout:`, error);
// Continue with other jobs even if one fails
}
});
await Promise.allSettled(savePromises);
console.log('✅ Completed saving all active jobs');
}
/**
* Execute all registered shutdown callbacks
*/
async function executeShutdownCallbacks(): Promise<void> {
if (shutdownCallbacks.length === 0) {
console.log('No shutdown callbacks to execute');
return;
}
console.log(`Executing ${shutdownCallbacks.length} shutdown callbacks...`);
const callbackPromises = shutdownCallbacks.map(async (callback, index) => {
try {
await callback();
console.log(`✅ Shutdown callback ${index + 1} completed`);
} catch (error) {
console.error(`❌ Shutdown callback ${index + 1} failed:`, error);
// Continue with other callbacks even if one fails
}
});
await Promise.allSettled(callbackPromises);
console.log('✅ Completed all shutdown callbacks');
}
/**
* Perform graceful shutdown of the application
*/
export async function gracefulShutdown(signal: string = 'UNKNOWN'): Promise<void> {
if (shutdownInProgress) {
console.log('⚠️ Shutdown already in progress, ignoring additional signal');
return;
}
shutdownInProgress = true;
shutdownStartTime = new Date();
console.log(`\n🛑 Graceful shutdown initiated by signal: ${signal}`);
console.log(`📊 Shutdown status: ${activeJobs.size} active jobs, ${shutdownCallbacks.length} callbacks`);
// Set up shutdown timeout
shutdownTimeout = setTimeout(() => {
console.error(`❌ Shutdown timeout reached (${SHUTDOWN_TIMEOUT}ms), forcing exit`);
process.exit(1);
}, SHUTDOWN_TIMEOUT);
try {
// Step 1: Save all active job states
console.log('\n📝 Step 1: Saving active job states...');
await saveAllActiveJobs();
// Step 2: Execute shutdown callbacks (stop services, close connections, etc.)
console.log('\n🔧 Step 2: Executing shutdown callbacks...');
await executeShutdownCallbacks();
// Step 3: Close database connections
console.log('\n💾 Step 3: Closing database connections...');
// Note: Drizzle with bun:sqlite doesn't require explicit connection closing
// but we'll add this for completeness and future database changes
console.log('\n✅ Graceful shutdown completed successfully');
// Clear the timeout since we completed successfully
if (shutdownTimeout) {
clearTimeout(shutdownTimeout);
shutdownTimeout = null;
}
// Exit with success code
process.exit(0);
} catch (error) {
console.error('\n❌ Error during graceful shutdown:', error);
// Clear the timeout
if (shutdownTimeout) {
clearTimeout(shutdownTimeout);
shutdownTimeout = null;
}
// Exit with error code
process.exit(1);
}
}
/**
* Initialize the shutdown manager
* This should be called early in the application lifecycle
*/
export function initializeShutdownManager(): void {
console.log('🔧 Initializing shutdown manager...');
// Reset state in case of re-initialization
shutdownInProgress = false;
shutdownStartTime = null;
activeJobs.clear();
shutdownCallbacks = []; // Reset callbacks too
// Clear any existing timeout
if (shutdownTimeout) {
clearTimeout(shutdownTimeout);
shutdownTimeout = null;
}
console.log('✅ Shutdown manager initialized');
}
/**
* Force immediate shutdown (for emergencies)
*/
export function forceShutdown(exitCode: number = 1): void {
console.error('🚨 Force shutdown requested');
if (shutdownTimeout) {
clearTimeout(shutdownTimeout);
}
process.exit(exitCode);
}

141
src/lib/signal-handlers.ts Normal file
View File

@@ -0,0 +1,141 @@
/**
* Signal Handlers for Graceful Shutdown
*
* This module sets up proper signal handling for container environments.
* It ensures the application responds correctly to SIGTERM, SIGINT, and other signals.
*/
import { gracefulShutdown, isShuttingDown } from './shutdown-manager';
// Track if signal handlers have been registered
let signalHandlersRegistered = false;
/**
* Setup signal handlers for graceful shutdown
* This should be called early in the application lifecycle
*/
export function setupSignalHandlers(): void {
if (signalHandlersRegistered) {
console.log('⚠️ Signal handlers already registered, skipping');
return;
}
console.log('🔧 Setting up signal handlers for graceful shutdown...');
// Handle SIGTERM (Docker stop, Kubernetes termination)
process.on('SIGTERM', () => {
console.log('\n📡 Received SIGTERM signal');
if (!isShuttingDown()) {
gracefulShutdown('SIGTERM').catch((error) => {
console.error('Error during SIGTERM shutdown:', error);
process.exit(1);
});
}
});
// Handle SIGINT (Ctrl+C)
process.on('SIGINT', () => {
console.log('\n📡 Received SIGINT signal');
if (!isShuttingDown()) {
gracefulShutdown('SIGINT').catch((error) => {
console.error('Error during SIGINT shutdown:', error);
process.exit(1);
});
}
});
// Handle SIGHUP (terminal hangup)
process.on('SIGHUP', () => {
console.log('\n📡 Received SIGHUP signal');
if (!isShuttingDown()) {
gracefulShutdown('SIGHUP').catch((error) => {
console.error('Error during SIGHUP shutdown:', error);
process.exit(1);
});
}
});
// Handle uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('\n💥 Uncaught Exception:', error);
console.error('Stack trace:', error.stack);
if (!isShuttingDown()) {
console.log('Initiating emergency shutdown due to uncaught exception...');
gracefulShutdown('UNCAUGHT_EXCEPTION').catch((shutdownError) => {
console.error('Error during emergency shutdown:', shutdownError);
process.exit(1);
});
} else {
// If already shutting down, force exit
console.error('Uncaught exception during shutdown, forcing exit');
process.exit(1);
}
});
// Handle unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('\n💥 Unhandled Promise Rejection at:', promise);
console.error('Reason:', reason);
if (!isShuttingDown()) {
console.log('Initiating emergency shutdown due to unhandled rejection...');
gracefulShutdown('UNHANDLED_REJECTION').catch((shutdownError) => {
console.error('Error during emergency shutdown:', shutdownError);
process.exit(1);
});
} else {
// If already shutting down, force exit
console.error('Unhandled rejection during shutdown, forcing exit');
process.exit(1);
}
});
// Handle process warnings (for debugging)
process.on('warning', (warning) => {
console.warn('⚠️ Process Warning:', warning.name);
console.warn('Message:', warning.message);
if (warning.stack) {
console.warn('Stack:', warning.stack);
}
});
signalHandlersRegistered = true;
console.log('✅ Signal handlers registered successfully');
}
/**
* Remove signal handlers (for testing)
*/
export function removeSignalHandlers(): void {
if (!signalHandlersRegistered) {
return;
}
console.log('🔧 Removing signal handlers...');
process.removeAllListeners('SIGTERM');
process.removeAllListeners('SIGINT');
process.removeAllListeners('SIGHUP');
process.removeAllListeners('uncaughtException');
process.removeAllListeners('unhandledRejection');
process.removeAllListeners('warning');
signalHandlersRegistered = false;
console.log('✅ Signal handlers removed');
}
/**
* Check if signal handlers are registered
*/
export function areSignalHandlersRegistered(): boolean {
return signalHandlersRegistered;
}
/**
* Send a test signal to the current process (for testing)
*/
export function sendTestSignal(signal: NodeJS.Signals = 'SIGTERM'): void {
console.log(`🧪 Sending test signal: ${signal}`);
process.kill(process.pid, signal);
}

View File

@@ -102,6 +102,16 @@ export async function processWithRetry<T, R>(
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
try {
// Check for shutdown before processing each item (only in production)
try {
const { isShuttingDown } = await import('@/lib/shutdown-manager');
if (isShuttingDown()) {
throw new Error('Processing interrupted by application shutdown');
}
} catch (importError) {
// Ignore import errors during testing
}
const result = await processItem(item);
// Handle checkpointing if enabled
@@ -185,9 +195,24 @@ export async function processWithResilience<T, R>(
...otherOptions
} = options;
// Import helpers for job management
// Import helpers for job management and shutdown handling
const { createMirrorJob, updateMirrorJobProgress } = await import('@/lib/helpers');
// Import shutdown manager (with fallback for testing)
let registerActiveJob: (jobId: string) => void = () => {};
let unregisterActiveJob: (jobId: string) => void = () => {};
let isShuttingDown: () => boolean = () => false;
try {
const shutdownManager = await import('@/lib/shutdown-manager');
registerActiveJob = shutdownManager.registerActiveJob;
unregisterActiveJob = shutdownManager.unregisterActiveJob;
isShuttingDown = shutdownManager.isShuttingDown;
} catch (importError) {
// Use fallback functions during testing
console.log('Using fallback shutdown manager functions (testing mode)');
}
// Get item IDs for all items
const allItemIds = items.map(getItemId);
@@ -240,6 +265,9 @@ export async function processWithResilience<T, R>(
console.log(`Created new job ${jobId} with ${items.length} items`);
}
// Register the job with the shutdown manager
registerActiveJob(jobId);
// Define the checkpoint function
const onCheckpoint = async (jobId: string, completedItemId: string) => {
const itemName = items.find(item => getItemId(item) === completedItemId)
@@ -254,6 +282,12 @@ export async function processWithResilience<T, R>(
};
try {
// Check if shutdown is in progress before starting
if (isShuttingDown()) {
console.log(`⚠️ Shutdown in progress, aborting job ${jobId}`);
throw new Error('Job aborted due to application shutdown');
}
// Process the items with checkpointing
const results = await processWithRetry(
itemsToProcess,
@@ -276,17 +310,27 @@ export async function processWithResilience<T, R>(
isCompleted: true,
});
// Unregister the job from shutdown manager
unregisterActiveJob(jobId);
return results;
} catch (error) {
// Mark the job as failed
// Mark the job as failed (unless it was interrupted by shutdown)
const isShutdownError = error instanceof Error && error.message.includes('shutdown');
await updateMirrorJobProgress({
jobId,
status: "failed",
message: `Failed ${jobType} job: ${error instanceof Error ? error.message : String(error)}`,
status: isShutdownError ? "imported" : "failed", // Keep as imported if shutdown interrupted
message: isShutdownError
? 'Job interrupted by application shutdown - will resume on restart'
: `Failed ${jobType} job: ${error instanceof Error ? error.message : String(error)}`,
inProgress: false,
isCompleted: true,
isCompleted: !isShutdownError, // Don't mark as completed if shutdown interrupted
});
// Unregister the job from shutdown manager
unregisterActiveJob(jobId);
throw error;
}
}

View File

@@ -1,13 +1,30 @@
import { defineMiddleware } from 'astro:middleware';
import { initializeRecovery, hasJobsNeedingRecovery, getRecoveryStatus } from './lib/recovery';
import { startCleanupService } from './lib/cleanup-service';
import { startCleanupService, stopCleanupService } from './lib/cleanup-service';
import { initializeShutdownManager, registerShutdownCallback } from './lib/shutdown-manager';
import { setupSignalHandlers } from './lib/signal-handlers';
// Flag to track if recovery has been initialized
let recoveryInitialized = false;
let recoveryAttempted = false;
let cleanupServiceStarted = false;
let shutdownManagerInitialized = false;
export const onRequest = defineMiddleware(async (context, next) => {
// Initialize shutdown manager and signal handlers first
if (!shutdownManagerInitialized) {
try {
console.log('🔧 Initializing shutdown manager and signal handlers...');
initializeShutdownManager();
setupSignalHandlers();
shutdownManagerInitialized = true;
console.log('✅ Shutdown manager and signal handlers initialized');
} catch (error) {
console.error('❌ Failed to initialize shutdown manager:', error);
// Continue anyway - this shouldn't block the application
}
}
// Initialize recovery system only once when the server starts
// This is a fallback in case the startup script didn't run
if (!recoveryInitialized && !recoveryAttempted) {
@@ -60,6 +77,13 @@ export const onRequest = defineMiddleware(async (context, next) => {
try {
console.log('Starting automatic database cleanup service...');
startCleanupService();
// Register cleanup service shutdown callback
registerShutdownCallback(async () => {
console.log('🛑 Shutting down cleanup service...');
stopCleanupService();
});
cleanupServiceStarted = true;
} catch (error) {
console.error('Failed to start cleanup service:', error);