Files
gitea-mirror/src/pages/api/sse/index.ts

108 lines
3.1 KiB
TypeScript

import type { APIRoute } from "astro";
import { getNewEvents } from "@/lib/events";
export const GET: APIRoute = async ({ request }) => {
const url = new URL(request.url);
const userId = url.searchParams.get("userId");
if (!userId) {
return new Response("Missing userId", { status: 400 });
}
const channel = `mirror-status:${userId}`;
let isClosed = false;
const POLL_INTERVAL = 5000; // Poll every 5 seconds (reduced from 2 seconds for low-traffic usage)
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
let lastEventTime: Date | undefined = undefined;
let pollIntervalId: ReturnType<typeof setInterval> | null = null;
// Function to send a message to the client
const sendMessage = (message: string) => {
if (isClosed) return;
try {
controller.enqueue(encoder.encode(message));
} catch (err) {
console.error("Stream enqueue error:", err);
}
};
// Function to poll for new events
const pollForEvents = async () => {
if (isClosed) return;
try {
console.log(`Polling for events for user ${userId} in channel ${channel}`);
// Get new events from SQLite
const events = await getNewEvents({
userId,
channel,
lastEventTime,
});
console.log(`Found ${events.length} new events`);
// Send events to client
if (events.length > 0) {
// Update last event time
lastEventTime = events[events.length - 1].createdAt;
// Send each event to the client
for (const event of events) {
console.log(`Sending event: ${JSON.stringify(event.payload)}`);
sendMessage(`data: ${JSON.stringify(event.payload)}\n\n`);
}
}
} catch (err) {
console.error("Error polling for events:", err);
sendMessage(`data: {"error": "Error polling for events"}\n\n`);
}
};
// Send initial connection message
sendMessage(": connected\n\n");
// Start polling for events
pollForEvents();
// Set up polling interval
pollIntervalId = setInterval(pollForEvents, POLL_INTERVAL);
// Send a heartbeat every 30 seconds to keep the connection alive
const heartbeatInterval = setInterval(() => {
if (!isClosed) {
sendMessage(": heartbeat\n\n");
} else {
clearInterval(heartbeatInterval);
}
}, 30000);
// Handle client disconnection
request.signal?.addEventListener("abort", () => {
if (!isClosed) {
isClosed = true;
if (pollIntervalId) {
clearInterval(pollIntervalId);
}
controller.close();
}
});
},
cancel() {
// Extra safety in case cancel is triggered
isClosed = true;
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
};