From 2131b39ac20cf869d10ec16d79614e464dfa2b08 Mon Sep 17 00:00:00 2001 From: Hirotaka Aoki <113173839+aoki-h-jp@users.noreply.github.com> Date: Fri, 25 Aug 2023 12:41:44 +0900 Subject: [PATCH 1/2] Filter frequency --- downloader/downloader.py | 73 +++++++++++++++------------------------- 1 file changed, 28 insertions(+), 45 deletions(-) diff --git a/downloader/downloader.py b/downloader/downloader.py index 4f631d9..6ae1ee7 100644 --- a/downloader/downloader.py +++ b/downloader/downloader.py @@ -14,12 +14,12 @@ from rich.progress import track # import my libraries -from .exceptions import (BinacneBulkDownloaderDownloadError, +from exceptions import (BinacneBulkDownloaderDownloadError, BinanceBulkDownloaderParamsError) class BinanceBulkDownloader: - _CHUNK_SIZE = 10 + _CHUNK_SIZE = 100 _BINANCE_DATA_S3_BUCKET_URL = ( "https://s3-ap-northeast-1.amazonaws.com/data.binance.vision" ) @@ -127,6 +127,8 @@ def __init__( self._data_frequency = data_frequency self._asset = asset self._timeperiod_per_file = timeperiod_per_file + self.marker = None + self.is_truncated = True def _check_params(self) -> None: """ @@ -171,20 +173,15 @@ def _check_params(self) -> None: f"data_frequency 1s is not supported for {self._asset}." ) - def _get_file_list_from_s3_bucket( - self, prefix, marker=None, print_info=True, accumulated_length=0 - ) -> list: + def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False): """ - Get files from s3 bucket + Get file list from s3 bucket :param prefix: s3 bucket prefix :param marker: marker - :param print_info: print info - :param accumulated_length: accumulated length from previous recursive calls + :param is_truncated: is truncated :return: list of files """ - if print_info: - print(f"[bold blue]Getting file list[/bold blue]: " + prefix) - files = [] + print(f"[bold blue]Get file list[/bold blue]: " + prefix) params = {"prefix": prefix, "max-keys": 1000} if marker: params["marker"] = marker @@ -192,28 +189,15 @@ def _get_file_list_from_s3_bucket( response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params) tree = ElementTree.fromstring(response.content) - for content in tree.findall( - "{http://s3.amazonaws.com/doc/2006-03-01/}Contents" - ): + files = [] + for content in tree.findall("{http://s3.amazonaws.com/doc/2006-03-01/}Contents"): key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text - files.append(key) + if key.endswith(".zip"): + files.append(key) + self.marker = key - accumulated_length += len(files) - print(f"[green]Found {accumulated_length} files[/green]") - # Check if there are more files to retrieve - is_truncated = tree.find( - "{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated" - ).text - if is_truncated == "true": - last_key = files[-1] - files.extend( - self._get_file_list_from_s3_bucket( - prefix, - marker=last_key, - print_info=False, - accumulated_length=accumulated_length, - ) - ) + is_truncated_element = tree.find("{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated") + self.is_truncated = is_truncated_element.text == "true" return files @@ -319,21 +303,20 @@ def make_chunks(lst, n) -> list: """ return [lst[i : i + n] for i in range(0, len(lst), n)] - def run_download(self) -> None: + def run_download(self): """ - Execute download concurrently + Download concurrently :return: None """ print(f"[bold blue]Downloading {self._data_type}[/bold blue]") - zip_files = [ - prefix - for prefix in self._get_file_list_from_s3_bucket(self._build_prefix()) - if prefix.endswith(".zip") - ] - # TODO: Filters by data_frequency if needed - for prefix_chunk in track( - self.make_chunks(zip_files, self._CHUNK_SIZE), - description="Downloading", - ): - with ThreadPoolExecutor() as executor: - executor.map(self._download, prefix_chunk) + + while self.is_truncated: + file_list_generator = self._get_file_list_from_s3_bucket(self._build_prefix(), self.marker, self.is_truncated) + if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE: + file_list_generator = [prefix for prefix in file_list_generator if prefix.count(self._data_frequency) == 2] + for prefix_chunk in track( + self.make_chunks(file_list_generator, self._CHUNK_SIZE), + description="Downloading", + ): + with ThreadPoolExecutor() as executor: + executor.map(self._download, prefix_chunk) From 4eea155cc2ca909ac0eb03e00003a95fd05e5e77 Mon Sep 17 00:00:00 2001 From: aoki-h-jp Date: Fri, 25 Aug 2023 03:42:14 +0000 Subject: [PATCH 2/2] Apply Code Formatter Change --- downloader/downloader.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/downloader/downloader.py b/downloader/downloader.py index 6ae1ee7..c46403d 100644 --- a/downloader/downloader.py +++ b/downloader/downloader.py @@ -10,12 +10,11 @@ # import third-party libraries import requests -from rich import print -from rich.progress import track - # import my libraries from exceptions import (BinacneBulkDownloaderDownloadError, - BinanceBulkDownloaderParamsError) + BinanceBulkDownloaderParamsError) +from rich import print +from rich.progress import track class BinanceBulkDownloader: @@ -190,13 +189,17 @@ def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False) tree = ElementTree.fromstring(response.content) files = [] - for content in tree.findall("{http://s3.amazonaws.com/doc/2006-03-01/}Contents"): + for content in tree.findall( + "{http://s3.amazonaws.com/doc/2006-03-01/}Contents" + ): key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text if key.endswith(".zip"): files.append(key) self.marker = key - is_truncated_element = tree.find("{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated") + is_truncated_element = tree.find( + "{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated" + ) self.is_truncated = is_truncated_element.text == "true" return files @@ -311,12 +314,18 @@ def run_download(self): print(f"[bold blue]Downloading {self._data_type}[/bold blue]") while self.is_truncated: - file_list_generator = self._get_file_list_from_s3_bucket(self._build_prefix(), self.marker, self.is_truncated) + file_list_generator = self._get_file_list_from_s3_bucket( + self._build_prefix(), self.marker, self.is_truncated + ) if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE: - file_list_generator = [prefix for prefix in file_list_generator if prefix.count(self._data_frequency) == 2] + file_list_generator = [ + prefix + for prefix in file_list_generator + if prefix.count(self._data_frequency) == 2 + ] for prefix_chunk in track( - self.make_chunks(file_list_generator, self._CHUNK_SIZE), - description="Downloading", + self.make_chunks(file_list_generator, self._CHUNK_SIZE), + description="Downloading", ): with ThreadPoolExecutor() as executor: executor.map(self._download, prefix_chunk)