diff --git a/src/services/Aggregator/ws/index.ts b/src/services/Aggregator/ws/index.ts index 7f5b795..1b83aba 100644 --- a/src/services/Aggregator/ws/index.ts +++ b/src/services/Aggregator/ws/index.ts @@ -23,6 +23,23 @@ import { objectKeys } from '../../../utils/objectKeys.js'; // import errorSchema from './schemas/errorSchema'; const UNSUBSCRIBE = 'u'; +const SERVER_PING_INTERVAL = 30000; +const HEARBEAT_THRESHOLD = 5000; + +const messageSchema = z.union([ + initMessageSchema, + pingPongMessageSchema, + addressUpdateSchema, + cfdAddressUpdateSchema, + assetPairsConfigSchema, + assetPairConfigSchema, + brokerMessageSchema, + orderBookSchema, + swapInfoSchema, + futuresTradeInfoSchema, + errorSchema, + unsubscriptionDoneSchema, +]); type SwapInfoSubscriptionPayload = { // d: string, // swap request UUID, set by client side @@ -201,6 +218,8 @@ class AggregatorWS { private readonly wsUrl: string; + private isAlive = false; + get api() { return this.wsUrl; } @@ -211,28 +230,51 @@ class AggregatorWS { this.wsUrl = wsUrl; } - // readonly messageQueue: Message[] = []; - - private sendRaw(data: BufferLike) { - if (this.ws?.readyState === 1) { - this.ws.send(data); - } else if (this.ws?.readyState === 0) { - setTimeout(() => { - this.sendRaw(data); - }, 50); + private messageQueue: BufferLike[] = []; + private sendWsMessage(message: BufferLike) { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(message); + } else { + this.messageQueue.push(message); } } - private send(jsonObject: Json) { - if (this.ws?.readyState === WebSocket.OPEN) { - const jsonData = JSON.stringify(jsonObject); - this.ws.send(jsonData); - this.logger?.(`Sent: ${jsonData}`); - } else { - setTimeout(() => { - this.send(jsonObject); - }, 50); + private readonly handleWsOpen = () => { + for (const message of this.messageQueue) { + this.ws?.send(message); } + this.messageQueue = []; + this.setupHeartbeat(); + } + + private sendRaw(data: BufferLike) { + this.sendWsMessage(data); + } + + private send(jsonObject: Json) { + const jsonData = JSON.stringify(jsonObject); + this.sendWsMessage(jsonData); + this.logger?.(`Sent: ${jsonData}`); + } + + private hearbeatIntervalId: NodeJS.Timer | undefined; + private setupHeartbeat() { + const heartbeat = () => { + if (this.isAlive) { + this.isAlive = false; + } else { + this.logger?.('Heartbeat timeout'); + this.isClosedIntentionally = true; + this.ws?.close(4000); + } + }; + + this.hearbeatIntervalId = setInterval(heartbeat, SERVER_PING_INTERVAL + HEARBEAT_THRESHOLD); + } + + private clearHeartbeat() { + this.isAlive = false; + clearInterval(this.hearbeatIntervalId); } subscribe( @@ -413,10 +455,12 @@ class AggregatorWS { this.ws.onclose = (event) => { this.onWSClose?.(event); this.logger?.(`AggregatorWS: connection closed ${this.isClosedIntentionally ? 'intentionally' : ''}`); + this.clearHeartbeat(); if (!this.isClosedIntentionally) this.init(true); }; this.ws.onopen = (e) => { this.onWSOpen?.(e); + this.handleWsOpen(); // Re-subscribe to all subscriptions if (isReconnect) { Object.keys(this.subscriptions) @@ -435,26 +479,12 @@ class AggregatorWS { this.logger?.(`AggregatorWS: connection opened${isReconnect ? ' (reconnect)' : ''}`); }; this.ws.onmessage = (e) => { + this.isAlive = true; const { data } = e; if (typeof data !== 'string') throw new Error('AggregatorWS: received non-string message'); this.logger?.(`AggregatorWS: received message: ${data}`); const rawJson: unknown = JSON.parse(data); - const messageSchema = z.union([ - initMessageSchema, - pingPongMessageSchema, - addressUpdateSchema, - cfdAddressUpdateSchema, - assetPairsConfigSchema, - assetPairConfigSchema, - brokerMessageSchema, - orderBookSchema, - swapInfoSchema, - futuresTradeInfoSchema, - errorSchema, - unsubscriptionDoneSchema, - ]); - const json = messageSchema.parse(rawJson); switch (json.T) {