Add concurrent request limit

This commit is contained in:
Ajay
2022-09-20 23:41:36 -04:00
parent f6c68ec29c
commit f6f83fcbe4
3 changed files with 36 additions and 3 deletions

View File

@@ -76,7 +76,8 @@ addDefaults(config, {
port: 5432, port: 5432,
max: 10, max: 10,
idleTimeoutMillis: 10000, idleTimeoutMillis: 10000,
maxTries: 3 maxTries: 3,
maxConcurrentRequests: 3500
}, },
postgresReadOnly: { postgresReadOnly: {
enabled: false, enabled: false,
@@ -89,7 +90,8 @@ addDefaults(config, {
max: 10, max: 10,
idleTimeoutMillis: 10000, idleTimeoutMillis: 10000,
maxTries: 3, maxTries: 3,
fallbackOnFail: true fallbackOnFail: true,
maxConcurrentRequests: 3500
}, },
dumpDatabase: { dumpDatabase: {
enabled: false, enabled: false,

View File

@@ -33,6 +33,9 @@ export class Postgres implements IDatabase {
private poolRead: Pool; private poolRead: Pool;
private lastPoolReadFail = 0; private lastPoolReadFail = 0;
private concurrentRequests = 0;
private concurrentReadRequests = 0;
constructor(private config: DatabaseConfig) {} constructor(private config: DatabaseConfig) {}
async init(): Promise<void> { async init(): Promise<void> {
@@ -99,8 +102,23 @@ export class Postgres implements IDatabase {
Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`); Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`);
const pendingQueries: PromiseWithState<QueryResult<any>>[] = []; if (this.config.readOnly) {
if (this.concurrentReadRequests > this.config.postgresReadOnly?.maxConcurrentRequests) {
Logger.error(`prepare (postgres): cancelling read query because too many concurrent requests, query: ${query}`);
throw new Error("Too many concurrent requests");
}
this.concurrentReadRequests++;
} else {
if (this.concurrentRequests > this.config.postgres.maxConcurrentRequests) {
Logger.error(`prepare (postgres): cancelling query because too many concurrent requests, query: ${query}`);
throw new Error("Too many concurrent requests");
}
this.concurrentRequests++;
}
const pendingQueries: PromiseWithState<QueryResult<any>>[] = [];
let tries = 0; let tries = 0;
let lastPool: Pool = null; let lastPool: Pool = null;
const maxTries = () => (lastPool === this.pool const maxTries = () => (lastPool === this.pool
@@ -116,6 +134,12 @@ export class Postgres implements IDatabase {
if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout))); if (options.useReplica && maxTries() - tries > 1) currentPromises.push(savePromiseState(timeoutPomise(this.config.postgresReadOnly.readTimeout)));
const queryResult = await nextFulfilment(currentPromises); const queryResult = await nextFulfilment(currentPromises);
if (this.config.readOnly) {
this.concurrentReadRequests--;
} else {
this.concurrentRequests--;
}
switch (type) { switch (type) {
case "get": { case "get": {
const value = queryResult.rows[0]; const value = queryResult.rows[0];
@@ -143,6 +167,12 @@ export class Postgres implements IDatabase {
} }
} while (this.isReadQuery(type) && tries < maxTries()); } while (this.isReadQuery(type) && tries < maxTries());
if (this.config.readOnly) {
this.concurrentReadRequests--;
} else {
this.concurrentRequests--;
}
throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`); throw new Error(`prepare (postgres): ${type} ${query} failed after ${tries} tries`);
} }

View File

@@ -10,6 +10,7 @@ interface RedisConfig extends redis.RedisClientOptions {
export interface CustomPostgresConfig extends PoolConfig { export interface CustomPostgresConfig extends PoolConfig {
enabled: boolean; enabled: boolean;
maxTries: number; maxTries: number;
maxConcurrentRequests: number;
} }
export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig { export interface CustomPostgresReadOnlyConfig extends CustomPostgresConfig {