diff --git a/downloader/downloader.py b/downloader/downloader.py index 4f631d9..c46403d 100644 --- a/downloader/downloader.py +++ b/downloader/downloader.py @@ -10,16 +10,15 @@ # import third-party libraries import requests +# import my libraries +from exceptions import (BinacneBulkDownloaderDownloadError, + BinanceBulkDownloaderParamsError) from rich import print from rich.progress import track -# import my libraries -from .exceptions import (BinacneBulkDownloaderDownloadError, - BinanceBulkDownloaderParamsError) - class BinanceBulkDownloader: - _CHUNK_SIZE = 10 + _CHUNK_SIZE = 100 _BINANCE_DATA_S3_BUCKET_URL = ( "https://s3-ap-northeast-1.amazonaws.com/data.binance.vision" ) @@ -127,6 +126,8 @@ def __init__( self._data_frequency = data_frequency self._asset = asset self._timeperiod_per_file = timeperiod_per_file + self.marker = None + self.is_truncated = True def _check_params(self) -> None: """ @@ -171,20 +172,15 @@ def _check_params(self) -> None: f"data_frequency 1s is not supported for {self._asset}." ) - def _get_file_list_from_s3_bucket( - self, prefix, marker=None, print_info=True, accumulated_length=0 - ) -> list: + def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False): """ - Get files from s3 bucket + Get file list from s3 bucket :param prefix: s3 bucket prefix :param marker: marker - :param print_info: print info - :param accumulated_length: accumulated length from previous recursive calls + :param is_truncated: is truncated :return: list of files """ - if print_info: - print(f"[bold blue]Getting file list[/bold blue]: " + prefix) - files = [] + print(f"[bold blue]Get file list[/bold blue]: " + prefix) params = {"prefix": prefix, "max-keys": 1000} if marker: params["marker"] = marker @@ -192,28 +188,19 @@ def _get_file_list_from_s3_bucket( response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params) tree = ElementTree.fromstring(response.content) + files = [] for content in tree.findall( "{http://s3.amazonaws.com/doc/2006-03-01/}Contents" ): key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text - files.append(key) + if key.endswith(".zip"): + files.append(key) + self.marker = key - accumulated_length += len(files) - print(f"[green]Found {accumulated_length} files[/green]") - # Check if there are more files to retrieve - is_truncated = tree.find( + is_truncated_element = tree.find( "{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated" - ).text - if is_truncated == "true": - last_key = files[-1] - files.extend( - self._get_file_list_from_s3_bucket( - prefix, - marker=last_key, - print_info=False, - accumulated_length=accumulated_length, - ) - ) + ) + self.is_truncated = is_truncated_element.text == "true" return files @@ -319,21 +306,26 @@ def make_chunks(lst, n) -> list: """ return [lst[i : i + n] for i in range(0, len(lst), n)] - def run_download(self) -> None: + def run_download(self): """ - Execute download concurrently + Download concurrently :return: None """ print(f"[bold blue]Downloading {self._data_type}[/bold blue]") - zip_files = [ - prefix - for prefix in self._get_file_list_from_s3_bucket(self._build_prefix()) - if prefix.endswith(".zip") - ] - # TODO: Filters by data_frequency if needed - for prefix_chunk in track( - self.make_chunks(zip_files, self._CHUNK_SIZE), - description="Downloading", - ): - with ThreadPoolExecutor() as executor: - executor.map(self._download, prefix_chunk) + + while self.is_truncated: + file_list_generator = self._get_file_list_from_s3_bucket( + self._build_prefix(), self.marker, self.is_truncated + ) + if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE: + file_list_generator = [ + prefix + for prefix in file_list_generator + if prefix.count(self._data_frequency) == 2 + ] + for prefix_chunk in track( + self.make_chunks(file_list_generator, self._CHUNK_SIZE), + description="Downloading", + ): + with ThreadPoolExecutor() as executor: + executor.map(self._download, prefix_chunk)