From 165774c0a249b8a391e17a2fdfb1b93826de7cba Mon Sep 17 00:00:00 2001 From: pipixi Date: Wed, 27 Nov 2024 22:22:31 +0800 Subject: [PATCH] [Binance] Support rolling window --- .../BinanceStreamingMarketDataService.java | 77 +++++++++++++++++++ .../binance/BinanceSubscriptionType.java | 1 + 2 files changed, 78 insertions(+) diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 61cf2b7f3e2..20a7075e7c2 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -1,6 +1,7 @@ package info.bitrich.xchangestream.binance; import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.KLINE; +import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.TICKER_WINDOW; import static info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper.getObjectMapper; import com.fasterxml.jackson.core.type.TypeReference; @@ -71,6 +72,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); private static final JavaType TICKER_TYPE = getTickerType(); + private static final JavaType WINDOW_TICKER_TYPE = getWindowTickerType(); private static final JavaType BOOK_TICKER_TYPE = getBookTickerType(); private static final JavaType TRADE_TYPE = getTradeType(); private static final JavaType DEPTH_TYPE = getDepthType(); @@ -84,6 +86,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private final int oderBookFetchLimitParameter; private final Map> tickerSubscriptions; + private final Map> rollingWindowTickerSubscriptions; private final Map> bookTickerSubscriptions; private final Map> orderbookSubscriptions; private final Map> tradeSubscriptions; @@ -91,6 +94,8 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private final Map>> klineSubscriptions; private final Map> orderBookRawUpdatesSubscriptions; + private Observable> allRollingWindowTickerSubscriptions; + /** * A scheduler for initialisation of binance order book snapshots, which is delegated to a @@ -125,6 +130,7 @@ public BinanceStreamingMarketDataService( this.marketDataService = marketDataService; this.onApiCall = onApiCall; this.tickerSubscriptions = new ConcurrentHashMap<>(); + this.rollingWindowTickerSubscriptions = new ConcurrentHashMap<>(); this.bookTickerSubscriptions = new ConcurrentHashMap<>(); this.orderbookSubscriptions = new ConcurrentHashMap<>(); this.tradeSubscriptions = new ConcurrentHashMap<>(); @@ -211,6 +217,34 @@ public Observable getRawTicker(Instrument instrument) { instrument, s -> triggerObservableBody(rawTickerStream(instrument)).share()); } + public Observable rollingWindow(Instrument instrument, KlineInterval windowSize) { + if (!service.isLiveSubscriptionEnabled() + && !service.getProductSubscription().getTicker().contains(instrument)) { + throw new UpFrontSubscriptionRequiredException(); + } + if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) { + return rollingWindowTickerSubscriptions.computeIfAbsent( + instrument, s -> triggerObservableBody(rollingWindowStream(instrument, windowSize)).share()); + }else { + throw new UnsupportedOperationException("RollingWindow not supported for other window size!"); + } + } + + public Observable> allRollingWindow(KlineInterval windowSize) { + if (!service.isLiveSubscriptionEnabled()) { + throw new UpFrontSubscriptionRequiredException(); + } + if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) { + if(null != allRollingWindowTickerSubscriptions){ + return allRollingWindowTickerSubscriptions.share(); + } + allRollingWindowTickerSubscriptions = triggerObservableBody(allRollingWindowStream(windowSize)).share(); + return allRollingWindowTickerSubscriptions; + }else { + throw new UnsupportedOperationException("RollingWindow not supported for other window size!"); + } + } + public Observable getRawBookTicker(Instrument instrument) { if (!service.isLiveSubscriptionEnabled() && !service.getProductSubscription().getTicker().contains(instrument)) { @@ -371,6 +405,11 @@ public void unsubscribeKline(Instrument instrument, KlineInterval klineInterval) unsubscribe(instrument, KLINE, klineInterval); } + + public void unsubscribeAllRollingWindow(KlineInterval klineInterval) { + unsubscribe(null, TICKER_WINDOW, klineInterval); + } + private void unsubscribe( Instrument instrument, BinanceSubscriptionType subscriptionType, @@ -396,6 +435,13 @@ private void unsubscribe( case TICKER: tickerSubscriptions.remove(instrument); break; + case TICKER_WINDOW: + if(null == instrument){ + allRollingWindowTickerSubscriptions = null; + }else { + rollingWindowTickerSubscriptions.remove(instrument); + } + break; case BOOK_TICKER: bookTickerSubscriptions.remove(instrument); break; @@ -416,6 +462,9 @@ private String getChannelId( Instrument instrument, BinanceSubscriptionType subscriptionType, KlineInterval klineInterval) { + if (instrument == null && subscriptionType == TICKER_WINDOW) { + return "!" + subscriptionType.getType() + klineInterval.code() + "@arr"; + } return getChannelPrefix(instrument) + "@" + subscriptionType.getType() @@ -455,6 +504,27 @@ private Observable rawTickerStream(Instrument instrument) { .map(transaction -> transaction.getData().getTicker()); } + private Observable rollingWindowStream(Instrument instrument, KlineInterval windowSize) { + return this.service + .subscribeChannel(this.getChannelPrefix(instrument) + "@" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code(), new Object[0]) + .map( + (it) -> this.readTransaction(it, TICKER_TYPE, "ticker")) + .filter( + transaction -> + BinanceAdapters.adaptSymbol( + transaction.getData().getSymbol(), instrument instanceof FuturesContract) + .equals(instrument)) + .map(transaction -> transaction.getData().getTicker()); + } + + private Observable> allRollingWindowStream(KlineInterval windowSize) { + return this.service + .subscribeChannel("!" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code() + "@arr", new Object[0]) + .map( + (it) -> this.>readTransaction(it, WINDOW_TICKER_TYPE , "ticker")) + .map( transaction -> transaction.getData().stream().map(TickerBinanceWebsocketTransaction::getTicker).collect(Collectors.toList())); + } + private Observable rawBookTickerStream(Instrument instrument) { return service .subscribeChannel( @@ -694,6 +764,13 @@ private static JavaType getTickerType() { new TypeReference>() {}); } + private static JavaType getWindowTickerType() { + return getObjectMapper() + .getTypeFactory() + .constructType( + new TypeReference>>() {}); + } + private static JavaType getBookTickerType() { return getObjectMapper() .getTypeFactory() diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java index 5a4b91d9b84..2b09105d33e 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java @@ -6,6 +6,7 @@ public enum BinanceSubscriptionType { FUNDING_RATES("markPrice"), TRADE("trade"), TICKER("ticker"), + TICKER_WINDOW("ticker_"), BOOK_TICKER("bookTicker"), KLINE("kline");