agg ws improvements

This commit is contained in:
Aleksandr Kraiz
2023-05-25 00:15:05 +04:00
parent c920972d08
commit 6c928690b9
7 changed files with 441 additions and 253 deletions

View File

@@ -1,4 +1,10 @@
import { WebSocket } from 'ws';
import Orion from '../Orion/index.js';
import { v4 as uuidV4 } from 'uuid';
jest.setTimeout(50000);
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
describe('Aggregator', () => {
test('Handle error aus', async () => {
@@ -39,7 +45,7 @@ describe('Aggregator', () => {
bscUnit.aggregator.ws.unsubscribe(subId);
bscUnit.aggregator.ws.destroy()
reject(new Error('Timeout'));
}, 10000);
}, 5000);
const payload = 'BTCUSDF';
subId = bscUnit.aggregator.ws.subscribe('aobus', {
payload,
@@ -53,4 +59,120 @@ describe('Aggregator', () => {
})
});
});
test('Breaking connection', async () => {
const WS_PORT = 8080;
const wsServer = new WebSocket.Server({ port: WS_PORT });
wsServer.on('connection', (ws) => {
ws.on('message', (message) => { // message type — Buffer | ArrayBuffer | Buffer[]
// Parse message json
const parsedMessage = JSON.parse(message.toString());
console.log('CLIENT -> SERVER', parsedMessage);
// Respond
ws.send(JSON.stringify({
S: 'BTCUSDF',
ob: {
a: [
['26287.4', '2.6', ['BINANCE'], [['BUY', 'BTCUSDF']]],
['26287.3', '0.172', ['BINANCE'], [['BUY', 'BTCUSDF']]],
['26287.2', '2.33', ['BINANCE'], [['BUY', 'BTCUSDF']]],
['26287.1', '0.746', ['BINANCE'], [['BUY', 'BTCUSDF']]],
['26287', '2.635', ['BINANCE'], [['BUY', 'BTCUSDF']]],
],
b: [
['26276.7', '13.397', ['BINANCE'], [['SELL', 'BTCUSDF']]],
['26276.6', '0.003', ['BINANCE'], [['SELL', 'BTCUSDF']]],
['26276.5', '0.023', ['BINANCE'], [['SELL', 'BTCUSDF']]],
['26276.4', '0.001', ['BINANCE'], [['SELL', 'BTCUSDF']]],
['26276.3', '2.334', ['BINANCE'], [['SELL', 'BTCUSDF']]],
]
},
T: 'aobu',
_: 1684941717661
}));
})
// ws.on('close', () => {
// console.log('Connection closed');
// })
ws.on('error', (error) => {
console.log('Error', error);
})
// Send initial message
ws.send(JSON.stringify({
T: 'i',
i: uuidV4(),
_: 1684941718016
}));
})
const orion = new Orion('testing', {
networks: {
97: {
services: {
aggregator: {
ws: `ws://localhost:${WS_PORT}`
}
}
}
}
});
const bscUnit = orion.getUnit('bsc');
let subId: string | undefined;
// Make subscription and wait for response
await new Promise((resolve) => {
subId = bscUnit.aggregator.ws.subscribe('aobus', {
payload: 'BTCUSDF',
callback: () => {
console.log('Received data');
resolve(true);
}
});
})
const terminateAllClients = () => Promise.all(Array.from(wsServer.clients).map((client) => {
return new Promise((resolve) => {
client.on('close', resolve);
client.terminate();
});
}));
// Disconnect client from server
await terminateAllClients();
console.log('Disconnected', bscUnit.aggregator.ws.subscriptions);
expect(wsServer.clients.size).toEqual(0);
await delay(1000);
expect(bscUnit.aggregator.ws.subscriptions).toEqual({});
// Await for reconnection
await new Promise((resolve) => {
bscUnit.aggregator.ws.subscribe('aobus', {
payload: 'BTCUSDF',
callback: () => {
console.log('Reconnected', bscUnit.aggregator.ws.subscriptions);
resolve(true);
}
});
});
await new Promise((resolve) => {
if (subId !== undefined) bscUnit.aggregator.ws.unsubscribe(subId);
bscUnit.aggregator.ws.destroy()
wsServer.clients.forEach((client) => {
client.terminate();
});
wsServer.close(() => {
resolve(true);
});
})
});
});

View File

@@ -19,11 +19,12 @@ import type { fullOrderSchema, orderUpdateSchema } from './schemas/addressUpdate
import cfdAddressUpdateSchema from './schemas/cfdAddressUpdateSchema.js';
import futuresTradeInfoSchema from './schemas/futuresTradeInfoSchema.js';
import { objectKeys } from '../../../utils/objectKeys.js';
// import assertError from '../../../utils/assertError.js';
// import errorSchema from './schemas/errorSchema';
const UNSUBSCRIBE = 'u';
type SwapSubscriptionRequest = {
type SwapInfoSubscriptionPayload = {
// d: string, // swap request UUID, set by client side
i: string // asset in
o: string // asset out
@@ -33,6 +34,13 @@ type SwapSubscriptionRequest = {
is?: boolean // instant settlement
}
type FuturesTradeInfoPayload = {
s: string // wallet address
i: string // pair
a: number // amount
p?: number // price
}
type BrokerTradableAtomicSwapBalanceSubscription = {
callback: (balances: Partial<Record<string, number>>) => void
}
@@ -63,17 +71,12 @@ type AggregatedOrderbookSubscription = {
}
type SwapInfoSubscription = {
payload: SwapSubscriptionRequest
payload: SwapInfoSubscriptionPayload
callback: (swapInfo: SwapInfo) => void
}
type FuturesTradeInfoSubscription = {
payload: {
s: string
i: string
a: number
p?: number
}
payload: FuturesTradeInfoPayload
callback: (futuresTradeInfo: FuturesTradeInfo) => void
errorCb?: (message: string) => void
}
@@ -161,6 +164,11 @@ const isSubType = (subType: string): subType is keyof Subscription => Object.val
const unknownMessageTypeRegex = /An unknown message type: '(.*)', json: (.*)/;
const nonExistentMessageRegex = /Could not cancel nonexistent subscription: (.*)/;
// type Message = {
// message: Json
// resolve: () => void
// };
class AggregatorWS {
private ws?: WebSocket | undefined;
@@ -170,7 +178,7 @@ class AggregatorWS {
// https://stackoverflow.com/questions/19304157/getting-the-reason-why-websockets-closed-with-close-code-1006
private isClosedIntentionally = false;
private subscriptions: Partial<{
readonly subscriptions: Partial<{
[K in keyof Subscription]: Partial<Record<string, Subscription[K]>>
}> = {};
@@ -186,6 +194,8 @@ class AggregatorWS {
return this.wsUrl;
}
readonly instanceId = uuidv4();
constructor(
wsUrl: string,
logger?: (msg: string) => void,
@@ -196,8 +206,12 @@ class AggregatorWS {
this.logger = logger;
this.onInit = onInit;
this.onError = onError;
console.log(`Created Aggregator WS instance ${this.instanceId}`);
}
// readonly messageQueue: Message[] = [];
private sendRaw(data: BufferLike) {
if (this.ws?.readyState === 1) {
this.ws.send(data);
@@ -224,39 +238,62 @@ class AggregatorWS {
type: T,
subscription: Subscription[T],
) {
if (!this.ws) this.init();
const isExclusive = exclusiveSubscriptions.some((t) => t === type);
const subs = this.subscriptions[type];
if (isExclusive && subs && Object.keys(subs).length > 0) {
throw new Error(`Subscription '${type}' already exists. Please unsubscribe first.`);
}
const id = type === 'aobus'
? ((subscription as any).payload as string) // TODO: Refactor!!!
: uuidv4();
const subRequest: Json = {};
subRequest['T'] = type;
subRequest['id'] = id;
// TODO Refactor this
if ('payload' in subscription) {
if (typeof subscription.payload === 'string') {
subRequest['S'] = subscription.payload;
} else {
subRequest['S'] = {
d: id,
...subscription.payload,
};
const makeSubscription = () => {
const isExclusive = exclusiveSubscriptions.some((t) => t === type);
const subs = this.subscriptions[type];
if (isExclusive && subs && Object.keys(subs).length > 0) {
throw new Error(`Subscription '${type}' already exists. Please unsubscribe first.`);
}
const subRequest: Json = {};
subRequest['T'] = type;
subRequest['id'] = id;
if ('payload' in subscription) {
if (typeof subscription.payload === 'string') {
subRequest['S'] = subscription.payload;
} else { // SwapInfoSubscriptionPayload | FuturesTradeInfoPayload
subRequest['S'] = { ...subscription.payload }
if (!('s' in subscription.payload)) { // SwapInfoSubscriptionPayload
subRequest['S'] = {
...subRequest['S'],
d: id,
};
}
}
}
this.send(subRequest);
const subKey = isExclusive ? 'default' : id;
this.subscriptions[type] = {
...this.subscriptions[type],
[subKey]: subscription,
};
}
this.send(subRequest);
// if (!this.ws) {
// this.initAsync()
// .then(() => {
// console.log(`Aggregator WS ${this.instanceId} is initialized`);
// makeSubscription();
// })
// .catch((err) => {
// assertError(err);
// this.onError?.(err.message);
// });
// } else makeSubscription();
const subKey = isExclusive ? 'default' : id;
this.subscriptions[type] = {
...this.subscriptions[type],
[subKey]: subscription,
};
if (!this.ws) {
this.init();
console.log(`Aggregator WS ${this.instanceId} is initialized`);
}
makeSubscription();
return id;
}
@@ -314,6 +351,23 @@ class AggregatorWS {
delete this.ws;
}
// private initPromise: Promise<void> | null = null;
// private initAsync() {
// if (!this.initPromise) {
// this.initPromise = new Promise<void>((resolve, reject) => {
// try {
// this.init();
// resolve();
// } catch (err) {
// reject(err);
// }
// });
// }
// return this.initPromise;
// }
private init(isReconnect = false) {
this.isClosedIntentionally = false;
this.ws = new WebSocket(this.wsUrl);
@@ -328,7 +382,10 @@ class AggregatorWS {
// Re-subscribe to all subscriptions
if (isReconnect) {
const subscriptionsToReconnect = this.subscriptions;
this.subscriptions = {};
objectKeys(this.subscriptions).forEach((subType) => {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete this.subscriptions[subType];
});
Object.keys(subscriptionsToReconnect)
.filter(isSubType)
.forEach((subType) => {
@@ -400,10 +457,10 @@ class AggregatorWS {
}
break;
case MessageType.PING_PONG:
this.sendRaw(data.toString());
this.sendRaw(data);
break;
case MessageType.UNSUBSCRIPTION_DONE:
// To implement
// const { id } = json;
break;
case MessageType.SWAP_INFO: {
const baseSwapInfo: SwapInfoBase = {

6
src/utils/assertError.ts Normal file
View File

@@ -0,0 +1,6 @@
export default function assertError(errorCandidate: unknown): asserts errorCandidate is Error {
if (!(errorCandidate instanceof Error)) {
throw Error(`Assertion failed: errorCandidate is not an Error. Content: ${JSON.stringify(errorCandidate)}`);
}
}