Skip to content

Commit

Permalink
Merge pull request #7 from aoki-h-jp/feature/1.0.1/filter-frequency
Browse files Browse the repository at this point in the history
Feature/1.0.1/filter frequency
  • Loading branch information
aoki-h-jp authored Aug 25, 2023
2 parents 354e664 + 4eea155 commit 063401e
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@

# import third-party libraries
import requests
# import my libraries
from exceptions import (BinacneBulkDownloaderDownloadError,
BinanceBulkDownloaderParamsError)
from rich import print
from rich.progress import track

# import my libraries
from .exceptions import (BinacneBulkDownloaderDownloadError,
BinanceBulkDownloaderParamsError)


class BinanceBulkDownloader:
_CHUNK_SIZE = 10
_CHUNK_SIZE = 100
_BINANCE_DATA_S3_BUCKET_URL = (
"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision"
)
Expand Down Expand Up @@ -127,6 +126,8 @@ def __init__(
self._data_frequency = data_frequency
self._asset = asset
self._timeperiod_per_file = timeperiod_per_file
self.marker = None
self.is_truncated = True

def _check_params(self) -> None:
"""
Expand Down Expand Up @@ -171,49 +172,35 @@ def _check_params(self) -> None:
f"data_frequency 1s is not supported for {self._asset}."
)

def _get_file_list_from_s3_bucket(
self, prefix, marker=None, print_info=True, accumulated_length=0
) -> list:
def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False):
"""
Get files from s3 bucket
Get file list from s3 bucket
:param prefix: s3 bucket prefix
:param marker: marker
:param print_info: print info
:param accumulated_length: accumulated length from previous recursive calls
:param is_truncated: is truncated
:return: list of files
"""
if print_info:
print(f"[bold blue]Getting file list[/bold blue]: " + prefix)
files = []
print(f"[bold blue]Get file list[/bold blue]: " + prefix)
params = {"prefix": prefix, "max-keys": 1000}
if marker:
params["marker"] = marker

response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params)
tree = ElementTree.fromstring(response.content)

files = []
for content in tree.findall(
"{http://s3.amazonaws.com/doc/2006-03-01/}Contents"
):
key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text
files.append(key)
if key.endswith(".zip"):
files.append(key)
self.marker = key

accumulated_length += len(files)
print(f"[green]Found {accumulated_length} files[/green]")
# Check if there are more files to retrieve
is_truncated = tree.find(
is_truncated_element = tree.find(
"{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated"
).text
if is_truncated == "true":
last_key = files[-1]
files.extend(
self._get_file_list_from_s3_bucket(
prefix,
marker=last_key,
print_info=False,
accumulated_length=accumulated_length,
)
)
)
self.is_truncated = is_truncated_element.text == "true"

return files

Expand Down Expand Up @@ -319,21 +306,26 @@ def make_chunks(lst, n) -> list:
"""
return [lst[i : i + n] for i in range(0, len(lst), n)]

def run_download(self) -> None:
def run_download(self):
"""
Execute download concurrently
Download concurrently
:return: None
"""
print(f"[bold blue]Downloading {self._data_type}[/bold blue]")
zip_files = [
prefix
for prefix in self._get_file_list_from_s3_bucket(self._build_prefix())
if prefix.endswith(".zip")
]
# TODO: Filters by data_frequency if needed
for prefix_chunk in track(
self.make_chunks(zip_files, self._CHUNK_SIZE),
description="Downloading",
):
with ThreadPoolExecutor() as executor:
executor.map(self._download, prefix_chunk)

while self.is_truncated:
file_list_generator = self._get_file_list_from_s3_bucket(
self._build_prefix(), self.marker, self.is_truncated
)
if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE:
file_list_generator = [
prefix
for prefix in file_list_generator
if prefix.count(self._data_frequency) == 2
]
for prefix_chunk in track(
self.make_chunks(file_list_generator, self._CHUNK_SIZE),
description="Downloading",
):
with ThreadPoolExecutor() as executor:
executor.map(self._download, prefix_chunk)

0 comments on commit 063401e

Please sign in to comment.