diff --git a/src/config.ts b/src/config.ts index 568aa0c..cb0bf34 100644 --- a/src/config.ts +++ b/src/config.ts @@ -76,7 +76,10 @@ addDefaults(config, { port: 5432, max: 10, idleTimeoutMillis: 10000, - maxTries: 3 + maxTries: 3, + maxActiveRequests: 0, + timeout: 60000, + highLoadThreshold: 10 }, postgresReadOnly: { enabled: false, @@ -138,7 +141,11 @@ addDefaults(config, { expiryTime: 24 * 60 * 60, getTimeout: 40, maxConnections: 15000, - maxWriteConnections: 1000 + maxWriteConnections: 1000, + commandsQueueMaxLength: 3000, + stopWritingAfterResponseTime: 50, + responseTimePause: 1000, + disableHashCache: false }, redisRead: { enabled: false, diff --git a/src/databases/IDatabase.ts b/src/databases/IDatabase.ts index 9cc82d4..717c700 100644 --- a/src/databases/IDatabase.ts +++ b/src/databases/IDatabase.ts @@ -7,6 +7,8 @@ export interface IDatabase { init(): Promise; prepare(type: QueryType, query: string, params?: any[], options?: QueryOption): Promise; + + highLoad(): boolean; } export type QueryType = "get" | "all" | "run"; \ No newline at end of file diff --git a/src/databases/Mysql.ts b/src/databases/Mysql.ts index d15b692..a68a12c 100644 --- a/src/databases/Mysql.ts +++ b/src/databases/Mysql.ts @@ -32,4 +32,8 @@ export class Mysql implements IDatabase { } } + highLoad() { + return false; + } + } diff --git a/src/databases/Postgres.ts b/src/databases/Postgres.ts index db72021..7f65c9f 100644 --- a/src/databases/Postgres.ts +++ b/src/databases/Postgres.ts @@ -3,7 +3,7 @@ import { IDatabase, QueryOption, QueryType } from "./IDatabase"; import { Client, Pool, QueryResult, types } from "pg"; import fs from "fs"; -import { CustomPostgresConfig, CustomPostgresReadOnlyConfig } from "../types/config.model"; +import { CustomPostgresReadOnlyConfig, CustomWritePostgresConfig } from "../types/config.model"; import { timeoutPomise, PromiseWithState, savePromiseState, nextFulfilment } from "../utils/promise"; // return numeric (pg_type oid=1700) as float @@ -22,7 +22,7 @@ export interface DatabaseConfig { fileNamePrefix: string, readOnly: boolean, createDbIfNotExists: boolean, - postgres: CustomPostgresConfig, + postgres: CustomWritePostgresConfig, postgresReadOnly: CustomPostgresReadOnlyConfig } @@ -105,6 +105,11 @@ export class Postgres implements IDatabase { Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`); + if (this.config.postgres.maxActiveRequests && this.isReadQuery(type) + && this.activePostgresRequests > this.config.postgres.maxActiveRequests) { + throw new Error("Too many active postgres requests"); + } + const pendingQueries: PromiseWithState>[] = []; let tries = 0; let lastPool: Pool = null; @@ -120,6 +125,7 @@ export class Postgres implements IDatabase { pendingQueries.push(savePromiseState(lastPool.query({ text: query, values: params }))); const currentPromises = [...pendingQueries]; if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout))); + else if (this.config.postgres.timeout) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgres.timeout))); const queryResult = await nextFulfilment(currentPromises); this.activePostgresRequests--; @@ -150,12 +156,12 @@ export class Postgres implements IDatabase { } } + this.activePostgresRequests--; Logger.error(`prepare (postgres) try ${tries}: ${err}`); } } while (this.isReadQuery(type) && tries < maxTries() && this.activePostgresRequests < this.config.postgresReadOnly.stopRetryThreshold); - this.activePostgresRequests--; throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`); } @@ -229,4 +235,8 @@ export class Postgres implements IDatabase { return result; } + + highLoad() { + return this.activePostgresRequests > this.config.postgres.highLoadThreshold; + } } diff --git a/src/databases/Sqlite.ts b/src/databases/Sqlite.ts index f5dd371..ac03753 100644 --- a/src/databases/Sqlite.ts +++ b/src/databases/Sqlite.ts @@ -95,6 +95,10 @@ export class Sqlite implements IDatabase { private static processUpgradeQuery(query: string): string { return query.replace(/^.*--!sqlite-ignore/gm, ""); } + + highLoad() { + return false; + } } export interface SqliteConfig { diff --git a/src/index.ts b/src/index.ts index 88c6562..ec44960 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,7 @@ import { createServer } from "./app"; import { Logger } from "./utils/logger"; import { startAllCrons } from "./cronjob"; import { getCommit } from "./utils/getCommit"; +import { connectionPromise } from "./utils/redis"; async function init() { process.on("unhandledRejection", (error: any) => { @@ -14,6 +15,7 @@ async function init() { try { await initDb(); + await connectionPromise; } catch (e) { Logger.error(`Init Db: ${e}`); process.exit(1); diff --git a/src/routes/getSkipSegments.ts b/src/routes/getSkipSegments.ts index 6c93822..4dfe589 100644 --- a/src/routes/getSkipSegments.ts +++ b/src/routes/getSkipSegments.ts @@ -291,7 +291,7 @@ async function chooseSegments(videoID: VideoID, service: Service, segments: DBSe //Segments with less than -1 votes are already ignored before this function is called async function buildSegmentGroups(segments: DBSegment[]): Promise { const reputationPromises = segments.map(segment => - segment.userID ? getReputation(segment.userID).catch((e) => Logger.error(e)) : null); + segment.userID && !db.highLoad() ? getReputation(segment.userID).catch((e) => Logger.error(e)) : null); //Create groups of segments that are similar to eachother //Segments must be sorted by their startTime so that we can build groups chronologically: diff --git a/src/routes/getSkipSegmentsByHash.ts b/src/routes/getSkipSegmentsByHash.ts index 454f232..d6ea9de 100644 --- a/src/routes/getSkipSegmentsByHash.ts +++ b/src/routes/getSkipSegmentsByHash.ts @@ -3,6 +3,7 @@ import { getSegmentsByHash } from "./getSkipSegments"; import { Request, Response } from "express"; import { ActionType, Category, SegmentUUID, VideoIDHash, Service } from "../types/segments.model"; import { getService } from "../utils/getService"; +import { Logger } from "../utils/logger"; export async function getSkipSegmentsByHash(req: Request, res: Response): Promise { let hashPrefix = req.params.prefix as VideoIDHash; @@ -67,10 +68,16 @@ export async function getSkipSegmentsByHash(req: Request, res: Response): Promis // Get all video id's that match hash prefix const segments = await getSegmentsByHash(req, hashPrefix, categories, actionTypes, requiredSegments, service); - const output = Object.entries(segments).map(([videoID, data]) => ({ - videoID, - hash: data.hash, - segments: data.segments, - })); - return res.status(output.length === 0 ? 404 : 200).json(output); + try { + const output = Object.entries(segments).map(([videoID, data]) => ({ + videoID, + hash: data.hash, + segments: data.segments, + })); + return res.status(output.length === 0 ? 404 : 200).json(output); + } catch(e) { + Logger.error(`skip segments by hash error: ${e}`); + + return res.status(500).send("Internal server error"); + } } diff --git a/src/routes/getTopCategoryUsers.ts b/src/routes/getTopCategoryUsers.ts index 21bc9eb..74853b3 100644 --- a/src/routes/getTopCategoryUsers.ts +++ b/src/routes/getTopCategoryUsers.ts @@ -34,7 +34,7 @@ async function generateTopCategoryUsersStats(sortBy: string, category: string) { userNames.push(row.userName); viewCounts.push(row.viewCount); totalSubmissions.push(row.totalSubmissions); - minutesSaved.push(row.minutesSaved); + minutesSaved.push(category === "chapter" ? 0 : row.minutesSaved); } } @@ -56,6 +56,10 @@ export async function getTopCategoryUsers(req: Request, res: Response): Promise< return res.sendStatus(400); } + if (db.highLoad()) { + return res.status(503).send("Disabled for load reasons"); + } + //setup which sort type to use let sortBy = ""; if (sortType == 0) { diff --git a/src/routes/getTopUsers.ts b/src/routes/getTopUsers.ts index 552aaf9..f77bc8b 100644 --- a/src/routes/getTopUsers.ts +++ b/src/routes/getTopUsers.ts @@ -88,6 +88,10 @@ export async function getTopUsers(req: Request, res: Response): Promise { - const userCountQuery = `(SELECT COUNT(*) FROM (SELECT DISTINCT "userID" from "sponsorTimes") t) "userCount",`; - const row = await db.prepare("get", `SELECT ${req.query.countContributingUsers ? userCountQuery : ""} COUNT(*) as "totalSubmissions", - SUM("views") as "viewCount", SUM(("endTime" - "startTime") / 60 * "views") as "minutesSaved" FROM "sponsorTimes" WHERE "shadowHidden" != 1 AND "votes" >= 0 AND "actionType" != 'chapter'`, []); + const row = await getStats(!!req.query.countContributingUsers); + lastFetch = row; if (row !== undefined) { const extensionUsers = chromeUsersCache + firefoxUsersCache; @@ -43,6 +56,18 @@ export async function getTotalStats(req: Request, res: Response): Promise } } +function getStats(countContributingUsers: boolean): Promise { + if (db.highLoad()) { + return Promise.resolve(lastFetch); + } else { + const userCountQuery = `(SELECT COUNT(*) FROM (SELECT DISTINCT "userID" from "sponsorTimes") t) "userCount",`; + + return db.prepare("get", `SELECT ${countContributingUsers ? userCountQuery : ""} COUNT(*) as "totalSubmissions", + SUM("views") as "viewCount", SUM(("endTime" - "startTime") / 60 * "views") as "minutesSaved" FROM "sponsorTimes" WHERE "shadowHidden" != 1 AND "votes" >= 0 AND "actionType" != 'chapter'`, []); + } +} + + function updateExtensionUsers() { if (config.userCounterURL) { axios.get(`${config.userCounterURL}/api/v1/userCount`) @@ -68,7 +93,7 @@ function updateExtensionUsers() { const userDownloadsStartIndex = body.indexOf(matchingString); if (userDownloadsStartIndex >= 0) { const closingQuoteIndex = body.indexOf('"', userDownloadsStartIndex + matchingStringLen); - const userDownloadsStr = body.substr(userDownloadsStartIndex + matchingStringLen, closingQuoteIndex - userDownloadsStartIndex).replace(",","").replace(".",""); + const userDownloadsStr = body.substr(userDownloadsStartIndex + matchingStringLen, closingQuoteIndex - userDownloadsStartIndex).replace(",", "").replace(".", ""); chromeUsersCache = parseInt(userDownloadsStr); } else { diff --git a/src/routes/postSkipSegments.ts b/src/routes/postSkipSegments.ts index f3efaac..aaf8eda 100644 --- a/src/routes/postSkipSegments.ts +++ b/src/routes/postSkipSegments.ts @@ -266,9 +266,10 @@ async function checkEachSegmentValid(rawIP: IPAddress, paramUserID: UserID, user pass: false, errorCode: 403, errorMessage: - `Users have voted that new segments aren't needed for the following category: ` + + `Users have voted that all the segments required for this video have already been submitted for the following category: ` + `'${segments[i].category}'\n` + - `${lockedCategoryList[lockIndex].reason?.length !== 0 ? `\nReason: '${lockedCategoryList[lockIndex].reason}'` : ""}\n` + + `${lockedCategoryList[lockIndex].reason?.length !== 0 ? `\nReason: '${lockedCategoryList[lockIndex].reason}\n'` : ""}` + + `You may need to refresh if you don't see the segments.\n` + `${(segments[i].category === "sponsor" ? "\nMaybe the segment you are submitting is a different category that you have not enabled and is not a sponsor. " + "Categories that aren't sponsor, such as self-promotion can be enabled in the options.\n" : "")}` + `\nIf you believe this is incorrect, please contact someone on chat.sponsor.ajay.app, discord.gg/SponsorBlock or matrix.to/#/#sponsor:ajay.app` diff --git a/src/types/config.model.ts b/src/types/config.model.ts index 3450b72..4adaa51 100644 --- a/src/types/config.model.ts +++ b/src/types/config.model.ts @@ -7,6 +7,9 @@ interface RedisConfig extends redis.RedisClientOptions { getTimeout: number; maxConnections: number; maxWriteConnections: number; + stopWritingAfterResponseTime: number; + responseTimePause: number; + disableHashCache: boolean; } interface RedisReadOnlyConfig extends redis.RedisClientOptions { @@ -19,6 +22,12 @@ export interface CustomPostgresConfig extends PoolConfig { maxTries: number; } +export interface CustomWritePostgresConfig extends CustomPostgresConfig { + maxActiveRequests: number; + timeout: number; + highLoadThreshold: number; +} + export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig { weight: number; readTimeout: number; @@ -71,7 +80,7 @@ export interface SBSConfig { redisRead?: RedisReadOnlyConfig; redisRateLimit: boolean; maxRewardTimePerSegmentInSeconds?: number; - postgres?: CustomPostgresConfig; + postgres?: CustomWritePostgresConfig; postgresReadOnly?: CustomPostgresReadOnlyConfig; dumpDatabase?: DumpDatabase; diskCacheURL: string; diff --git a/src/utils/getHashCache.ts b/src/utils/getHashCache.ts index 7ff0ef0..0c15cc0 100644 --- a/src/utils/getHashCache.ts +++ b/src/utils/getHashCache.ts @@ -3,6 +3,7 @@ import { shaHashKey } from "../utils/redisKeys"; import { HashedValue } from "../types/hash.model"; import { Logger } from "../utils/logger"; import { getHash } from "../utils/getHash"; +import { config } from "../config"; const defaultedHashTimes = 5000; const cachedHashTimes = defaultedHashTimes - 1; @@ -19,20 +20,25 @@ export async function getHashCache(value: T, times = defaulted async function getFromRedis(key: HashedValue): Promise { const redisKey = shaHashKey(key); - try { - const reply = await redis.get(redisKey); + if (!config.redis?.disableHashCache) { + try { + const reply = await redis.get(redisKey); - if (reply) { - Logger.debug(`Got data from redis: ${reply}`); - return reply as T & HashedValue; + if (reply) { + Logger.debug(`Got data from redis: ${reply}`); + return reply as T & HashedValue; + } + } catch (err) { + Logger.error(err as string); } - } catch (err) { - Logger.error(err as string); } // Otherwise, calculate it const data = getHash(key, cachedHashTimes); - redis.set(redisKey, data).catch((err) => Logger.error(err)); + + if (!config.redis?.disableHashCache) { + redis.set(redisKey, data).catch((err) => Logger.error(err)); + } return data as T & HashedValue; } \ No newline at end of file diff --git a/src/utils/redis.ts b/src/utils/redis.ts index 68240ea..01761ae 100644 --- a/src/utils/redis.ts +++ b/src/utils/redis.ts @@ -39,21 +39,24 @@ let writeRequests = 0; const readResponseTime: number[] = []; const writeResponseTime: number[] = []; +let lastResponseTimeLimit = 0; const maxStoredTimes = 200; +export let connectionPromise = Promise.resolve(); + if (config.redis?.enabled) { Logger.info("Connected to redis"); const client = createClient(config.redis); const readClient = config.redisRead?.enabled ? createClient(config.redisRead) : null; - void client.connect(); // void as we don't care about the promise - void readClient?.connect(); + connectionPromise = client.connect(); + void readClient?.connect(); // void as we don't care about the promise exportClient = client as RedisSB; const get = client.get.bind(client); const getRead = readClient?.get?.bind(readClient); exportClient.get = (key) => new Promise((resolve, reject) => { - if (activeRequests > config.redis.maxConnections) { + if (config.redis.maxConnections && activeRequests > config.redis.maxConnections) { reject("Too many active requests"); return; } @@ -69,8 +72,13 @@ if (config.redis?.enabled) { activeRequests--; resolve(reply); - readResponseTime.push(Date.now() - start); + const responseTime = Date.now() - start; + readResponseTime.push(responseTime); if (readResponseTime.length > maxStoredTimes) readResponseTime.shift(); + if (config.redis.stopWritingAfterResponseTime + && responseTime > config.redis.stopWritingAfterResponseTime) { + lastResponseTimeLimit = Date.now(); + } }).catch((err) => { if (chosenGet === get) { lastClientFail = Date.now(); @@ -83,10 +91,12 @@ if (config.redis?.enabled) { }); }); - const setFun = >(func: (...args: T) => Promise , params: T): Promise => + const setFun = >(func: (...args: T) => Promise, params: T): Promise => new Promise((resolve, reject) => { - if (activeRequests > config.redis.maxWriteConnections) { - reject("Too many active requests"); + if ((config.redis.maxWriteConnections && activeRequests > config.redis.maxWriteConnections) + || (config.redis.responseTimePause + && Date.now() - lastResponseTimeLimit < config.redis.responseTimePause)) { + reject("Too many active requests to write"); return; } @@ -108,8 +118,10 @@ if (config.redis?.enabled) { }); }); - exportClient.set = (key, value) => setFun(client.set.bind(client), [key, value]); - exportClient.setEx = (key, seconds, value) => setFun(client.setEx.bind(client), [key, seconds, value]); + const set = client.set.bind(client); + const setEx = client.setEx.bind(client); + exportClient.set = (key, value) => setFun(set, [key, value]); + exportClient.setEx = (key, seconds, value) => setFun(setEx, [key, seconds, value]); exportClient.increment = (key) => new Promise((resolve, reject) => void client.multi() .incr(key)