Skip to content

Commit

Permalink
Merge pull request #11 from aoki-h-jp/feature/1.1.1/faster-download
Browse files Browse the repository at this point in the history
Feature/1.1.1/faster download
  • Loading branch information
aoki-h-jp authored Sep 12, 2023
2 parents 42cd4ee + 4560906 commit 8675b34
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 26 deletions.
75 changes: 54 additions & 21 deletions bybit_bulk_downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ class BybitBulkDownloader:
"klines",
)

def __init__(self, destination_dir=".", data_type="trading"):
def __init__(
self, destination_dir=".", data_type="trading", klines_category="linear"
):
"""
:param destination_dir: Directory to save the downloaded data.
:param data_type: Data type to download. Available data types are: "kline_for_metatrader4", "premium_index", "spot_index", "trading", "fundingRate", "klines".
:param klines_category: Klines category to download. Available categories are: "linear". ("spot", "inverse" is not supported yet.)
"""
self._destination_dir = destination_dir
self._data_type = data_type
self._klines_category = klines_category
self.session = HTTP()

def _get_url_from_bybit(self):
Expand Down Expand Up @@ -67,7 +71,7 @@ def _get_url_from_bybit(self):
return download_list

@staticmethod
def make_chunks(lst, n) -> list:
def make_chunks(lst: list, n: int) -> list:
"""
Make chunks
:param lst: Raw list
Expand All @@ -76,7 +80,7 @@ def make_chunks(lst, n) -> list:
"""
return [lst[i : i + n] for i in range(0, len(lst), n)]

def _download(self, url):
def _download(self, url: str):
"""
Execute the download.
:param url: URL
Expand Down Expand Up @@ -117,11 +121,11 @@ def _download(self, url):
os.remove(filepath)
print(f"[green]Deleted: {filepath}[/green]")

def download(self, url):
def download(self, url: str):
self._download(url)

@staticmethod
def generate_dates_until_today(start_year, start_month) -> list:
def generate_dates_until_today(start_year: int, start_month: int) -> list:
"""
Generate dates until today (2 months at a time)
:param start_year:
Expand Down Expand Up @@ -189,9 +193,9 @@ def _download_fundingrate(self):

@staticmethod
def generate_dates_by_minutes_limited(
start_year, start_month, interval_minutes
start_year: int, start_month: int, start_day: int, interval_minutes=1000
) -> (list, list):
start_date = datetime(start_year, start_month, 1)
start_date = datetime(start_year, start_month, start_day)
end_date = datetime.today()

# Generating the list
Expand All @@ -204,15 +208,19 @@ def generate_dates_by_minutes_limited(
start_dt = date_list_1000min[:-1]
return start_dt

def _download_klines(self, symbol):
def _download_klines(self, symbol: str):
"""
Download klines from Bybit
:param symbol: symbol
"""
if not os.path.exists(f"{self._destination_dir}/bybit_data/klines/{symbol}"):
os.makedirs(f"{self._destination_dir}/bybit_data/klines/{symbol}")
if not os.path.exists(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}"
):
os.makedirs(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}"
)

def _download(start_time):
def __download(start_time: datetime):
df_tmp = pd.DataFrame(
columns=[
"startTime",
Expand All @@ -225,7 +233,7 @@ def _download(start_time):
]
)
for d in self.session.get_kline(
category="linear",
category=self._klines_category,
symbol=symbol,
interval="1",
limit=1000,
Expand All @@ -243,21 +251,38 @@ def _download(start_time):
self._destination_dir,
"bybit_data",
"klines",
self._klines_category,
symbol,
str(int(start_time.timestamp())) + ".csv",
]
)
print(f"[green]Saving: {save_path}[/green]")
df_tmp.to_csv(save_path)

# the oldest data is 2020-03-25
print(f"[bold blue]Initial download: {symbol}[/bold blue]")
__download(datetime(2019, 1, 1))
df_init_path = sorted(
[
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}/{file}"
for file in os.listdir(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}"
)
]
)[0]
df_init = pd.read_csv(df_init_path)
start_date = df_init["startTime"].iloc[0]
start_date = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S").date()

for start_time_chunk in self.make_chunks(
self.generate_dates_by_minutes_limited(2020, 3, 1000), self._CHUNK_SIZE
self.generate_dates_by_minutes_limited(
start_date.year, start_date.month, start_date.day, 1000
),
self._CHUNK_SIZE,
):
print(f"[bold blue]Downloading: {symbol}[/bold blue]")
print(start_time_chunk)
with ThreadPoolExecutor() as executor:
executor.map(_download, start_time_chunk)
executor.map(__download, start_time_chunk)

# merge downloaded csv
df = pd.DataFrame(
Expand All @@ -271,17 +296,23 @@ def _download(start_time):
"turnover",
]
)
for file in os.listdir(f"{self._destination_dir}/bybit_data/klines/{symbol}"):
for file in os.listdir(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}"
):
df_tmp = pd.read_csv(
f"{self._destination_dir}/bybit_data/klines/{symbol}/{file}"
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}/{file}"
)
df = pd.concat([df, df_tmp])
os.remove(f"{self._destination_dir}/bybit_data/klines/{symbol}/{file}")
os.remove(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}/{file}"
)
df = df.sort_values("startTime")
df = df.drop_duplicates(subset=["startTime"])
df.to_csv(f"{self._destination_dir}/bybit_data/klines/{symbol}/1m.csv")
df.to_csv(
f"{self._destination_dir}/bybit_data/klines/{self._klines_category}/{symbol}/1m.csv"
)

def download_klines(self, symbol):
def download_klines(self, symbol: str):
self._download_klines(symbol)

def run_download(self):
Expand All @@ -297,7 +328,9 @@ def run_download(self):
elif self._data_type == "klines":
s_list = [
d["symbol"]
for d in self.session.get_tickers(category="linear")["result"]["list"]
for d in self.session.get_tickers(category=self._klines_category)[
"result"
]["list"]
if d["symbol"][-4:] == "USDT"
]
for symbol in track(
Expand Down
15 changes: 10 additions & 5 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ def dynamic_test_params():
Generate params for tests
:return:
"""
for data_type in BybitBulkDownloader._DATA_TYPE:
for data_type in [
"kline_for_metatrader4",
"premium_index",
"spot_index",
"trading",
]:
yield pytest.param(data_type)


Expand All @@ -33,7 +38,7 @@ def test_download(tmpdir, data_type):
)
if data_type == "kline_for_metatrader4":
single_download_url = "https://public.bybit.com/kline_for_metatrader4/ADAUSDT/2022/ADAUSDT_15_2022-09-01_2022-09-30.csv.gz"
downloader._download(single_download_url)
downloader.download(single_download_url)
# If exists csv file on destination dir, test is passed.
assert os.path.exists(
os.path.join(
Expand All @@ -45,7 +50,7 @@ def test_download(tmpdir, data_type):

elif data_type == "premium_index":
single_download_url = "https://public.bybit.com/premium_index/ADAUSD/ADAUSD2022-03-24_premium_index.csv.gz"
downloader._download(single_download_url)
downloader.download(single_download_url)
# If exists csv file on destination dir, test is passed.
assert os.path.exists(
os.path.join(
Expand All @@ -57,7 +62,7 @@ def test_download(tmpdir, data_type):

elif data_type == "spot_index":
single_download_url = "https://public.bybit.com/spot_index/ADAUSD/ADAUSD2022-03-24_index_price.csv.gz"
downloader._download(single_download_url)
downloader.download(single_download_url)
# If exists csv file on destination dir, test is passed.
assert os.path.exists(
os.path.join(
Expand All @@ -67,7 +72,7 @@ def test_download(tmpdir, data_type):

elif data_type == "trading":
single_download_url = "https://public.bybit.com/trading/10000LADYSUSDT/10000LADYSUSDT2023-05-11.csv.gz"
downloader._download(single_download_url)
downloader.download(single_download_url)
# If exists csv file on destination dir, test is passed.
print(
os.path.join(
Expand Down

0 comments on commit 8675b34

Please sign in to comment.