mirror of
https://github.com/ajayyy/SponsorBlockServer.git
synced 2025-12-12 06:27:10 +03:00
Wait for any successful query instead of just most recent
This commit is contained in:
@@ -1,10 +1,10 @@
|
|||||||
import { Logger } from "../utils/logger";
|
import { Logger } from "../utils/logger";
|
||||||
import { IDatabase, QueryOption, QueryType } from "./IDatabase";
|
import { IDatabase, QueryOption, QueryType } from "./IDatabase";
|
||||||
import { Client, Pool, types } from "pg";
|
import { Client, Pool, QueryResult, types } from "pg";
|
||||||
|
|
||||||
import fs from "fs";
|
import fs from "fs";
|
||||||
import { CustomPostgresConfig, CustomPostgresReadOnlyConfig } from "../types/config.model";
|
import { CustomPostgresConfig, CustomPostgresReadOnlyConfig } from "../types/config.model";
|
||||||
import { promiseTimeout } from "../utils/promiseTimeout";
|
import { timeoutPomise, PromiseWithState, savePromiseState, nextFulfilment } from "../utils/promiseTimeout";
|
||||||
|
|
||||||
// return numeric (pg_type oid=1700) as float
|
// return numeric (pg_type oid=1700) as float
|
||||||
types.setTypeParser(1700, function(val) {
|
types.setTypeParser(1700, function(val) {
|
||||||
@@ -104,6 +104,8 @@ export class Postgres implements IDatabase {
|
|||||||
|
|
||||||
Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`);
|
Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`);
|
||||||
|
|
||||||
|
const pendingQueries: PromiseWithState<QueryResult<any>>[] = [];
|
||||||
|
|
||||||
let tries = 0;
|
let tries = 0;
|
||||||
let lastPool: Pool = null;
|
let lastPool: Pool = null;
|
||||||
do {
|
do {
|
||||||
@@ -111,7 +113,11 @@ export class Postgres implements IDatabase {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
lastPool = this.getPool(type, options);
|
lastPool = this.getPool(type, options);
|
||||||
const queryResult = await promiseTimeout(lastPool.query({ text: query, values: params }), options.useReplica ? this.readTimeout : null);
|
|
||||||
|
pendingQueries.push(savePromiseState(lastPool.query({ text: query, values: params })));
|
||||||
|
const currentPromises = [...pendingQueries];
|
||||||
|
if (options.useReplica) currentPromises.push(savePromiseState(timeoutPomise(this.readTimeout)));
|
||||||
|
const queryResult = await nextFulfilment(currentPromises);
|
||||||
|
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case "get": {
|
case "get": {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import { Logger } from "../utils/logger";
|
|||||||
import { QueryCacher } from "../utils/queryCacher";
|
import { QueryCacher } from "../utils/queryCacher";
|
||||||
import { getReputation } from "../utils/reputation";
|
import { getReputation } from "../utils/reputation";
|
||||||
import { getService } from "../utils/getService";
|
import { getService } from "../utils/getService";
|
||||||
import { promiseTimeout } from "../utils/promiseTimeout";
|
import { promiseOrTimeout } from "../utils/promiseTimeout";
|
||||||
|
|
||||||
|
|
||||||
async function prepareCategorySegments(req: Request, videoID: VideoID, service: Service, segments: DBSegment[], cache: SegmentCache = { shadowHiddenSegmentIPs: {} }, useCache: boolean): Promise<Segment[]> {
|
async function prepareCategorySegments(req: Request, videoID: VideoID, service: Service, segments: DBSegment[], cache: SegmentCache = { shadowHiddenSegmentIPs: {} }, useCache: boolean): Promise<Segment[]> {
|
||||||
@@ -40,7 +40,7 @@ async function prepareCategorySegments(req: Request, videoID: VideoID, service:
|
|||||||
const fetchData = () => privateDB.prepare("all", 'SELECT "hashedIP" FROM "sponsorTimes" WHERE "videoID" = ? AND "timeSubmitted" = ? AND "service" = ?',
|
const fetchData = () => privateDB.prepare("all", 'SELECT "hashedIP" FROM "sponsorTimes" WHERE "videoID" = ? AND "timeSubmitted" = ? AND "service" = ?',
|
||||||
[videoID, segment.timeSubmitted, service], { useReplica: true }) as Promise<{ hashedIP: HashedIP }[]>;
|
[videoID, segment.timeSubmitted, service], { useReplica: true }) as Promise<{ hashedIP: HashedIP }[]>;
|
||||||
try {
|
try {
|
||||||
cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = await promiseTimeout(QueryCacher.get(fetchData, shadowHiddenIPKey(videoID, segment.timeSubmitted, service)), 150);
|
cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = await promiseOrTimeout(QueryCacher.get(fetchData, shadowHiddenIPKey(videoID, segment.timeSubmitted, service)), 150);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// give up on shadowhide for now
|
// give up on shadowhide for now
|
||||||
cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = null;
|
cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = null;
|
||||||
|
|||||||
@@ -1,11 +1,50 @@
|
|||||||
export function promiseTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
|
export class PromiseTimeoutError<T> extends Error {
|
||||||
|
promise?: Promise<T>;
|
||||||
|
|
||||||
|
constructor(promise?: Promise<T>) {
|
||||||
|
super("Promise timed out");
|
||||||
|
|
||||||
|
this.promise = promise;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PromiseWithState<T> extends Promise<T> {
|
||||||
|
isResolved: boolean;
|
||||||
|
isRejected: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function promiseOrTimeout<T>(promise: Promise<T>, timeout?: number): Promise<T> {
|
||||||
|
return Promise.race([timeoutPomise<T>(timeout), promise]);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function timeoutPomise<T>(timeout?: number): Promise<T> {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (timeout) {
|
if (timeout) {
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
reject(new Error("Promise timed out"));
|
reject(new PromiseTimeoutError());
|
||||||
}, timeout);
|
}, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
promise.then(resolve, reject);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function savePromiseState<T>(promise: Promise<T>): PromiseWithState<T> {
|
||||||
|
const p = promise as PromiseWithState<T>;
|
||||||
|
p.isResolved = false;
|
||||||
|
p.isRejected = false;
|
||||||
|
|
||||||
|
p.then(() => {
|
||||||
|
p.isResolved = true;
|
||||||
|
}).catch(() => {
|
||||||
|
p.isRejected = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows rejection or resolve
|
||||||
|
* Allows past resolves too, but not past rejections
|
||||||
|
*/
|
||||||
|
export function nextFulfilment<T>(promises: PromiseWithState<T>[]): Promise<T> {
|
||||||
|
return Promise.race(promises.filter((p) => !p.isRejected));
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user