PriceFeed subscription

This commit is contained in:
Aleksandr Kraiz
2022-05-21 11:14:10 +04:00
parent c81601dece
commit 246070d064
12 changed files with 214 additions and 138 deletions

View File

@@ -251,7 +251,7 @@ ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE = 'apcus',
BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE = 'btasabus', // Need for Orion Bridge
```
Example:
### Swap Info
```ts
import { v4 as uuidv4 } from "uuid";
@@ -344,3 +344,36 @@ orionUnit.orionAggregator.ws.unsubscribe("apcu");
// Broker tradable atomic swap assets balance unsubscribe
orionUnit.orionAggregator.ws.unsubscribe("btasabu");
```
## Price Feed Websocket Stream
> :warning: **Currently supported only one subscription per subscription type**
```ts
orionUnit.priceFeed.ws.subscribe(
"allTickers",
(tickers) => {
console.log(tickers);
},
undefined
);
orionUnit.priceFeed.ws.unsubscribe("allTickers");
orionUnit.priceFeed.ws.subscribe(
"ticker",
(ticker) => {
console.log(tricker);
},
"ORN-USDT"
);
orionUnit.priceFeed.ws.unsubscribe("ticker");
orionUnit.priceFeed.ws.subscribe(
"lastPrice",
({ pair, price }) => {
console.log(`Price: ${price}`);
},
"ORN-USDT"
);
orionUnit.priceFeed.ws.unsubscribe("lastPrice");
```

View File

@@ -1,6 +1,6 @@
{
"name": "@orionprotocol/sdk",
"version": "0.8.2",
"version": "0.9.0",
"description": "Orion Protocol SDK",
"main": "./lib/esm/index.js",
"module": "./lib/esm/index.js",

View File

@@ -1,11 +1,15 @@
import fetchWithValidation from '../../fetchWithValidation';
import candlesSchema from './schemas/candlesSchema';
import { PriceFeedWS } from './ws';
class PriceFeed {
private apiUrl: string;
readonly ws: PriceFeedWS;
constructor(apiUrl: string) {
this.apiUrl = apiUrl;
this.ws = new PriceFeedWS(this.wsUrl);
this.getCandles = this.getCandles.bind(this);
}
@@ -17,7 +21,7 @@ class PriceFeed {
interval: '5m' | '30m' | '1h' | '1d',
exchange: string,
) => {
const url = new URL(`${this.apiUrl}/candles/candles`);
const url = new URL(this.candlesUrl);
url.searchParams.append('symbol', symbol);
url.searchParams.append('timeStart', timeStart.toString());
url.searchParams.append('timeEnd', timeEnd.toString());
@@ -33,28 +37,15 @@ class PriceFeed {
get wsUrl() {
const url = new URL(this.apiUrl);
const wsProtocol = url.protocol === 'https:' ? 'wss' : 'ws';
return `${wsProtocol}://${url.host + url.pathname}`;
return `${wsProtocol}://${url.host + url.pathname}/ws2`;
}
get candlesUrl() {
return `${this.apiUrl}/candles/candles`;
}
get allTickersWSUrl() {
return `${this.wsUrl}/ws2/allTickers`;
}
get tickerWSUrl() {
return `${this.wsUrl}/ws2/ticker/`;
}
get lastPriceWSUrl() {
return `${this.wsUrl}/ws2/lastPrice/`;
}
}
export * as schemas from './schemas';
export * as ws from './ws';
export {
PriceFeed,

View File

@@ -1,39 +0,0 @@
import { z } from 'zod';
import WebSocket from 'isomorphic-ws';
import tickerInfoSchema from './schemas/tickerInfoSchema';
const schema = z.array(z.union([
z.number(),
tickerInfoSchema,
]));
export default class PriceFeedAllTickersWS {
private pairsWebSocket: WebSocket;
private heartbeatInterval: ReturnType<typeof setInterval>;
constructor(
url: string,
updateData: (pairs: z.infer<typeof tickerInfoSchema>[]) => void,
) {
this.pairsWebSocket = new WebSocket(url);
this.heartbeatInterval = setInterval(() => {
this.pairsWebSocket.send('heartbeat');
}, 15000);
this.pairsWebSocket.onmessage = (e) => {
if (e.data === 'pong') return;
const json: unknown = JSON.parse(e.data.toString());
const data = schema.parse(json);
data.shift(); // Unnecessary timestamp
const tickersData = z.array(tickerInfoSchema).parse(data);
updateData(tickersData);
};
}
kill() {
clearInterval(this.heartbeatInterval);
this.pairsWebSocket.close();
}
}

View File

@@ -1,38 +0,0 @@
import WebSocket from 'isomorphic-ws';
import { z } from 'zod';
const schema = z.tuple([
z.number(), // unix timestamp
z.string(), // pair
z.number(), // price
]);
export default class PriceFeedLastPriceWS {
private pairsWebSocket: WebSocket;
private heartbeatInterval: ReturnType<typeof setInterval>;
constructor(
url: string,
pair: string,
updateData: (price: number) => void,
) {
this.pairsWebSocket = new WebSocket(url + pair);
this.heartbeatInterval = setInterval(() => {
this.pairsWebSocket.send('heartbeat');
}, 15000);
this.pairsWebSocket.onmessage = (e) => {
if (e.data === 'pong') return;
const json: unknown = JSON.parse(e.data.toString());
const [,, price] = schema.parse(json);
updateData(price);
};
}
kill() {
clearInterval(this.heartbeatInterval);
this.pairsWebSocket.close();
}
}

View File

@@ -0,0 +1,113 @@
import WebSocket from 'isomorphic-ws';
import { z } from 'zod';
import { v4 as uuidv4 } from 'uuid';
import priceFeedSubscriptions from './priceFeedSubscriptions';
import { tickerInfoSchema } from './schemas';
import priceSchema from './schemas/priceSchema';
type TickerInfo = {
pairName: string;
lastPrice: string;
openPrice: string;
highPrice: string;
lowPrice: string;
volume24h: string;
}
const allTickersSchema = z.unknown().array()
.transform((tickers) => {
const data = [...tickers];
data.shift();
const parsedDate = tickerInfoSchema.array().parse(data);
return parsedDate.reduce<
Partial<
Record<
string,
TickerInfo
>
>
>((prev, pairData) => ({
...prev,
[pairData.pairName]: pairData,
}), {});
});
export const subscriptions = {
[priceFeedSubscriptions.ALL_TICKERS]: {
schema: allTickersSchema,
payload: false as const,
},
[priceFeedSubscriptions.TICKER]: {
schema: z.tuple([z.number(), tickerInfoSchema]),
payload: true as const,
},
[priceFeedSubscriptions.LAST_PRICE]: {
schema: priceSchema,
payload: true as const,
},
};
export type SubscriptionType = keyof typeof subscriptions
export type Payload<T extends SubscriptionType> = typeof subscriptions[T] extends { payload: true } ? string : undefined;
export type ResponseSchemaType<T extends SubscriptionType> = typeof subscriptions[T]['schema'];
export default class PriceFeedSubscription<S extends SubscriptionType> {
public readonly id: string;
private readonly callback: (data: z.infer<ResponseSchemaType<S>>) => void;
private readonly payload: Payload<S>;
private ws?: WebSocket;
private readonly url: string;
private heartbeatInterval?: ReturnType<typeof setInterval>;
private readonly type: S;
constructor(
type: S,
url: string,
callback: (data: z.infer<ResponseSchemaType<S>>) => void,
payload: Payload<S>,
) {
this.id = uuidv4();
this.url = url;
this.type = type;
this.payload = payload;
this.callback = callback;
this.init();
}
init() {
const { payload, url, type } = this;
this.ws = new WebSocket(`${url}/${type}${payload ? `/${payload.toString()}` : ''}`);
this.ws.onmessage = (e) => {
if (e.data === 'pong') return;
const json: unknown = JSON.parse(e.data.toString());
const subscription = subscriptions[type];
const parseResult = subscription.schema.safeParse(json);
if (parseResult.success === false) {
const errorsMessage = parseResult.error.errors.map((error) => error.message).join(', ');
throw new Error(`Can't recognize PriceFeed "${type}" subscription message "${e.data.toString()}": ${errorsMessage}`);
}
this.callback(parseResult.data);
};
this.ws.onclose = () => this.init();
this.heartbeatInterval = setInterval(() => {
this.ws?.send('heartbeat');
}, 15000);
}
kill() {
if (this.heartbeatInterval) clearInterval(this.heartbeatInterval);
this.ws?.close();
}
}

View File

@@ -1,40 +0,0 @@
import WebSocket from 'isomorphic-ws';
import { z } from 'zod';
import tickerInfoSchema from './schemas/tickerInfoSchema';
const schema = z.tuple([
z.number(), // timestamp
tickerInfoSchema,
]);
export default class PriceFeedTickerWS {
priceWebSocket: WebSocket;
private heartbeatInterval: ReturnType<typeof setInterval>;
constructor(
symbol: string,
url: string,
updateData: (pair: z.infer<typeof tickerInfoSchema>) => void,
) {
this.priceWebSocket = new WebSocket(`${url}${symbol}`);
this.heartbeatInterval = setInterval(() => {
this.priceWebSocket.send('heartbeat');
}, 15000);
this.priceWebSocket.onmessage = (e) => {
if (e.data === 'pong') return;
const data: unknown = JSON.parse(e.data.toString());
const [, tickerData] = schema.parse(data);
if (tickerData === undefined) return;
updateData(tickerData);
};
}
kill() {
clearInterval(this.heartbeatInterval);
this.priceWebSocket.close();
}
}

View File

@@ -1,5 +1,37 @@
export { default as PriceFeedAllTickersWS } from './PriceFeedAllTickersWS';
export { default as PriceFeedTickerWS } from './PriceFeedTickerWS';
export { default as PriceFeedLastPriceWS } from './PriceFeedLastPriceWS';
import { z } from 'zod';
import PriceFeedSubscription, { Payload, ResponseSchemaType, SubscriptionType } from './PriceFeedSubscription';
export * as schemas from './schemas';
export class PriceFeedWS {
private subscriptions: Partial<{
[S in SubscriptionType]: PriceFeedSubscription<S>
}> = { };
private url: string;
constructor(url: string) {
this.url = url;
}
subscribe<S extends SubscriptionType>(
type: S,
callback: (data: z.infer<ResponseSchemaType<S>>) => void,
payload: Payload<S>,
) {
if (this.subscriptions[type]) throw new Error(`Subscription already exists for '${type}'. Please unsubscribe first.`);
this.subscriptions = {
...this.subscriptions,
[type]: new PriceFeedSubscription(
type,
this.url,
callback,
payload,
),
};
}
unsubscribe<S extends SubscriptionType>(type: S) {
this.subscriptions[type]?.kill();
delete this.subscriptions[type];
}
}

View File

@@ -0,0 +1,7 @@
const priceFeedSubscriptions = {
TICKER: 'ticker',
ALL_TICKERS: 'allTickers',
LAST_PRICE: 'lastPrice',
} as const;
export default priceFeedSubscriptions;

View File

@@ -0,0 +1,9 @@
import { z } from 'zod';
const priceSchema = z.tuple([
z.number(), // unix timestamp
z.string(), // pair
z.number(), // price
]).transform(([, pair, price]) => ({ pair, price }));
export default priceSchema;

View File

@@ -7,6 +7,13 @@ const tickerInfoSchema = z.tuple([
z.string(), // high price
z.string(), // low price
z.string(), // volume 24h
]);
]).transform(([pairName, lastPrice, openPrice, highPrice, lowPrice, volume24h]) => ({
pairName,
lastPrice,
openPrice,
highPrice,
lowPrice,
volume24h,
}));
export default tickerInfoSchema;

View File

@@ -2,6 +2,7 @@ const path = require("path");
module.exports = (env, argv) => {
return {
mode: "production",
entry: {
index: path.resolve(__dirname, "./lib/esm/index.js")
},