Handle reconnects with client-side caching

Also upgrades redis to fix a library bug
This commit is contained in:
Ajay
2024-02-06 00:52:42 -05:00
parent 14da10bd8a
commit 5b1b362bf0
3 changed files with 114 additions and 84 deletions

View File

@@ -55,9 +55,12 @@ const writeResponseTime: number[] = [];
let lastResponseTimeLimit = 0;
const maxStoredTimes = 200;
// For redis
let cacheConnectionClientId = "";
export class TooManyActiveConnectionsError extends Error {}
export let connectionPromise = Promise.resolve();
export let connectionPromise: Promise<unknown> = Promise.resolve();
if (config.redis?.enabled) {
Logger.info("Connected to redis");
@@ -67,7 +70,7 @@ if (config.redis?.enabled) {
void readClient?.connect(); // void as we don't care about the promise
exportClient = client as unknown as RedisSB;
const cacheClient = config.redis.clientCacheLength ? createClient(config.redis) : null;
let cacheClient = null as RedisClientType | null;
const cache = config.redis.clientCacheLength ? new LRUCache<RedisCommandArgument, string>({
max: config.redis.clientCacheLength
}) : null;
@@ -184,7 +187,7 @@ if (config.redis?.enabled) {
Logger.error(`Redis Error: ${error}`);
});
/* istanbul ignore next */
client.on("reconnect", () => {
client.on("reconnecting", () => {
Logger.info("Redis: trying to reconnect");
});
/* istanbul ignore next */
@@ -193,26 +196,54 @@ if (config.redis?.enabled) {
Logger.error(`Redis Read-Only Error: ${error}`);
});
/* istanbul ignore next */
readClient?.on("reconnect", () => {
readClient?.on("reconnecting", () => {
Logger.info("Redis Read-Only: trying to reconnect");
});
if (cacheClient) {
// It needs to recreate itself when the connection fails as the queue connection doesn't properly restart
const createCacheClient = () => {
cacheClient = createClient(config.redis) as RedisClientType;
/* istanbul ignore next */
cacheClient.on("error", function(error) {
cacheClient.on("error", function (error) {
lastClientFail = Date.now();
Logger.error(`Redis Cache Client Error: ${error}`);
});
/* istanbul ignore next */
cacheClient.on("reconnect", () => {
cacheClient.on("reconnecting", () => {
Logger.info("Redis cache client: trying to reconnect");
cache?.clear();
void cacheClient.disconnect();
setTimeout(() => createCacheClient(), 1000);
});
setupClientCache(client as RedisClientType,
readClient as RedisClientType,
cacheClient as RedisClientType,
cache).catch(Logger.error);
// eslint-disable-next-line @typescript-eslint/no-misused-promises
cacheClient.on("ready", async () => {
cache?.clear();
await setupCacheClientListener(cacheClient as RedisClientType, cache);
void setupCacheClientTracking(client as RedisClientType, cacheClient as RedisClientType);
void setupCacheClientTracking(readClient as RedisClientType, cacheClient as RedisClientType);
});
void cacheClient.connect();
};
if (config.redis.clientCacheLength) {
createCacheClient();
client.on("ready", () => {
if (cacheClient.isReady) {
void setupCacheClientTracking(client as RedisClientType, cacheClient as RedisClientType);
}
});
readClient?.on("ready", () => {
if (cacheClient.isReady) {
void setupCacheClientTracking(readClient as RedisClientType, cacheClient as RedisClientType);
}
});
}
}
@@ -237,23 +268,22 @@ export function getRedisStats(): RedisStats {
};
}
async function setupClientCache(client: RedisClientType,
readClient: RedisClientType,
cacheClient: RedisClientType,
async function setupCacheClientListener(cacheClient: RedisClientType,
cache: LRUCache<RedisCommandArgument, string>) {
await cacheClient.connect();
cacheConnectionClientId = String(await cacheClient.clientId());
const clientId = await cacheClient.sendCommand(["CLIENT", "ID"]);
cacheClient.subscribe("__redis__:invalidate", (messages) => {
cache.delete(messages[0]);
}).catch(Logger.error);
}
await client.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]);
async function setupCacheClientTracking(client: RedisClientType,
cacheClient: RedisClientType) {
if (readClient) {
await readClient.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", String(clientId)]);
}
if (!client || !cacheClient.isReady) return;
await client.sendCommand(["CLIENT", "TRACKING", "ON", "REDIRECT", cacheConnectionClientId]);
}
export default exportClient;