mirror of
https://github.com/RayLabsHQ/gitea-mirror.git
synced 2025-12-08 04:26:44 +03:00
Implement parallel processing with retry logic for repository mirroring and syncing operations
This commit is contained in:
229
src/lib/gitea.ts
229
src/lib/gitea.ts
@@ -601,11 +601,22 @@ export async function mirrorGitHubOrgToGitea({
|
|||||||
.from(repositories)
|
.from(repositories)
|
||||||
.where(eq(repositories.organization, organization.name));
|
.where(eq(repositories.organization, organization.name));
|
||||||
|
|
||||||
for (const repo of orgRepos) {
|
if (orgRepos.length === 0) {
|
||||||
await mirrorGitHubRepoToGiteaOrg({
|
console.log(`No repositories found for organization ${organization.name}`);
|
||||||
octokit,
|
return;
|
||||||
config,
|
}
|
||||||
repository: {
|
|
||||||
|
console.log(`Mirroring ${orgRepos.length} repositories for organization ${organization.name}`);
|
||||||
|
|
||||||
|
// Import the processWithRetry function
|
||||||
|
const { processWithRetry } = await import("@/lib/utils/concurrency");
|
||||||
|
|
||||||
|
// Process repositories in parallel with concurrency control
|
||||||
|
await processWithRetry(
|
||||||
|
orgRepos,
|
||||||
|
async (repo) => {
|
||||||
|
// Prepare repository data
|
||||||
|
const repoData = {
|
||||||
...repo,
|
...repo,
|
||||||
status: repo.status as RepoStatus,
|
status: repo.status as RepoStatus,
|
||||||
visibility: repo.visibility as RepositoryVisibility,
|
visibility: repo.visibility as RepositoryVisibility,
|
||||||
@@ -614,11 +625,37 @@ export async function mirrorGitHubOrgToGitea({
|
|||||||
organization: repo.organization ?? undefined,
|
organization: repo.organization ?? undefined,
|
||||||
forkedFrom: repo.forkedFrom ?? undefined,
|
forkedFrom: repo.forkedFrom ?? undefined,
|
||||||
mirroredLocation: repo.mirroredLocation || "",
|
mirroredLocation: repo.mirroredLocation || "",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log the start of mirroring
|
||||||
|
console.log(`Starting mirror for repository: ${repo.name} in organization ${organization.name}`);
|
||||||
|
|
||||||
|
// Mirror the repository
|
||||||
|
await mirrorGitHubRepoToGiteaOrg({
|
||||||
|
octokit,
|
||||||
|
config,
|
||||||
|
repository: repoData,
|
||||||
|
giteaOrgId,
|
||||||
|
orgName: organization.name,
|
||||||
|
});
|
||||||
|
|
||||||
|
return repo;
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrencyLimit: 3, // Process 3 repositories at a time
|
||||||
|
maxRetries: 2,
|
||||||
|
retryDelay: 2000,
|
||||||
|
onProgress: (completed, total, result) => {
|
||||||
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
|
if (result) {
|
||||||
|
console.log(`Mirrored repository "${result.name}" in organization ${organization.name} (${completed}/${total}, ${percentComplete}%)`);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
giteaOrgId,
|
onRetry: (repo, error, attempt) => {
|
||||||
orgName: organization.name,
|
console.log(`Retrying repository ${repo.name} in organization ${organization.name} (attempt ${attempt}): ${error.message}`);
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
);
|
||||||
|
|
||||||
console.log(`Organization ${organization.name} mirrored successfully`);
|
console.log(`Organization ${organization.name} mirrored successfully`);
|
||||||
|
|
||||||
@@ -837,7 +874,15 @@ export const mirrorGitRepoIssuesToGitea = async ({
|
|||||||
(res) => res.data
|
(res) => res.data
|
||||||
);
|
);
|
||||||
|
|
||||||
console.log(`Mirroring ${issues.length} issues from ${repository.fullName}`);
|
// Filter out pull requests
|
||||||
|
const filteredIssues = issues.filter(issue => !(issue as any).pull_request);
|
||||||
|
|
||||||
|
console.log(`Mirroring ${filteredIssues.length} issues from ${repository.fullName}`);
|
||||||
|
|
||||||
|
if (filteredIssues.length === 0) {
|
||||||
|
console.log(`No issues to mirror for ${repository.fullName}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Get existing labels from Gitea
|
// Get existing labels from Gitea
|
||||||
const giteaLabelsRes = await superagent
|
const giteaLabelsRes = await superagent
|
||||||
@@ -851,58 +896,60 @@ export const mirrorGitRepoIssuesToGitea = async ({
|
|||||||
giteaLabels.map((label: any) => [label.name, label.id])
|
giteaLabels.map((label: any) => [label.name, label.id])
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const issue of issues) {
|
// Import the processWithRetry function
|
||||||
if ((issue as any).pull_request) {
|
const { processWithRetry } = await import("@/lib/utils/concurrency");
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const githubLabelNames =
|
// Process issues in parallel with concurrency control
|
||||||
issue.labels
|
await processWithRetry(
|
||||||
?.map((l) => (typeof l === "string" ? l : l.name))
|
filteredIssues,
|
||||||
.filter((l): l is string => !!l) || [];
|
async (issue) => {
|
||||||
|
const githubLabelNames =
|
||||||
|
issue.labels
|
||||||
|
?.map((l) => (typeof l === "string" ? l : l.name))
|
||||||
|
.filter((l): l is string => !!l) || [];
|
||||||
|
|
||||||
const giteaLabelIds: number[] = [];
|
const giteaLabelIds: number[] = [];
|
||||||
|
|
||||||
// Resolve or create labels in Gitea
|
// Resolve or create labels in Gitea
|
||||||
for (const name of githubLabelNames) {
|
for (const name of githubLabelNames) {
|
||||||
if (labelMap.has(name)) {
|
if (labelMap.has(name)) {
|
||||||
giteaLabelIds.push(labelMap.get(name)!);
|
giteaLabelIds.push(labelMap.get(name)!);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
const created = await superagent
|
const created = await superagent
|
||||||
.post(
|
.post(
|
||||||
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/labels`
|
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/labels`
|
||||||
)
|
)
|
||||||
.set("Authorization", `token ${config.giteaConfig.token}`)
|
.set("Authorization", `token ${config.giteaConfig.token}`)
|
||||||
.send({ name, color: "#ededed" }); // Default color
|
.send({ name, color: "#ededed" }); // Default color
|
||||||
|
|
||||||
labelMap.set(name, created.body.id);
|
labelMap.set(name, created.body.id);
|
||||||
giteaLabelIds.push(created.body.id);
|
giteaLabelIds.push(created.body.id);
|
||||||
} catch (labelErr) {
|
} catch (labelErr) {
|
||||||
console.error(
|
console.error(
|
||||||
`Failed to create label "${name}" in Gitea: ${labelErr}`
|
`Failed to create label "${name}" in Gitea: ${labelErr}`
|
||||||
);
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
const originalAssignees =
|
const originalAssignees =
|
||||||
issue.assignees && issue.assignees.length > 0
|
issue.assignees && issue.assignees.length > 0
|
||||||
? `\n\nOriginally assigned to: ${issue.assignees
|
? `\n\nOriginally assigned to: ${issue.assignees
|
||||||
.map((a) => `@${a.login}`)
|
.map((a) => `@${a.login}`)
|
||||||
.join(", ")} on GitHub.`
|
.join(", ")} on GitHub.`
|
||||||
: "";
|
: "";
|
||||||
|
|
||||||
const issuePayload: any = {
|
const issuePayload: any = {
|
||||||
title: issue.title,
|
title: issue.title,
|
||||||
body: `Originally created by @${
|
body: `Originally created by @${
|
||||||
issue.user?.login
|
issue.user?.login
|
||||||
} on GitHub.${originalAssignees}\n\n${issue.body || ""}`,
|
} on GitHub.${originalAssignees}\n\n${issue.body || ""}`,
|
||||||
closed: issue.state === "closed",
|
closed: issue.state === "closed",
|
||||||
labels: giteaLabelIds,
|
labels: giteaLabelIds,
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
// Create the issue in Gitea
|
||||||
const createdIssue = await superagent
|
const createdIssue = await superagent
|
||||||
.post(
|
.post(
|
||||||
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/issues`
|
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/issues`
|
||||||
@@ -922,41 +969,49 @@ export const mirrorGitRepoIssuesToGitea = async ({
|
|||||||
(res) => res.data
|
(res) => res.data
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const comment of comments) {
|
// Process comments in parallel with concurrency control
|
||||||
try {
|
if (comments.length > 0) {
|
||||||
await superagent
|
await processWithRetry(
|
||||||
.post(
|
comments,
|
||||||
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/issues/${createdIssue.body.number}/comments`
|
async (comment) => {
|
||||||
)
|
await superagent
|
||||||
.set("Authorization", `token ${config.giteaConfig.token}`)
|
.post(
|
||||||
.send({
|
`${config.giteaConfig.url}/api/v1/repos/${repoOrigin}/${repository.name}/issues/${createdIssue.body.number}/comments`
|
||||||
body: `@${comment.user?.login} commented on GitHub:\n\n${comment.body}`,
|
)
|
||||||
});
|
.set("Authorization", `token ${config.giteaConfig.token}`)
|
||||||
} catch (commentErr) {
|
.send({
|
||||||
console.error(
|
body: `@${comment.user?.login} commented on GitHub:\n\n${comment.body}`,
|
||||||
`Failed to copy comment to Gitea for issue "${issue.title}": ${
|
});
|
||||||
commentErr instanceof Error
|
return comment;
|
||||||
? commentErr.message
|
},
|
||||||
: String(commentErr)
|
{
|
||||||
}`
|
concurrencyLimit: 5,
|
||||||
);
|
maxRetries: 2,
|
||||||
}
|
retryDelay: 1000,
|
||||||
|
onRetry: (comment, error, attempt) => {
|
||||||
|
console.log(`Retrying comment (attempt ${attempt}): ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
|
||||||
if (err instanceof Error && (err as any).response) {
|
return issue;
|
||||||
console.error(
|
},
|
||||||
`Failed to create issue "${issue.title}" in Gitea: ${err.message}`
|
{
|
||||||
);
|
concurrencyLimit: 3, // Process 3 issues at a time
|
||||||
console.error(
|
maxRetries: 2,
|
||||||
`Response body: ${JSON.stringify((err as any).response.body)}`
|
retryDelay: 2000,
|
||||||
);
|
onProgress: (completed, total, result) => {
|
||||||
} else {
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
console.error(
|
if (result) {
|
||||||
`Failed to create issue "${issue.title}" in Gitea: ${
|
console.log(`Mirrored issue "${result.title}" (${completed}/${total}, ${percentComplete}%)`);
|
||||||
err instanceof Error ? err.message : String(err)
|
}
|
||||||
}`
|
},
|
||||||
);
|
onRetry: (issue, error, attempt) => {
|
||||||
|
console.log(`Retrying issue "${issue.title}" (attempt ${attempt}): ${error.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
|
console.log(`Completed mirroring ${filteredIssues.length} issues for ${repository.fullName}`);
|
||||||
};
|
};
|
||||||
|
|||||||
122
src/lib/utils/concurrency.ts
Normal file
122
src/lib/utils/concurrency.ts
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
/**
|
||||||
|
* Utility for processing items in parallel with concurrency control
|
||||||
|
*
|
||||||
|
* @param items Array of items to process
|
||||||
|
* @param processItem Function to process each item
|
||||||
|
* @param concurrencyLimit Maximum number of concurrent operations
|
||||||
|
* @param onProgress Optional callback for progress updates
|
||||||
|
* @returns Promise that resolves when all items are processed
|
||||||
|
*/
|
||||||
|
export async function processInParallel<T, R>(
|
||||||
|
items: T[],
|
||||||
|
processItem: (item: T) => Promise<R>,
|
||||||
|
concurrencyLimit: number = 5,
|
||||||
|
onProgress?: (completed: number, total: number, result?: R) => void
|
||||||
|
): Promise<R[]> {
|
||||||
|
const results: R[] = [];
|
||||||
|
let completed = 0;
|
||||||
|
const total = items.length;
|
||||||
|
|
||||||
|
// Process items in batches to control concurrency
|
||||||
|
for (let i = 0; i < total; i += concurrencyLimit) {
|
||||||
|
const batch = items.slice(i, i + concurrencyLimit);
|
||||||
|
|
||||||
|
const batchPromises = batch.map(async (item) => {
|
||||||
|
try {
|
||||||
|
const result = await processItem(item);
|
||||||
|
completed++;
|
||||||
|
|
||||||
|
if (onProgress) {
|
||||||
|
onProgress(completed, total, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
} catch (error) {
|
||||||
|
completed++;
|
||||||
|
|
||||||
|
if (onProgress) {
|
||||||
|
onProgress(completed, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the current batch to complete before starting the next batch
|
||||||
|
const batchResults = await Promise.allSettled(batchPromises);
|
||||||
|
|
||||||
|
// Process results and handle errors
|
||||||
|
for (const result of batchResults) {
|
||||||
|
if (result.status === 'fulfilled') {
|
||||||
|
results.push(result.value);
|
||||||
|
} else {
|
||||||
|
console.error('Error processing item:', result.reason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility for processing items in parallel with automatic retry for failed operations
|
||||||
|
*
|
||||||
|
* @param items Array of items to process
|
||||||
|
* @param processItem Function to process each item
|
||||||
|
* @param options Configuration options
|
||||||
|
* @returns Promise that resolves when all items are processed
|
||||||
|
*/
|
||||||
|
export async function processWithRetry<T, R>(
|
||||||
|
items: T[],
|
||||||
|
processItem: (item: T) => Promise<R>,
|
||||||
|
options: {
|
||||||
|
concurrencyLimit?: number;
|
||||||
|
maxRetries?: number;
|
||||||
|
retryDelay?: number;
|
||||||
|
onProgress?: (completed: number, total: number, result?: R) => void;
|
||||||
|
onRetry?: (item: T, error: Error, attempt: number) => void;
|
||||||
|
} = {}
|
||||||
|
): Promise<R[]> {
|
||||||
|
const {
|
||||||
|
concurrencyLimit = 5,
|
||||||
|
maxRetries = 3,
|
||||||
|
retryDelay = 1000,
|
||||||
|
onProgress,
|
||||||
|
onRetry
|
||||||
|
} = options;
|
||||||
|
|
||||||
|
// Wrap the process function with retry logic
|
||||||
|
const processWithRetryLogic = async (item: T): Promise<R> => {
|
||||||
|
let lastError: Error | null = null;
|
||||||
|
|
||||||
|
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
|
||||||
|
try {
|
||||||
|
return await processItem(item);
|
||||||
|
} catch (error) {
|
||||||
|
lastError = error instanceof Error ? error : new Error(String(error));
|
||||||
|
|
||||||
|
if (attempt <= maxRetries) {
|
||||||
|
if (onRetry) {
|
||||||
|
onRetry(item, lastError, attempt);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff
|
||||||
|
const delay = retryDelay * Math.pow(2, attempt - 1);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, delay));
|
||||||
|
} else {
|
||||||
|
throw lastError;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should never be reached due to the throw in the catch block
|
||||||
|
throw lastError || new Error('Unknown error occurred');
|
||||||
|
};
|
||||||
|
|
||||||
|
return processInParallel(
|
||||||
|
items,
|
||||||
|
processWithRetryLogic,
|
||||||
|
concurrencyLimit,
|
||||||
|
onProgress
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -6,6 +6,8 @@ import { createGitHubClient } from "@/lib/github";
|
|||||||
import { mirrorGitHubOrgToGitea } from "@/lib/gitea";
|
import { mirrorGitHubOrgToGitea } from "@/lib/gitea";
|
||||||
import { repoStatusEnum } from "@/types/Repository";
|
import { repoStatusEnum } from "@/types/Repository";
|
||||||
import { type MembershipRole } from "@/types/organizations";
|
import { type MembershipRole } from "@/types/organizations";
|
||||||
|
import { processWithRetry } from "@/lib/utils/concurrency";
|
||||||
|
import { createMirrorJob } from "@/lib/helpers";
|
||||||
|
|
||||||
export const POST: APIRoute = async ({ request }) => {
|
export const POST: APIRoute = async ({ request }) => {
|
||||||
try {
|
try {
|
||||||
@@ -61,31 +63,73 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fire async mirroring without blocking response
|
// Fire async mirroring without blocking response, using parallel processing
|
||||||
setTimeout(async () => {
|
setTimeout(async () => {
|
||||||
for (const org of orgs) {
|
if (!config.githubConfig.token) {
|
||||||
if (!config.githubConfig.token) {
|
throw new Error("GitHub token is missing in config.");
|
||||||
throw new Error("GitHub token is missing in config.");
|
}
|
||||||
}
|
|
||||||
|
|
||||||
const octokit = createGitHubClient(config.githubConfig.token);
|
// Create a single Octokit instance to be reused
|
||||||
|
const octokit = createGitHubClient(config.githubConfig.token);
|
||||||
|
|
||||||
try {
|
// Define the concurrency limit - adjust based on API rate limits
|
||||||
|
// Using a lower concurrency for organizations since each org might contain many repos
|
||||||
|
const CONCURRENCY_LIMIT = 2;
|
||||||
|
|
||||||
|
// Process organizations in parallel with retry capability
|
||||||
|
await processWithRetry(
|
||||||
|
orgs,
|
||||||
|
async (org) => {
|
||||||
|
// Prepare organization data
|
||||||
|
const orgData = {
|
||||||
|
...org,
|
||||||
|
status: repoStatusEnum.parse("imported"),
|
||||||
|
membershipRole: org.membershipRole as MembershipRole,
|
||||||
|
lastMirrored: org.lastMirrored ?? undefined,
|
||||||
|
errorMessage: org.errorMessage ?? undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log the start of mirroring
|
||||||
|
console.log(`Starting mirror for organization: ${org.name}`);
|
||||||
|
|
||||||
|
// Create a mirror job entry to track progress
|
||||||
|
await createMirrorJob({
|
||||||
|
userId: config.userId || "",
|
||||||
|
organizationId: org.id,
|
||||||
|
organizationName: org.name,
|
||||||
|
message: `Started mirroring organization: ${org.name}`,
|
||||||
|
details: `Organization ${org.name} is now in the mirroring queue.`,
|
||||||
|
status: "mirroring",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mirror the organization
|
||||||
await mirrorGitHubOrgToGitea({
|
await mirrorGitHubOrgToGitea({
|
||||||
config,
|
config,
|
||||||
octokit,
|
octokit,
|
||||||
organization: {
|
organization: orgData,
|
||||||
...org,
|
|
||||||
status: repoStatusEnum.parse("imported"),
|
|
||||||
membershipRole: org.membershipRole as MembershipRole,
|
|
||||||
lastMirrored: org.lastMirrored ?? undefined,
|
|
||||||
errorMessage: org.errorMessage ?? undefined,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
} catch (error) {
|
|
||||||
console.error(`Mirror failed for organization ${org.name}:`, error);
|
return org;
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrencyLimit: CONCURRENCY_LIMIT,
|
||||||
|
maxRetries: 2,
|
||||||
|
retryDelay: 3000,
|
||||||
|
onProgress: (completed, total, result) => {
|
||||||
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
|
console.log(`Organization mirroring progress: ${percentComplete}% (${completed}/${total})`);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
console.log(`Successfully mirrored organization: ${result.name}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onRetry: (org, error, attempt) => {
|
||||||
|
console.log(`Retrying organization ${org.name} (attempt ${attempt}): ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
|
console.log("All organization mirroring tasks completed");
|
||||||
}, 0);
|
}, 0);
|
||||||
|
|
||||||
const responsePayload: MirrorOrgResponse = {
|
const responsePayload: MirrorOrgResponse = {
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ import {
|
|||||||
mirrorGitHubOrgRepoToGiteaOrg,
|
mirrorGitHubOrgRepoToGiteaOrg,
|
||||||
} from "@/lib/gitea";
|
} from "@/lib/gitea";
|
||||||
import { createGitHubClient } from "@/lib/github";
|
import { createGitHubClient } from "@/lib/github";
|
||||||
|
import { processWithRetry } from "@/lib/utils/concurrency";
|
||||||
|
import { createMirrorJob } from "@/lib/helpers";
|
||||||
|
|
||||||
export const POST: APIRoute = async ({ request }) => {
|
export const POST: APIRoute = async ({ request }) => {
|
||||||
try {
|
try {
|
||||||
@@ -63,52 +65,84 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start async mirroring in background
|
// Start async mirroring in background with parallel processing
|
||||||
setTimeout(async () => {
|
setTimeout(async () => {
|
||||||
for (const repo of repos) {
|
if (!config.githubConfig.token) {
|
||||||
if (!config.githubConfig.token) {
|
throw new Error("GitHub token is missing.");
|
||||||
throw new Error("GitHub token is missing.");
|
}
|
||||||
}
|
|
||||||
|
|
||||||
const octokit = createGitHubClient(config.githubConfig.token);
|
// Create a single Octokit instance to be reused
|
||||||
|
const octokit = createGitHubClient(config.githubConfig.token);
|
||||||
|
|
||||||
try {
|
// Define the concurrency limit - adjust based on API rate limits
|
||||||
|
const CONCURRENCY_LIMIT = 3;
|
||||||
|
|
||||||
|
// Process repositories in parallel with retry capability
|
||||||
|
await processWithRetry(
|
||||||
|
repos,
|
||||||
|
async (repo) => {
|
||||||
|
// Prepare repository data
|
||||||
|
const repoData = {
|
||||||
|
...repo,
|
||||||
|
status: repoStatusEnum.parse("imported"),
|
||||||
|
organization: repo.organization ?? undefined,
|
||||||
|
lastMirrored: repo.lastMirrored ?? undefined,
|
||||||
|
errorMessage: repo.errorMessage ?? undefined,
|
||||||
|
forkedFrom: repo.forkedFrom ?? undefined,
|
||||||
|
visibility: repositoryVisibilityEnum.parse(repo.visibility),
|
||||||
|
mirroredLocation: repo.mirroredLocation || "",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log the start of mirroring
|
||||||
|
console.log(`Starting mirror for repository: ${repo.name}`);
|
||||||
|
|
||||||
|
// Create a mirror job entry to track progress
|
||||||
|
await createMirrorJob({
|
||||||
|
userId: config.userId || "",
|
||||||
|
repositoryId: repo.id,
|
||||||
|
repositoryName: repo.name,
|
||||||
|
message: `Started mirroring repository: ${repo.name}`,
|
||||||
|
details: `Repository ${repo.name} is now in the mirroring queue.`,
|
||||||
|
status: "mirroring",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mirror the repository based on whether it's in an organization
|
||||||
if (repo.organization && config.githubConfig.preserveOrgStructure) {
|
if (repo.organization && config.githubConfig.preserveOrgStructure) {
|
||||||
await mirrorGitHubOrgRepoToGiteaOrg({
|
await mirrorGitHubOrgRepoToGiteaOrg({
|
||||||
config,
|
config,
|
||||||
octokit,
|
octokit,
|
||||||
orgName: repo.organization,
|
orgName: repo.organization,
|
||||||
repository: {
|
repository: repoData,
|
||||||
...repo,
|
|
||||||
status: repoStatusEnum.parse("imported"),
|
|
||||||
organization: repo.organization ?? undefined,
|
|
||||||
lastMirrored: repo.lastMirrored ?? undefined,
|
|
||||||
errorMessage: repo.errorMessage ?? undefined,
|
|
||||||
forkedFrom: repo.forkedFrom ?? undefined,
|
|
||||||
visibility: repositoryVisibilityEnum.parse(repo.visibility),
|
|
||||||
mirroredLocation: repo.mirroredLocation || "",
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
await mirrorGithubRepoToGitea({
|
await mirrorGithubRepoToGitea({
|
||||||
octokit,
|
octokit,
|
||||||
repository: {
|
repository: repoData,
|
||||||
...repo,
|
|
||||||
status: repoStatusEnum.parse("imported"),
|
|
||||||
organization: repo.organization ?? undefined,
|
|
||||||
lastMirrored: repo.lastMirrored ?? undefined,
|
|
||||||
errorMessage: repo.errorMessage ?? undefined,
|
|
||||||
forkedFrom: repo.forkedFrom ?? undefined,
|
|
||||||
visibility: repositoryVisibilityEnum.parse(repo.visibility),
|
|
||||||
mirroredLocation: repo.mirroredLocation || "",
|
|
||||||
},
|
|
||||||
config,
|
config,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (error) {
|
|
||||||
console.error(`Mirror failed for repo ${repo.name}:`, error);
|
return repo;
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrencyLimit: CONCURRENCY_LIMIT,
|
||||||
|
maxRetries: 2,
|
||||||
|
retryDelay: 2000,
|
||||||
|
onProgress: (completed, total, result) => {
|
||||||
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
|
console.log(`Mirroring progress: ${percentComplete}% (${completed}/${total})`);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
console.log(`Successfully mirrored repository: ${result.name}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onRetry: (repo, error, attempt) => {
|
||||||
|
console.log(`Retrying repository ${repo.name} (attempt ${attempt}): ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
|
console.log("All repository mirroring tasks completed");
|
||||||
}, 0);
|
}, 0);
|
||||||
|
|
||||||
const responsePayload: MirrorRepoResponse = {
|
const responsePayload: MirrorRepoResponse = {
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ import {
|
|||||||
import { createGitHubClient } from "@/lib/github";
|
import { createGitHubClient } from "@/lib/github";
|
||||||
import { repoStatusEnum, repositoryVisibilityEnum } from "@/types/Repository";
|
import { repoStatusEnum, repositoryVisibilityEnum } from "@/types/Repository";
|
||||||
import type { RetryRepoRequest, RetryRepoResponse } from "@/types/retry";
|
import type { RetryRepoRequest, RetryRepoResponse } from "@/types/retry";
|
||||||
|
import { processWithRetry } from "@/lib/utils/concurrency";
|
||||||
|
import { createMirrorJob } from "@/lib/helpers";
|
||||||
|
|
||||||
export const POST: APIRoute = async ({ request }) => {
|
export const POST: APIRoute = async ({ request }) => {
|
||||||
try {
|
try {
|
||||||
@@ -65,10 +67,21 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start background retry
|
// Start background retry with parallel processing
|
||||||
setTimeout(async () => {
|
setTimeout(async () => {
|
||||||
for (const repo of repos) {
|
// Create a single Octokit instance to be reused if needed
|
||||||
try {
|
const octokit = config.githubConfig.token
|
||||||
|
? createGitHubClient(config.githubConfig.token)
|
||||||
|
: null;
|
||||||
|
|
||||||
|
// Define the concurrency limit - adjust based on API rate limits
|
||||||
|
const CONCURRENCY_LIMIT = 3;
|
||||||
|
|
||||||
|
// Process repositories in parallel with retry capability
|
||||||
|
await processWithRetry(
|
||||||
|
repos,
|
||||||
|
async (repo) => {
|
||||||
|
// Prepare repository data
|
||||||
const visibility = repositoryVisibilityEnum.parse(repo.visibility);
|
const visibility = repositoryVisibilityEnum.parse(repo.visibility);
|
||||||
const status = repoStatusEnum.parse(repo.status);
|
const status = repoStatusEnum.parse(repo.status);
|
||||||
const repoData = {
|
const repoData = {
|
||||||
@@ -81,6 +94,20 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
forkedFrom: repo.forkedFrom ?? undefined,
|
forkedFrom: repo.forkedFrom ?? undefined,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Log the start of retry operation
|
||||||
|
console.log(`Starting retry for repository: ${repo.name}`);
|
||||||
|
|
||||||
|
// Create a mirror job entry to track progress
|
||||||
|
await createMirrorJob({
|
||||||
|
userId: config.userId || "",
|
||||||
|
repositoryId: repo.id,
|
||||||
|
repositoryName: repo.name,
|
||||||
|
message: `Started retry operation for repository: ${repo.name}`,
|
||||||
|
details: `Repository ${repo.name} is now in the retry queue.`,
|
||||||
|
status: "imported",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Determine if the repository exists in Gitea
|
||||||
let owner = getGiteaRepoOwner({
|
let owner = getGiteaRepoOwner({
|
||||||
config,
|
config,
|
||||||
repository: repoData,
|
repository: repoData,
|
||||||
@@ -93,16 +120,21 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (present) {
|
if (present) {
|
||||||
|
// If the repository exists, sync it
|
||||||
await syncGiteaRepo({ config, repository: repoData });
|
await syncGiteaRepo({ config, repository: repoData });
|
||||||
console.log(`Synced existing repo: ${repo.name}`);
|
console.log(`Synced existing repo: ${repo.name}`);
|
||||||
} else {
|
} else {
|
||||||
|
// If the repository doesn't exist, mirror it
|
||||||
if (!config.githubConfig.token) {
|
if (!config.githubConfig.token) {
|
||||||
throw new Error("GitHub token is missing.");
|
throw new Error("GitHub token is missing.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!octokit) {
|
||||||
|
throw new Error("Octokit client is not initialized.");
|
||||||
|
}
|
||||||
|
|
||||||
console.log(`Importing repo: ${repo.name} ${owner}`);
|
console.log(`Importing repo: ${repo.name} ${owner}`);
|
||||||
|
|
||||||
const octokit = createGitHubClient(config.githubConfig.token);
|
|
||||||
if (repo.organization && config.githubConfig.preserveOrgStructure) {
|
if (repo.organization && config.githubConfig.preserveOrgStructure) {
|
||||||
await mirrorGitHubOrgRepoToGiteaOrg({
|
await mirrorGitHubOrgRepoToGiteaOrg({
|
||||||
config,
|
config,
|
||||||
@@ -124,10 +156,28 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
|
||||||
console.error(`Failed to retry repo ${repo.name}:`, err);
|
return repo;
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrencyLimit: CONCURRENCY_LIMIT,
|
||||||
|
maxRetries: 2,
|
||||||
|
retryDelay: 2000,
|
||||||
|
onProgress: (completed, total, result) => {
|
||||||
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
|
console.log(`Retry progress: ${percentComplete}% (${completed}/${total})`);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
console.log(`Successfully processed repository: ${result.name}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onRetry: (repo, error, attempt) => {
|
||||||
|
console.log(`Retrying repository ${repo.name} (attempt ${attempt}): ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
|
console.log("All repository retry tasks completed");
|
||||||
}, 0);
|
}, 0);
|
||||||
|
|
||||||
const responsePayload: RetryRepoResponse = {
|
const responsePayload: RetryRepoResponse = {
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import { eq, inArray } from "drizzle-orm";
|
|||||||
import { repositoryVisibilityEnum, repoStatusEnum } from "@/types/Repository";
|
import { repositoryVisibilityEnum, repoStatusEnum } from "@/types/Repository";
|
||||||
import { syncGiteaRepo } from "@/lib/gitea";
|
import { syncGiteaRepo } from "@/lib/gitea";
|
||||||
import type { SyncRepoResponse } from "@/types/sync";
|
import type { SyncRepoResponse } from "@/types/sync";
|
||||||
|
import { processWithRetry } from "@/lib/utils/concurrency";
|
||||||
|
import { createMirrorJob } from "@/lib/helpers";
|
||||||
|
|
||||||
export const POST: APIRoute = async ({ request }) => {
|
export const POST: APIRoute = async ({ request }) => {
|
||||||
try {
|
try {
|
||||||
@@ -60,26 +62,66 @@ export const POST: APIRoute = async ({ request }) => {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start async mirroring in background
|
// Start async mirroring in background with parallel processing
|
||||||
setTimeout(async () => {
|
setTimeout(async () => {
|
||||||
for (const repo of repos) {
|
// Define the concurrency limit - adjust based on API rate limits
|
||||||
try {
|
const CONCURRENCY_LIMIT = 5;
|
||||||
|
|
||||||
|
// Process repositories in parallel with retry capability
|
||||||
|
await processWithRetry(
|
||||||
|
repos,
|
||||||
|
async (repo) => {
|
||||||
|
// Prepare repository data
|
||||||
|
const repoData = {
|
||||||
|
...repo,
|
||||||
|
status: repoStatusEnum.parse(repo.status),
|
||||||
|
organization: repo.organization ?? undefined,
|
||||||
|
lastMirrored: repo.lastMirrored ?? undefined,
|
||||||
|
errorMessage: repo.errorMessage ?? undefined,
|
||||||
|
forkedFrom: repo.forkedFrom ?? undefined,
|
||||||
|
visibility: repositoryVisibilityEnum.parse(repo.visibility),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log the start of syncing
|
||||||
|
console.log(`Starting sync for repository: ${repo.name}`);
|
||||||
|
|
||||||
|
// Create a mirror job entry to track progress
|
||||||
|
await createMirrorJob({
|
||||||
|
userId: config.userId || "",
|
||||||
|
repositoryId: repo.id,
|
||||||
|
repositoryName: repo.name,
|
||||||
|
message: `Started syncing repository: ${repo.name}`,
|
||||||
|
details: `Repository ${repo.name} is now in the syncing queue.`,
|
||||||
|
status: "syncing",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sync the repository
|
||||||
await syncGiteaRepo({
|
await syncGiteaRepo({
|
||||||
config,
|
config,
|
||||||
repository: {
|
repository: repoData,
|
||||||
...repo,
|
|
||||||
status: repoStatusEnum.parse(repo.status),
|
|
||||||
organization: repo.organization ?? undefined,
|
|
||||||
lastMirrored: repo.lastMirrored ?? undefined,
|
|
||||||
errorMessage: repo.errorMessage ?? undefined,
|
|
||||||
forkedFrom: repo.forkedFrom ?? undefined,
|
|
||||||
visibility: repositoryVisibilityEnum.parse(repo.visibility),
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
} catch (error) {
|
|
||||||
console.error(`Sync failed for repo ${repo.name}:`, error);
|
return repo;
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrencyLimit: CONCURRENCY_LIMIT,
|
||||||
|
maxRetries: 2,
|
||||||
|
retryDelay: 2000,
|
||||||
|
onProgress: (completed, total, result) => {
|
||||||
|
const percentComplete = Math.round((completed / total) * 100);
|
||||||
|
console.log(`Syncing progress: ${percentComplete}% (${completed}/${total})`);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
console.log(`Successfully synced repository: ${result.name}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onRetry: (repo, error, attempt) => {
|
||||||
|
console.log(`Retrying sync for repository ${repo.name} (attempt ${attempt}): ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
|
console.log("All repository syncing tasks completed");
|
||||||
}, 0);
|
}, 0);
|
||||||
|
|
||||||
const responsePayload: SyncRepoResponse = {
|
const responsePayload: SyncRepoResponse = {
|
||||||
|
|||||||
Reference in New Issue
Block a user