From db225f8a8400035e05be80dec7b2fd1db335a9ca Mon Sep 17 00:00:00 2001 From: Ajay Date: Thu, 8 Feb 2024 21:30:27 -0500 Subject: [PATCH] Reuse running redis connections and handle redis race condition --- src/utils/redis.ts | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/utils/redis.ts b/src/utils/redis.ts index f7dacf2..6e012d1 100644 --- a/src/utils/redis.ts +++ b/src/utils/redis.ts @@ -67,6 +67,9 @@ const writeResponseTime: number[] = []; let lastResponseTimeLimit = 0; const maxStoredTimes = 200; +const activeRequestPromises: Record> = {}; +// Used to handle race conditions +const resetKeys: Set = new Set(); const cache = config.redis.clientCacheSize ? new LRUCache({ maxSize: config.redis.clientCacheSize, sizeCalculation: (value) => value.length, @@ -81,6 +84,7 @@ export class TooManyActiveConnectionsError extends Error {} export let connectionPromise: Promise = Promise.resolve(); + if (config.redis?.enabled) { Logger.info("Connected to redis"); const client = createClient(config.redis); @@ -107,16 +111,34 @@ if (config.redis?.enabled) { memoryCacheUncachedMisses = 0; } - return exportClient.get(key).then((reply) => { + if (activeRequestPromises[key as string] !== undefined) { + return activeRequestPromises[key as string]; + } + + const request = exportClient.get(key).then((reply) => { if (reply === null) return null; const decompressed = uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8")); if (cache && shouldClientCacheKey(key)) { - decompressed.then((d) => cache.set(key, d)).catch(Logger.error); + decompressed.then((d) => { + if (!resetKeys.has(key)) { + cache.set(key, d); + } + + resetKeys.delete(key); + }).catch(Logger.error); + } else { + resetKeys.delete(key); } return decompressed; }); + + activeRequestPromises[key as string] = request; + + void request.then(() => delete activeRequestPromises[key as string]); + + return request; }; exportClient.setCompressed = (key, value, options) => { return compress(Buffer.from(value as string, "utf-8")).then((compressed) => @@ -309,10 +331,17 @@ async function setupCacheClientListener(cacheClient: RedisClientType, cacheConnectionClientId = String(await cacheClient.clientId()); cacheClient.subscribe("__redis__:invalidate", (messages) => { + const key = messages?.[0]; + lastInvalidationMessage = Date.now(); - if (cache.delete(messages?.[0])) { + if (cache.delete(key)) { lastInvalidation = Date.now(); } + + // To tell it to not save the result of this currently running request + if (key && activeRequestPromises[key] !== undefined) { + resetKeys.add(key); + } }).catch(Logger.error); }