From f6f83fcbe43fb93b0f0391b6f392a4e9895cefa5 Mon Sep 17 00:00:00 2001 From: Ajay Date: Tue, 20 Sep 2022 23:41:36 -0400 Subject: [PATCH] Add concurrent request limit --- src/config.ts | 6 ++++-- src/databases/Postgres.ts | 32 +++++++++++++++++++++++++++++++- src/types/config.model.ts | 1 + 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/config.ts b/src/config.ts index 39debed..70e1fc3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -76,7 +76,8 @@ addDefaults(config, { port: 5432, max: 10, idleTimeoutMillis: 10000, - maxTries: 3 + maxTries: 3, + maxConcurrentRequests: 3500 }, postgresReadOnly: { enabled: false, @@ -89,7 +90,8 @@ addDefaults(config, { max: 10, idleTimeoutMillis: 10000, maxTries: 3, - fallbackOnFail: true + fallbackOnFail: true, + maxConcurrentRequests: 3500 }, dumpDatabase: { enabled: false, diff --git a/src/databases/Postgres.ts b/src/databases/Postgres.ts index 389257a..01f59b1 100644 --- a/src/databases/Postgres.ts +++ b/src/databases/Postgres.ts @@ -33,6 +33,9 @@ export class Postgres implements IDatabase { private poolRead: Pool; private lastPoolReadFail = 0; + private concurrentRequests = 0; + private concurrentReadRequests = 0; + constructor(private config: DatabaseConfig) {} async init(): Promise { @@ -99,8 +102,23 @@ export class Postgres implements IDatabase { Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`); - const pendingQueries: PromiseWithState>[] = []; + if (this.config.readOnly) { + if (this.concurrentReadRequests > this.config.postgresReadOnly?.maxConcurrentRequests) { + Logger.error(`prepare (postgres): cancelling read query because too many concurrent requests, query: ${query}`); + throw new Error("Too many concurrent requests"); + } + this.concurrentReadRequests++; + } else { + if (this.concurrentRequests > this.config.postgres.maxConcurrentRequests) { + Logger.error(`prepare (postgres): cancelling query because too many concurrent requests, query: ${query}`); + throw new Error("Too many concurrent requests"); + } + + this.concurrentRequests++; + } + + const pendingQueries: PromiseWithState>[] = []; let tries = 0; let lastPool: Pool = null; const maxTries = () => (lastPool === this.pool @@ -116,6 +134,12 @@ export class Postgres implements IDatabase { if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout))); const queryResult = await nextFulfilment(currentPromises); + if (this.config.readOnly) { + this.concurrentReadRequests--; + } else { + this.concurrentRequests--; + } + switch (type) { case "get": { const value = queryResult.rows[0]; @@ -143,6 +167,12 @@ export class Postgres implements IDatabase { } } while (this.isReadQuery(type) && tries < maxTries()); + if (this.config.readOnly) { + this.concurrentReadRequests--; + } else { + this.concurrentRequests--; + } + throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`); } diff --git a/src/types/config.model.ts b/src/types/config.model.ts index 4e89a02..406643c 100644 --- a/src/types/config.model.ts +++ b/src/types/config.model.ts @@ -10,6 +10,7 @@ interface RedisConfig extends redis.RedisClientOptions { export interface CustomPostgresConfig extends PoolConfig { enabled: boolean; maxTries: number; + maxConcurrentRequests: number; } export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig {