Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/32 #62

Merged
merged 45 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ff52377
Added Refactored Tap Boilerplate Classes
Nov 22, 2021
b738db8
ECDDC-476
Nov 22, 2021
efb3ea0
Implemented some logic, need to rethink inheritance, especially initi…
Nov 23, 2021
33459da
Pushing more changes, will test after this
Nov 23, 2021
72e3caa
Intermediary commit
Nov 25, 2021
906b3cc
Added last_account_appraisal_date to loan_accounts.json
Nov 25, 2021
30cbdfe
Implemented bookmark resume and ABC for Generators, with Loan Account…
Nov 26, 2021
c451325
Commit before fixing Appraisal Order Bug
Nov 29, 2021
284f56a
Finished implementation of Loan Accounts Generator/Processor
Nov 29, 2021
0039dd6
Small Bugfixes
Dec 6, 2021
6d82085
Merge branch 'master' into release/32
Dec 6, 2021
2c8fca6
Merge branch 'release/32' into feature/ECDCC-476_Extract-loans
Dec 6, 2021
19800f3
Small fixes suggested by Alex via PR Comments
Dec 6, 2021
f8617c0
Solved __iter__ method and __next__ method duplicated code according …
Dec 6, 2021
f7c4d20
Added a comment so we don't forget about the deduplication_key in the…
Dec 7, 2021
21c8ef2
Merge branch 'feature/ECDCC-476_Extract-loans' into 'release/32'
Dec 7, 2021
6868cbd
Also added record_count
Dec 7, 2021
2a58d92
Merge branch 'feature/ECDCC-476_Extract-loans' into 'release/32'
Dec 7, 2021
7bc95df
adjusted the last_datetime var from sync_endpoint func to use the loo…
DownstreamDataTeam Dec 16, 2021
3eb4038
Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into 'release…
Dec 17, 2021
062e19d
Revert "Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into …
Dec 20, 2021
fa82719
Added Refactored Tap Boilerplate Classes
Nov 22, 2021
7b11dd8
ECDDC-476
Nov 22, 2021
952efdc
Implemented some logic, need to rethink inheritance, especially initi…
Nov 23, 2021
12d3628
Pushing more changes, will test after this
Nov 23, 2021
b7ee002
Intermediary commit
Nov 25, 2021
4d106f4
Added last_account_appraisal_date to loan_accounts.json
Nov 25, 2021
c4de88f
Implemented bookmark resume and ABC for Generators, with Loan Account…
Nov 26, 2021
139a7a7
Commit before fixing Appraisal Order Bug
Nov 29, 2021
8531705
Finished implementation of Loan Accounts Generator/Processor
Nov 29, 2021
3c39c6f
Small Bugfixes
Dec 6, 2021
80ab4f6
Small fixes suggested by Alex via PR Comments
Dec 6, 2021
c9afcbe
Solved __iter__ method and __next__ method duplicated code according …
Dec 6, 2021
0bbfd3a
Added a comment so we don't forget about the deduplication_key in the…
Dec 7, 2021
2a5e45b
Also added record_count
Dec 7, 2021
532e55b
adjusted the last_datetime var from sync_endpoint func to use the loo…
DownstreamDataTeam Dec 16, 2021
e7e3653
Revert "Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into …
Dec 20, 2021
6f04d6d
Merge remote-tracking branch 'origin/release/32' into release/32
DownstreamDataTeam Jan 3, 2022
b4dec65
Started the loan_repayments refactor
DownstreamDataTeam Jan 5, 2022
a9a4717
Finished the refactor
DownstreamDataTeam Jan 5, 2022
7762921
Fixed missing deduplication_key error
DownstreamDataTeam Jan 11, 2022
cd8d18a
Fixed up loan repayments refactor
Jan 14, 2022
96475ee
Added gitlab-ci config
Jan 18, 2022
20632f3
Revert "Added gitlab-ci config"
Jan 18, 2022
4d835f7
Merge branch 'feature/ECDDC-519_loan_repayments_refactor' into 'relea…
Jan 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tap_mambu/schemas/loan_accounts.json
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,13 @@
],
"format": "date-time"
},
"last_account_appraisal_date": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"penalty_settings": {
"type": [
"null",
Expand Down
52 changes: 32 additions & 20 deletions tap_mambu/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import singer
from singer import Transformer, metadata, metrics, utils
from singer.utils import strftime, strptime_to_utc

from tap_mambu.tap_mambu_refactor import sync_endpoint_refactor
from tap_mambu.transform import transform_json, transform_activities

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -899,26 +901,36 @@ def sync(client, config, catalog, state):
if sub_type_param:
endpoint_config['params']['type'] = sub_type

total_records = sync_endpoint(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
stream_name=stream_name,
path=path,
endpoint_config=endpoint_config,
api_version=endpoint_config.get('api_version', 'v2'),
api_method=endpoint_config.get('api_method', 'GET'),
static_params=endpoint_config.get('params', {}),
sub_type=sub_type,
bookmark_query_field=endpoint_config.get('bookmark_query_field'),
bookmark_field=endpoint_config.get('bookmark_field'),
bookmark_type=endpoint_config.get('bookmark_type'),
data_key=endpoint_config.get('data_key', None),
body=endpoint_config.get('body', None),
id_fields=endpoint_config.get('id_fields'),
apikey_type=endpoint_config.get('apikey_type', None)
)
if stream_name in ["loan_accounts"]:
total_records = sync_endpoint_refactor(
client=client,
catalog=catalog,
state=state,
stream_name=stream_name,
sub_type=sub_type,
config=config
)
else:
total_records = sync_endpoint(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
stream_name=stream_name,
path=path,
endpoint_config=endpoint_config,
api_version=endpoint_config.get('api_version', 'v2'),
api_method=endpoint_config.get('api_method', 'GET'),
static_params=endpoint_config.get('params', {}),
sub_type=sub_type,
bookmark_query_field=endpoint_config.get('bookmark_query_field'),
bookmark_field=endpoint_config.get('bookmark_field'),
bookmark_type=endpoint_config.get('bookmark_type'),
data_key=endpoint_config.get('data_key', None),
body=endpoint_config.get('body', None),
id_fields=endpoint_config.get('id_fields'),
apikey_type=endpoint_config.get('apikey_type', None)
)

update_currently_syncing(state, None)
LOGGER.info('Synced: {}, total_records: {}'.format(
Expand Down
128 changes: 128 additions & 0 deletions tap_mambu/tap_mambu_refactor/Helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import re
from singer import write_state, Transformer, metadata


def get_bookmark(state, stream, sub_type, default):
if (state is None) or ('bookmarks' not in state):
return default

if sub_type == 'self':
return state.get('bookmarks', {}).get(stream, default)
else:
return state.get('bookmarks', {}).get(stream, {}).get(sub_type, default)


def transform_datetime(this_dttm):
with Transformer() as transformer:
new_dttm = transformer._transform_datetime(this_dttm)
return new_dttm


def write_bookmark(state, stream, sub_type, value):
if 'bookmarks' not in state:
state['bookmarks'] = {}
if stream not in state['bookmarks']:
state['bookmarks'][stream] = {}
if sub_type == 'self':
state['bookmarks'][stream] = value
else:
if sub_type not in state['bookmarks'][stream]:
state['bookmarks'][stream][sub_type] = {}
state['bookmarks'][stream][sub_type] = value
write_state(state)


# Convert camelCase to snake_case
def convert(name):
regsub = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', regsub).lower()


# Convert keys in json array
def convert_array(arr):
new_arr = []
for i in arr:
if isinstance(i, list):
new_arr.append(convert_array(i))
elif isinstance(i, dict):
new_arr.append(convert_json(i))
else:
new_arr.append(i)
return new_arr


# Convert keys in json
def convert_json(this_json):
out = {}
for key in this_json:
new_key = convert(key)
if isinstance(this_json[key], dict):
out[new_key] = convert_json(this_json[key])
elif isinstance(this_json[key], list):
out[new_key] = convert_array(this_json[key])
else:
out[new_key] = this_json[key]
return out


def remove_custom_nodes(this_json):
if not isinstance(this_json, (dict, list)):
return this_json
if isinstance(this_json, list):
return [remove_custom_nodes(vv) for vv in this_json]
return {kk: remove_custom_nodes(vv) for kk, vv in this_json.items() \
if not kk[:1] == '_'}


def add_cust_field(key, record, cust_field_sets):
for cf_key, cf_value in record.items():
field = {
'field_set_id' : key,
'id' : cf_key,
'value' : cf_value,
}
cust_field_sets.append(field)

# Convert custom fields and sets
# Generalize/Abstract custom fields to key/value pairs
def convert_custom_fields(this_json):
for record in this_json:
cust_field_sets = []
for key, value in record.items():
if key.startswith('_'):
if isinstance(value, dict):
add_cust_field(key, value, cust_field_sets)
elif isinstance(value, list):
for element in value:
add_cust_field(key, element, cust_field_sets)
record['custom_fields'] = cust_field_sets
return this_json


# Run all transforms: denests _embedded, removes _embedded/_links, and
# converst camelCase to snake_case for fieldname keys.
def transform_json(this_json, path):
new_json = remove_custom_nodes(convert_custom_fields(this_json))
out = {}
out[path] = new_json
transformed_json = convert_json(out)
return transformed_json[path]


def transform_activities(this_json):
for record in this_json:
for key, value in record['activity'].items():
record[key] = value
del record['activity']
return this_json


# Review catalog and make a list of selected streams
def get_selected_streams(catalog):
selected_streams = set()
for stream in catalog.streams:
mdata = metadata.to_map(stream.metadata)
root_metadata = mdata.get(())
if root_metadata and root_metadata.get('selected') is True:
selected_streams.add(stream.tap_stream_id)
return list(selected_streams)
Empty file.
107 changes: 107 additions & 0 deletions tap_mambu/tap_mambu_refactor/TapGenerators/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from abc import ABC
from typing import List
from singer import utils

from ..Helpers import write_bookmark, transform_json, get_bookmark


class TapGenerator(ABC):
def __init__(self, stream_name, client, config, state, sub_type, endpoint_config=None):
self.stream_name = stream_name
self.client = client
self.config = config
self.state = state
self.sub_type = sub_type
self.endpoint_config = endpoint_config
self._init_config()
self._init_endpoint_config()
self._init_buffers()
self._init_bookmarks()
self._init_params()

def _init_config(self):
self.start_date = self.config.get('start_date')

def _init_endpoint_config(self):
if self.endpoint_config is None:
self.endpoint_config = {}

def _init_buffers(self):
self.buffer: List = list()

def _init_bookmarks(self):
self.bookmark_query_field = self.endpoint_config.get('bookmark_query_field')
self.bookmark_type = self.endpoint_config.get('bookmark_type')
self.bookmark_field = self.endpoint_config.get('bookmark_field')
if self.bookmark_type == "integer":
self.last_bookmark_value = get_bookmark(self.state, self.stream_name, self.sub_type, 0)
else:
self.last_bookmark_value = get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)
self.max_bookmark_value = self.last_bookmark_value

def _init_params(self):
self.time_extracted = None
self.static_params = self.endpoint_config.get('params', {})
self.offset = 0
self.limit = self.client.page_size
self.params = self.static_params

def __all_fetch_batch_steps(self):
self.prepare_batch()
raw_batch = self.fetch_batch()
self.buffer = self.transform_batch(raw_batch)
self.last_batch_size = len(self.buffer)

def __iter__(self):
self.__all_fetch_batch_steps()
return self

def __next__(self):
if not self.buffer:
if self.last_batch_size < self.limit:
raise StopIteration()
self.offset += self.limit
# self.write_bookmark()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be deleted. Since the sorting parameter is id and the bookmark key is a datetime, bookmarking at this point seems like it wouldn't work out.

self.__all_fetch_batch_steps()
if not self.buffer:
raise StopIteration()
return self.buffer.pop(0)

def write_bookmark(self):
if self.bookmark_field:
write_bookmark(self.state,
self.stream_name,
self.sub_type,
self.max_bookmark_value)

def prepare_batch(self):
self.params = {
"offset": self.offset,
"limit": self.limit,
**self.static_params
}

def fetch_batch(self):
if self.bookmark_query_field:
self.params[self.bookmark_query_field] = self.last_bookmark_value
response = self.client.request(
method=self.endpoint_config.get('api_method', 'GET'),
path=self.endpoint_config.get('path'),
version=self.endpoint_config.get('api_version', 'v2'),
apikey_type=self.endpoint_config.get('apikey_type', None),
params='&'.join([f'{key}={value}' for (key, value) in self.params.items()]),
endpoint=self.stream_name,
json=self.endpoint_config.get('body', None))
self.time_extracted = utils.now()
if isinstance(response, dict):
return [response]
return response

def transform_batch(self, batch):
data_key = self.endpoint_config.get('data_key', None)
transformed_batch = list()
if data_key is None:
transformed_batch = transform_json(batch, self.stream_name)
elif data_key in batch:
transformed_batch = transform_json(batch, data_key)[data_key]
return transformed_batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import abc

from .generator import TapGenerator
from ..Helpers import transform_datetime, get_bookmark


class LoanAccountsGenerator(TapGenerator):
@abc.abstractmethod
def _init_endpoint_config(self):
self.endpoint_config = {
'path': 'loans:search',
'api_version': 'v2',
'api_method': 'POST',
'params': {
'detailsLevel': 'FULL',
'paginationDetails': 'ON'
},
'body': {
"sortingCriteria": {
"field": "id",
"order": "ASC"
},
"filterCriteria": [
{
"field": "",
"operator": "AFTER",
"value": transform_datetime(
get_bookmark(self.state, 'loan_accounts', 'self', self.start_date))[:10]
}
]
},
'bookmark_field': '',
'bookmark_type': 'datetime',
'id_fields': ['id'],
'children': {
'loan_repayments': {
'path': 'loans/{}/repayments',
'api_version': 'v1',
'api_method': 'GET',
'params': {
'detailsLevel': 'FULL',
'paginationDetails': 'ON'
},
'id_fields': ['encoded_key'],
'parent': 'loan_accounts'
}
}
}


class LoanAccountsLMGenerator(LoanAccountsGenerator):
def _init_endpoint_config(self):
super()._init_endpoint_config()
self.endpoint_config["body"]["filterCriteria"][0]["field"] = "lastModifiedDate"
self.endpoint_config["bookmark_field"] = "lastModifiedDate"


class LoanAccountsADGenerator(LoanAccountsGenerator):
def _init_endpoint_config(self):
super()._init_endpoint_config()
self.endpoint_config["body"]["filterCriteria"][0]["field"] = "lastAccountAppraisalDate"
self.endpoint_config["bookmark_field"] = "lastAccountAppraisalDate"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .generator import TapGenerator


class LoanRepaymentsGenerator(TapGenerator):
pass
Empty file.
Loading