mirror of
https://github.com/ajayyy/SponsorBlockServer.git
synced 2025-12-07 20:17:02 +03:00
Reuse running redis connections and handle redis race condition
This commit is contained in:
@@ -67,6 +67,9 @@ const writeResponseTime: number[] = [];
|
|||||||
let lastResponseTimeLimit = 0;
|
let lastResponseTimeLimit = 0;
|
||||||
const maxStoredTimes = 200;
|
const maxStoredTimes = 200;
|
||||||
|
|
||||||
|
const activeRequestPromises: Record<string, Promise<string>> = {};
|
||||||
|
// Used to handle race conditions
|
||||||
|
const resetKeys: Set<RedisCommandArgument> = new Set();
|
||||||
const cache = config.redis.clientCacheSize ? new LRUCache<RedisCommandArgument, string>({
|
const cache = config.redis.clientCacheSize ? new LRUCache<RedisCommandArgument, string>({
|
||||||
maxSize: config.redis.clientCacheSize,
|
maxSize: config.redis.clientCacheSize,
|
||||||
sizeCalculation: (value) => value.length,
|
sizeCalculation: (value) => value.length,
|
||||||
@@ -81,6 +84,7 @@ export class TooManyActiveConnectionsError extends Error {}
|
|||||||
|
|
||||||
export let connectionPromise: Promise<unknown> = Promise.resolve();
|
export let connectionPromise: Promise<unknown> = Promise.resolve();
|
||||||
|
|
||||||
|
|
||||||
if (config.redis?.enabled) {
|
if (config.redis?.enabled) {
|
||||||
Logger.info("Connected to redis");
|
Logger.info("Connected to redis");
|
||||||
const client = createClient(config.redis);
|
const client = createClient(config.redis);
|
||||||
@@ -107,16 +111,34 @@ if (config.redis?.enabled) {
|
|||||||
memoryCacheUncachedMisses = 0;
|
memoryCacheUncachedMisses = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return exportClient.get(key).then((reply) => {
|
if (activeRequestPromises[key as string] !== undefined) {
|
||||||
|
return activeRequestPromises[key as string];
|
||||||
|
}
|
||||||
|
|
||||||
|
const request = exportClient.get(key).then((reply) => {
|
||||||
if (reply === null) return null;
|
if (reply === null) return null;
|
||||||
|
|
||||||
const decompressed = uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8"));
|
const decompressed = uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8"));
|
||||||
if (cache && shouldClientCacheKey(key)) {
|
if (cache && shouldClientCacheKey(key)) {
|
||||||
decompressed.then((d) => cache.set(key, d)).catch(Logger.error);
|
decompressed.then((d) => {
|
||||||
|
if (!resetKeys.has(key)) {
|
||||||
|
cache.set(key, d);
|
||||||
|
}
|
||||||
|
|
||||||
|
resetKeys.delete(key);
|
||||||
|
}).catch(Logger.error);
|
||||||
|
} else {
|
||||||
|
resetKeys.delete(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
return decompressed;
|
return decompressed;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
activeRequestPromises[key as string] = request;
|
||||||
|
|
||||||
|
void request.then(() => delete activeRequestPromises[key as string]);
|
||||||
|
|
||||||
|
return request;
|
||||||
};
|
};
|
||||||
exportClient.setCompressed = (key, value, options) => {
|
exportClient.setCompressed = (key, value, options) => {
|
||||||
return compress(Buffer.from(value as string, "utf-8")).then((compressed) =>
|
return compress(Buffer.from(value as string, "utf-8")).then((compressed) =>
|
||||||
@@ -309,10 +331,17 @@ async function setupCacheClientListener(cacheClient: RedisClientType,
|
|||||||
cacheConnectionClientId = String(await cacheClient.clientId());
|
cacheConnectionClientId = String(await cacheClient.clientId());
|
||||||
|
|
||||||
cacheClient.subscribe("__redis__:invalidate", (messages) => {
|
cacheClient.subscribe("__redis__:invalidate", (messages) => {
|
||||||
|
const key = messages?.[0];
|
||||||
|
|
||||||
lastInvalidationMessage = Date.now();
|
lastInvalidationMessage = Date.now();
|
||||||
if (cache.delete(messages?.[0])) {
|
if (cache.delete(key)) {
|
||||||
lastInvalidation = Date.now();
|
lastInvalidation = Date.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// To tell it to not save the result of this currently running request
|
||||||
|
if (key && activeRequestPromises[key] !== undefined) {
|
||||||
|
resetKeys.add(key);
|
||||||
|
}
|
||||||
}).catch(Logger.error);
|
}).catch(Logger.error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user