From 246070d06432a73fda3a7a5563c273ce3883b911 Mon Sep 17 00:00:00 2001 From: Aleksandr Kraiz Date: Sat, 21 May 2022 11:14:10 +0400 Subject: [PATCH] PriceFeed subscription --- README.md | 35 +++++- package.json | 2 +- src/services/PriceFeed/index.ts | 21 +--- .../PriceFeed/ws/PriceFeedAllTickersWS.ts | 39 ------ .../PriceFeed/ws/PriceFeedLastPriceWS.ts | 38 ------ .../PriceFeed/ws/PriceFeedSubscription.ts | 113 ++++++++++++++++++ .../PriceFeed/ws/PriceFeedTickerWS.ts | 40 ------- src/services/PriceFeed/ws/index.ts | 38 +++++- .../PriceFeed/ws/priceFeedSubscriptions.ts | 7 ++ .../PriceFeed/ws/schemas/priceSchema.ts | 9 ++ .../PriceFeed/ws/schemas/tickerInfoSchema.ts | 9 +- webpack.config.js | 1 + 12 files changed, 214 insertions(+), 138 deletions(-) delete mode 100644 src/services/PriceFeed/ws/PriceFeedAllTickersWS.ts delete mode 100644 src/services/PriceFeed/ws/PriceFeedLastPriceWS.ts create mode 100644 src/services/PriceFeed/ws/PriceFeedSubscription.ts delete mode 100644 src/services/PriceFeed/ws/PriceFeedTickerWS.ts create mode 100644 src/services/PriceFeed/ws/priceFeedSubscriptions.ts create mode 100644 src/services/PriceFeed/ws/schemas/priceSchema.ts diff --git a/README.md b/README.md index ec6f49f..2297ad9 100644 --- a/README.md +++ b/README.md @@ -251,7 +251,7 @@ ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE = 'apcus', BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE = 'btasabus', // Need for Orion Bridge ``` -Example: +### Swap Info ```ts import { v4 as uuidv4 } from "uuid"; @@ -344,3 +344,36 @@ orionUnit.orionAggregator.ws.unsubscribe("apcu"); // Broker tradable atomic swap assets balance unsubscribe orionUnit.orionAggregator.ws.unsubscribe("btasabu"); ``` + +## Price Feed Websocket Stream + +> :warning: **Currently supported only one subscription per subscription type** + +```ts +orionUnit.priceFeed.ws.subscribe( + "allTickers", + (tickers) => { + console.log(tickers); + }, + undefined +); +orionUnit.priceFeed.ws.unsubscribe("allTickers"); + +orionUnit.priceFeed.ws.subscribe( + "ticker", + (ticker) => { + console.log(tricker); + }, + "ORN-USDT" +); +orionUnit.priceFeed.ws.unsubscribe("ticker"); + +orionUnit.priceFeed.ws.subscribe( + "lastPrice", + ({ pair, price }) => { + console.log(`Price: ${price}`); + }, + "ORN-USDT" +); +orionUnit.priceFeed.ws.unsubscribe("lastPrice"); +``` diff --git a/package.json b/package.json index f7347d8..803b504 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@orionprotocol/sdk", - "version": "0.8.2", + "version": "0.9.0", "description": "Orion Protocol SDK", "main": "./lib/esm/index.js", "module": "./lib/esm/index.js", diff --git a/src/services/PriceFeed/index.ts b/src/services/PriceFeed/index.ts index 19cf7ca..3d726d8 100644 --- a/src/services/PriceFeed/index.ts +++ b/src/services/PriceFeed/index.ts @@ -1,11 +1,15 @@ import fetchWithValidation from '../../fetchWithValidation'; import candlesSchema from './schemas/candlesSchema'; +import { PriceFeedWS } from './ws'; class PriceFeed { private apiUrl: string; + readonly ws: PriceFeedWS; + constructor(apiUrl: string) { this.apiUrl = apiUrl; + this.ws = new PriceFeedWS(this.wsUrl); this.getCandles = this.getCandles.bind(this); } @@ -17,7 +21,7 @@ class PriceFeed { interval: '5m' | '30m' | '1h' | '1d', exchange: string, ) => { - const url = new URL(`${this.apiUrl}/candles/candles`); + const url = new URL(this.candlesUrl); url.searchParams.append('symbol', symbol); url.searchParams.append('timeStart', timeStart.toString()); url.searchParams.append('timeEnd', timeEnd.toString()); @@ -33,28 +37,15 @@ class PriceFeed { get wsUrl() { const url = new URL(this.apiUrl); const wsProtocol = url.protocol === 'https:' ? 'wss' : 'ws'; - return `${wsProtocol}://${url.host + url.pathname}`; + return `${wsProtocol}://${url.host + url.pathname}/ws2`; } get candlesUrl() { return `${this.apiUrl}/candles/candles`; } - - get allTickersWSUrl() { - return `${this.wsUrl}/ws2/allTickers`; - } - - get tickerWSUrl() { - return `${this.wsUrl}/ws2/ticker/`; - } - - get lastPriceWSUrl() { - return `${this.wsUrl}/ws2/lastPrice/`; - } } export * as schemas from './schemas'; -export * as ws from './ws'; export { PriceFeed, diff --git a/src/services/PriceFeed/ws/PriceFeedAllTickersWS.ts b/src/services/PriceFeed/ws/PriceFeedAllTickersWS.ts deleted file mode 100644 index cd879de..0000000 --- a/src/services/PriceFeed/ws/PriceFeedAllTickersWS.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { z } from 'zod'; -import WebSocket from 'isomorphic-ws'; -import tickerInfoSchema from './schemas/tickerInfoSchema'; - -const schema = z.array(z.union([ - z.number(), - tickerInfoSchema, -])); -export default class PriceFeedAllTickersWS { - private pairsWebSocket: WebSocket; - - private heartbeatInterval: ReturnType; - - constructor( - url: string, - updateData: (pairs: z.infer[]) => void, - ) { - this.pairsWebSocket = new WebSocket(url); - - this.heartbeatInterval = setInterval(() => { - this.pairsWebSocket.send('heartbeat'); - }, 15000); - - this.pairsWebSocket.onmessage = (e) => { - if (e.data === 'pong') return; - const json: unknown = JSON.parse(e.data.toString()); - const data = schema.parse(json); - data.shift(); // Unnecessary timestamp - const tickersData = z.array(tickerInfoSchema).parse(data); - - updateData(tickersData); - }; - } - - kill() { - clearInterval(this.heartbeatInterval); - this.pairsWebSocket.close(); - } -} diff --git a/src/services/PriceFeed/ws/PriceFeedLastPriceWS.ts b/src/services/PriceFeed/ws/PriceFeedLastPriceWS.ts deleted file mode 100644 index df957b0..0000000 --- a/src/services/PriceFeed/ws/PriceFeedLastPriceWS.ts +++ /dev/null @@ -1,38 +0,0 @@ -import WebSocket from 'isomorphic-ws'; -import { z } from 'zod'; - -const schema = z.tuple([ - z.number(), // unix timestamp - z.string(), // pair - z.number(), // price -]); -export default class PriceFeedLastPriceWS { - private pairsWebSocket: WebSocket; - - private heartbeatInterval: ReturnType; - - constructor( - url: string, - pair: string, - updateData: (price: number) => void, - ) { - this.pairsWebSocket = new WebSocket(url + pair); - - this.heartbeatInterval = setInterval(() => { - this.pairsWebSocket.send('heartbeat'); - }, 15000); - - this.pairsWebSocket.onmessage = (e) => { - if (e.data === 'pong') return; - const json: unknown = JSON.parse(e.data.toString()); - const [,, price] = schema.parse(json); - - updateData(price); - }; - } - - kill() { - clearInterval(this.heartbeatInterval); - this.pairsWebSocket.close(); - } -} diff --git a/src/services/PriceFeed/ws/PriceFeedSubscription.ts b/src/services/PriceFeed/ws/PriceFeedSubscription.ts new file mode 100644 index 0000000..ae93f54 --- /dev/null +++ b/src/services/PriceFeed/ws/PriceFeedSubscription.ts @@ -0,0 +1,113 @@ +import WebSocket from 'isomorphic-ws'; +import { z } from 'zod'; +import { v4 as uuidv4 } from 'uuid'; +import priceFeedSubscriptions from './priceFeedSubscriptions'; +import { tickerInfoSchema } from './schemas'; +import priceSchema from './schemas/priceSchema'; + +type TickerInfo = { + pairName: string; + lastPrice: string; + openPrice: string; + highPrice: string; + lowPrice: string; + volume24h: string; +} + +const allTickersSchema = z.unknown().array() + .transform((tickers) => { + const data = [...tickers]; + data.shift(); + const parsedDate = tickerInfoSchema.array().parse(data); + return parsedDate.reduce< + Partial< + Record< + string, + TickerInfo + > + > + >((prev, pairData) => ({ + ...prev, + [pairData.pairName]: pairData, + }), {}); + }); + +export const subscriptions = { + [priceFeedSubscriptions.ALL_TICKERS]: { + schema: allTickersSchema, + payload: false as const, + }, + [priceFeedSubscriptions.TICKER]: { + schema: z.tuple([z.number(), tickerInfoSchema]), + payload: true as const, + }, + [priceFeedSubscriptions.LAST_PRICE]: { + schema: priceSchema, + payload: true as const, + }, +}; + +export type SubscriptionType = keyof typeof subscriptions + +export type Payload = typeof subscriptions[T] extends { payload: true } ? string : undefined; + +export type ResponseSchemaType = typeof subscriptions[T]['schema']; + +export default class PriceFeedSubscription { + public readonly id: string; + + private readonly callback: (data: z.infer>) => void; + + private readonly payload: Payload; + + private ws?: WebSocket; + + private readonly url: string; + + private heartbeatInterval?: ReturnType; + + private readonly type: S; + + constructor( + type: S, + url: string, + callback: (data: z.infer>) => void, + payload: Payload, + ) { + this.id = uuidv4(); + this.url = url; + this.type = type; + this.payload = payload; + this.callback = callback; + + this.init(); + } + + init() { + const { payload, url, type } = this; + this.ws = new WebSocket(`${url}/${type}${payload ? `/${payload.toString()}` : ''}`); + + this.ws.onmessage = (e) => { + if (e.data === 'pong') return; + const json: unknown = JSON.parse(e.data.toString()); + const subscription = subscriptions[type]; + const parseResult = subscription.schema.safeParse(json); + if (parseResult.success === false) { + const errorsMessage = parseResult.error.errors.map((error) => error.message).join(', '); + throw new Error(`Can't recognize PriceFeed "${type}" subscription message "${e.data.toString()}": ${errorsMessage}`); + } + this.callback(parseResult.data); + }; + + this.ws.onclose = () => this.init(); + + this.heartbeatInterval = setInterval(() => { + this.ws?.send('heartbeat'); + }, 15000); + } + + kill() { + if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); + this.ws?.close(); + } +} diff --git a/src/services/PriceFeed/ws/PriceFeedTickerWS.ts b/src/services/PriceFeed/ws/PriceFeedTickerWS.ts deleted file mode 100644 index 8bae67f..0000000 --- a/src/services/PriceFeed/ws/PriceFeedTickerWS.ts +++ /dev/null @@ -1,40 +0,0 @@ -import WebSocket from 'isomorphic-ws'; -import { z } from 'zod'; -import tickerInfoSchema from './schemas/tickerInfoSchema'; - -const schema = z.tuple([ - z.number(), // timestamp - tickerInfoSchema, -]); - -export default class PriceFeedTickerWS { - priceWebSocket: WebSocket; - - private heartbeatInterval: ReturnType; - - constructor( - symbol: string, - url: string, - updateData: (pair: z.infer) => void, - ) { - this.priceWebSocket = new WebSocket(`${url}${symbol}`); - - this.heartbeatInterval = setInterval(() => { - this.priceWebSocket.send('heartbeat'); - }, 15000); - - this.priceWebSocket.onmessage = (e) => { - if (e.data === 'pong') return; - const data: unknown = JSON.parse(e.data.toString()); - const [, tickerData] = schema.parse(data); - - if (tickerData === undefined) return; - updateData(tickerData); - }; - } - - kill() { - clearInterval(this.heartbeatInterval); - this.priceWebSocket.close(); - } -} diff --git a/src/services/PriceFeed/ws/index.ts b/src/services/PriceFeed/ws/index.ts index 5a1359d..41e8c28 100644 --- a/src/services/PriceFeed/ws/index.ts +++ b/src/services/PriceFeed/ws/index.ts @@ -1,5 +1,37 @@ -export { default as PriceFeedAllTickersWS } from './PriceFeedAllTickersWS'; -export { default as PriceFeedTickerWS } from './PriceFeedTickerWS'; -export { default as PriceFeedLastPriceWS } from './PriceFeedLastPriceWS'; +import { z } from 'zod'; +import PriceFeedSubscription, { Payload, ResponseSchemaType, SubscriptionType } from './PriceFeedSubscription'; export * as schemas from './schemas'; +export class PriceFeedWS { + private subscriptions: Partial<{ + [S in SubscriptionType]: PriceFeedSubscription + }> = { }; + + private url: string; + + constructor(url: string) { + this.url = url; + } + + subscribe( + type: S, + callback: (data: z.infer>) => void, + payload: Payload, + ) { + if (this.subscriptions[type]) throw new Error(`Subscription already exists for '${type}'. Please unsubscribe first.`); + this.subscriptions = { + ...this.subscriptions, + [type]: new PriceFeedSubscription( + type, + this.url, + callback, + payload, + ), + }; + } + + unsubscribe(type: S) { + this.subscriptions[type]?.kill(); + delete this.subscriptions[type]; + } +} diff --git a/src/services/PriceFeed/ws/priceFeedSubscriptions.ts b/src/services/PriceFeed/ws/priceFeedSubscriptions.ts new file mode 100644 index 0000000..9600d13 --- /dev/null +++ b/src/services/PriceFeed/ws/priceFeedSubscriptions.ts @@ -0,0 +1,7 @@ +const priceFeedSubscriptions = { + TICKER: 'ticker', + ALL_TICKERS: 'allTickers', + LAST_PRICE: 'lastPrice', +} as const; + +export default priceFeedSubscriptions; diff --git a/src/services/PriceFeed/ws/schemas/priceSchema.ts b/src/services/PriceFeed/ws/schemas/priceSchema.ts new file mode 100644 index 0000000..089fd87 --- /dev/null +++ b/src/services/PriceFeed/ws/schemas/priceSchema.ts @@ -0,0 +1,9 @@ +import { z } from 'zod'; + +const priceSchema = z.tuple([ + z.number(), // unix timestamp + z.string(), // pair + z.number(), // price +]).transform(([, pair, price]) => ({ pair, price })); + +export default priceSchema; diff --git a/src/services/PriceFeed/ws/schemas/tickerInfoSchema.ts b/src/services/PriceFeed/ws/schemas/tickerInfoSchema.ts index 41263f0..c6196da 100644 --- a/src/services/PriceFeed/ws/schemas/tickerInfoSchema.ts +++ b/src/services/PriceFeed/ws/schemas/tickerInfoSchema.ts @@ -7,6 +7,13 @@ const tickerInfoSchema = z.tuple([ z.string(), // high price z.string(), // low price z.string(), // volume 24h -]); +]).transform(([pairName, lastPrice, openPrice, highPrice, lowPrice, volume24h]) => ({ + pairName, + lastPrice, + openPrice, + highPrice, + lowPrice, + volume24h, +})); export default tickerInfoSchema; diff --git a/webpack.config.js b/webpack.config.js index 3ab5eec..715cd18 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -2,6 +2,7 @@ const path = require("path"); module.exports = (env, argv) => { return { + mode: "production", entry: { index: path.resolve(__dirname, "./lib/esm/index.js") },