Skip to content

Commit

Permalink
Merge pull request #4971 from pipixi/develop
Browse files Browse the repository at this point in the history
[Binance] Support rolling window
  • Loading branch information
timmolter authored Dec 1, 2024
2 parents c20907c + 165774c commit 757a96b
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package info.bitrich.xchangestream.binance;

import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.KLINE;
import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.TICKER_WINDOW;
import static info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper.getObjectMapper;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);

private static final JavaType TICKER_TYPE = getTickerType();
private static final JavaType WINDOW_TICKER_TYPE = getWindowTickerType();
private static final JavaType BOOK_TICKER_TYPE = getBookTickerType();
private static final JavaType TRADE_TYPE = getTradeType();
private static final JavaType DEPTH_TYPE = getDepthType();
Expand All @@ -84,13 +86,16 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
private final int oderBookFetchLimitParameter;

private final Map<Instrument, Observable<BinanceTicker24h>> tickerSubscriptions;
private final Map<Instrument, Observable<BinanceTicker24h>> rollingWindowTickerSubscriptions;
private final Map<Instrument, Observable<BinanceBookTicker>> bookTickerSubscriptions;
private final Map<Instrument, Observable<OrderBook>> orderbookSubscriptions;
private final Map<Instrument, Observable<BinanceRawTrade>> tradeSubscriptions;
private final Map<Instrument, Observable<List<OrderBookUpdate>>> orderBookUpdatesSubscriptions;
private final Map<Instrument, Map<KlineInterval, Observable<BinanceKline>>> klineSubscriptions;
private final Map<Instrument, Observable<DepthBinanceWebSocketTransaction>>
orderBookRawUpdatesSubscriptions;
private Observable<List<BinanceTicker24h>> allRollingWindowTickerSubscriptions;


/**
* A scheduler for initialisation of binance order book snapshots, which is delegated to a
Expand Down Expand Up @@ -125,6 +130,7 @@ public BinanceStreamingMarketDataService(
this.marketDataService = marketDataService;
this.onApiCall = onApiCall;
this.tickerSubscriptions = new ConcurrentHashMap<>();
this.rollingWindowTickerSubscriptions = new ConcurrentHashMap<>();
this.bookTickerSubscriptions = new ConcurrentHashMap<>();
this.orderbookSubscriptions = new ConcurrentHashMap<>();
this.tradeSubscriptions = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -211,6 +217,34 @@ public Observable<BinanceTicker24h> getRawTicker(Instrument instrument) {
instrument, s -> triggerObservableBody(rawTickerStream(instrument)).share());
}

public Observable<BinanceTicker24h> rollingWindow(Instrument instrument, KlineInterval windowSize) {
if (!service.isLiveSubscriptionEnabled()
&& !service.getProductSubscription().getTicker().contains(instrument)) {
throw new UpFrontSubscriptionRequiredException();
}
if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) {
return rollingWindowTickerSubscriptions.computeIfAbsent(
instrument, s -> triggerObservableBody(rollingWindowStream(instrument, windowSize)).share());
}else {
throw new UnsupportedOperationException("RollingWindow not supported for other window size!");
}
}

public Observable<List<BinanceTicker24h>> allRollingWindow(KlineInterval windowSize) {
if (!service.isLiveSubscriptionEnabled()) {
throw new UpFrontSubscriptionRequiredException();
}
if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) {
if(null != allRollingWindowTickerSubscriptions){
return allRollingWindowTickerSubscriptions.share();
}
allRollingWindowTickerSubscriptions = triggerObservableBody(allRollingWindowStream(windowSize)).share();
return allRollingWindowTickerSubscriptions;
}else {
throw new UnsupportedOperationException("RollingWindow not supported for other window size!");
}
}

public Observable<BinanceBookTicker> getRawBookTicker(Instrument instrument) {
if (!service.isLiveSubscriptionEnabled()
&& !service.getProductSubscription().getTicker().contains(instrument)) {
Expand Down Expand Up @@ -371,6 +405,11 @@ public void unsubscribeKline(Instrument instrument, KlineInterval klineInterval)
unsubscribe(instrument, KLINE, klineInterval);
}


public void unsubscribeAllRollingWindow(KlineInterval klineInterval) {
unsubscribe(null, TICKER_WINDOW, klineInterval);
}

private void unsubscribe(
Instrument instrument,
BinanceSubscriptionType subscriptionType,
Expand All @@ -396,6 +435,13 @@ private void unsubscribe(
case TICKER:
tickerSubscriptions.remove(instrument);
break;
case TICKER_WINDOW:
if(null == instrument){
allRollingWindowTickerSubscriptions = null;
}else {
rollingWindowTickerSubscriptions.remove(instrument);
}
break;
case BOOK_TICKER:
bookTickerSubscriptions.remove(instrument);
break;
Expand All @@ -416,6 +462,9 @@ private String getChannelId(
Instrument instrument,
BinanceSubscriptionType subscriptionType,
KlineInterval klineInterval) {
if (instrument == null && subscriptionType == TICKER_WINDOW) {
return "!" + subscriptionType.getType() + klineInterval.code() + "@arr";
}
return getChannelPrefix(instrument)
+ "@"
+ subscriptionType.getType()
Expand Down Expand Up @@ -455,6 +504,27 @@ private Observable<BinanceTicker24h> rawTickerStream(Instrument instrument) {
.map(transaction -> transaction.getData().getTicker());
}

private Observable<BinanceTicker24h> rollingWindowStream(Instrument instrument, KlineInterval windowSize) {
return this.service
.subscribeChannel(this.getChannelPrefix(instrument) + "@" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code(), new Object[0])
.map(
(it) -> this.<TickerBinanceWebsocketTransaction>readTransaction(it, TICKER_TYPE, "ticker"))
.filter(
transaction ->
BinanceAdapters.adaptSymbol(
transaction.getData().getSymbol(), instrument instanceof FuturesContract)
.equals(instrument))
.map(transaction -> transaction.getData().getTicker());
}

private Observable<List<BinanceTicker24h>> allRollingWindowStream(KlineInterval windowSize) {
return this.service
.subscribeChannel("!" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code() + "@arr", new Object[0])
.map(
(it) -> this.<List<TickerBinanceWebsocketTransaction>>readTransaction(it, WINDOW_TICKER_TYPE , "ticker"))
.map( transaction -> transaction.getData().stream().map(TickerBinanceWebsocketTransaction::getTicker).collect(Collectors.toList()));
}

private Observable<BinanceBookTicker> rawBookTickerStream(Instrument instrument) {
return service
.subscribeChannel(
Expand Down Expand Up @@ -694,6 +764,13 @@ private static JavaType getTickerType() {
new TypeReference<BinanceWebsocketTransaction<TickerBinanceWebsocketTransaction>>() {});
}

private static JavaType getWindowTickerType() {
return getObjectMapper()
.getTypeFactory()
.constructType(
new TypeReference<BinanceWebsocketTransaction<List<TickerBinanceWebsocketTransaction>>>() {});
}

private static JavaType getBookTickerType() {
return getObjectMapper()
.getTypeFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public enum BinanceSubscriptionType {
FUNDING_RATES("markPrice"),
TRADE("trade"),
TICKER("ticker"),
TICKER_WINDOW("ticker_"),
BOOK_TICKER("bookTicker"),
KLINE("kline");

Expand Down

0 comments on commit 757a96b

Please sign in to comment.