Skip to content

Commit

Permalink
Merge branch 'master' into 'local-master'
Browse files Browse the repository at this point in the history
Pull changes from Stitch

See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!32
  • Loading branch information
Radu Marinoiu committed Feb 14, 2022
2 parents 14cbfff + 03f420a commit 986619a
Show file tree
Hide file tree
Showing 18 changed files with 567 additions and 31 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## 2.3.3
* Enable sorting of `gl_journal_entries` response to prevent missing records due to pagination [#66](https://github.com/singer-io/tap-mambu/pull/66)

## 2.3.2
* Fix issue with handling the `account_appraisal_date` for `loan_accounts` stream. [#64](https://github.com/singer-io/tap-mambu/pull/64)

## 2.3.1
* Adjust bookmarking of `loan_accounts` to use `modified_date` *or* `account_appraisal_date` [#62](https://github.com/singer-io/tap-mambu/pull/62)
* Stream `loan_accounts` and child stream `loan_repayments` refactored into new pattern

## 2.3.0
* Added `original_account_key` field to the `loan_accounts` stream [#60](https://github.com/singer-io/tap-mambu/pull/60)
* Fix audit trail duplicated records [#59](https://github.com/singer-io/tap-mambu/pull/59)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-mambu',
version='2.3.0',
version='2.3.3',
description='Singer.io tap for extracting data from the Mambu 2.0 API',
author='[email protected]',
classifiers=['Programming Language :: Python :: 3 :: Only'],
Expand Down
2 changes: 1 addition & 1 deletion tap_mambu/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
'loan_accounts': {
'key_properties': ['id'],
'replication_method': 'INCREMENTAL',
'replication_keys': ['last_modified_date']
'replication_keys': ['last_modified_date', 'last_account_appraisal_date']
},
'loan_repayments': {
'key_properties': ['encoded_key'],
Expand Down
7 changes: 7 additions & 0 deletions tap_mambu/schemas/loan_accounts.json
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,13 @@
],
"format": "date-time"
},
"last_account_appraisal_date": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"penalty_settings": {
"type": [
"null",
Expand Down
56 changes: 36 additions & 20 deletions tap_mambu/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import singer
from singer import Transformer, metadata, metrics, utils
from singer.utils import strftime, strptime_to_utc

from tap_mambu.tap_mambu_refactor import sync_endpoint_refactor
from tap_mambu.transform import transform_json, transform_activities

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -742,6 +744,10 @@ def sync(client, config, catalog, state):
'api_version': 'v1',
'api_method': 'POST',
'body': {
"sortDetails": {
"sortingColumn": "ENTRY_ID",
"sortingOrder": "ASCENDING"
},
"filterConstraints": [
{
"filterSelection": "CREATION_DATE",
Expand Down Expand Up @@ -899,26 +905,36 @@ def sync(client, config, catalog, state):
if sub_type_param:
endpoint_config['params']['type'] = sub_type

total_records = sync_endpoint(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
stream_name=stream_name,
path=path,
endpoint_config=endpoint_config,
api_version=endpoint_config.get('api_version', 'v2'),
api_method=endpoint_config.get('api_method', 'GET'),
static_params=endpoint_config.get('params', {}),
sub_type=sub_type,
bookmark_query_field=endpoint_config.get('bookmark_query_field'),
bookmark_field=endpoint_config.get('bookmark_field'),
bookmark_type=endpoint_config.get('bookmark_type'),
data_key=endpoint_config.get('data_key', None),
body=endpoint_config.get('body', None),
id_fields=endpoint_config.get('id_fields'),
apikey_type=endpoint_config.get('apikey_type', None)
)
if stream_name in ["loan_accounts"]:
total_records = sync_endpoint_refactor(
client=client,
catalog=catalog,
state=state,
stream_name=stream_name,
sub_type=sub_type,
config=config
)
else:
total_records = sync_endpoint(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
stream_name=stream_name,
path=path,
endpoint_config=endpoint_config,
api_version=endpoint_config.get('api_version', 'v2'),
api_method=endpoint_config.get('api_method', 'GET'),
static_params=endpoint_config.get('params', {}),
sub_type=sub_type,
bookmark_query_field=endpoint_config.get('bookmark_query_field'),
bookmark_field=endpoint_config.get('bookmark_field'),
bookmark_type=endpoint_config.get('bookmark_type'),
data_key=endpoint_config.get('data_key', None),
body=endpoint_config.get('body', None),
id_fields=endpoint_config.get('id_fields'),
apikey_type=endpoint_config.get('apikey_type', None)
)

update_currently_syncing(state, None)
LOGGER.info('Synced: {}, total_records: {}'.format(
Expand Down
128 changes: 128 additions & 0 deletions tap_mambu/tap_mambu_refactor/Helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import re
from singer import write_state, Transformer, metadata


def get_bookmark(state, stream, sub_type, default):
if (state is None) or ('bookmarks' not in state):
return default

if sub_type == 'self':
return state.get('bookmarks', {}).get(stream, default)
else:
return state.get('bookmarks', {}).get(stream, {}).get(sub_type, default)


def transform_datetime(this_dttm):
with Transformer() as transformer:
new_dttm = transformer._transform_datetime(this_dttm)
return new_dttm


def write_bookmark(state, stream, sub_type, value):
if 'bookmarks' not in state:
state['bookmarks'] = {}
if stream not in state['bookmarks']:
state['bookmarks'][stream] = {}
if sub_type == 'self':
state['bookmarks'][stream] = value
else:
if sub_type not in state['bookmarks'][stream]:
state['bookmarks'][stream][sub_type] = {}
state['bookmarks'][stream][sub_type] = value
write_state(state)


# Convert camelCase to snake_case
def convert(name):
regsub = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', regsub).lower()


# Convert keys in json array
def convert_array(arr):
new_arr = []
for i in arr:
if isinstance(i, list):
new_arr.append(convert_array(i))
elif isinstance(i, dict):
new_arr.append(convert_json(i))
else:
new_arr.append(i)
return new_arr


# Convert keys in json
def convert_json(this_json):
out = {}
for key in this_json:
new_key = convert(key)
if isinstance(this_json[key], dict):
out[new_key] = convert_json(this_json[key])
elif isinstance(this_json[key], list):
out[new_key] = convert_array(this_json[key])
else:
out[new_key] = this_json[key]
return out


def remove_custom_nodes(this_json):
if not isinstance(this_json, (dict, list)):
return this_json
if isinstance(this_json, list):
return [remove_custom_nodes(vv) for vv in this_json]
return {kk: remove_custom_nodes(vv) for kk, vv in this_json.items() \
if not kk[:1] == '_'}


def add_cust_field(key, record, cust_field_sets):
for cf_key, cf_value in record.items():
field = {
'field_set_id' : key,
'id' : cf_key,
'value' : cf_value,
}
cust_field_sets.append(field)

# Convert custom fields and sets
# Generalize/Abstract custom fields to key/value pairs
def convert_custom_fields(this_json):
for record in this_json:
cust_field_sets = []
for key, value in record.items():
if key.startswith('_'):
if isinstance(value, dict):
add_cust_field(key, value, cust_field_sets)
elif isinstance(value, list):
for element in value:
add_cust_field(key, element, cust_field_sets)
record['custom_fields'] = cust_field_sets
return this_json


# Run all transforms: denests _embedded, removes _embedded/_links, and
# converst camelCase to snake_case for fieldname keys.
def transform_json(this_json, path):
new_json = remove_custom_nodes(convert_custom_fields(this_json))
out = {}
out[path] = new_json
transformed_json = convert_json(out)
return transformed_json[path]


def transform_activities(this_json):
for record in this_json:
for key, value in record['activity'].items():
record[key] = value
del record['activity']
return this_json


# Review catalog and make a list of selected streams
def get_selected_streams(catalog):
selected_streams = set()
for stream in catalog.streams:
mdata = metadata.to_map(stream.metadata)
root_metadata = mdata.get(())
if root_metadata and root_metadata.get('selected') is True:
selected_streams.add(stream.tap_stream_id)
return list(selected_streams)
Empty file.
107 changes: 107 additions & 0 deletions tap_mambu/tap_mambu_refactor/TapGenerators/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from abc import ABC
from typing import List
from singer import utils

from ..Helpers import write_bookmark, transform_json, get_bookmark


class TapGenerator(ABC):
def __init__(self, stream_name, client, config, state, sub_type, endpoint_config=None):
self.stream_name = stream_name
self.client = client
self.config = config
self.state = state
self.sub_type = sub_type
self.endpoint_config = endpoint_config
self._init_config()
self._init_endpoint_config()
self._init_buffers()
self._init_bookmarks()
self._init_params()

def _init_config(self):
self.start_date = self.config.get('start_date')

def _init_endpoint_config(self):
if self.endpoint_config is None:
self.endpoint_config = {}

def _init_buffers(self):
self.buffer: List = list()

def _init_bookmarks(self):
self.bookmark_query_field = self.endpoint_config.get('bookmark_query_field')
self.bookmark_type = self.endpoint_config.get('bookmark_type')
self.bookmark_field = self.endpoint_config.get('bookmark_field')
if self.bookmark_type == "integer":
self.last_bookmark_value = get_bookmark(self.state, self.stream_name, self.sub_type, 0)
else:
self.last_bookmark_value = get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)
self.max_bookmark_value = self.last_bookmark_value

def _init_params(self):
self.time_extracted = None
self.static_params = self.endpoint_config.get('params', {})
self.offset = 0
self.limit = self.client.page_size
self.params = self.static_params

def __all_fetch_batch_steps(self):
self.prepare_batch()
raw_batch = self.fetch_batch()
self.buffer = self.transform_batch(raw_batch)
self.last_batch_size = len(self.buffer)

def __iter__(self):
self.__all_fetch_batch_steps()
return self

def __next__(self):
if not self.buffer:
if self.last_batch_size < self.limit:
raise StopIteration()
self.offset += self.limit
# self.write_bookmark()
self.__all_fetch_batch_steps()
if not self.buffer:
raise StopIteration()
return self.buffer.pop(0)

def write_bookmark(self):
if self.bookmark_field:
write_bookmark(self.state,
self.stream_name,
self.sub_type,
self.max_bookmark_value)

def prepare_batch(self):
self.params = {
"offset": self.offset,
"limit": self.limit,
**self.static_params
}

def fetch_batch(self):
if self.bookmark_query_field:
self.params[self.bookmark_query_field] = self.last_bookmark_value
response = self.client.request(
method=self.endpoint_config.get('api_method', 'GET'),
path=self.endpoint_config.get('path'),
version=self.endpoint_config.get('api_version', 'v2'),
apikey_type=self.endpoint_config.get('apikey_type', None),
params='&'.join([f'{key}={value}' for (key, value) in self.params.items()]),
endpoint=self.stream_name,
json=self.endpoint_config.get('body', None))
self.time_extracted = utils.now()
if isinstance(response, dict):
return [response]
return response

def transform_batch(self, batch):
data_key = self.endpoint_config.get('data_key', None)
transformed_batch = list()
if data_key is None:
transformed_batch = transform_json(batch, self.stream_name)
elif data_key in batch:
transformed_batch = transform_json(batch, data_key)[data_key]
return transformed_batch
Loading

0 comments on commit 986619a

Please sign in to comment.