Add locks to different write operations

This commit is contained in:
Ajay
2023-07-23 23:21:50 -04:00
parent b2081fe155
commit 8bcc781da7
5 changed files with 68 additions and 3 deletions

View File

@@ -12,6 +12,7 @@ import { isUserVIP } from "../utils/isUserVIP";
import { Logger } from "../utils/logger"; import { Logger } from "../utils/logger";
import crypto from "crypto"; import crypto from "crypto";
import { QueryCacher } from "../utils/queryCacher"; import { QueryCacher } from "../utils/queryCacher";
import { acquireLock } from "../utils/redisLock";
enum BrandingType { enum BrandingType {
Title, Title,
@@ -42,6 +43,14 @@ export async function postBranding(req: Request, res: Response) {
const hashedVideoID = await getHashCache(videoID, 1); const hashedVideoID = await getHashCache(videoID, 1);
const hashedIP = await getHashCache(getIP(req) + config.globalSalt as IPAddress); const hashedIP = await getHashCache(getIP(req) + config.globalSalt as IPAddress);
const lock = await acquireLock(`postBranding:${videoID}.${hashedUserID}`);
if (!lock.status) {
res.status(429).send("Vote already in progress");
return;
}
await new Promise((resolve) => setTimeout(resolve, 3000));
const now = Date.now(); const now = Date.now();
const voteType = 1; const voteType = 1;
@@ -104,6 +113,7 @@ export async function postBranding(req: Request, res: Response) {
QueryCacher.clearBrandingCache({ videoID, hashedVideoID, service }); QueryCacher.clearBrandingCache({ videoID, hashedVideoID, service });
res.status(200).send("OK"); res.status(200).send("OK");
lock.unlock();
} catch (e) { } catch (e) {
Logger.error(e as string); Logger.error(e as string);
res.status(500).send("Internal Server Error"); res.status(500).send("Internal Server Error");

View File

@@ -24,6 +24,7 @@ import { canSubmit } from "../utils/permissions";
import { getVideoDetails, videoDetails } from "../utils/getVideoDetails"; import { getVideoDetails, videoDetails } from "../utils/getVideoDetails";
import * as youtubeID from "../utils/youtubeID"; import * as youtubeID from "../utils/youtubeID";
import { banUser } from "./shadowBanUser"; import { banUser } from "./shadowBanUser";
import { acquireLock } from "../utils/redisLock";
type CheckResult = { type CheckResult = {
pass: boolean, pass: boolean,
@@ -508,6 +509,12 @@ export async function postSkipSegments(req: Request, res: Response): Promise<Res
return res.status(userWarningCheckResult.errorCode).send(userWarningCheckResult.errorMessage); return res.status(userWarningCheckResult.errorCode).send(userWarningCheckResult.errorMessage);
} }
const lock = await acquireLock(`postSkipSegment:${videoID}.${userID}`);
if (!lock.status) {
res.status(429).send("Submission already in progress");
return;
}
const isVIP = (await isUserVIP(userID)); const isVIP = (await isUserVIP(userID));
const isTempVIP = (await isUserTempVIP(userID, videoID)); const isTempVIP = (await isUserTempVIP(userID, videoID));
const rawIP = getIP(req); const rawIP = getIP(req);
@@ -618,6 +625,8 @@ export async function postSkipSegments(req: Request, res: Response): Promise<Res
for (let i = 0; i < segments.length; i++) { for (let i = 0; i < segments.length; i++) {
sendWebhooks(apiVideoDetails, userID, videoID, UUIDs[i], segments[i], service).catch((e) => Logger.error(`call send webhooks ${e}`)); sendWebhooks(apiVideoDetails, userID, videoID, UUIDs[i], segments[i], service).catch((e) => Logger.error(`call send webhooks ${e}`));
} }
lock.unlock();
return res.json(newSegments); return res.json(newSegments);
} }

View File

@@ -15,6 +15,7 @@ import { QueryCacher } from "../utils/queryCacher";
import axios from "axios"; import axios from "axios";
import { getVideoDetails, videoDetails } from "../utils/getVideoDetails"; import { getVideoDetails, videoDetails } from "../utils/getVideoDetails";
import { deleteLockCategories } from "./deleteLockCategories"; import { deleteLockCategories } from "./deleteLockCategories";
import { acquireLock } from "../utils/redisLock";
const voteTypes = { const voteTypes = {
normal: 0, normal: 0,
@@ -335,6 +336,11 @@ export async function vote(ip: IPAddress, UUID: SegmentUUID, paramUserID: UserID
const nonAnonUserID = await getHashCache(paramUserID); const nonAnonUserID = await getHashCache(paramUserID);
const userID = await getHashCache(paramUserID + UUID); const userID = await getHashCache(paramUserID + UUID);
const lock = await acquireLock(`voteOnSponsorTime:${UUID}.${paramUserID}`);
if (!lock.status) {
return { status: 429, message: "Vote already in progress" };
}
// To force a non 200, change this early // To force a non 200, change this early
const finalResponse: FinalResponse = { const finalResponse: FinalResponse = {
blockVote: false, blockVote: false,
@@ -526,6 +532,9 @@ export async function vote(ip: IPAddress, UUID: SegmentUUID, paramUserID: UserID
finalResponse finalResponse
}).catch((e) => Logger.error(`Sending vote webhook: ${e}`)); }).catch((e) => Logger.error(`Sending vote webhook: ${e}`));
} }
lock.unlock();
return { status: finalResponse.finalStatus, message: finalResponse.finalMessage ?? undefined }; return { status: finalResponse.finalStatus, message: finalResponse.finalMessage ?? undefined };
} catch (err) { } catch (err) {
Logger.error(err as string); Logger.error(err as string);

View File

@@ -1,6 +1,6 @@
import { config } from "../config"; import { config } from "../config";
import { Logger } from "./logger"; import { Logger } from "./logger";
import { createClient } from "redis"; import { SetOptions, createClient } from "redis";
import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from "@redis/client/dist/lib/commands"; import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from "@redis/client/dist/lib/commands";
import { RedisClientOptions } from "@redis/client/dist/lib/client"; import { RedisClientOptions } from "@redis/client/dist/lib/client";
import { RedisReply } from "rate-limit-redis"; import { RedisReply } from "rate-limit-redis";
@@ -16,7 +16,7 @@ export interface RedisStats {
interface RedisSB { interface RedisSB {
get(key: RedisCommandArgument): Promise<string>; get(key: RedisCommandArgument): Promise<string>;
set(key: RedisCommandArgument, value: RedisCommandArgument): Promise<string>; set(key: RedisCommandArgument, value: RedisCommandArgument, options?: SetOptions): Promise<string>;
setEx(key: RedisCommandArgument, seconds: number, value: RedisCommandArgument): Promise<string>; setEx(key: RedisCommandArgument, seconds: number, value: RedisCommandArgument): Promise<string>;
del(...keys: [RedisCommandArgument]): Promise<number>; del(...keys: [RedisCommandArgument]): Promise<number>;
increment?(key: RedisCommandArgument): Promise<RedisCommandRawReply[]>; increment?(key: RedisCommandArgument): Promise<RedisCommandRawReply[]>;
@@ -125,7 +125,7 @@ if (config.redis?.enabled) {
const set = client.set.bind(client); const set = client.set.bind(client);
const setEx = client.setEx.bind(client); const setEx = client.setEx.bind(client);
exportClient.set = (key, value) => setFun(set, [key, value]); exportClient.set = (key, value, options) => setFun(set, [key, value, options]);
exportClient.setEx = (key, seconds, value) => setFun(setEx, [key, seconds, value]); exportClient.setEx = (key, seconds, value) => setFun(setEx, [key, seconds, value]);
exportClient.increment = (key) => new Promise((resolve, reject) => exportClient.increment = (key) => new Promise((resolve, reject) =>
void client.multi() void client.multi()

37
src/utils/redisLock.ts Normal file
View File

@@ -0,0 +1,37 @@
import redis from "../utils/redis";
import { Logger } from "./logger";
const defaultTimeout = 5000;
export type AcquiredLock = {
status: false
} | {
status: true;
unlock: () => void;
};
export async function acquireLock(key: string, timeout = defaultTimeout): Promise<AcquiredLock> {
try {
const result = await redis.set(key, "1", {
PX: timeout,
NX: true
});
if (result) {
return {
status: true,
unlock: () => void redis.del(key).catch((err) => Logger.error(err))
};
} else {
return {
status: false
};
}
} catch (e) {
Logger.error(e as string);
}
return {
status: false
};
}