feat: message queue and heartbeat interval

This commit is contained in:
Oleg Nechiporenko
2023-06-15 19:38:37 +02:00
parent 18480ca2b8
commit de90673f8f

View File

@@ -23,6 +23,23 @@ import { objectKeys } from '../../../utils/objectKeys.js';
// import errorSchema from './schemas/errorSchema';
const UNSUBSCRIBE = 'u';
const SERVER_PING_INTERVAL = 30000;
const HEARBEAT_THRESHOLD = 5000;
const messageSchema = z.union([
initMessageSchema,
pingPongMessageSchema,
addressUpdateSchema,
cfdAddressUpdateSchema,
assetPairsConfigSchema,
assetPairConfigSchema,
brokerMessageSchema,
orderBookSchema,
swapInfoSchema,
futuresTradeInfoSchema,
errorSchema,
unsubscriptionDoneSchema,
]);
type SwapInfoSubscriptionPayload = {
// d: string, // swap request UUID, set by client side
@@ -201,6 +218,8 @@ class AggregatorWS {
private readonly wsUrl: string;
private isAlive = false;
get api() {
return this.wsUrl;
}
@@ -211,28 +230,51 @@ class AggregatorWS {
this.wsUrl = wsUrl;
}
// readonly messageQueue: Message[] = [];
private sendRaw(data: BufferLike) {
if (this.ws?.readyState === 1) {
this.ws.send(data);
} else if (this.ws?.readyState === 0) {
setTimeout(() => {
this.sendRaw(data);
}, 50);
private messageQueue: BufferLike[] = [];
private sendWsMessage(message: BufferLike) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(message);
} else {
this.messageQueue.push(message);
}
}
private send(jsonObject: Json) {
if (this.ws?.readyState === WebSocket.OPEN) {
const jsonData = JSON.stringify(jsonObject);
this.ws.send(jsonData);
this.logger?.(`Sent: ${jsonData}`);
} else {
setTimeout(() => {
this.send(jsonObject);
}, 50);
private readonly handleWsOpen = () => {
for (const message of this.messageQueue) {
this.ws?.send(message);
}
this.messageQueue = [];
this.setupHeartbeat();
}
private sendRaw(data: BufferLike) {
this.sendWsMessage(data);
}
private send(jsonObject: Json) {
const jsonData = JSON.stringify(jsonObject);
this.sendWsMessage(jsonData);
this.logger?.(`Sent: ${jsonData}`);
}
private hearbeatIntervalId: NodeJS.Timer | undefined;
private setupHeartbeat() {
const heartbeat = () => {
if (this.isAlive) {
this.isAlive = false;
} else {
this.logger?.('Heartbeat timeout');
this.isClosedIntentionally = true;
this.ws?.close(4000);
}
};
this.hearbeatIntervalId = setInterval(heartbeat, SERVER_PING_INTERVAL + HEARBEAT_THRESHOLD);
}
private clearHeartbeat() {
this.isAlive = false;
clearInterval(this.hearbeatIntervalId);
}
subscribe<T extends typeof SubscriptionType[keyof typeof SubscriptionType]>(
@@ -413,10 +455,12 @@ class AggregatorWS {
this.ws.onclose = (event) => {
this.onWSClose?.(event);
this.logger?.(`AggregatorWS: connection closed ${this.isClosedIntentionally ? 'intentionally' : ''}`);
this.clearHeartbeat();
if (!this.isClosedIntentionally) this.init(true);
};
this.ws.onopen = (e) => {
this.onWSOpen?.(e);
this.handleWsOpen();
// Re-subscribe to all subscriptions
if (isReconnect) {
Object.keys(this.subscriptions)
@@ -435,26 +479,12 @@ class AggregatorWS {
this.logger?.(`AggregatorWS: connection opened${isReconnect ? ' (reconnect)' : ''}`);
};
this.ws.onmessage = (e) => {
this.isAlive = true;
const { data } = e;
if (typeof data !== 'string') throw new Error('AggregatorWS: received non-string message');
this.logger?.(`AggregatorWS: received message: ${data}`);
const rawJson: unknown = JSON.parse(data);
const messageSchema = z.union([
initMessageSchema,
pingPongMessageSchema,
addressUpdateSchema,
cfdAddressUpdateSchema,
assetPairsConfigSchema,
assetPairConfigSchema,
brokerMessageSchema,
orderBookSchema,
swapInfoSchema,
futuresTradeInfoSchema,
errorSchema,
unsubscriptionDoneSchema,
]);
const json = messageSchema.parse(rawJson);
switch (json.T) {