From f9429360f2465e8e76d511e45be9c3e7c28903b0 Mon Sep 17 00:00:00 2001 From: Aleksandr Kraiz Date: Fri, 3 Jun 2022 00:44:49 +0400 Subject: [PATCH] Schemas improvements / strict subscribe --- src/services/OrionAggregator/ws/index.ts | 16 ++++++++++++---- .../ws/schemas/addressUpdateSchema.ts | 1 + .../ws/schemas/assetPairsConfigSchema.ts | 1 + .../ws/schemas/orderBookSchema.ts | 1 + .../ws/schemas/unsubscriptionDoneSchema.ts | 1 + 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/services/OrionAggregator/ws/index.ts b/src/services/OrionAggregator/ws/index.ts index 64d0a24..03fb922 100644 --- a/src/services/OrionAggregator/ws/index.ts +++ b/src/services/OrionAggregator/ws/index.ts @@ -132,6 +132,7 @@ type Subscription = { [SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE]: BrokerTradableAtomicSwapBalanceSubscription, [SubscriptionType.SWAP_SUBSCRIBE]: SwapInfoSubscription } + class OrionAggregatorWS { private ws: WebSocket | undefined; @@ -139,9 +140,9 @@ class OrionAggregatorWS { [K in keyof Subscription]: Subscription[K] }> = {}; - private onInit?: () => void; + public onInit?: () => void; - private onError?: (err: string) => void; + public onError?: (err: string) => void; private readonly wsUrl: string; @@ -174,11 +175,16 @@ class OrionAggregatorWS { subscribe( type: T, subscription: Subscription[T], + strict = false, ) { if (!this.ws) this.init(); + const subscriptionExists = type in this.subscriptions; + if (strict && subscriptionExists) throw new Error(`Subscription '${type}' already exists. Please unsubscribe first.`); + + const id = uuidv4; this.send({ T: type, - id: uuidv4(), + id, ...('payload' in subscription) && { S: subscription.payload, }, @@ -189,7 +195,6 @@ class OrionAggregatorWS { unsubscribe(subscription: keyof typeof UnsubscriptionType | string) { this.send({ - id: uuidv4(), T: UNSUBSCRIBE, S: subscription, }); @@ -255,6 +260,9 @@ class OrionAggregatorWS { case MessageType.PING_PONG: this.sendRaw(data.toString()); break; + case MessageType.UNSUBSCRIPTION_DONE: + // To implement + break; case MessageType.SWAP_INFO: { const baseSwapInfo: SwapInfoBase = { swapRequestId: json.S, diff --git a/src/services/OrionAggregator/ws/schemas/addressUpdateSchema.ts b/src/services/OrionAggregator/ws/schemas/addressUpdateSchema.ts index c9363aa..8189096 100644 --- a/src/services/OrionAggregator/ws/schemas/addressUpdateSchema.ts +++ b/src/services/OrionAggregator/ws/schemas/addressUpdateSchema.ts @@ -6,6 +6,7 @@ import balancesSchema from './balancesSchema'; import baseMessageSchema from './baseMessageSchema'; const baseAddressUpdate = baseMessageSchema.extend({ + id: z.string(), T: z.literal(MessageType.ADDRESS_UPDATE), S: z.string(), // subscription uc: z.array(z.enum(['b', 'o'])), // update content diff --git a/src/services/OrionAggregator/ws/schemas/assetPairsConfigSchema.ts b/src/services/OrionAggregator/ws/schemas/assetPairsConfigSchema.ts index 48e563f..04fcbb5 100644 --- a/src/services/OrionAggregator/ws/schemas/assetPairsConfigSchema.ts +++ b/src/services/OrionAggregator/ws/schemas/assetPairsConfigSchema.ts @@ -3,6 +3,7 @@ import MessageType from '../MessageType'; import baseMessageSchema from './baseMessageSchema'; const assetPairsConfigSchema = baseMessageSchema.extend({ + id: z.string(), T: z.literal(MessageType.ASSET_PAIRS_CONFIG_UPDATE), k: z.enum(['i', 'u']), u: z.array( diff --git a/src/services/OrionAggregator/ws/schemas/orderBookSchema.ts b/src/services/OrionAggregator/ws/schemas/orderBookSchema.ts index d0021d1..6346fc5 100644 --- a/src/services/OrionAggregator/ws/schemas/orderBookSchema.ts +++ b/src/services/OrionAggregator/ws/schemas/orderBookSchema.ts @@ -13,6 +13,7 @@ export const orderBookItemSchema = z.tuple([ ]); export const orderBookSchema = baseMessageSchema.extend({ + id: z.string(), T: z.literal(MessageType.AGGREGATED_ORDER_BOOK_UPDATE), S: z.string(), ob: z.object({ diff --git a/src/services/OrionAggregator/ws/schemas/unsubscriptionDoneSchema.ts b/src/services/OrionAggregator/ws/schemas/unsubscriptionDoneSchema.ts index 4dbe81d..f8d33b2 100644 --- a/src/services/OrionAggregator/ws/schemas/unsubscriptionDoneSchema.ts +++ b/src/services/OrionAggregator/ws/schemas/unsubscriptionDoneSchema.ts @@ -3,6 +3,7 @@ import MessageType from '../MessageType'; import baseMessageSchema from './baseMessageSchema'; const unsubscriptionDoneSchema = baseMessageSchema.extend({ + id: z.string(), T: z.literal(MessageType.UNSUBSCRIPTION_DONE), });