This commit is contained in:
Aleksandr Kraiz
2022-06-04 18:17:16 +04:00
parent 201d41e94b
commit 40a39f7cd2
4 changed files with 80 additions and 69 deletions

View File

@@ -65,7 +65,7 @@ const UNSUBSCRIBE = 'u';
// https://github.com/orionprotocol/orion-aggregator/tree/feature/OP-1752-symmetric-swap#swap-info-subscribe
type SwapSubscriptionRequest = {
d: string, // swap request UUID, set by client side
// d: string, // swap request UUID, set by client side
i: string, // asset in
o: string, // asset out
a: number // amount IN/OUT
@@ -133,12 +133,17 @@ type Subscription = {
[SubscriptionType.SWAP_SUBSCRIBE]: SwapInfoSubscription
}
const exclusiveSubscriptions = [
SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE,
SubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE,
] as const;
type WsMessage = string | ArrayBufferLike | Blob | ArrayBufferView;
class OrionAggregatorWS {
private ws: WebSocket | undefined;
private subscriptions: Partial<{
[K in keyof Subscription]: Subscription[K]
[K in keyof Subscription]: Partial<Record<string, Subscription[K]>>
}> = {};
public onInit?: () => void;
@@ -176,22 +181,40 @@ class OrionAggregatorWS {
subscribe<T extends typeof SubscriptionType[keyof typeof SubscriptionType]>(
type: T,
subscription: Subscription[T],
strict = false,
) {
if (!this.ws) this.init();
const subscriptionExists = type in this.subscriptions;
if (strict && subscriptionExists) throw new Error(`Subscription '${type}' already exists. Please unsubscribe first.`);
const isExclusive = exclusiveSubscriptions.some((t) => t === type);
const subs = this.subscriptions[type];
if (isExclusive && subs && Object.keys(subs).length > 0) {
throw new Error(`Subscription '${type}' already exists. Please unsubscribe first.`);
}
const id = uuidv4();
this.send({
T: type,
id,
...('payload' in subscription) && {
S: subscription.payload,
},
});
const subRequest: Partial<Record<string, unknown>> = {};
subRequest.T = type;
subRequest.id = id;
this.subscriptions[type] = subscription;
// TODO Refactor this
if ('payload' in subscription) {
if (typeof subscription.payload === 'string') {
subRequest.S = subscription.payload;
} else {
subRequest.S = {
d: id,
...subscription.payload,
};
}
}
this.send(subRequest);
const subKey = isExclusive ? 'default' : id;
this.subscriptions[type] = {
...this.subscriptions[type],
[subKey]: subscription,
};
return id;
}
unsubscribe(subscription: keyof typeof UnsubscriptionType | string) {
@@ -201,16 +224,30 @@ class OrionAggregatorWS {
});
if (subscription.includes('0x')) { // is wallet address (ADDRESS_UPDATE)
delete this.subscriptions[SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE];
const auSubscriptions = this.subscriptions[SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE];
if (auSubscriptions) {
const targetAuSub = Object.entries(auSubscriptions).find(([, value]) => value?.payload === subscription);
if (targetAuSub) {
const [key] = targetAuSub;
delete this.subscriptions[SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE]?.[key];
}
}
} else if (uuidValidate(subscription)) { // is swap info subscription (contains hyphen)
delete this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE];
delete this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.[subscription];
// !!! swap info subscription is uuid that contains hyphen
} else if (subscription.includes('-') && subscription.split('-').length === 2) { // is pair name(AGGREGATED_ORDER_BOOK_UPDATE)
delete this.subscriptions[SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE];
const aobSubscriptions = this.subscriptions[SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE];
if (aobSubscriptions) {
const targetAobSub = Object.entries(aobSubscriptions).find(([, value]) => value?.payload === subscription);
if (targetAobSub) {
const [key] = targetAobSub;
delete this.subscriptions[SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE]?.[key];
}
}
} else if (subscription === UnsubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_UNSUBSCRIBE) {
delete this.subscriptions[SubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE];
delete this.subscriptions[SubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE]?.default;
} else if (subscription === UnsubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_UNSUBSCRIBE) {
delete this.subscriptions[SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE];
delete this.subscriptions[SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE]?.default;
}
}
@@ -292,7 +329,7 @@ class OrionAggregatorWS {
switch (json.k) { // kind
case 'exactSpend':
this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.callback({
this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.[json.S]?.callback({
kind: json.k,
marketAmountOut: json.mo,
availableAmountIn: json.aa,
@@ -301,7 +338,7 @@ class OrionAggregatorWS {
break;
case 'exactReceive':
this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.callback({
this.subscriptions[SubscriptionType.SWAP_SUBSCRIBE]?.[json.S]?.callback({
kind: json.k,
...baseSwapInfo,
marketAmountIn: json.mi,
@@ -340,7 +377,7 @@ class OrionAggregatorWS {
}, []);
this.subscriptions[
SubscriptionType.AGGREGATED_ORDER_BOOK_UPDATES_SUBSCRIBE
]?.callback(
]?.[json.id]?.callback(
mapOrderbookItems(ob.a),
mapOrderbookItems(ob.b),
S,
@@ -358,7 +395,7 @@ class OrionAggregatorWS {
}), {});
this.subscriptions[
SubscriptionType.ASSET_PAIRS_CONFIG_UPDATES_SUBSCRIBE
]?.callback({
]?.default?.callback({
kind: json.k === 'i' ? 'initial' : 'update',
data: priceUpdates,
});
@@ -392,7 +429,7 @@ class OrionAggregatorWS {
this.subscriptions[
SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE
]?.callback({
]?.[json.id]?.callback({
kind: 'initial',
orders: fullOrders,
balances,
@@ -410,7 +447,7 @@ class OrionAggregatorWS {
this.subscriptions[
SubscriptionType.ADDRESS_UPDATES_SUBSCRIBE
]?.callback({
]?.[json.id]?.callback({
kind: 'update',
order: orderUpdate,
balances,
@@ -430,7 +467,7 @@ class OrionAggregatorWS {
this.subscriptions[
SubscriptionType.BROKER_TRADABLE_ATOMIC_SWAP_ASSETS_BALANCE_UPDATES_SUBSCRIBE
]?.callback(brokerBalances);
]?.default?.callback(brokerBalances);
}
break;
default:

View File

@@ -100,7 +100,7 @@ export default class PriceFeedSubscription<T extends SubscriptionType = Subscrip
const subscription = subscriptions[type];
const parseResult = subscription.schema.safeParse(json);
if (parseResult.success === false) {
const errorsMessage = parseResult.error.errors.map((error) => error.message).join(', ');
const errorsMessage = parseResult.error.errors.map((error) => `[${error.path.join('.')}] ${error.message}`).join(', ');
throw new Error(`Can't recognize PriceFeed "${type}" subscription message "${e.data.toString()}": ${errorsMessage}`);
}
this.callback(parseResult.data);