feat: enhance event management by adding duplicate removal, cleanup functionality, and improving activity logging

This commit is contained in:
Arunavo Ray
2025-05-24 13:25:58 +05:30
parent 546db472e5
commit 98610482ae
15 changed files with 603 additions and 305 deletions

View File

@@ -1,6 +1,6 @@
import { v4 as uuidv4 } from "uuid";
import { db, events } from "./db";
import { eq, and, gt, lt } from "drizzle-orm";
import { eq, and, gt, lt, inArray } from "drizzle-orm";
/**
* Publishes an event to a specific channel for a user
@@ -10,21 +10,58 @@ export async function publishEvent({
userId,
channel,
payload,
deduplicationKey,
}: {
userId: string;
channel: string;
payload: any;
deduplicationKey?: string; // Optional key to prevent duplicate events
}): Promise<string> {
try {
const eventId = uuidv4();
console.log(`Publishing event to channel ${channel} for user ${userId}`);
// Check for duplicate events if deduplication key is provided
if (deduplicationKey) {
const existingEvent = await db
.select()
.from(events)
.where(
and(
eq(events.userId, userId),
eq(events.channel, channel),
eq(events.read, false)
)
)
.limit(10); // Check recent unread events
// Check if any existing event has the same deduplication key in payload
const isDuplicate = existingEvent.some(event => {
try {
const eventPayload = JSON.parse(event.payload as string);
return eventPayload.deduplicationKey === deduplicationKey;
} catch {
return false;
}
});
if (isDuplicate) {
console.log(`Skipping duplicate event with key: ${deduplicationKey}`);
return eventId; // Return a valid ID but don't create the event
}
}
// Add deduplication key to payload if provided
const eventPayload = deduplicationKey
? { ...payload, deduplicationKey }
: payload;
// Insert the event into the SQLite database
await db.insert(events).values({
id: eventId,
userId,
channel,
payload: JSON.stringify(payload),
payload: JSON.stringify(eventPayload),
createdAt: new Date(),
});
@@ -103,6 +140,78 @@ export async function getNewEvents({
}
}
/**
* Removes duplicate events based on deduplication keys
* This can be called periodically to clean up any duplicates that may have slipped through
*/
export async function removeDuplicateEvents(userId?: string): Promise<{ duplicatesRemoved: number }> {
try {
console.log("Removing duplicate events...");
// Build the base query
let query = db.select().from(events);
if (userId) {
query = query.where(eq(events.userId, userId));
}
const allEvents = await query;
const duplicateIds: string[] = [];
const seenKeys = new Set<string>();
// Group events by user and channel, then check for duplicates
const eventsByUserChannel = new Map<string, typeof allEvents>();
for (const event of allEvents) {
const key = `${event.userId}-${event.channel}`;
if (!eventsByUserChannel.has(key)) {
eventsByUserChannel.set(key, []);
}
eventsByUserChannel.get(key)!.push(event);
}
// Check each group for duplicates
for (const [, events] of eventsByUserChannel) {
const channelSeenKeys = new Set<string>();
// Sort by creation time (keep the earliest)
events.sort((a, b) => new Date(a.createdAt).getTime() - new Date(b.createdAt).getTime());
for (const event of events) {
try {
const payload = JSON.parse(event.payload as string);
if (payload.deduplicationKey) {
if (channelSeenKeys.has(payload.deduplicationKey)) {
duplicateIds.push(event.id);
} else {
channelSeenKeys.add(payload.deduplicationKey);
}
}
} catch {
// Skip events with invalid JSON
}
}
}
// Remove duplicates
if (duplicateIds.length > 0) {
console.log(`Removing ${duplicateIds.length} duplicate events`);
// Delete in batches to avoid query size limits
const batchSize = 100;
for (let i = 0; i < duplicateIds.length; i += batchSize) {
const batch = duplicateIds.slice(i, i + batchSize);
await db.delete(events).where(inArray(events.id, batch));
}
}
console.log(`Removed ${duplicateIds.length} duplicate events`);
return { duplicatesRemoved: duplicateIds.length };
} catch (error) {
console.error("Error removing duplicate events:", error);
return { duplicatesRemoved: 0 };
}
}
/**
* Cleans up old events to prevent the database from growing too large
* Should be called periodically (e.g., daily via a cron job)

View File

@@ -295,15 +295,8 @@ export async function getOrCreateGiteaOrg({
if (orgRes.ok) {
const org = await orgRes.json();
await createMirrorJob({
userId: config.userId,
organizationId: org.id,
organizationName: orgName,
status: "imported",
message: `Organization ${orgName} fetched successfully`,
details: `Organization ${orgName} was fetched from GitHub`,
});
// Note: Organization events are handled by the main mirroring process
// to avoid duplicate events
return org.id;
}
@@ -325,13 +318,8 @@ export async function getOrCreateGiteaOrg({
throw new Error(`Failed to create Gitea org: ${await createRes.text()}`);
}
await createMirrorJob({
userId: config.userId,
organizationName: orgName,
status: "imported",
message: `Organization ${orgName} created successfully`,
details: `Organization ${orgName} was created in Gitea`,
});
// Note: Organization creation events are handled by the main mirroring process
// to avoid duplicate events
const newOrg = await createRes.json();
return newOrg.id;
@@ -417,15 +405,8 @@ export async function mirrorGitHubRepoToGiteaOrg({
})
.where(eq(repositories.id, repository.id!));
// Append log for "mirroring" status
await createMirrorJob({
userId: config.userId,
repositoryId: repository.id,
repositoryName: repository.name,
message: `Started mirroring repository: ${repository.name}`,
details: `Repository ${repository.name} is now in the mirroring state.`,
status: "mirroring",
});
// Note: "mirroring" status events are handled by the concurrency system
// to avoid duplicate events during batch operations
const apiUrl = `${config.giteaConfig.url}/api/v1/repos/migrate`;

View File

@@ -17,6 +17,7 @@ export async function createMirrorJob({
totalItems,
itemIds,
inProgress,
skipDuplicateEvent,
}: {
userId: string;
organizationId?: string;
@@ -31,6 +32,7 @@ export async function createMirrorJob({
totalItems?: number;
itemIds?: string[];
inProgress?: boolean;
skipDuplicateEvent?: boolean; // Option to skip event publishing for internal operations
}) {
const jobId = uuidv4();
const currentTimestamp = new Date();
@@ -64,13 +66,27 @@ export async function createMirrorJob({
// Insert the job into the database
await db.insert(mirrorJobs).values(job);
// Publish the event using SQLite instead of Redis
const channel = `mirror-status:${userId}`;
await publishEvent({
userId,
channel,
payload: job
});
// Publish the event using SQLite instead of Redis (unless skipped)
if (!skipDuplicateEvent) {
const channel = `mirror-status:${userId}`;
// Create deduplication key based on the operation
let deduplicationKey: string | undefined;
if (repositoryId && status) {
deduplicationKey = `repo-${repositoryId}-${status}`;
} else if (organizationId && status) {
deduplicationKey = `org-${organizationId}-${status}`;
} else if (batchId) {
deduplicationKey = `batch-${batchId}-${status}`;
}
await publishEvent({
userId,
channel,
payload: job,
deduplicationKey
});
}
return jobId;
} catch (error) {
@@ -156,16 +172,27 @@ export async function updateMirrorJobProgress({
.set(updates)
.where(mirrorJobs.id === jobId);
// Publish the event
// Publish the event with deduplication
const updatedJob = {
...job,
...updates,
};
// Create deduplication key for progress updates
let deduplicationKey: string | undefined;
if (completedItemId) {
deduplicationKey = `progress-${jobId}-${completedItemId}`;
} else if (isCompleted) {
deduplicationKey = `completed-${jobId}`;
} else {
deduplicationKey = `update-${jobId}-${Date.now()}`;
}
await publishEvent({
userId: job.userId,
channel: `mirror-status:${job.userId}`,
payload: updatedJob,
deduplicationKey
});
return updatedJob;

View File

@@ -181,7 +181,7 @@ export async function processWithResilience<T, R>(
getItemId,
getItemName,
resumeFromJobId,
checkpointInterval = 5,
checkpointInterval = 10, // Increased from 5 to reduce event frequency
...otherOptions
} = options;