Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1.0.1/filter frequency #7

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@

# import third-party libraries
import requests
# import my libraries
from exceptions import (BinacneBulkDownloaderDownloadError,
BinanceBulkDownloaderParamsError)
from rich import print
from rich.progress import track

# import my libraries
from .exceptions import (BinacneBulkDownloaderDownloadError,
BinanceBulkDownloaderParamsError)


class BinanceBulkDownloader:
_CHUNK_SIZE = 10
_CHUNK_SIZE = 100
_BINANCE_DATA_S3_BUCKET_URL = (
"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision"
)
Expand Down Expand Up @@ -127,6 +126,8 @@ def __init__(
self._data_frequency = data_frequency
self._asset = asset
self._timeperiod_per_file = timeperiod_per_file
self.marker = None
self.is_truncated = True

def _check_params(self) -> None:
"""
Expand Down Expand Up @@ -171,49 +172,35 @@ def _check_params(self) -> None:
f"data_frequency 1s is not supported for {self._asset}."
)

def _get_file_list_from_s3_bucket(
self, prefix, marker=None, print_info=True, accumulated_length=0
) -> list:
def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False):
"""
Get files from s3 bucket
Get file list from s3 bucket
:param prefix: s3 bucket prefix
:param marker: marker
:param print_info: print info
:param accumulated_length: accumulated length from previous recursive calls
:param is_truncated: is truncated
:return: list of files
"""
if print_info:
print(f"[bold blue]Getting file list[/bold blue]: " + prefix)
files = []
print(f"[bold blue]Get file list[/bold blue]: " + prefix)
params = {"prefix": prefix, "max-keys": 1000}
if marker:
params["marker"] = marker

response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params)
tree = ElementTree.fromstring(response.content)

files = []
for content in tree.findall(
"{http://s3.amazonaws.com/doc/2006-03-01/}Contents"
):
key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text
files.append(key)
if key.endswith(".zip"):
files.append(key)
self.marker = key

accumulated_length += len(files)
print(f"[green]Found {accumulated_length} files[/green]")
# Check if there are more files to retrieve
is_truncated = tree.find(
is_truncated_element = tree.find(
"{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated"
).text
if is_truncated == "true":
last_key = files[-1]
files.extend(
self._get_file_list_from_s3_bucket(
prefix,
marker=last_key,
print_info=False,
accumulated_length=accumulated_length,
)
)
)
self.is_truncated = is_truncated_element.text == "true"

return files

Expand Down Expand Up @@ -319,21 +306,26 @@ def make_chunks(lst, n) -> list:
"""
return [lst[i : i + n] for i in range(0, len(lst), n)]

def run_download(self) -> None:
def run_download(self):
"""
Execute download concurrently
Download concurrently
:return: None
"""
print(f"[bold blue]Downloading {self._data_type}[/bold blue]")
zip_files = [
prefix
for prefix in self._get_file_list_from_s3_bucket(self._build_prefix())
if prefix.endswith(".zip")
]
# TODO: Filters by data_frequency if needed
for prefix_chunk in track(
self.make_chunks(zip_files, self._CHUNK_SIZE),
description="Downloading",
):
with ThreadPoolExecutor() as executor:
executor.map(self._download, prefix_chunk)

while self.is_truncated:
file_list_generator = self._get_file_list_from_s3_bucket(
self._build_prefix(), self.marker, self.is_truncated
)
if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE:
file_list_generator = [
prefix
for prefix in file_list_generator
if prefix.count(self._data_frequency) == 2
]
for prefix_chunk in track(
self.make_chunks(file_list_generator, self._CHUNK_SIZE),
description="Downloading",
):
with ThreadPoolExecutor() as executor:
executor.map(self._download, prefix_chunk)