Merge branch 'master' of https://github.com/ajayyy/SponsorBlockServer into submitPrivateVideo

This commit is contained in:
Michael C
2022-11-21 14:40:46 -05:00
15 changed files with 134 additions and 37 deletions

View File

@@ -76,7 +76,10 @@ addDefaults(config, {
port: 5432, port: 5432,
max: 10, max: 10,
idleTimeoutMillis: 10000, idleTimeoutMillis: 10000,
maxTries: 3 maxTries: 3,
maxActiveRequests: 0,
timeout: 60000,
highLoadThreshold: 10
}, },
postgresReadOnly: { postgresReadOnly: {
enabled: false, enabled: false,
@@ -138,7 +141,11 @@ addDefaults(config, {
expiryTime: 24 * 60 * 60, expiryTime: 24 * 60 * 60,
getTimeout: 40, getTimeout: 40,
maxConnections: 15000, maxConnections: 15000,
maxWriteConnections: 1000 maxWriteConnections: 1000,
commandsQueueMaxLength: 3000,
stopWritingAfterResponseTime: 50,
responseTimePause: 1000,
disableHashCache: false
}, },
redisRead: { redisRead: {
enabled: false, enabled: false,

View File

@@ -7,6 +7,8 @@ export interface IDatabase {
init(): Promise<void>; init(): Promise<void>;
prepare(type: QueryType, query: string, params?: any[], options?: QueryOption): Promise<any | any[] | void>; prepare(type: QueryType, query: string, params?: any[], options?: QueryOption): Promise<any | any[] | void>;
highLoad(): boolean;
} }
export type QueryType = "get" | "all" | "run"; export type QueryType = "get" | "all" | "run";

View File

@@ -32,4 +32,8 @@ export class Mysql implements IDatabase {
} }
} }
highLoad() {
return false;
}
} }

View File

@@ -3,7 +3,7 @@ import { IDatabase, QueryOption, QueryType } from "./IDatabase";
import { Client, Pool, QueryResult, types } from "pg"; import { Client, Pool, QueryResult, types } from "pg";
import fs from "fs"; import fs from "fs";
import { CustomPostgresConfig, CustomPostgresReadOnlyConfig } from "../types/config.model"; import { CustomPostgresReadOnlyConfig, CustomWritePostgresConfig } from "../types/config.model";
import { timeoutPomise, PromiseWithState, savePromiseState, nextFulfilment } from "../utils/promise"; import { timeoutPomise, PromiseWithState, savePromiseState, nextFulfilment } from "../utils/promise";
// return numeric (pg_type oid=1700) as float // return numeric (pg_type oid=1700) as float
@@ -22,7 +22,7 @@ export interface DatabaseConfig {
fileNamePrefix: string, fileNamePrefix: string,
readOnly: boolean, readOnly: boolean,
createDbIfNotExists: boolean, createDbIfNotExists: boolean,
postgres: CustomPostgresConfig, postgres: CustomWritePostgresConfig,
postgresReadOnly: CustomPostgresReadOnlyConfig postgresReadOnly: CustomPostgresReadOnlyConfig
} }
@@ -105,6 +105,11 @@ export class Postgres implements IDatabase {
Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`); Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`);
if (this.config.postgres.maxActiveRequests && this.isReadQuery(type)
&& this.activePostgresRequests > this.config.postgres.maxActiveRequests) {
throw new Error("Too many active postgres requests");
}
const pendingQueries: PromiseWithState<QueryResult<any>>[] = []; const pendingQueries: PromiseWithState<QueryResult<any>>[] = [];
let tries = 0; let tries = 0;
let lastPool: Pool = null; let lastPool: Pool = null;
@@ -120,6 +125,7 @@ export class Postgres implements IDatabase {
pendingQueries.push(savePromiseState(lastPool.query({ text: query, values: params }))); pendingQueries.push(savePromiseState(lastPool.query({ text: query, values: params })));
const currentPromises = [...pendingQueries]; const currentPromises = [...pendingQueries];
if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout))); if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout)));
else if (this.config.postgres.timeout) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgres.timeout)));
const queryResult = await nextFulfilment(currentPromises); const queryResult = await nextFulfilment(currentPromises);
this.activePostgresRequests--; this.activePostgresRequests--;
@@ -150,12 +156,12 @@ export class Postgres implements IDatabase {
} }
} }
this.activePostgresRequests--;
Logger.error(`prepare (postgres) try ${tries}: ${err}`); Logger.error(`prepare (postgres) try ${tries}: ${err}`);
} }
} while (this.isReadQuery(type) && tries < maxTries() } while (this.isReadQuery(type) && tries < maxTries()
&& this.activePostgresRequests < this.config.postgresReadOnly.stopRetryThreshold); && this.activePostgresRequests < this.config.postgresReadOnly.stopRetryThreshold);
this.activePostgresRequests--;
throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`); throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`);
} }
@@ -229,4 +235,8 @@ export class Postgres implements IDatabase {
return result; return result;
} }
highLoad() {
return this.activePostgresRequests > this.config.postgres.highLoadThreshold;
}
} }

View File

@@ -95,6 +95,10 @@ export class Sqlite implements IDatabase {
private static processUpgradeQuery(query: string): string { private static processUpgradeQuery(query: string): string {
return query.replace(/^.*--!sqlite-ignore/gm, ""); return query.replace(/^.*--!sqlite-ignore/gm, "");
} }
highLoad() {
return false;
}
} }
export interface SqliteConfig { export interface SqliteConfig {

View File

@@ -4,6 +4,7 @@ import { createServer } from "./app";
import { Logger } from "./utils/logger"; import { Logger } from "./utils/logger";
import { startAllCrons } from "./cronjob"; import { startAllCrons } from "./cronjob";
import { getCommit } from "./utils/getCommit"; import { getCommit } from "./utils/getCommit";
import { connectionPromise } from "./utils/redis";
async function init() { async function init() {
process.on("unhandledRejection", (error: any) => { process.on("unhandledRejection", (error: any) => {
@@ -14,6 +15,7 @@ async function init() {
try { try {
await initDb(); await initDb();
await connectionPromise;
} catch (e) { } catch (e) {
Logger.error(`Init Db: ${e}`); Logger.error(`Init Db: ${e}`);
process.exit(1); process.exit(1);

View File

@@ -291,7 +291,7 @@ async function chooseSegments(videoID: VideoID, service: Service, segments: DBSe
//Segments with less than -1 votes are already ignored before this function is called //Segments with less than -1 votes are already ignored before this function is called
async function buildSegmentGroups(segments: DBSegment[]): Promise<OverlappingSegmentGroup[]> { async function buildSegmentGroups(segments: DBSegment[]): Promise<OverlappingSegmentGroup[]> {
const reputationPromises = segments.map(segment => const reputationPromises = segments.map(segment =>
segment.userID ? getReputation(segment.userID).catch((e) => Logger.error(e)) : null); segment.userID && !db.highLoad() ? getReputation(segment.userID).catch((e) => Logger.error(e)) : null);
//Create groups of segments that are similar to eachother //Create groups of segments that are similar to eachother
//Segments must be sorted by their startTime so that we can build groups chronologically: //Segments must be sorted by their startTime so that we can build groups chronologically:

View File

@@ -3,6 +3,7 @@ import { getSegmentsByHash } from "./getSkipSegments";
import { Request, Response } from "express"; import { Request, Response } from "express";
import { ActionType, Category, SegmentUUID, VideoIDHash, Service } from "../types/segments.model"; import { ActionType, Category, SegmentUUID, VideoIDHash, Service } from "../types/segments.model";
import { getService } from "../utils/getService"; import { getService } from "../utils/getService";
import { Logger } from "../utils/logger";
export async function getSkipSegmentsByHash(req: Request, res: Response): Promise<Response> { export async function getSkipSegmentsByHash(req: Request, res: Response): Promise<Response> {
let hashPrefix = req.params.prefix as VideoIDHash; let hashPrefix = req.params.prefix as VideoIDHash;
@@ -67,10 +68,16 @@ export async function getSkipSegmentsByHash(req: Request, res: Response): Promis
// Get all video id's that match hash prefix // Get all video id's that match hash prefix
const segments = await getSegmentsByHash(req, hashPrefix, categories, actionTypes, requiredSegments, service); const segments = await getSegmentsByHash(req, hashPrefix, categories, actionTypes, requiredSegments, service);
const output = Object.entries(segments).map(([videoID, data]) => ({ try {
videoID, const output = Object.entries(segments).map(([videoID, data]) => ({
hash: data.hash, videoID,
segments: data.segments, hash: data.hash,
})); segments: data.segments,
return res.status(output.length === 0 ? 404 : 200).json(output); }));
return res.status(output.length === 0 ? 404 : 200).json(output);
} catch(e) {
Logger.error(`skip segments by hash error: ${e}`);
return res.status(500).send("Internal server error");
}
} }

View File

@@ -34,7 +34,7 @@ async function generateTopCategoryUsersStats(sortBy: string, category: string) {
userNames.push(row.userName); userNames.push(row.userName);
viewCounts.push(row.viewCount); viewCounts.push(row.viewCount);
totalSubmissions.push(row.totalSubmissions); totalSubmissions.push(row.totalSubmissions);
minutesSaved.push(row.minutesSaved); minutesSaved.push(category === "chapter" ? 0 : row.minutesSaved);
} }
} }
@@ -56,6 +56,10 @@ export async function getTopCategoryUsers(req: Request, res: Response): Promise<
return res.sendStatus(400); return res.sendStatus(400);
} }
if (db.highLoad()) {
return res.status(503).send("Disabled for load reasons");
}
//setup which sort type to use //setup which sort type to use
let sortBy = ""; let sortBy = "";
if (sortType == 0) { if (sortType == 0) {

View File

@@ -88,6 +88,10 @@ export async function getTopUsers(req: Request, res: Response): Promise<Response
return res.sendStatus(400); return res.sendStatus(400);
} }
if (db.highLoad()) {
return res.status(503).send("Disabled for load reasons");
}
const stats = await getTopUsersWithCache(sortBy, categoryStatsEnabled); const stats = await getTopUsersWithCache(sortBy, categoryStatsEnabled);
//send this result //send this result

View File

@@ -12,13 +12,26 @@ let firefoxUsersCache = 0;
let apiUsersCache = 0; let apiUsersCache = 0;
let lastUserCountCheck = 0; let lastUserCountCheck = 0;
interface DBStatsData {
userCount: number,
viewCount: number,
totalSubmissions: number,
minutesSaved: number
}
let lastFetch: DBStatsData = {
userCount: 0,
viewCount: 0,
totalSubmissions: 0,
minutesSaved: 0
};
updateExtensionUsers(); updateExtensionUsers();
export async function getTotalStats(req: Request, res: Response): Promise<void> { export async function getTotalStats(req: Request, res: Response): Promise<void> {
const userCountQuery = `(SELECT COUNT(*) FROM (SELECT DISTINCT "userID" from "sponsorTimes") t) "userCount",`;
const row = await db.prepare("get", `SELECT ${req.query.countContributingUsers ? userCountQuery : ""} COUNT(*) as "totalSubmissions", const row = await getStats(!!req.query.countContributingUsers);
SUM("views") as "viewCount", SUM(("endTime" - "startTime") / 60 * "views") as "minutesSaved" FROM "sponsorTimes" WHERE "shadowHidden" != 1 AND "votes" >= 0 AND "actionType" != 'chapter'`, []); lastFetch = row;
if (row !== undefined) { if (row !== undefined) {
const extensionUsers = chromeUsersCache + firefoxUsersCache; const extensionUsers = chromeUsersCache + firefoxUsersCache;
@@ -43,6 +56,18 @@ export async function getTotalStats(req: Request, res: Response): Promise<void>
} }
} }
function getStats(countContributingUsers: boolean): Promise<DBStatsData> {
if (db.highLoad()) {
return Promise.resolve(lastFetch);
} else {
const userCountQuery = `(SELECT COUNT(*) FROM (SELECT DISTINCT "userID" from "sponsorTimes") t) "userCount",`;
return db.prepare("get", `SELECT ${countContributingUsers ? userCountQuery : ""} COUNT(*) as "totalSubmissions",
SUM("views") as "viewCount", SUM(("endTime" - "startTime") / 60 * "views") as "minutesSaved" FROM "sponsorTimes" WHERE "shadowHidden" != 1 AND "votes" >= 0 AND "actionType" != 'chapter'`, []);
}
}
function updateExtensionUsers() { function updateExtensionUsers() {
if (config.userCounterURL) { if (config.userCounterURL) {
axios.get(`${config.userCounterURL}/api/v1/userCount`) axios.get(`${config.userCounterURL}/api/v1/userCount`)
@@ -68,7 +93,7 @@ function updateExtensionUsers() {
const userDownloadsStartIndex = body.indexOf(matchingString); const userDownloadsStartIndex = body.indexOf(matchingString);
if (userDownloadsStartIndex >= 0) { if (userDownloadsStartIndex >= 0) {
const closingQuoteIndex = body.indexOf('"', userDownloadsStartIndex + matchingStringLen); const closingQuoteIndex = body.indexOf('"', userDownloadsStartIndex + matchingStringLen);
const userDownloadsStr = body.substr(userDownloadsStartIndex + matchingStringLen, closingQuoteIndex - userDownloadsStartIndex).replace(",","").replace(".",""); const userDownloadsStr = body.substr(userDownloadsStartIndex + matchingStringLen, closingQuoteIndex - userDownloadsStartIndex).replace(",", "").replace(".", "");
chromeUsersCache = parseInt(userDownloadsStr); chromeUsersCache = parseInt(userDownloadsStr);
} }
else { else {

View File

@@ -266,9 +266,10 @@ async function checkEachSegmentValid(rawIP: IPAddress, paramUserID: UserID, user
pass: false, pass: false,
errorCode: 403, errorCode: 403,
errorMessage: errorMessage:
`Users have voted that new segments aren't needed for the following category: ` + `Users have voted that all the segments required for this video have already been submitted for the following category: ` +
`'${segments[i].category}'\n` + `'${segments[i].category}'\n` +
`${lockedCategoryList[lockIndex].reason?.length !== 0 ? `\nReason: '${lockedCategoryList[lockIndex].reason}'` : ""}\n` + `${lockedCategoryList[lockIndex].reason?.length !== 0 ? `\nReason: '${lockedCategoryList[lockIndex].reason}\n'` : ""}` +
`You may need to refresh if you don't see the segments.\n` +
`${(segments[i].category === "sponsor" ? "\nMaybe the segment you are submitting is a different category that you have not enabled and is not a sponsor. " + `${(segments[i].category === "sponsor" ? "\nMaybe the segment you are submitting is a different category that you have not enabled and is not a sponsor. " +
"Categories that aren't sponsor, such as self-promotion can be enabled in the options.\n" : "")}` + "Categories that aren't sponsor, such as self-promotion can be enabled in the options.\n" : "")}` +
`\nIf you believe this is incorrect, please contact someone on chat.sponsor.ajay.app, discord.gg/SponsorBlock or matrix.to/#/#sponsor:ajay.app` `\nIf you believe this is incorrect, please contact someone on chat.sponsor.ajay.app, discord.gg/SponsorBlock or matrix.to/#/#sponsor:ajay.app`

View File

@@ -7,6 +7,9 @@ interface RedisConfig extends redis.RedisClientOptions {
getTimeout: number; getTimeout: number;
maxConnections: number; maxConnections: number;
maxWriteConnections: number; maxWriteConnections: number;
stopWritingAfterResponseTime: number;
responseTimePause: number;
disableHashCache: boolean;
} }
interface RedisReadOnlyConfig extends redis.RedisClientOptions { interface RedisReadOnlyConfig extends redis.RedisClientOptions {
@@ -19,6 +22,12 @@ export interface CustomPostgresConfig extends PoolConfig {
maxTries: number; maxTries: number;
} }
export interface CustomWritePostgresConfig extends CustomPostgresConfig {
maxActiveRequests: number;
timeout: number;
highLoadThreshold: number;
}
export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig { export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig {
weight: number; weight: number;
readTimeout: number; readTimeout: number;
@@ -71,7 +80,7 @@ export interface SBSConfig {
redisRead?: RedisReadOnlyConfig; redisRead?: RedisReadOnlyConfig;
redisRateLimit: boolean; redisRateLimit: boolean;
maxRewardTimePerSegmentInSeconds?: number; maxRewardTimePerSegmentInSeconds?: number;
postgres?: CustomPostgresConfig; postgres?: CustomWritePostgresConfig;
postgresReadOnly?: CustomPostgresReadOnlyConfig; postgresReadOnly?: CustomPostgresReadOnlyConfig;
dumpDatabase?: DumpDatabase; dumpDatabase?: DumpDatabase;
diskCacheURL: string; diskCacheURL: string;

View File

@@ -3,6 +3,7 @@ import { shaHashKey } from "../utils/redisKeys";
import { HashedValue } from "../types/hash.model"; import { HashedValue } from "../types/hash.model";
import { Logger } from "../utils/logger"; import { Logger } from "../utils/logger";
import { getHash } from "../utils/getHash"; import { getHash } from "../utils/getHash";
import { config } from "../config";
const defaultedHashTimes = 5000; const defaultedHashTimes = 5000;
const cachedHashTimes = defaultedHashTimes - 1; const cachedHashTimes = defaultedHashTimes - 1;
@@ -19,20 +20,25 @@ export async function getHashCache<T extends string>(value: T, times = defaulted
async function getFromRedis<T extends string>(key: HashedValue): Promise<T & HashedValue> { async function getFromRedis<T extends string>(key: HashedValue): Promise<T & HashedValue> {
const redisKey = shaHashKey(key); const redisKey = shaHashKey(key);
try { if (!config.redis?.disableHashCache) {
const reply = await redis.get(redisKey); try {
const reply = await redis.get(redisKey);
if (reply) { if (reply) {
Logger.debug(`Got data from redis: ${reply}`); Logger.debug(`Got data from redis: ${reply}`);
return reply as T & HashedValue; return reply as T & HashedValue;
}
} catch (err) {
Logger.error(err as string);
} }
} catch (err) {
Logger.error(err as string);
} }
// Otherwise, calculate it // Otherwise, calculate it
const data = getHash(key, cachedHashTimes); const data = getHash(key, cachedHashTimes);
redis.set(redisKey, data).catch((err) => Logger.error(err));
if (!config.redis?.disableHashCache) {
redis.set(redisKey, data).catch((err) => Logger.error(err));
}
return data as T & HashedValue; return data as T & HashedValue;
} }

View File

@@ -39,21 +39,24 @@ let writeRequests = 0;
const readResponseTime: number[] = []; const readResponseTime: number[] = [];
const writeResponseTime: number[] = []; const writeResponseTime: number[] = [];
let lastResponseTimeLimit = 0;
const maxStoredTimes = 200; const maxStoredTimes = 200;
export let connectionPromise = Promise.resolve();
if (config.redis?.enabled) { if (config.redis?.enabled) {
Logger.info("Connected to redis"); Logger.info("Connected to redis");
const client = createClient(config.redis); const client = createClient(config.redis);
const readClient = config.redisRead?.enabled ? createClient(config.redisRead) : null; const readClient = config.redisRead?.enabled ? createClient(config.redisRead) : null;
void client.connect(); // void as we don't care about the promise connectionPromise = client.connect();
void readClient?.connect(); void readClient?.connect(); // void as we don't care about the promise
exportClient = client as RedisSB; exportClient = client as RedisSB;
const get = client.get.bind(client); const get = client.get.bind(client);
const getRead = readClient?.get?.bind(readClient); const getRead = readClient?.get?.bind(readClient);
exportClient.get = (key) => new Promise((resolve, reject) => { exportClient.get = (key) => new Promise((resolve, reject) => {
if (activeRequests > config.redis.maxConnections) { if (config.redis.maxConnections && activeRequests > config.redis.maxConnections) {
reject("Too many active requests"); reject("Too many active requests");
return; return;
} }
@@ -69,8 +72,13 @@ if (config.redis?.enabled) {
activeRequests--; activeRequests--;
resolve(reply); resolve(reply);
readResponseTime.push(Date.now() - start); const responseTime = Date.now() - start;
readResponseTime.push(responseTime);
if (readResponseTime.length > maxStoredTimes) readResponseTime.shift(); if (readResponseTime.length > maxStoredTimes) readResponseTime.shift();
if (config.redis.stopWritingAfterResponseTime
&& responseTime > config.redis.stopWritingAfterResponseTime) {
lastResponseTimeLimit = Date.now();
}
}).catch((err) => { }).catch((err) => {
if (chosenGet === get) { if (chosenGet === get) {
lastClientFail = Date.now(); lastClientFail = Date.now();
@@ -83,10 +91,12 @@ if (config.redis?.enabled) {
}); });
}); });
const setFun = <T extends Array<any>>(func: (...args: T) => Promise<string> , params: T): Promise<string> => const setFun = <T extends Array<any>>(func: (...args: T) => Promise<string>, params: T): Promise<string> =>
new Promise((resolve, reject) => { new Promise((resolve, reject) => {
if (activeRequests > config.redis.maxWriteConnections) { if ((config.redis.maxWriteConnections && activeRequests > config.redis.maxWriteConnections)
reject("Too many active requests"); || (config.redis.responseTimePause
&& Date.now() - lastResponseTimeLimit < config.redis.responseTimePause)) {
reject("Too many active requests to write");
return; return;
} }
@@ -108,8 +118,10 @@ if (config.redis?.enabled) {
}); });
}); });
exportClient.set = (key, value) => setFun(client.set.bind(client), [key, value]); const set = client.set.bind(client);
exportClient.setEx = (key, seconds, value) => setFun(client.setEx.bind(client), [key, seconds, value]); const setEx = client.setEx.bind(client);
exportClient.set = (key, value) => setFun(set, [key, value]);
exportClient.setEx = (key, seconds, value) => setFun(setEx, [key, seconds, value]);
exportClient.increment = (key) => new Promise((resolve, reject) => exportClient.increment = (key) => new Promise((resolve, reject) =>
void client.multi() void client.multi()
.incr(key) .incr(key)