Add client-side caching

This commit is contained in:
Ajay
2024-02-05 13:11:44 -05:00
parent 547632341a
commit 14da10bd8a
6 changed files with 81 additions and 5 deletions

14
package-lock.json generated
View File

@@ -17,6 +17,7 @@
"express-rate-limit": "^6.7.0", "express-rate-limit": "^6.7.0",
"form-data": "^4.0.0", "form-data": "^4.0.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"lru-cache": "^10.2.0",
"lz4-napi": "^2.2.0", "lz4-napi": "^2.2.0",
"pg": "^8.8.0", "pg": "^8.8.0",
"rate-limit-redis": "^3.0.1", "rate-limit-redis": "^3.0.1",
@@ -3715,6 +3716,14 @@
"url": "https://github.com/sponsors/sindresorhus" "url": "https://github.com/sponsors/sindresorhus"
} }
}, },
"node_modules/lru-cache": {
"version": "10.2.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.0.tgz",
"integrity": "sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==",
"engines": {
"node": "14 || >=16.14"
}
},
"node_modules/luxon": { "node_modules/luxon": {
"version": "1.28.1", "version": "1.28.1",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz", "resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz",
@@ -8608,6 +8617,11 @@
"is-unicode-supported": "^0.1.0" "is-unicode-supported": "^0.1.0"
} }
}, },
"lru-cache": {
"version": "10.2.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.0.tgz",
"integrity": "sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q=="
},
"luxon": { "luxon": {
"version": "1.28.1", "version": "1.28.1",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz", "resolved": "https://registry.npmjs.org/luxon/-/luxon-1.28.1.tgz",

View File

@@ -27,6 +27,7 @@
"express-rate-limit": "^6.7.0", "express-rate-limit": "^6.7.0",
"form-data": "^4.0.0", "form-data": "^4.0.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"lru-cache": "^10.2.0",
"lz4-napi": "^2.2.0", "lz4-napi": "^2.2.0",
"pg": "^8.8.0", "pg": "^8.8.0",
"rate-limit-redis": "^3.0.1", "rate-limit-redis": "^3.0.1",

View File

@@ -166,7 +166,8 @@ addDefaults(config, {
commandsQueueMaxLength: 3000, commandsQueueMaxLength: 3000,
stopWritingAfterResponseTime: 50, stopWritingAfterResponseTime: 50,
responseTimePause: 1000, responseTimePause: 1000,
disableHashCache: false disableHashCache: false,
clientCacheLength: 2000
}, },
redisRead: { redisRead: {
enabled: false, enabled: false,

View File

@@ -11,6 +11,7 @@ interface RedisConfig extends redis.RedisClientOptions {
stopWritingAfterResponseTime: number; stopWritingAfterResponseTime: number;
responseTimePause: number; responseTimePause: number;
disableHashCache: boolean; disableHashCache: boolean;
clientCacheLength: number;
} }
interface RedisReadOnlyConfig extends redis.RedisClientOptions { interface RedisReadOnlyConfig extends redis.RedisClientOptions {

View File

@@ -1,12 +1,14 @@
import { config } from "../config"; import { config } from "../config";
import { Logger } from "./logger"; import { Logger } from "./logger";
import { SetOptions, createClient } from "redis"; import { RedisClientType, SetOptions, createClient } from "redis";
import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from "@redis/client/dist/lib/commands"; import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from "@redis/client/dist/lib/commands";
import { RedisClientOptions } from "@redis/client/dist/lib/client"; import { RedisClientOptions } from "@redis/client/dist/lib/client";
import { RedisReply } from "rate-limit-redis"; import { RedisReply } from "rate-limit-redis";
import { db } from "../databases/databases"; import { db } from "../databases/databases";
import { Postgres } from "../databases/Postgres"; import { Postgres } from "../databases/Postgres";
import { compress, uncompress } from "lz4-napi"; import { compress, uncompress } from "lz4-napi";
import { LRUCache } from "lru-cache";
import { shouldClientCacheKey } from "./redisKeys";
export interface RedisStats { export interface RedisStats {
activeRequests: number; activeRequests: number;
@@ -16,7 +18,7 @@ export interface RedisStats {
} }
interface RedisSB { interface RedisSB {
get(key: RedisCommandArgument): Promise<string>; get(key: RedisCommandArgument, useClientCache?: boolean): Promise<string>;
getCompressed(key: RedisCommandArgument): Promise<string>; getCompressed(key: RedisCommandArgument): Promise<string>;
set(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise<string>; set(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise<string>;
setCompressed(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise<string>; setCompressed(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise<string>;
@@ -65,10 +67,25 @@ if (config.redis?.enabled) {
void readClient?.connect(); // void as we don't care about the promise void readClient?.connect(); // void as we don't care about the promise
exportClient = client as unknown as RedisSB; exportClient = client as unknown as RedisSB;
const cacheClient = config.redis.clientCacheLength ? createClient(config.redis) : null;
const cache = config.redis.clientCacheLength ? new LRUCache<RedisCommandArgument, string>({
max: config.redis.clientCacheLength
}) : null;
exportClient.getCompressed = (key) => { exportClient.getCompressed = (key) => {
if (cache && cacheClient && cache.has(key)) {
return Promise.resolve(cache.get(key));
}
return exportClient.get(key).then((reply) => { return exportClient.get(key).then((reply) => {
if (reply === null) return null; if (reply === null) return null;
return uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8"));
const decompressed = uncompress(Buffer.from(reply, "base64")).then((decompressed) => decompressed.toString("utf-8"));
if (cache && shouldClientCacheKey(key)) {
decompressed.then((d) => cache.set(key, d)).catch(Logger.error);
}
return decompressed;
}); });
}; };
exportClient.setCompressed = (key, value, options) => { exportClient.setCompressed = (key, value, options) => {
@@ -111,7 +128,7 @@ if (config.redis?.enabled) {
lastResponseTimeLimit = Date.now(); lastResponseTimeLimit = Date.now();
} }
}).catch((err) => { }).catch((err) => {
if (chosenGet === get) { if (chosenGet === get || chosenGet === cacheClient?.get) {
lastClientFail = Date.now(); lastClientFail = Date.now();
} else { } else {
lastReadFail = Date.now(); lastReadFail = Date.now();
@@ -179,6 +196,24 @@ if (config.redis?.enabled) {
readClient?.on("reconnect", () => { readClient?.on("reconnect", () => {
Logger.info("Redis Read-Only: trying to reconnect"); Logger.info("Redis Read-Only: trying to reconnect");
}); });
if (cacheClient) {
/* istanbul ignore next */
cacheClient.on("error", function(error) {
lastClientFail = Date.now();
Logger.error(`Redis Cache Client Error: ${error}`);
});
/* istanbul ignore next */
cacheClient.on("reconnect", () => {
Logger.info("Redis cache client: trying to reconnect");
cache?.clear();
});
setupClientCache(client as RedisClientType,
readClient as RedisClientType,
cacheClient as RedisClientType,
cache).catch(Logger.error);
}
} }
function pickChoice<T>(client: T, readClient: T): T { function pickChoice<T>(client: T, readClient: T): T {
@@ -202,4 +237,23 @@ export function getRedisStats(): RedisStats {
}; };
} }
async function setupClientCache(client: RedisClientType,
readClient: RedisClientType,
cacheClient: RedisClientType,
cache: LRUCache<RedisCommandArgument, string>) {
await cacheClient.connect();
const clientId = await cacheClient.sendCommand(["CLIENT", "ID"]);
cacheClient.subscribe("__redis__:invalidate", (messages) => {
cache.delete(messages[0]);
}).catch(Logger.error);
await client.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]);
if (readClient) {
await readClient.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]);
}
}
export default exportClient; export default exportClient;

View File

@@ -3,6 +3,7 @@ import { Feature, HashedUserID, UserID } from "../types/user.model";
import { HashedValue } from "../types/hash.model"; import { HashedValue } from "../types/hash.model";
import { Logger } from "./logger"; import { Logger } from "./logger";
import { BrandingUUID } from "../types/branding.model"; import { BrandingUUID } from "../types/branding.model";
import { RedisCommandArgument } from "@redis/client/dist/lib/commands";
export const skipSegmentsKey = (videoID: VideoID, service: Service): string => export const skipSegmentsKey = (videoID: VideoID, service: Service): string =>
`segments.v6.${service}.videoID.${videoID}`; `segments.v6.${service}.videoID.${videoID}`;
@@ -65,4 +66,8 @@ export function videoLabelsHashKey(hashedVideoIDPrefix: VideoIDHash, service: Se
export function userFeatureKey (userID: HashedUserID, feature: Feature): string { export function userFeatureKey (userID: HashedUserID, feature: Feature): string {
return `user.v2.${userID}.feature.${feature}`; return `user.v2.${userID}.feature.${feature}`;
}
export function shouldClientCacheKey(key: RedisCommandArgument): boolean {
return (key as string).startsWith("segments.");
} }