diff --git a/.circleci/config.yml b/.circleci/config.yml index f344ae6..b586bf2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,8 +1,20 @@ -version: 2 -jobs: - build: +version: 2.1 +orbs: + slack: circleci/slack@3.4.2 + +executors: + docker-executor: docker: - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester + +jobs: + build: + executor: docker-executor + steps: + - run: echo "CI Done" + + ensure_env: + executor: docker-executor steps: - checkout - run: @@ -12,39 +24,93 @@ jobs: source /usr/local/share/virtualenvs/tap-mambu/bin/activate pip install -U 'pip<19.2' 'setuptools<51.0.0' pip install .[dev] - - add_ssh_keys - - run: - name: 'JSON Validator' - command: | - source /usr/local/share/virtualenvs/tap-tester/bin/activate - stitch-validate-json tap_mambu/helpers/schemas/*.json + aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox /usr/local/share/virtualenvs/dev_env.sh + - slack/notify-on-failure: + only_for_branches: master + - persist_to_workspace: + root: /usr/local/share/virtualenvs + paths: + - tap-mambu + - dev_env.sh + + run_unit_tests: + executor: docker-executor + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs - run: - when: always name: 'Unit Tests' command: | source /usr/local/share/virtualenvs/tap-mambu/bin/activate + pip install parameterized pytest tests/unittests + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov + + run_integration_tests: + executor: docker-executor + parallelism: 5 + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs - run: - name: 'Integration Tests' + name: 'Run Integration Tests' + no_output_timeout: 30m command: | - aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh - source dev_env.sh + source /usr/local/share/virtualenvs/dev_env.sh + mkdir /tmp/${CIRCLE_PROJECT_REPONAME} + export STITCH_CONFIG_DIR=/tmp/${CIRCLE_PROJECT_REPONAME} source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-mambu tests + circleci tests glob "tests/test_*.py" | circleci tests split > ./tests-to-run + if [ -s ./tests-to-run ]; then + for test_file in $(cat ./tests-to-run) + do + echo $test_file > $STITCH_CONFIG_DIR/tap_test.txt + run-test --tap=${CIRCLE_PROJECT_REPONAME} $test_file + done + fi + - slack/notify-on-failure: + only_for_branches: master + - store_artifacts: + path: /tmp/tap-mambu + workflows: version: 2 - commit: + commit: &commit_jobs jobs: + - ensure_env: + context: + - circleci-user + - tier-1-tap-user + - run_unit_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - run_integration_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env - build: - context: circleci-user + context: + - circleci-user + - tier-1-tap-user + requires: + - run_unit_tests + - run_integration_tests build_daily: + <<: *commit_jobs triggers: - schedule: - cron: "0 0 * * *" + cron: "0 1 * * *" filters: branches: only: - master - jobs: - - build: - context: circleci-user diff --git a/CHANGELOG.md b/CHANGELOG.md index 7058feb..e8cfd7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 4.2.0 + * Perfomance improvements [#113](https://github.com/singer-io/tap-mambu/pull/113) + * Implement date windowing (default size = 1 day) and pagination for multi-threaded streams + * Limit generator buffer growth to finite boundaries + * Revise bookmark strategy for multi-threaded generators to address data discrepancies + * Segregate LoanAccounts sub-stream bookmarking to rectify data inconsistencies + * Eliminate performance metrics to reduce performance overheads + ## 4.1.0 * Change clients stream to full table sync [#111](https://github.com/singer-io/tap-mambu/pull/111) diff --git a/README.md b/README.md index 22af6b1..72c8042 100644 --- a/README.md +++ b/README.md @@ -254,10 +254,13 @@ This tap: "lookback_window": 30, "user_agent": "tap-mambu ", "page_size": "500", - "apikey_audit": "AUDIT_TRAIL_APIKEY" + "apikey_audit": "AUDIT_TRAIL_APIKEY", + "window_size": 7 } ``` - + + Note: The `window_size` parameter defaults to 1 day, which may cause slowdowns in historical sync for streams utilizing multi-threaded implementation. Conversely, using a larger `window_size` could lead to potential `out-of-memory` issues. It is advisable to select an optimal `window_size` based on the `start_date` and volume of data to mitigate these concerns. + Optionally, also create a `state.json` file. `currently_syncing` is an optional attribute used for identifying the last object to be synced in case the job is interrupted mid-stream. The next run would begin where the last job left off. ```json diff --git a/setup.py b/setup.py index 17baaa1..77fc623 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-mambu', - version='4.1.0', + version='4.2.0', description='Singer.io tap for extracting data from the Mambu 2.0 API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_mambu/__init__.py b/tap_mambu/__init__.py index 287a4ae..71236bd 100644 --- a/tap_mambu/__init__.py +++ b/tap_mambu/__init__.py @@ -35,7 +35,8 @@ def main(): parsed_args.config['subdomain'], parsed_args.config.get('apikey_audit'), int(parsed_args.config.get('page_size', DEFAULT_PAGE_SIZE)), - user_agent=parsed_args.config['user_agent']) as client: + user_agent=parsed_args.config['user_agent'], + window_size=parsed_args.config.get('window_size')) as client: state = {} if parsed_args.state: diff --git a/tap_mambu/helpers/client.py b/tap_mambu/helpers/client.py index 12c2284..1905c73 100644 --- a/tap_mambu/helpers/client.py +++ b/tap_mambu/helpers/client.py @@ -1,9 +1,11 @@ import backoff import requests import requests.adapters -from requests.exceptions import ConnectionError +from requests.exceptions import ConnectionError, ChunkedEncodingError from singer import metrics, get_logger +from urllib3.exceptions import ProtocolError +from tap_mambu.helpers.constants import DEFAULT_DATE_WINDOW_SIZE LOGGER = get_logger() class ClientError(Exception): @@ -118,13 +120,21 @@ def __init__(self, subdomain, apikey_audit, page_size, - user_agent=''): + user_agent='', + window_size=DEFAULT_DATE_WINDOW_SIZE): self.__username = username self.__password = password self.__subdomain = subdomain base_url = "https://{}.mambu.com/api".format(subdomain) self.base_url = base_url self.page_size = page_size + try: + self.window_size = int(float(window_size)) if window_size else DEFAULT_DATE_WINDOW_SIZE + if self.window_size <= 0: + raise ValueError() + except ValueError: + raise Exception("The entered window size '{}' is invalid; it should be a valid non-zero integer.".format(window_size)) + self.__user_agent = f'MambuTap-{user_agent}' if user_agent else 'MambuTap' self.__apikey = apikey self.__session = requests.Session() @@ -178,7 +188,8 @@ def check_access(self): return True @backoff.on_exception(backoff.expo, - (MambuInternalServiceError, ConnectionError, MambuApiLimitError), + (MambuInternalServiceError, ConnectionError, + ChunkedEncodingError, MambuApiLimitError, ProtocolError), max_tries=7, factor=3) def request(self, method, path=None, url=None, json=None, version=None, apikey_type=None, **kwargs): diff --git a/tap_mambu/helpers/constants.py b/tap_mambu/helpers/constants.py index fef71d5..bb05042 100644 --- a/tap_mambu/helpers/constants.py +++ b/tap_mambu/helpers/constants.py @@ -1 +1,2 @@ DEFAULT_PAGE_SIZE = 200 +DEFAULT_DATE_WINDOW_SIZE = 1 diff --git a/tap_mambu/helpers/exceptions.py b/tap_mambu/helpers/exceptions.py index ed1ad8a..46ef2f8 100644 --- a/tap_mambu/helpers/exceptions.py +++ b/tap_mambu/helpers/exceptions.py @@ -3,3 +3,6 @@ class NoDeduplicationCapabilityException(Exception): class NoDeduplicationKeyException(Exception): pass + +class MambuGeneratorThreadNotAlive(Exception): + pass diff --git a/tap_mambu/helpers/metrics_plotter.py b/tap_mambu/helpers/metrics_plotter.py deleted file mode 100644 index 3324f73..0000000 --- a/tap_mambu/helpers/metrics_plotter.py +++ /dev/null @@ -1,34 +0,0 @@ -# pragma pylint: disable=protected-access -import matplotlib.pyplot as plt -from matplotlib.lines import Line2D - -from .perf_metrics import PerformanceMetrics as PerfMetrics - - -# noinspection PyProtectedMember -def show_thread_graph(): - all_timestamps = [(*generator_time, "r", "Generator") for generator_time in PerfMetrics._metrics["generator"]] + \ - [(*processor_time, "b", "Processor") for processor_time in PerfMetrics._metrics["processor"]] - counter = 0 - total_time = 0 - for timestamp in sorted(all_timestamps, key=lambda ts: ts[0]): - start_time = round(timestamp[0] - PerfMetrics._metrics_start_time, 1) - end_time = round(timestamp[1] - PerfMetrics._metrics_start_time, 1) - plt.plot([start_time, end_time], - [counter, counter], color=timestamp[2], label=timestamp[3], linestyle="-") - counter += 1 - if end_time > total_time: - total_time = end_time - plt.title(f"Total execution time: {total_time}s") - plt.ylabel("Timestamp") - plt.legend([Line2D([0], [0], color="r", lw=4), Line2D([0], [0], color="b", lw=4)], ["Generator", "Processor"]) - - plt.show() - - -# noinspection PyProtectedMember -def show_request_duration_graph(): - data_points = [record[1] - record[0] for record in PerfMetrics._metrics["generator"]] - x = list(range(len(data_points))) - plt.bar(x, data_points) - plt.show() \ No newline at end of file diff --git a/tap_mambu/helpers/multithreaded_requests.py b/tap_mambu/helpers/multithreaded_requests.py index 724f58a..979b277 100644 --- a/tap_mambu/helpers/multithreaded_requests.py +++ b/tap_mambu/helpers/multithreaded_requests.py @@ -1,7 +1,6 @@ import singer from concurrent.futures import Future, ThreadPoolExecutor from typing import List -from .perf_metrics import PerformanceMetrics LOGGER = singer.get_logger() @@ -24,16 +23,15 @@ def run(client, stream_name, f'{endpoint_api_version}): {client.base_url}/{endpoint_path}?{endpoint_querystring}' f' - body = {endpoint_body}') - with PerformanceMetrics(metric_name="generator"): - response = client.request( - method=endpoint_api_method, - path=endpoint_path, - version=endpoint_api_version, - apikey_type=endpoint_api_key_type, - params=endpoint_querystring, - endpoint=stream_name, - json=endpoint_body - ) + response = client.request( + method=endpoint_api_method, + path=endpoint_path, + version=endpoint_api_version, + apikey_type=endpoint_api_key_type, + params=endpoint_querystring, + endpoint=stream_name, + json=endpoint_body + ) LOGGER.info(f'(generator) Stream {stream_name} - extracted records: {len(response)}') return response diff --git a/tap_mambu/helpers/perf_metrics.py b/tap_mambu/helpers/perf_metrics.py deleted file mode 100644 index 3330699..0000000 --- a/tap_mambu/helpers/perf_metrics.py +++ /dev/null @@ -1,77 +0,0 @@ -import math -import time - - -class PerformanceMetrics: - _metrics_start_time = time.monotonic() - _metrics = dict(generator=list(), - processor=list(), - generator_wait=list(), - processor_wait=list()) - _generator_batch_size = 500 - - def __init__(self, metric_name): - self.start_time = None - if metric_name not in self._metrics: - raise ValueError("One argument must be True, but only one!") - self._metric_name = metric_name - - def __enter__(self): - self.start_time = time.monotonic() - - def __exit__(self, exc_type, exc_val, exc_tb): - metric = (self.start_time, time.monotonic()) - self._metrics[self._metric_name].append(metric) - - @classmethod - def reset_metrics(cls): - cls._metrics_start_time = time.monotonic() - cls._metrics = dict(generator=list(), - processor=list(), - generator_wait=list(), - processor_wait=list()) - - @classmethod - def set_generator_batch_size(cls, batch_size): - if any(cls._metrics.values()): - raise RuntimeError("You cannot change batch size after measuring metrics!") - cls._generator_batch_size = batch_size - - @property - def generator_batch_size(self): - return self._generator_batch_size - - @staticmethod - def get_sum(metric): - if not metric: - return 0 - return sum([record[1] - record[0] for record in metric]) - - @staticmethod - def get_avg_with_98th(metric): - if not metric: - return 0, 0 - values_total = sorted([record[1] - record[0] for record in metric], reverse=True) - values_98th = values_total[:math.ceil(len(values_total) * 2 / 100)] - - average = sum(values_total) / len(values_total) - average_98th = sum(values_98th) / len(values_98th) - - return average, average_98th - - @classmethod - def get_statistics(cls): - extraction_duration = time.monotonic() - cls._metrics_start_time - - generator_avg, generator_avg_98th = cls.get_avg_with_98th(cls._metrics["generator"]) - processor_avg, processor_avg_98th = cls.get_avg_with_98th(cls._metrics["processor"]) - generator_wait = cls.get_sum(cls._metrics["generator_wait"]) - processor_wait = cls.get_sum(cls._metrics["processor_wait"]) - - return dict(generator=generator_avg / cls._generator_batch_size, - generator_98th=generator_avg_98th / cls._generator_batch_size, - processor=processor_avg, processor_98th=processor_avg_98th, - generator_wait=generator_wait, - processor_wait=processor_wait, - records=len(cls._metrics["processor"]) // extraction_duration, - extraction=extraction_duration) diff --git a/tap_mambu/sync.py b/tap_mambu/sync.py index 14653b1..4d4b2e4 100644 --- a/tap_mambu/sync.py +++ b/tap_mambu/sync.py @@ -5,7 +5,6 @@ from .helpers.datetime_utils import get_timezone_info from .helpers.generator_processor_pairs import get_generator_processor_for_stream, get_stream_subtypes from .helpers.multithreaded_requests import MultithreadedRequestsPool -from .helpers.perf_metrics import PerformanceMetrics LOGGER = singer.get_logger() @@ -38,8 +37,6 @@ def sync_all_streams(client, config, catalog, state): get_timezone_info(client) - PerformanceMetrics.set_generator_batch_size(int(config.get("page_size", DEFAULT_PAGE_SIZE))) - selected_streams = get_selected_streams(catalog) LOGGER.info('selected_streams: {}'.format(selected_streams)) @@ -75,7 +72,6 @@ def sync_all_streams(client, config, catalog, state): LOGGER.info('START Syncing: {}, Type: {}'.format(stream_name, sub_type)) update_currently_syncing(state, stream_name) - PerformanceMetrics.reset_metrics() total_records = sync_endpoint( client=client, catalog=catalog, @@ -91,21 +87,4 @@ def sync_all_streams(client, config, catalog, state): total_records)) LOGGER.info('FINISHED Syncing: {}'.format(stream_name)) - statistics = PerformanceMetrics.get_statistics() - - if statistics['generator'] and statistics['generator_98th']: - LOGGER.info(f"Average Generator Records/s: {round(1/statistics['generator'])} " - f"[98th percentile: {round(1/statistics['generator_98th'])}]") - - if statistics['processor'] and statistics['processor_98th']: - LOGGER.info(f"Average Processor Records/s: {round(1/statistics['processor'])} " - f"[98th percentile: {round(1/statistics['processor_98th'])}]") - - LOGGER.info(f"Total Generator Wait (s): {round(statistics['generator_wait'], 1)} ") - - LOGGER.info(f"Total Processor Wait (s): {round(statistics['processor_wait'], 1)} ") - - LOGGER.info(f"Average Records/s: {statistics['records']}") - LOGGER.info(f"Total Duration: {statistics['extraction']}") - MultithreadedRequestsPool.shutdown() diff --git a/tap_mambu/tap_generators/activities_generator.py b/tap_mambu/tap_generators/activities_generator.py index 4133bae..afa7140 100644 --- a/tap_mambu/tap_generators/activities_generator.py +++ b/tap_mambu/tap_generators/activities_generator.py @@ -1,6 +1,6 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkDayByDayGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime, utc_now +from ..helpers.datetime_utils import datetime_to_utc_str +from datetime import datetime class ActivitiesGenerator(MultithreadedBookmarkDayByDayGenerator): @@ -9,12 +9,12 @@ def _init_endpoint_config(self): self.endpoint_path = "activities" self.endpoint_api_method = "GET" self.endpoint_api_version = "v1" - - self.endpoint_params["from"] = datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10] - self.endpoint_params["to"] = datetime_to_utc_str(utc_now())[:10] self.endpoint_bookmark_field = "timestamp" + def modify_request_params(self, start, end): + self.static_params["from"] = datetime.strftime(start, '%Y-%m-%d') + self.static_params["to"] = datetime.strftime(end, '%Y-%m-%d') + @staticmethod def unpack_activity(record): record.update(record["activity"]) diff --git a/tap_mambu/tap_generators/child_generator.py b/tap_mambu/tap_generators/child_generator.py index 1c3c5e0..52f9b9a 100644 --- a/tap_mambu/tap_generators/child_generator.py +++ b/tap_mambu/tap_generators/child_generator.py @@ -1,4 +1,5 @@ from .generator import TapGenerator +from typing import List class ChildGenerator(TapGenerator): @@ -10,3 +11,7 @@ def _init_endpoint_config(self): super(ChildGenerator, self)._init_endpoint_config() self.endpoint_path = f"{self.endpoint_parent_id}" # include parent id in endpoint path + def _init_buffers(self): + self.buffer: List = list() + self.max_buffer_size = 1000 + diff --git a/tap_mambu/tap_generators/clients_generator.py b/tap_mambu/tap_generators/clients_generator.py index c9ed17e..fe42767 100644 --- a/tap_mambu/tap_generators/clients_generator.py +++ b/tap_mambu/tap_generators/clients_generator.py @@ -1,8 +1,24 @@ -from .generator import TapGenerator +from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator +from ..helpers.datetime_utils import datetime_to_local_str -class ClientsGenerator(TapGenerator): +class ClientsGenerator(MultithreadedBookmarkGenerator): + def _init_config(self): + super()._init_config() + self.max_threads = 10 + def _init_endpoint_config(self): super(ClientsGenerator, self)._init_endpoint_config() - self.endpoint_path = "clients" - self.endpoint_api_method = "GET" + self.endpoint_path = "clients:search" + self.endpoint_bookmark_field = "lastModifiedDate" + self.endpoint_sorting_criteria = { + "field": "lastModifiedDate", + "order": "ASC" + } + + def prepare_batch_params(self): + super(ClientsGenerator, self).prepare_batch_params() + self.endpoint_filter_criteria[0]["value"] = datetime_to_local_str(self.endpoint_intermediary_bookmark_value) + + def write_sub_stream_bookmark(self, start): + pass diff --git a/tap_mambu/tap_generators/communications_generator.py b/tap_mambu/tap_generators/communications_generator.py index 561559d..9ece42b 100644 --- a/tap_mambu/tap_generators/communications_generator.py +++ b/tap_mambu/tap_generators/communications_generator.py @@ -1,6 +1,6 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_local_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_local_str, datetime_to_utc_str +from datetime import datetime class CommunicationsGenerator(MultithreadedBookmarkGenerator): @@ -12,17 +12,23 @@ def _init_endpoint_config(self): "paginationDetails": "OFF" } self.endpoint_bookmark_field = "creationDate" - self.endpoint_filter_criteria = [ + + def modify_request_params(self, start, end): + self.endpoint_body = [ + { + "field": self.endpoint_bookmark_field, + "operator": "AFTER", + "value": datetime_to_utc_str(start) + }, + { + "field": self.endpoint_bookmark_field, + "operator": "BEFORE", + "value": datetime_to_utc_str(start) + }, { "field": "state", "operator": "EQUALS", "value": "SENT" - }, - { - "field": "creationDate", - "operator": "AFTER", - "value": datetime_to_local_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) } ] @@ -31,4 +37,4 @@ def _init_endpoint_body(self): def prepare_batch_params(self): super(CommunicationsGenerator, self).prepare_batch_params() - self.endpoint_filter_criteria[1]["value"] = datetime_to_local_str(self.endpoint_intermediary_bookmark_value) + self.endpoint_filter_criteria[0]["value"] = datetime_to_local_str(self.endpoint_intermediary_bookmark_value) diff --git a/tap_mambu/tap_generators/deposit_accounts_generator.py b/tap_mambu/tap_generators/deposit_accounts_generator.py index be99802..d592a2d 100644 --- a/tap_mambu/tap_generators/deposit_accounts_generator.py +++ b/tap_mambu/tap_generators/deposit_accounts_generator.py @@ -1,6 +1,5 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_utc_str class DepositAccountsGenerator(MultithreadedBookmarkGenerator): @@ -11,19 +10,11 @@ def _init_config(self): def _init_endpoint_config(self): super(DepositAccountsGenerator, self)._init_endpoint_config() self.endpoint_path = "deposits:search" + self.endpoint_bookmark_field = "lastModifiedDate" self.endpoint_sorting_criteria = { "field": "lastModifiedDate", "order": "ASC" } - self.endpoint_filter_criteria = [ - { - "field": "lastModifiedDate", - "operator": "AFTER", - "value": datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] - self.endpoint_bookmark_field = "lastModifiedDate" def prepare_batch_params(self): super(DepositAccountsGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/deposit_transactions_generator.py b/tap_mambu/tap_generators/deposit_transactions_generator.py index aa83ff4..d528023 100644 --- a/tap_mambu/tap_generators/deposit_transactions_generator.py +++ b/tap_mambu/tap_generators/deposit_transactions_generator.py @@ -1,26 +1,13 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_utc_str class DepositTransactionsGenerator(MultithreadedBookmarkGenerator): - def _init_config(self): - super()._init_config() - self.max_threads = 5 - def _init_endpoint_config(self): super(DepositTransactionsGenerator, self)._init_endpoint_config() self.endpoint_path = "deposits/transactions:search" self.endpoint_bookmark_field = "creationDate" self.endpoint_sorting_criteria["field"] = "id" - self.endpoint_filter_criteria = [ - { - "field": "creationDate", - "operator": "AFTER", - "value": datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] def prepare_batch_params(self): super(DepositTransactionsGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/generator.py b/tap_mambu/tap_generators/generator.py index dbd5fa9..4af7e69 100644 --- a/tap_mambu/tap_generators/generator.py +++ b/tap_mambu/tap_generators/generator.py @@ -4,7 +4,6 @@ from ..helpers import transform_json from ..helpers.hashable_dict import HashableDict -from ..helpers.perf_metrics import PerformanceMetrics LOGGER = get_logger() @@ -16,6 +15,10 @@ def __init__(self, stream_name, client, config, state, sub_type): self.config = config self.state = state self.sub_type = sub_type + self.date_windowing = False + self.date_window_size = client.window_size + self.start_windows_datetime_str = None + self.sub_stream_name = stream_name # Define parameters inside init self.params = dict() @@ -54,6 +57,7 @@ def _init_endpoint_body(self): def _init_buffers(self): self.buffer: List = list() + self.max_buffer_size = 10000 def _init_params(self): self.time_extracted = None @@ -63,6 +67,11 @@ def _init_params(self): self.params = self.static_params def _all_fetch_batch_steps(self): + # Large buffer size can impact memory utilization of connector + # so empty the buffer once it reaches default max limit + if len(self.buffer) > self.max_buffer_size: + return + self.prepare_batch() raw_batch = self.fetch_batch() self.buffer = transform_json(raw_batch, self.stream_name) @@ -85,7 +94,6 @@ def next(self): return self.buffer.pop(0) def __next__(self): - # with PerformanceMetrics(metric_name="processor_wait"): return self.next() def prepare_batch(self): @@ -108,17 +116,25 @@ def fetch_batch(self): f'{self.endpoint_api_version}): {self.client.base_url}/{self.endpoint_path}?{endpoint_querystring}') LOGGER.info(f'(generator) Stream {self.stream_name} - body = {self.endpoint_body}') - with PerformanceMetrics(metric_name="generator"): - response = self.client.request( - method=self.endpoint_api_method, - path=self.endpoint_path, - version=self.endpoint_api_version, - apikey_type=self.endpoint_api_key_type, - params=endpoint_querystring, - endpoint=self.stream_name, - json=self.endpoint_body - ) + response = self.client.request( + method=self.endpoint_api_method, + path=self.endpoint_path, + version=self.endpoint_api_version, + apikey_type=self.endpoint_api_key_type, + params=endpoint_querystring, + endpoint=self.stream_name, + json=self.endpoint_body + ) self.time_extracted = utils.now() LOGGER.info(f'(generator) Stream {self.stream_name} - extracted records: {len(response)}') return self.transform_batch(response) + + def get_default_start_value(self): + return None + + def set_last_sync_completed(self, end_time): + pass + + def wait_for_slibling_to_catchup(self): + pass diff --git a/tap_mambu/tap_generators/gl_journal_entries_generator.py b/tap_mambu/tap_generators/gl_journal_entries_generator.py index 6b54305..9070d6d 100644 --- a/tap_mambu/tap_generators/gl_journal_entries_generator.py +++ b/tap_mambu/tap_generators/gl_journal_entries_generator.py @@ -1,13 +1,8 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime, utc_now +from ..helpers.datetime_utils import datetime_to_utc_str class GlJournalEntriesGenerator(MultithreadedBookmarkGenerator): - def _init_config(self): - super()._init_config() - self.max_threads = 5 - def _init_endpoint_config(self): super(GlJournalEntriesGenerator, self)._init_endpoint_config() self.endpoint_path = "gljournalentries:search" @@ -16,15 +11,6 @@ def _init_endpoint_config(self): "field": "entryId", "order": "ASC" } - self.endpoint_filter_criteria = [ - { - "field": "creationDate", - "operator": "BETWEEN", - "value": datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))), - "secondValue": datetime_to_utc_str(utc_now()) - } - ] def prepare_batch_params(self): super(GlJournalEntriesGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/groups_generator.py b/tap_mambu/tap_generators/groups_generator.py index bcabdcd..8e38037 100644 --- a/tap_mambu/tap_generators/groups_generator.py +++ b/tap_mambu/tap_generators/groups_generator.py @@ -1,6 +1,5 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_local_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_local_str class GroupsGenerator(MultithreadedBookmarkGenerator): @@ -12,14 +11,6 @@ def _init_endpoint_config(self): "field": "lastModifiedDate", "order": "ASC" } - self.endpoint_filter_criteria = [ - { - "field": "lastModifiedDate", - "operator": "AFTER", - "value": datetime_to_local_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] def prepare_batch_params(self): super(GroupsGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/installments_generator.py b/tap_mambu/tap_generators/installments_generator.py index 10e4c99..a9810f3 100644 --- a/tap_mambu/tap_generators/installments_generator.py +++ b/tap_mambu/tap_generators/installments_generator.py @@ -3,6 +3,10 @@ class InstallmentsGenerator(MultithreadedOffsetGenerator): + def __init__(self, stream_name, client, config, state, sub_type): + super(InstallmentsGenerator, self).__init__(stream_name, client, config, state, sub_type) + self.date_windowing = False + def _init_endpoint_config(self): super(InstallmentsGenerator, self)._init_endpoint_config() self.endpoint_path = "installments" diff --git a/tap_mambu/tap_generators/interest_accrual_breakdown_generator.py b/tap_mambu/tap_generators/interest_accrual_breakdown_generator.py index 7b094f6..a9bd7ab 100644 --- a/tap_mambu/tap_generators/interest_accrual_breakdown_generator.py +++ b/tap_mambu/tap_generators/interest_accrual_breakdown_generator.py @@ -1,6 +1,5 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkDayByDayGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_utc_str class InterestAccrualBreakdownGenerator(MultithreadedBookmarkDayByDayGenerator): @@ -12,14 +11,6 @@ def _init_endpoint_config(self): "field": "entryId", "order": "ASC" } - self.endpoint_filter_criteria = [ - { - "field": "creationDate", - "operator": "AFTER", - "value": datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10] - } - ] def prepare_batch_params(self): super(InterestAccrualBreakdownGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/loan_accounts_generator.py b/tap_mambu/tap_generators/loan_accounts_generator.py index 1d8ee5b..5ea57e0 100644 --- a/tap_mambu/tap_generators/loan_accounts_generator.py +++ b/tap_mambu/tap_generators/loan_accounts_generator.py @@ -1,44 +1,66 @@ -import abc +from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator +from ..helpers.datetime_utils import datetime_to_utc_str, str_to_datetime +from ..helpers import get_bookmark, write_bookmark -from .generator import TapGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import str_to_localized_datetime, datetime_to_local_str +class LoanAccountsLMGenerator(MultithreadedBookmarkGenerator): + def __init__(self, stream_name, client, config, state, sub_type): + super(LoanAccountsLMGenerator, self).__init__(stream_name, client, config, state, sub_type) + self.max_threads = 3 + self.sub_stream_name = "loan_accounts_lmg" -class LoanAccountsGenerator(TapGenerator): - @abc.abstractmethod def _init_endpoint_config(self): - super(LoanAccountsGenerator, self)._init_endpoint_config() + super(LoanAccountsLMGenerator, self)._init_endpoint_config() self.endpoint_path = "loans:search" + self.endpoint_bookmark_field = "lastModifiedDate" self.endpoint_sorting_criteria = { "field": "id", "order": "ASC" } + def prepare_batch_params(self): + super(LoanAccountsLMGenerator, self).prepare_batch_params() + self.endpoint_filter_criteria[0]["value"] = datetime_to_utc_str( + self.endpoint_intermediary_bookmark_value) + + def write_sub_stream_bookmark(self, start): + write_bookmark(self.state, self.sub_stream_name, self.sub_type, start) + + def get_default_start_value(self): + # Historical sync will use start date as as date window + # Increamental syncs will use last stream bookmark value + # Interrupted syncs will use last winodow of sub-stream as first date window + stream_bookmark = get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date) + sub_stream_bookmark = get_bookmark(self.state, self.sub_stream_name, self.sub_type, stream_bookmark) + if self.compare_bookmark_values(sub_stream_bookmark, stream_bookmark): + start_value = stream_bookmark + else: + start_value = sub_stream_bookmark + truncated_start_date = datetime_to_utc_str(str_to_datetime(start_value).replace(hour=0, minute=0, second=0)) + return truncated_start_date + + def remove_sub_stream_bookmark(self): + # Remove sub-stream bookmark once we finish extraction till current date + if self.sub_stream_name in self.state.get("bookmarks", {}): + del self.state["bookmarks"][self.sub_stream_name] + + def set_intermediary_bookmark(self, record_bookmark_value): + if self.endpoint_intermediary_bookmark_value is None or \ + self.compare_bookmark_values(record_bookmark_value, + self.endpoint_intermediary_bookmark_value): + self.endpoint_intermediary_bookmark_offset = 1 + return + + if record_bookmark_value == self.endpoint_intermediary_bookmark_value: + self.endpoint_intermediary_bookmark_offset += 1 + return -class LoanAccountsLMGenerator(LoanAccountsGenerator): - def _init_endpoint_config(self): - super()._init_endpoint_config() - self.endpoint_bookmark_field = "lastModifiedDate" - self.endpoint_filter_criteria = [ - { - "field": "lastModifiedDate", - "operator": "AFTER", - "value": datetime_to_local_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] +class LoanAccountsADGenerator(LoanAccountsLMGenerator): + def __init__(self, stream_name, client, config, state, sub_type): + super(LoanAccountsADGenerator, self).__init__(stream_name, client, config, state, sub_type) + self.sub_stream_name = "loan_accounts_adg" -class LoanAccountsADGenerator(LoanAccountsGenerator): def _init_endpoint_config(self): - super()._init_endpoint_config() + super(LoanAccountsADGenerator, self)._init_endpoint_config() self.endpoint_bookmark_field = "lastAccountAppraisalDate" - self.endpoint_filter_criteria = [ - { - "field": "lastAccountAppraisalDate", - "operator": "AFTER", - "value": datetime_to_local_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] diff --git a/tap_mambu/tap_generators/loan_repayments_generator.py b/tap_mambu/tap_generators/loan_repayments_generator.py index b13fae1..4a9281a 100644 --- a/tap_mambu/tap_generators/loan_repayments_generator.py +++ b/tap_mambu/tap_generators/loan_repayments_generator.py @@ -2,8 +2,10 @@ class LoanRepaymentsGenerator(ChildGenerator): + def _init_endpoint_config(self): super(LoanRepaymentsGenerator, self)._init_endpoint_config() self.endpoint_api_version = "v1" self.endpoint_api_method = "GET" - self.endpoint_path = f"loans/{self.endpoint_parent_id}/repayments" # include parent id in endpoint path + # include parent id in endpoint path + self.endpoint_path = f"loans/{self.endpoint_parent_id}/repayments" diff --git a/tap_mambu/tap_generators/loan_transactions_generator.py b/tap_mambu/tap_generators/loan_transactions_generator.py index 7c30f5b..dfaf2e4 100644 --- a/tap_mambu/tap_generators/loan_transactions_generator.py +++ b/tap_mambu/tap_generators/loan_transactions_generator.py @@ -1,6 +1,5 @@ from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator -from ..helpers import get_bookmark -from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime +from ..helpers.datetime_utils import datetime_to_utc_str class LoanTransactionsGenerator(MultithreadedBookmarkGenerator): @@ -9,14 +8,6 @@ def _init_endpoint_config(self): self.endpoint_path = "loans/transactions:search" self.endpoint_bookmark_field = "creationDate" self.endpoint_sorting_criteria["field"] = "id" - self.endpoint_filter_criteria = [ - { - "field": "creationDate", - "operator": "AFTER", - "value": datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date))) - } - ] def prepare_batch_params(self): super(LoanTransactionsGenerator, self).prepare_batch_params() diff --git a/tap_mambu/tap_generators/multithreaded_offset_generator.py b/tap_mambu/tap_generators/multithreaded_offset_generator.py index 297cccd..f5a3fd7 100644 --- a/tap_mambu/tap_generators/multithreaded_offset_generator.py +++ b/tap_mambu/tap_generators/multithreaded_offset_generator.py @@ -4,16 +4,23 @@ import backoff from singer import get_logger +from datetime import datetime, timedelta from .generator import TapGenerator -from ..helpers import transform_json +from ..helpers import transform_json, get_bookmark, write_bookmark +from ..helpers.datetime_utils import str_to_localized_datetime, datetime_to_utc_str, utc_now, str_to_datetime from ..helpers.multithreaded_requests import MultithreadedRequestsPool -from ..helpers.perf_metrics import PerformanceMetrics +from ..helpers.exceptions import MambuGeneratorThreadNotAlive LOGGER = get_logger() class MultithreadedOffsetGenerator(TapGenerator): + def __init__(self, stream_name, client, config, state, sub_type): + super(MultithreadedOffsetGenerator, self).__init__(stream_name, client, config, state, sub_type) + self.date_windowing = True + self.start_windows_datetime_str = self.start_date + def _init_params(self): self.time_extracted = None self.static_params = dict(self.endpoint_params) @@ -118,15 +125,75 @@ def preprocess_batches(self, final_buffer): self.preprocess_record(raw_record) self.last_batch_size = len(self.last_batch_set) + def write_sub_stream_bookmark(self, start): + write_bookmark(self.state, self.sub_stream_name, self.sub_type, start) + + def get_default_start_value(self): + return get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date) + + def remove_sub_stream_bookmark(self): + pass + + def set_last_sync_completed(self, end_time): + last_bookmark = get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date) + if end_time < str_to_datetime(last_bookmark): + write_bookmark(self.state, self.stream_name, + self.sub_type, datetime_to_utc_str(end_time)) + @backoff.on_exception(backoff.expo, RuntimeError, max_tries=5) def _all_fetch_batch_steps(self): - futures = self.queue_batches() - final_buffer, stop_iteration = self.collect_batches(futures) - self.preprocess_batches(final_buffer) + if self.date_windowing: + last_sync_window_start = self.get_default_start_value() + + if last_sync_window_start: + truncated_start_date = str_to_datetime( + last_sync_window_start).replace(hour=0, minute=0, second=0) + start = str_to_localized_datetime( + datetime_to_utc_str(truncated_start_date)) + else: + start = str_to_localized_datetime(self.get_default_start_value()) + + end_datetime = datetime_to_utc_str(utc_now() + timedelta(days=1)) + end = str_to_localized_datetime(end_datetime) + temp = start + timedelta(days=self.date_window_size) + stop_iteration = True + final_buffer = [] + while start < end: + # Empty the current buffer before moving to next window to make sure all records + # of current date window are processed to reduce memory pressure and improve bookmarking + while len(self.buffer): + time.sleep(1) + self.write_sub_stream_bookmark(datetime_to_utc_str(start)) + self.modify_request_params(start - timedelta(minutes=5), temp) + final_buffer, stop_iteration = self.collect_batches( + self.queue_batches()) + self.preprocess_batches(final_buffer) + if not final_buffer or stop_iteration: + self.offset = 0 + self.start_windows_datetime_str = start + start = temp + temp = start + timedelta(days=self.date_window_size) + else: + final_buffer, stop_iteration = self.collect_batches(self.queue_batches()) + self.preprocess_batches(final_buffer) if not final_buffer or stop_iteration: return False return True + def modify_request_params(self, start, end): + self.endpoint_body['filterCriteria'] = [ + { + "field": self.endpoint_bookmark_field, + "operator": "AFTER", + "value": datetime_to_utc_str(start) + }, + { + "field": self.endpoint_bookmark_field, + "operator": "BEFORE", + "value": datetime_to_utc_str(end) + } + ] + def error_check_and_fix(self, final_buffer: set, temp_buffer: set, futures: list): try: final_buffer = self.check_and_get_set_reunion(final_buffer, temp_buffer, self.artificial_limit) @@ -150,9 +217,10 @@ def __iter__(self): def next(self): if not self.buffer and not self.end_of_file: - with PerformanceMetrics(metric_name="processor_wait"): - while not self.buffer and not self.end_of_file: - time.sleep(0.01) + while not self.buffer and not self.end_of_file: + if not self.fetch_batch_thread.is_alive(): + raise MambuGeneratorThreadNotAlive("Generator stopped running premaurely") + time.sleep(0.01) if not self.buffer and self.end_of_file: raise StopIteration() return self.buffer.pop(0) diff --git a/tap_mambu/tap_generators/users_generator.py b/tap_mambu/tap_generators/users_generator.py index c418e4e..32f1cc4 100644 --- a/tap_mambu/tap_generators/users_generator.py +++ b/tap_mambu/tap_generators/users_generator.py @@ -2,6 +2,10 @@ class UsersGenerator(MultithreadedOffsetGenerator): + def __init__(self, stream_name, client, config, state, sub_type): + super(UsersGenerator, self).__init__(stream_name, client, config, state, sub_type) + self.date_windowing = False + def _init_endpoint_config(self): super(UsersGenerator, self)._init_endpoint_config() self.endpoint_path = "users" diff --git a/tap_mambu/tap_processors/deduplication_processor.py b/tap_mambu/tap_processors/deduplication_processor.py index 5d3c59c..b98e1d4 100644 --- a/tap_mambu/tap_processors/deduplication_processor.py +++ b/tap_mambu/tap_processors/deduplication_processor.py @@ -6,7 +6,6 @@ from .processor import TapProcessor from ..helpers import convert from ..helpers.exceptions import NoDeduplicationCapabilityException, NoDeduplicationKeyException -from ..helpers.perf_metrics import PerformanceMetrics LOGGER = get_logger() @@ -86,9 +85,8 @@ def process_records(self): # Process the record record = self.generator_values[record_key] - with PerformanceMetrics(metric_name="processor"): - is_processed = self.process_record(record, record_key.time_extracted, - record_key.endpoint_bookmark_field) + is_processed = self.process_record(record, record_key.time_extracted, + record_key.endpoint_bookmark_field) if is_processed: record_count += 1 diff --git a/tap_mambu/tap_processors/multithreaded_parent_processor.py b/tap_mambu/tap_processors/multithreaded_parent_processor.py index dbeaabd..892d13d 100644 --- a/tap_mambu/tap_processors/multithreaded_parent_processor.py +++ b/tap_mambu/tap_processors/multithreaded_parent_processor.py @@ -15,6 +15,10 @@ def process_records(self): for future in futures.as_completed(self.futures): record_count += future.result() + + for generator in self.generators: + generator.set_last_sync_completed(self.generators[0].start_windows_datetime_str) + generator.remove_sub_stream_bookmark() return record_count def _process_child_records(self, record): diff --git a/tap_mambu/tap_processors/processor.py b/tap_mambu/tap_processors/processor.py index c12839b..f6585e0 100644 --- a/tap_mambu/tap_processors/processor.py +++ b/tap_mambu/tap_processors/processor.py @@ -5,8 +5,8 @@ from ..helpers import convert, get_bookmark, write_bookmark from ..helpers.transformer import Transformer from ..helpers.exceptions import NoDeduplicationCapabilityException -from ..helpers.perf_metrics import PerformanceMetrics from ..helpers.datetime_utils import utc_now, str_to_datetime, datetime_to_utc_str, str_to_localized_datetime +from ..helpers.schema import STREAMS LOGGER = get_logger() @@ -44,7 +44,14 @@ def _init_endpoint_config(self): "you need to use the deduplication processor") def _init_bookmarks(self): - self.last_bookmark_value = get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date) + # Since we have date window implementation in multithreaded genrators, + # we can't rely on bookmark value since if case of interruption we may miss some of the records + # lesser bookmark value record by lagging threads than bookmark written by faster thread + # Because of which in next sync we will miss parent as well as corresponding child records. + # In such scenario we should resume extraction from the last date window where extration interrupted + last_bookmark = self.generators[0].get_default_start_value() + + self.last_bookmark_value = last_bookmark or get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date) self.max_bookmark_value = self.last_bookmark_value def write_schema(self): @@ -61,20 +68,22 @@ def process_records(self): with metrics.record_counter(self.stream_name) as counter: for record in self.generators[0]: # Process the record - with PerformanceMetrics(metric_name="processor"): - is_processed = self.process_record(record, utils.now(), - self.generators[0].endpoint_bookmark_field) + is_processed = self.process_record(record, utils.now(), + self.generators[0].endpoint_bookmark_field) if is_processed: record_count += 1 self._process_child_records(record) counter.increment() + return record_count def process_streams_from_generators(self): self.write_schema() record_count = self.process_records() - self.write_bookmark() + if STREAMS.get(self.stream_name).get("replication_method") == "INCREMENTAL": + self.write_bookmark() + return record_count # This function is provided for processors with child streams, must be overridden if child streams are to be used @@ -100,6 +109,9 @@ def _is_record_past_bookmark(self, transformed_record, bookmark_field): if str_to_localized_datetime(transformed_record[bookmark_field]) >= \ str_to_localized_datetime(self.last_bookmark_value): return True + else: + LOGGER.info( + f"Skipped record older than bookmark: {self.stream_name} {transformed_record.get('id')}") return False def process_record(self, record, time_extracted, bookmark_field): diff --git a/tests/base.py b/tests/base.py index fd68374..6b51a83 100644 --- a/tests/base.py +++ b/tests/base.py @@ -249,7 +249,8 @@ def get_properties(self, original_properties=True): 'start_date': '2017-01-01T00:00:00Z', 'username': os.environ['TAP_MAMBU_USERNAME'], 'subdomain': os.environ['TAP_MAMBU_SUBDOMAIN'], - 'page_size': '100' + 'page_size': '90', + 'window_size': 30 } if not original_properties: diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 7c847d8..19b70a3 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -111,7 +111,7 @@ def test_run(self): # Verify the second sync records fall between simulated bookmark value and the # final bookmark value for message in second_sync_messages: - lower_bound = strptime_to_utc(simulated_bookmark_value) + lower_bound = strptime_to_utc(simulated_bookmark_value) - timedelta(minutes=5) upper_bound = strptime_to_utc(second_sync_bookmark_value) record = message.get('data') actual_values = [strptime_to_utc(record.get(replication_key)) diff --git a/tests/test_sync_canary.py b/tests/test_sync_canary.py deleted file mode 100644 index 380ed7b..0000000 --- a/tests/test_sync_canary.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -Test that with no fields selected for a stream automatic fields are still replicated -""" -from tap_tester import connections -from base import MambuBaseTest - -class SyncCanaryTest(MambuBaseTest): - """ - Smoke test - """ - - @staticmethod - def name(): - return "tap_tester_mambu_sync_canary_test" - - def test_run(self): - """ - Run tap in check mode, then select all streams and all fields within streams. Run a sync and - verify exit codes do not throw errors. This is meant to be a smoke test for the tap. If this - is failing do not expect any other tests to pass. - """ - conn_id = connections.ensure_connection(self) - self.run_and_verify_check_mode(conn_id) - - self.select_and_verify_fields(conn_id) - - record_count_by_stream = self.run_and_verify_sync(conn_id) - - - # Assert all expected streams synced at least one record - for stream in self.expected_streams(): - with self.subTest(stream=stream): - self.assertGreater(record_count_by_stream.get(stream, 0), - 0, - msg="{} did not sync any records".format(stream)) diff --git a/tests/unittests/test_window_size.py b/tests/unittests/test_window_size.py new file mode 100644 index 0000000..ad95729 --- /dev/null +++ b/tests/unittests/test_window_size.py @@ -0,0 +1,86 @@ +from parameterized import parameterized +import mock + +from tap_mambu.helpers.client import MambuClient +from tap_mambu.helpers.constants import DEFAULT_DATE_WINDOW_SIZE, DEFAULT_PAGE_SIZE + +config = { + "username": "YOUR_USERNAME", + "password": "YOUR_PASSWORD", + "apikey": "YOUR_APIKEY", + "subdomain": "YOUR_SUBDOMAIN", + "start_date": "2019-01-01T00:00:00Z", + "lookback_window": 30, + "user_agent": "tap-mambu ", + "page_size": "500", + "apikey_audit": "AUDIT_TRAIL_APIKEY"} + + +@mock.patch("tap_mambu.helpers.client.MambuClient.check_access") +class TestGetWindowSize(): + def test_non_value(self, mock_check_access): + """ + Test if no window size is not passed in the config, then set it to the default value. + """ + # Verify that the default window size value is set. + with MambuClient(config.get('username'), + config.get('password'), + config.get('apikey'), + config['subdomain'], + config.get('apikey_audit'), + int(config.get('page_size', DEFAULT_PAGE_SIZE)), + user_agent=config['user_agent'], + window_size=config.get('window_size')) as client: + # Verify window size value is expected + assert client.window_size == DEFAULT_DATE_WINDOW_SIZE + + @parameterized.expand([ + ["None_value", None, DEFAULT_DATE_WINDOW_SIZE], + ["integer_value", 10, 10], + ["float_value", 100.5, 100], + ["string_integer", "10", 10], + ["string_float", "100.5", 100], + ]) + def test_window_size_values(self, mock_check_access, name, date_window_size, expected_value): + """ + Test that for the valid value of window size, + No exception is raised and the expected value is set. + """ + with MambuClient(config.get('username'), + config.get('password'), + config.get('apikey'), + config['subdomain'], + config.get('apikey_audit'), + int(config.get('page_size', DEFAULT_PAGE_SIZE)), + user_agent=config['user_agent'], + window_size=date_window_size) as client: + # Verify window size value is expected + assert client.window_size == expected_value + + @parameterized.expand([ + ["zero_string", "0"], + ["less_than_1_string", "0.5"], + ["negative_value", -10], + ["string_negative_value", "-100"], + ["string_alphabate", "abc"], + ]) + def test_invalid_value(self, mock_check_access, name, date_window_size): + """ + Test that for invalid value exception is raised. + """ + actual_exc_string = "" + expected_exc_string = "The entered window size '{}' is invalid; it should be a valid non-zero integer.".format(date_window_size) + try: + MambuClient(config.get('username'), + config.get('password'), + config.get('apikey'), + config['subdomain'], + config.get('apikey_audit'), + int(config.get('page_size', DEFAULT_PAGE_SIZE)), + user_agent=config['user_agent'], + window_size=date_window_size) + except Exception as e: + # Verify that the exception message is expected. + actual_exc_string = str(e) + + assert actual_exc_string == expected_exc_string