Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/32 #64

Merged
merged 52 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ff52377
Added Refactored Tap Boilerplate Classes
Nov 22, 2021
b738db8
ECDDC-476
Nov 22, 2021
efb3ea0
Implemented some logic, need to rethink inheritance, especially initi…
Nov 23, 2021
33459da
Pushing more changes, will test after this
Nov 23, 2021
72e3caa
Intermediary commit
Nov 25, 2021
906b3cc
Added last_account_appraisal_date to loan_accounts.json
Nov 25, 2021
30cbdfe
Implemented bookmark resume and ABC for Generators, with Loan Account…
Nov 26, 2021
c451325
Commit before fixing Appraisal Order Bug
Nov 29, 2021
284f56a
Finished implementation of Loan Accounts Generator/Processor
Nov 29, 2021
0039dd6
Small Bugfixes
Dec 6, 2021
6d82085
Merge branch 'master' into release/32
Dec 6, 2021
2c8fca6
Merge branch 'release/32' into feature/ECDCC-476_Extract-loans
Dec 6, 2021
19800f3
Small fixes suggested by Alex via PR Comments
Dec 6, 2021
f8617c0
Solved __iter__ method and __next__ method duplicated code according …
Dec 6, 2021
f7c4d20
Added a comment so we don't forget about the deduplication_key in the…
Dec 7, 2021
21c8ef2
Merge branch 'feature/ECDCC-476_Extract-loans' into 'release/32'
Dec 7, 2021
6868cbd
Also added record_count
Dec 7, 2021
2a58d92
Merge branch 'feature/ECDCC-476_Extract-loans' into 'release/32'
Dec 7, 2021
7bc95df
adjusted the last_datetime var from sync_endpoint func to use the loo…
DownstreamDataTeam Dec 16, 2021
3eb4038
Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into 'release…
Dec 17, 2021
062e19d
Revert "Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into …
Dec 20, 2021
fa82719
Added Refactored Tap Boilerplate Classes
Nov 22, 2021
7b11dd8
ECDDC-476
Nov 22, 2021
952efdc
Implemented some logic, need to rethink inheritance, especially initi…
Nov 23, 2021
12d3628
Pushing more changes, will test after this
Nov 23, 2021
b7ee002
Intermediary commit
Nov 25, 2021
4d106f4
Added last_account_appraisal_date to loan_accounts.json
Nov 25, 2021
c4de88f
Implemented bookmark resume and ABC for Generators, with Loan Account…
Nov 26, 2021
139a7a7
Commit before fixing Appraisal Order Bug
Nov 29, 2021
8531705
Finished implementation of Loan Accounts Generator/Processor
Nov 29, 2021
3c39c6f
Small Bugfixes
Dec 6, 2021
80ab4f6
Small fixes suggested by Alex via PR Comments
Dec 6, 2021
c9afcbe
Solved __iter__ method and __next__ method duplicated code according …
Dec 6, 2021
0bbfd3a
Added a comment so we don't forget about the deduplication_key in the…
Dec 7, 2021
2a5e45b
Also added record_count
Dec 7, 2021
532e55b
adjusted the last_datetime var from sync_endpoint func to use the loo…
DownstreamDataTeam Dec 16, 2021
e7e3653
Revert "Merge branch 'feature/ECDCC-500_lookback_window_bugfix' into …
Dec 20, 2021
6f04d6d
Merge remote-tracking branch 'origin/release/32' into release/32
DownstreamDataTeam Jan 3, 2022
b4dec65
Started the loan_repayments refactor
DownstreamDataTeam Jan 5, 2022
a9a4717
Finished the refactor
DownstreamDataTeam Jan 5, 2022
7762921
Fixed missing deduplication_key error
DownstreamDataTeam Jan 11, 2022
cd8d18a
Fixed up loan repayments refactor
Jan 14, 2022
96475ee
Added gitlab-ci config
Jan 18, 2022
20632f3
Revert "Added gitlab-ci config"
Jan 18, 2022
4d835f7
Merge branch 'feature/ECDDC-519_loan_repayments_refactor' into 'relea…
Jan 18, 2022
6a923a4
Bugfix for records ignored when filtering by modified_date on records…
Feb 2, 2022
a70b1e1
Merge branch 'bugfix/ECDDC-559-release-32-fix-loan-accounts-appraisal…
Feb 2, 2022
b50a8d4
Fixed processor to take bookmark into account correctly, fixed tap-te…
Feb 3, 2022
936a08a
Also added replication key to tests
Feb 3, 2022
e3f4a05
Adjusted bookmark tests to accept multiple replication keys
Feb 3, 2022
cba648e
Merge branch 'bugfix/ECDDC-559-release-32-fix-loan-accounts-appraisal…
Feb 3, 2022
1e78c8b
Merge branch 'master' into release/32
DownstreamDataTeam Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tap_mambu/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
'loan_accounts': {
'key_properties': ['id'],
'replication_method': 'INCREMENTAL',
'replication_keys': ['last_modified_date']
'replication_keys': ['last_modified_date', 'last_account_appraisal_date']
},
'loan_repayments': {
'key_properties': ['encoded_key'],
Expand Down
34 changes: 27 additions & 7 deletions tap_mambu/tap_mambu_refactor/TapProcessors/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,38 @@ def process_streams_from_generators(self, generators):
self.generator_values.pop(iterator)
if not self.generator_values:
break
# Find lowest value in the list
# Find lowest value in the list, and if two or more are equal, find max bookmark
min_record_key = None
min_record_value = None
min_record_bookmark = None
for iterator in self.generator_values:
if min_record_value is None \
bookmark_field = convert(iterator.endpoint_config.get('bookmark_field', ''))
# Different record
if min_record_key is None \
or min_record_value > self.generator_values[iterator][self.deduplication_key]:
min_record_key = iterator
min_record_value = self.generator_values[iterator][self.deduplication_key]
if not bookmark_field:
continue
min_record_bookmark = self.generator_values[iterator][bookmark_field]
# Same record
elif min_record_value == self.generator_values[iterator][self.deduplication_key]:
if not bookmark_field:
continue

# Check the new bookmark against the min_record_key's bookmark
min_record_dttm = strptime_to_utc(min_record_bookmark)
record_dttm = strptime_to_utc(self.generator_values[iterator][bookmark_field])
# If min_record has bookmark smaller than record, then replace min_record (we want highest bookmark)
if min_record_dttm < record_dttm:
min_record_key = iterator
min_record_value = self.generator_values[iterator][self.deduplication_key]
min_record_bookmark = self.generator_values[iterator][bookmark_field]

# Process the record
record = self.generator_values[min_record_key]
if self.process_record(record, min_record_key.time_extracted):
if self.process_record(record, min_record_key.time_extracted,
min_record_key.endpoint_config.get('bookmark_field', '')):
record_count += 1
record_count += self._process_child_records(record)

Expand All @@ -70,10 +90,10 @@ def process_streams_from_generators(self, generators):
def _process_child_records(self, record):
return 0

def __is_record_past_bookmark(self, transformed_record):
def __is_record_past_bookmark(self, transformed_record, bookmark_field):
is_record_past_bookmark = False
bookmark_type = self.generators[0].endpoint_config.get('bookmark_type')
bookmark_field = convert(self.generators[0].endpoint_config.get('bookmark_field', ''))
bookmark_field = convert(bookmark_field)

# Reset max_bookmark_value to new value if higher
if bookmark_field and (bookmark_field in transformed_record):
Expand Down Expand Up @@ -101,13 +121,13 @@ def __is_record_past_bookmark(self, transformed_record):

return is_record_past_bookmark

def process_record(self, record, time_extracted):
def process_record(self, record, time_extracted, bookmark_field):
with Transformer() as transformer:
transformed_record = transformer.transform(record,
self.schema,
self.stream_metadata)

if self.__is_record_past_bookmark(transformed_record):
if self.__is_record_past_bookmark(transformed_record, bookmark_field):
write_record(self.stream_name,
transformed_record,
time_extracted=time_extracted)
Expand Down
2 changes: 1 addition & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def expected_metadata(self):
},
self.REPLICATION_METHOD: "INCREMENTAL",
self.REPLICATION_KEYS: {
"last_modified_date"
"last_modified_date", "last_account_appraisal_date"
}
},
"loan_repayments": {
Expand Down
23 changes: 16 additions & 7 deletions tests/test_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_run(self):
second_sync_messages = second_sync_records.get(stream, {}).get('messages', [])

if replication_method == self.INCREMENTAL:
replication_key = self.expected_replication_keys().get(stream).pop()
replication_keys = self.expected_replication_keys().get(stream)

first_sync_bookmark_value = first_sync_bookmarks['bookmarks'][stream]
second_sync_bookmark_value = second_sync_bookmarks['bookmarks'][stream]
Expand All @@ -92,29 +92,38 @@ def test_run(self):
self.assertEqual(first_sync_bookmark_value,
second_sync_bookmark_value)

# Verify that first sync records fall betwen the start date and the final
# Verify that first sync records fall between the start date and the final
# bookmark value
for message in first_sync_messages:
lower_bound = strptime_to_utc(self.get_properties()['start_date'])
actual_value = strptime_to_utc(message.get('data').get(replication_key))
upper_bound = strptime_to_utc(first_sync_bookmark_value)
record = message.get('data')
actual_values = [strptime_to_utc(record.get(replication_key))
for replication_key in replication_keys if replication_key in record]

self.assertNotEqual(actual_values, [], msg="No replication key found in record")

self.assertTrue(
lower_bound <= actual_value <= upper_bound,
any([lower_bound <= actual_value <= upper_bound for actual_value in actual_values]),
msg="First sync records fall outside of expected sync window"
)

# Verify the second sync records fall between simulated bookmark value and the
# final bookmark value
for message in second_sync_messages:
lower_bound = strptime_to_utc(simulated_bookmark_value)
actual_value = strptime_to_utc(message.get('data',{}).get(replication_key))
upper_bound = strptime_to_utc(second_sync_bookmark_value)
record = message.get('data')
actual_values = [strptime_to_utc(record.get(replication_key))
Copy link
Contributor

@dmosorast dmosorast Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of an intereting case, since I don't think we've ever used two fields for a replication key before. This use case is a sort of key1 OR key2 approach, rather than a key1 AND key2 approach (like would be the case for a composite primary key).

The replication key metadata in a SaaS tap is mostly informational, so that may not be an issue, but it's an interesting piece that I hadn't considered. What folks have done in the past where the max of two fields is the replication key would be to consolidate them into a single manufactured field (like row_updated or something) and bookmark based on that, but this approach seems more fine grained as far as state goes, so I'm leaning towards it being good as-is. I just wanted to raise that as a consideration on my end.

Tests passed locally, so I'm good to release this now.

for replication_key in replication_keys if replication_key in record]

self.assertNotEqual(actual_values, [], msg="No replication key found in record")

self.assertTrue(
lower_bound <= actual_value <= upper_bound,
any([lower_bound <= actual_value <= upper_bound for actual_value in actual_values]),
msg="Second sync records fall outside of expected sync window"
)


# Verify the number of records in the 2nd sync is less then the first
self.assertLess(second_sync_count, first_sync_count)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_replication_key_values(self, stream, records):
for record in records:
# Build the primary key for this record, maintaining the same order for the fields
record_rep_key = [record[field]
for field in sorted(self.expected_replication_keys()[stream])]
for field in sorted(self.expected_replication_keys()[stream]) if field in record]
# Cast to a tuple to make it hashable
all_records.append(record_rep_key[0])
return all_records
Expand Down