From d8afb7918c503e308891abf35a60836cd15c1c56 Mon Sep 17 00:00:00 2001 From: Aleksandr Kraiz Date: Fri, 2 Jun 2023 12:11:41 +0400 Subject: [PATCH] Fix reconnections --- package-lock.json | 4 +-- package.json | 2 +- src/services/Aggregator/ws/index.ts | 42 ++++++++++++++++++++--------- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/package-lock.json b/package-lock.json index 699c798..2e12cdf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@orionprotocol/sdk", - "version": "0.19.17", + "version": "0.19.18", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@orionprotocol/sdk", - "version": "0.19.17", + "version": "0.19.18", "license": "ISC", "dependencies": { "@babel/runtime": "^7.21.0", diff --git a/package.json b/package.json index b467192..35a2493 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@orionprotocol/sdk", - "version": "0.19.17", + "version": "0.19.18", "description": "Orion Protocol SDK", "main": "./lib/index.cjs", "module": "./lib/index.js", diff --git a/src/services/Aggregator/ws/index.ts b/src/services/Aggregator/ws/index.ts index d85d83a..bf860fc 100644 --- a/src/services/Aggregator/ws/index.ts +++ b/src/services/Aggregator/ws/index.ts @@ -190,6 +190,8 @@ class AggregatorWS { public logger: ((message: string) => void) | undefined + private subIdReplacements: Partial> = {} + private readonly wsUrl: string; get api() { @@ -276,6 +278,7 @@ class AggregatorWS { this.logger?.(`Resubscribing to ${type} with id ${id}. Subscription request: ${JSON.stringify(subRequest)}`); const prevSub = this.subscriptions[type]?.[prevSubscriptionId]; if (prevSub) { + this.subIdReplacements[prevSubscriptionId] = id; // Save mapping for future use (unsubscribe) this.subscriptions[type] = { ...this.subscriptions[type], [subKey]: { @@ -310,17 +313,32 @@ class AggregatorWS { return id; } + /** + * Returns newest subscription id for given id. Subscription id can be changed during resubscription. + * This function ensure that old subscription id will be replaced with newest one. + * @param id Id of subscription + * @returns Newest subscription id + */ + getNewestSubscriptionId(id: string): string { + const newId = this.subIdReplacements[id]; + if (newId !== undefined) { + return this.getNewestSubscriptionId(newId); + } + return id; + } + unsubscribe(subscription: keyof typeof UnsubscriptionType | string, details?: string) { + const newestSubId = this.getNewestSubscriptionId(subscription); this.send({ T: UNSUBSCRIBE, - S: subscription, + S: newestSubId, ...(details !== undefined) && { d: details }, }); - if (subscription.includes('0x')) { // is wallet address (ADDRESS_UPDATE) + if (newestSubId.includes('0x')) { // is wallet address (ADDRESS_UPDATE) const auSubscriptions = this.subscriptions[SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE]; if (auSubscriptions) { - const targetAuSub = Object.entries(auSubscriptions).find(([, value]) => value?.payload === subscription); + const targetAuSub = Object.entries(auSubscriptions).find(([, value]) => value?.payload === newestSubId); if (targetAuSub) { const [key] = targetAuSub; delete this.subscriptions[SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE]?.[key]; @@ -329,30 +347,30 @@ class AggregatorWS { const aufSubscriptions = this.subscriptions[SubscriptionType.CFD_ADDRESS_UPDATES_SUBSCRIBE]; if (aufSubscriptions) { - const targetAufSub = Object.entries(aufSubscriptions).find(([, value]) => value?.payload === subscription); + const targetAufSub = Object.entries(aufSubscriptions).find(([, value]) => value?.payload === newestSubId); if (targetAufSub) { const [key] = targetAufSub; delete this.subscriptions[SubscriptionType.CFD_ADDRESS_UPDATES_SUBSCRIBE]?.[key]; } } - } else if (uuidValidate(subscription)) { + } else if (uuidValidate(newestSubId)) { // is swap info subscription (contains hyphen) - delete this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.[subscription]; - delete this.subscriptions[SubscriptionType.ASSET_PAIR_CONFIG_UPDATES_SUBSCRIBE]?.[subscription]; - delete this.subscriptions[SubscriptionType.FUTURES_TRADE_INFO_SUBSCRIBE]?.[subscription]; + delete this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.[newestSubId]; + delete this.subscriptions[SubscriptionType.ASSET_PAIR_CONFIG_UPDATES_SUBSCRIBE]?.[newestSubId]; + delete this.subscriptions[SubscriptionType.FUTURES_TRADE_INFO_SUBSCRIBE]?.[newestSubId]; // !!! swap info subscription is uuid that contains hyphen - } else if (subscription.includes('-') && subscription.split('-').length === 2) { // is pair name(AGGREGATED_ORDER_BOOK_UPDATE) + } else if (newestSubId.includes('-') && newestSubId.split('-').length === 2) { // is pair name(AGGREGATED_ORDER_BOOK_UPDATE) const aobSubscriptions = this.subscriptions[SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE]; if (aobSubscriptions) { - const targetAobSub = Object.entries(aobSubscriptions).find(([, value]) => value?.payload === subscription); + const targetAobSub = Object.entries(aobSubscriptions).find(([, value]) => value?.payload === newestSubId); if (targetAobSub) { const [key] = targetAobSub; delete this.subscriptions[SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE]?.[key]; } } - } else if (subscription === UnsubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_UNSUBSCRIBE) { + } else if (newestSubId === UnsubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_UNSUBSCRIBE) { delete this.subscriptions[SubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE]?.['default']; - } else if (subscription === UnsubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_UNSUBSCRIBE) { + } else if (newestSubId === UnsubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_UNSUBSCRIBE) { delete this.subscriptions[SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE]?.['default']; } }