-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multithreading improvement #84
Conversation
# Conflicts: # tap_mambu/sync.py # tap_mambu/tap_mambu_refactor/main.py
…into 'release/40' [ECDDC-592] Refactored credit_arrangements stream, implemented unit tests See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!68
Refactored 7 more streams (singer-io#72) See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!71
…ion logic to a helper module Refactored unit tests to use the new Generator/Processor selection logic, and also to reduce code duplication
…ease/40' [ECDDC-653] Refactored main.py and unit tests See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!72
…_breakdown_stream # Conflicts: # tap_mambu/tap_mambu_refactor/main.py
…to 'release/40' [ECDDC-646] New Sonar config version See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!65
[ECDDC-591] Fixed issue with users deduplication key See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!78
…' into 'release/40' [ECDDC-603] Added interest accrual breakdown stream See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!74
…se/40' [ECDDC-649] Added task_link_key field to tasks stream See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!75
[ECDDC-652] Adjusted Snyk dev test See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!67
…ease/40' [ECDDC-653] Finished implementing catalog automatic fields checker test See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!77
…-extraction' into 'release/45' [ECDDC-695] Implement multithreaded child streams extraction See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!118
# Conflicts: # tap_mambu/helpers/client.py # tap_mambu/sync.py # tap_mambu/tap_processors/processor.py
[ECDDC-726] Merge master into release 45 See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!119
…se/45' [ECDDC-727] Added unit tests for multithreading See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!115
…nto 'release/45' [ECDDC-729] Refactored offset and bookmark multithreaded generators See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!120
… 'release/45' [ECDDC-716] Deposit accounts missing records See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some changes requested and comments. I think the biggest blocker here is the usage of set
for deduping (for performance and consistency reasons).
|
||
|
||
class MultithreadedRequestsPool: | ||
_dispatcher = ThreadPoolExecutor(max_workers=100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any metrics on the CPU usage of this? It does look like it will only create threads if it needs, and we think that most of them will just be waiting on network I/O, so CPU usage shouldn't be too high, but it'd be good to see some performance metrics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anecdotally, it's using quite a bit while running the tap-tester tests, so I'm a bit concerned about load in a multi-tenant environment. Would be nice to make the degree of parallelism configurable as a max through config or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be an idea, yes. Maybe we should talk more about this once I finish up fixing stuff after this review
… to one using a custom dict class "HashableDict" which implements a hash and unique keys for each dict (generated from the json dump data)
[ECDDC-726] Resolve issues in the PR See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!131
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more changes for performance requested, we can talk about this more in depth if you'd like.
|
||
def __eq__(self, other): | ||
if isinstance(other, HashableDict): | ||
return self.__key() == other.__key() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably use the hash instead of the key because equality is a required property of __hash__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure what you mean by equality being a required property of hash.
Also, it feels like changing equality from key to hash(key) could result in collisions for records that are not the same but result in the same hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's my mistake. I misread the statement in that Python documentation (and it seems like I need to review my Data Structures textbook 😅). At first read I interpreted this to also mean that hashes that are equal map to objects that are equal.
The only required property is that objects which compare equal have the same hash value
@@ -130,11 +133,11 @@ def error_check_and_fix(self, final_buffer: set, temp_buffer: set, futures: list | |||
final_buffer = self.check_and_get_set_reunion(final_buffer, temp_buffer, self.artificial_limit) | |||
except RuntimeError: # if errors are found | |||
LOGGER.exception("Discrepancies found in extracted data, and errors couldn't be corrected." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be exception level? I'd be concerned that a data set with a lot of activity on it during the sync could cause quite a bit of noise with all of the call stacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed in Slack, this can be included in the next PR.
… compute a hash for
[ECDDC-726] Replaced all json.dumps with tuple conversions, as they are faster to compute a hash for See merge request mambucom/product/ecosystem/mambu-marketplace/connectors/singer/tap-mambu!132
Description of change
page_size
default value from500
to200
page_size
andmax_threads
for some streams in order to improve performance and avoid causing extreme loadsRollback steps