diff --git a/package-lock.json b/package-lock.json index 30318be..4793763 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "express-rate-limit": "^6.7.0", "form-data": "^4.0.0", "lodash": "^4.17.21", + "lru-cache": "^10.2.0", "lz4-napi": "^2.2.0", "pg": "^8.8.0", "rate-limit-redis": "^3.0.1", @@ -3715,6 +3716,14 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lru-cache": { + "version": "10.2.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.0.tgz", + "integrity": "sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==", + "engines": { + "node": "14 || >=16.14" + } + }, "node_modules/luxon": { "version": "1.28.1", "resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz", @@ -8608,6 +8617,11 @@ "is-unicode-supported": "^0.1.0" } }, + "lru-cache": { + "version": "10.2.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.0.tgz", + "integrity": "sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==" + }, "luxon": { "version": "1.28.1", "resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz", diff --git a/package.json b/package.json index 77ecee7..a6e728f 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "express-rate-limit": "^6.7.0", "form-data": "^4.0.0", "lodash": "^4.17.21", + "lru-cache": "^10.2.0", "lz4-napi": "^2.2.0", "pg": "^8.8.0", "rate-limit-redis": "^3.0.1", diff --git a/src/config.ts b/src/config.ts index 8e458d8..c51fd0b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -166,7 +166,8 @@ addDefaults(config, { commandsQueueMaxLength: 3000, stopWritingAfterResponseTime: 50, responseTimePause: 1000, - disableHashCache: false + disableHashCache: false, + clientCacheLength: 2000 }, redisRead: { enabled: false, diff --git a/src/types/config.model.ts b/src/types/config.model.ts index 9849165..8992cf4 100644 --- a/src/types/config.model.ts +++ b/src/types/config.model.ts @@ -11,6 +11,7 @@ interface RedisConfig extends redis.RedisClientOptions { stopWritingAfterResponseTime: number; responseTimePause: number; disableHashCache: boolean; + clientCacheLength: number; } interface RedisReadOnlyConfig extends redis.RedisClientOptions { diff --git a/src/utils/redis.ts b/src/utils/redis.ts index ba09f10..877886c 100644 --- a/src/utils/redis.ts +++ b/src/utils/redis.ts @@ -1,12 +1,14 @@ import { config } from "../config"; import { Logger } from "./logger"; -import { SetOptions, createClient } from "redis"; +import { RedisClientType, SetOptions, createClient } from "redis"; import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from "@redis/client/dist/lib/commands"; import { RedisClientOptions } from "@redis/client/dist/lib/client"; import { RedisReply } from "rate-limit-redis"; import { db } from "../databases/databases"; import { Postgres } from "../databases/Postgres"; import { compress, uncompress } from "lz4-napi"; +import { LRUCache } from "lru-cache"; +import { shouldClientCacheKey } from "./redisKeys"; export interface RedisStats { activeRequests: number; @@ -16,7 +18,7 @@ export interface RedisStats { } interface RedisSB { - get(key: RedisCommandArgument): Promise; + get(key: RedisCommandArgument, useClientCache?: boolean): Promise; getCompressed(key: RedisCommandArgument): Promise; set(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise; setCompressed(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise; @@ -65,10 +67,25 @@ if (config.redis?.enabled) { void readClient?.connect(); // void as we don't care about the promise exportClient = client as unknown as RedisSB; + const cacheClient = config.redis.clientCacheLength ? createClient(config.redis) : null; + const cache = config.redis.clientCacheLength ? new LRUCache({ + max: config.redis.clientCacheLength + }) : null; + exportClient.getCompressed = (key) => { + if (cache && cacheClient && cache.has(key)) { + return Promise.resolve(cache.get(key)); + } + return exportClient.get(key).then((reply) => { if (reply === null) return null; - return uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8")); + + const decompressed = uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8")); + if (cache && shouldClientCacheKey(key)) { + decompressed.then((d) => cache.set(key, d)).catch(Logger.error); + } + + return decompressed; }); }; exportClient.setCompressed = (key, value, options) => { @@ -111,7 +128,7 @@ if (config.redis?.enabled) { lastResponseTimeLimit = Date.now(); } }).catch((err) => { - if (chosenGet === get) { + if (chosenGet === get || chosenGet === cacheClient?.get) { lastClientFail = Date.now(); } else { lastReadFail = Date.now(); @@ -179,6 +196,24 @@ if (config.redis?.enabled) { readClient?.on("reconnect", () => { Logger.info("Redis Read-Only: trying to reconnect"); }); + + if (cacheClient) { + /* istanbul ignore next */ + cacheClient.on("error", function(error) { + lastClientFail = Date.now(); + Logger.error(`Redis Cache Client Error: ${error}`); + }); + /* istanbul ignore next */ + cacheClient.on("reconnect", () => { + Logger.info("Redis cache client: trying to reconnect"); + cache?.clear(); + }); + + setupClientCache(client as RedisClientType, + readClient as RedisClientType, + cacheClient as RedisClientType, + cache).catch(Logger.error); + } } function pickChoice(client: T, readClient: T): T { @@ -202,4 +237,23 @@ export function getRedisStats(): RedisStats { }; } +async function setupClientCache(client: RedisClientType, + readClient: RedisClientType, + cacheClient: RedisClientType, + cache: LRUCache) { + + await cacheClient.connect(); + + const clientId = await cacheClient.sendCommand(["CLIENT", "ID"]); + cacheClient.subscribe("__redis__:invalidate", (messages) => { + cache.delete(messages[0]); + }).catch(Logger.error); + + await client.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]); + + if (readClient) { + await readClient.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]); + } +} + export default exportClient; diff --git a/src/utils/redisKeys.ts b/src/utils/redisKeys.ts index 9d5ca3c..7c9da12 100644 --- a/src/utils/redisKeys.ts +++ b/src/utils/redisKeys.ts @@ -3,6 +3,7 @@ import { Feature, HashedUserID, UserID } from "../types/user.model"; import { HashedValue } from "../types/hash.model"; import { Logger } from "./logger"; import { BrandingUUID } from "../types/branding.model"; +import { RedisCommandArgument } from "@redis/client/dist/lib/commands"; export const skipSegmentsKey = (videoID: VideoID, service: Service): string => `segments.v6.${service}.videoID.${videoID}`; @@ -65,4 +66,8 @@ export function videoLabelsHashKey(hashedVideoIDPrefix: VideoIDHash, service: Se export function userFeatureKey (userID: HashedUserID, feature: Feature): string { return `user.v2.${userID}.feature.${feature}`; +} + +export function shouldClientCacheKey(key: RedisCommandArgument): boolean { + return (key as string).startsWith("segments."); } \ No newline at end of file