Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TDL-22683] Use Date Windowing Pagination #113

Merged
merged 70 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
d51282c
Adding None support to HashableDict
cosimon Dec 20, 2023
7801007
Initial date windowing logic for activities stream.
shantanu73 Feb 20, 2024
e3851a8
Changes:
shantanu73 Feb 26, 2024
16d3fcb
Changes:
shantanu73 Mar 6, 2024
859d923
Changing number of threads from 20 to 1 to fix memory issue.
shantanu73 Mar 6, 2024
db922e5
Changes:
shantanu73 Mar 6, 2024
01d315d
Changes:
shantanu73 Mar 16, 2024
28fbc3a
Fixed issue with initializing offset and infinite loop in date window…
shantanu73 Mar 17, 2024
e33022b
Remove performance metrics
cosimon Mar 19, 2024
887e0b3
Remove performance metrics
cosimon Mar 19, 2024
1b399dc
Initializing final_buffer variable.
shantanu73 Mar 20, 2024
da24d92
Fixed BEFORE date for loan_transactions & gl_journal_entries streams.
shantanu73 Mar 20, 2024
a8d46dd
Fixed BEFORE date for loan_transactions & gl_journal_entries streams.…
shantanu73 Mar 20, 2024
6d6ae66
Limit max buffer size
Mar 21, 2024
162e542
Add generator change
Mar 21, 2024
b5c3941
Changing default date window, from 3 to 5.
shantanu73 Mar 22, 2024
58a98b8
Added generic date windowing for all MultiThreadedOffsetGenerator str…
shantanu73 Mar 22, 2024
3403ea7
Implemented modify_request_params method for Activities stream.
shantanu73 Mar 22, 2024
aedd6aa
Implemented modify_request_params method for Installments stream.
shantanu73 Mar 22, 2024
a9ff3ad
Disabled Date windowing for Users stream.
shantanu73 Mar 22, 2024
db07f7a
Removed endpoint filter criteria for Clients stream.
shantanu73 Mar 22, 2024
554720b
Implemented modify_request_params method for Communications stream.
shantanu73 Mar 22, 2024
7711832
Removed endpoint filter criteria for DepositAccounts stream.
shantanu73 Mar 22, 2024
11e44de
Removed endpoint filter criteria for DepositTransactions stream.
shantanu73 Mar 22, 2024
09473ff
Removed modify_request_params implementation from GL Journal Accounts…
shantanu73 Mar 22, 2024
232a7ea
Removed endpoint filter criteria for Groups stream.
shantanu73 Mar 22, 2024
70fc616
Removed endpoint filter criteria for Interest Accrual Breakdown stream.
shantanu73 Mar 22, 2024
7856961
Removed modify_request_params implementation from Loan Transactions s…
shantanu73 Mar 22, 2024
b4cf6d5
Changes:
shantanu73 Mar 22, 2024
69f2435
Fixed bug in communications stream date windowing implementation.
shantanu73 Mar 25, 2024
2be5cf7
Set date_windowing as False and restored original implementation for …
shantanu73 Mar 25, 2024
fac0fa0
Merge branch 'master' into TDL-22683-date-windowing-pagination
shantanu73 Mar 26, 2024
33bfbd1
Changed loan_accounts stream implementation to MultithreadedBookmarkG…
shantanu73 Mar 26, 2024
0d57fb7
Added backoff for intermittent ProtocolError in Mambu APIs.
shantanu73 Mar 28, 2024
08bc5f7
- Raise exception when thread is dead
shantanu73 Mar 28, 2024
1c9aed4
Adding frequent bookmarks in case of multi-threaded parent streams.
shantanu73 Apr 3, 2024
fdd1019
Changes:
shantanu73 Apr 3, 2024
e8789a4
Fix memory limit for loan_repayments
Apr 4, 2024
527617b
Merge branch 'TDL-22683-date-windowing-pagination' of github.com:sing…
Apr 4, 2024
f559a63
Fix memory issue for child generators
Apr 4, 2024
ec82db9
Reduce max buffer limit of parent stream
Apr 4, 2024
9857841
Reducing max threads for Loan accounts generator.
Apr 5, 2024
1b8973d
Changes:
shantanu73 Apr 5, 2024
b8584a7
Merge branch 'TDL-22683-date-windowing-pagination' of github.com:sing…
shantanu73 Apr 5, 2024
6cd984a
Changed BEFORE value of modify_request_params to have 1 min overlap i…
shantanu73 Apr 5, 2024
ec3bc76
Add backoff for ChunkedEncodingError
RushiT0122 Apr 7, 2024
1dbdd6b
Update loan_accounts bookmarking strategy
RushiT0122 Apr 9, 2024
56eca6c
Interrupted sync enhancements
RushiT0122 Apr 10, 2024
8684a68
Minor fix
RushiT0122 Apr 10, 2024
f7175ef
- Update bookmarking to fix missing child records
RushiT0122 Apr 12, 2024
b1e9138
Minor fixes
RushiT0122 Apr 12, 2024
0efeb77
Remove wait for thread implementation
RushiT0122 Apr 14, 2024
ba83d27
- Empty buffer before next date window
RushiT0122 Apr 15, 2024
179ad28
Remove frequent bookmarking
RushiT0122 Apr 17, 2024
0aabdaf
Reduce integration test execution time
RushiT0122 May 2, 2024
77aa759
Fix integration tests
May 3, 2024
8cb2a43
Merge cobnflicts
May 3, 2024
5b691a9
Parallelise integration tests
May 3, 2024
cd3ae43
fix config.yml
May 3, 2024
a680071
fix config.yml
May 3, 2024
8eae933
Read window_ize from config
May 6, 2024
fa9d9e3
Minor window size fixes
May 6, 2024
c37fedf
- Fix bookmarking for FULL_TABLE multi-threaded streams
May 6, 2024
f101dff
- Add 30 days window size to reduce integration test execution time
May 6, 2024
993883a
Update window_size implementation
May 8, 2024
8a25625
Add window_size unittests
May 8, 2024
4fe5582
Update README file
May 8, 2024
9483813
Install parameterized madule for unittests
May 8, 2024
f82eac5
Fix review comments
May 8, 2024
da2f222
Bump version 4.2.0
shantanu73 May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 86 additions & 20 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
version: 2
jobs:
build:
version: 2.1
orbs:
slack: circleci/[email protected]

executors:
docker-executor:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester

jobs:
build:
executor: docker-executor
steps:
- run: echo "CI Done"

ensure_env:
executor: docker-executor
steps:
- checkout
- run:
Expand All @@ -12,39 +24,93 @@ jobs:
source /usr/local/share/virtualenvs/tap-mambu/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
- add_ssh_keys
- run:
name: 'JSON Validator'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_mambu/helpers/schemas/*.json
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox /usr/local/share/virtualenvs/dev_env.sh
- slack/notify-on-failure:
only_for_branches: master
- persist_to_workspace:
root: /usr/local/share/virtualenvs
paths:
- tap-mambu
- dev_env.sh

run_unit_tests:
executor: docker-executor
steps:
- checkout
- attach_workspace:
at: /usr/local/share/virtualenvs
- run:
when: always
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-mambu/bin/activate
pip install parameterized
pytest tests/unittests
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov

run_integration_tests:
executor: docker-executor
parallelism: 5
steps:
- checkout
- attach_workspace:
at: /usr/local/share/virtualenvs
- run:
name: 'Integration Tests'
name: 'Run Integration Tests'
no_output_timeout: 30m
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/dev_env.sh
mkdir /tmp/${CIRCLE_PROJECT_REPONAME}
export STITCH_CONFIG_DIR=/tmp/${CIRCLE_PROJECT_REPONAME}
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-mambu tests
circleci tests glob "tests/test_*.py" | circleci tests split > ./tests-to-run
if [ -s ./tests-to-run ]; then
for test_file in $(cat ./tests-to-run)
do
echo $test_file > $STITCH_CONFIG_DIR/tap_test.txt
run-test --tap=${CIRCLE_PROJECT_REPONAME} $test_file
done
fi
- slack/notify-on-failure:
only_for_branches: master
- store_artifacts:
path: /tmp/tap-mambu

workflows:
version: 2
commit:
commit: &commit_jobs
jobs:
- ensure_env:
context:
- circleci-user
- tier-1-tap-user
- run_unit_tests:
context:
- circleci-user
- tier-1-tap-user
requires:
- ensure_env
- run_integration_tests:
context:
- circleci-user
- tier-1-tap-user
requires:
- ensure_env
- build:
context: circleci-user
context:
- circleci-user
- tier-1-tap-user
requires:
- run_unit_tests
- run_integration_tests
build_daily:
<<: *commit_jobs
triggers:
- schedule:
cron: "0 0 * * *"
cron: "0 1 * * *"
filters:
branches:
only:
- master
jobs:
- build:
context: circleci-user
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ This tap:
"lookback_window": 30,
"user_agent": "tap-mambu <api_user_email@your_company.com>",
"page_size": "500",
"apikey_audit": "AUDIT_TRAIL_APIKEY"
"apikey_audit": "AUDIT_TRAIL_APIKEY",
"window_size": 7
}
```


Note: The `window_size` parameter defaults to 1 day, which may cause slowdowns in historical sync for streams utilizing multi-threaded implementation. Conversely, using a larger `window_size` could lead to potential `out-of-memory` issues. It is advisable to select an optimal `window_size` based on the `start_date` and volume of data to mitigate these concerns.

Optionally, also create a `state.json` file. `currently_syncing` is an optional attribute used for identifying the last object to be synced in case the job is interrupted mid-stream. The next run would begin where the last job left off.

```json
Expand Down
3 changes: 2 additions & 1 deletion tap_mambu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def main():
parsed_args.config['subdomain'],
parsed_args.config.get('apikey_audit'),
int(parsed_args.config.get('page_size', DEFAULT_PAGE_SIZE)),
user_agent=parsed_args.config['user_agent']) as client:
user_agent=parsed_args.config['user_agent'],
window_size=parsed_args.config.get('window_size')) as client:

state = {}
if parsed_args.state:
Expand Down
17 changes: 14 additions & 3 deletions tap_mambu/helpers/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import backoff
import requests
import requests.adapters
from requests.exceptions import ConnectionError
from requests.exceptions import ConnectionError, ChunkedEncodingError
from singer import metrics, get_logger

from urllib3.exceptions import ProtocolError
from tap_mambu.helpers.constants import DEFAULT_DATE_WINDOW_SIZE

LOGGER = get_logger()
class ClientError(Exception):
Expand Down Expand Up @@ -118,13 +120,21 @@ def __init__(self,
subdomain,
apikey_audit,
page_size,
user_agent=''):
user_agent='',
window_size=DEFAULT_DATE_WINDOW_SIZE):
self.__username = username
self.__password = password
self.__subdomain = subdomain
base_url = "https://{}.mambu.com/api".format(subdomain)
self.base_url = base_url
self.page_size = page_size
try:
self.window_size = int(float(window_size)) if window_size else DEFAULT_DATE_WINDOW_SIZE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we do integer typecasting directly instead of, converting to float first?

Copy link
Contributor

@RushiT0122 RushiT0122 May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

>>> int("10.0")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: invalid literal for int() with base 10: '10.0'

if self.window_size <= 0:
raise ValueError()
except ValueError:
raise Exception("The entered window size '{}' is invalid; it should be a valid non-zero integer.".format(window_size))

self.__user_agent = f'MambuTap-{user_agent}' if user_agent else 'MambuTap'
self.__apikey = apikey
self.__session = requests.Session()
Expand Down Expand Up @@ -178,7 +188,8 @@ def check_access(self):
return True

@backoff.on_exception(backoff.expo,
(MambuInternalServiceError, ConnectionError, MambuApiLimitError),
(MambuInternalServiceError, ConnectionError,
ChunkedEncodingError, MambuApiLimitError, ProtocolError),
max_tries=7,
factor=3)
def request(self, method, path=None, url=None, json=None, version=None, apikey_type=None, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions tap_mambu/helpers/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DEFAULT_PAGE_SIZE = 200
DEFAULT_DATE_WINDOW_SIZE = 1
3 changes: 3 additions & 0 deletions tap_mambu/helpers/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ class NoDeduplicationCapabilityException(Exception):

class NoDeduplicationKeyException(Exception):
pass

class MambuGeneratorThreadNotAlive(Exception):
pass
34 changes: 0 additions & 34 deletions tap_mambu/helpers/metrics_plotter.py

This file was deleted.

20 changes: 9 additions & 11 deletions tap_mambu/helpers/multithreaded_requests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import singer
from concurrent.futures import Future, ThreadPoolExecutor
from typing import List
from .perf_metrics import PerformanceMetrics


LOGGER = singer.get_logger()
Expand All @@ -24,16 +23,15 @@ def run(client, stream_name,
f'{endpoint_api_version}): {client.base_url}/{endpoint_path}?{endpoint_querystring}'
f' - body = {endpoint_body}')

with PerformanceMetrics(metric_name="generator"):
response = client.request(
method=endpoint_api_method,
path=endpoint_path,
version=endpoint_api_version,
apikey_type=endpoint_api_key_type,
params=endpoint_querystring,
endpoint=stream_name,
json=endpoint_body
)
response = client.request(
method=endpoint_api_method,
path=endpoint_path,
version=endpoint_api_version,
apikey_type=endpoint_api_key_type,
params=endpoint_querystring,
endpoint=stream_name,
json=endpoint_body
)

LOGGER.info(f'(generator) Stream {stream_name} - extracted records: {len(response)}')
return response
Expand Down
77 changes: 0 additions & 77 deletions tap_mambu/helpers/perf_metrics.py

This file was deleted.

Loading