Skip to content

Commit

Permalink
Add python multiprocessing draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Timoeller committed Dec 4, 2020
1 parent a6171ec commit 270996c
Showing 1 changed file with 52 additions and 4 deletions.
56 changes: 52 additions & 4 deletions farm/data_handler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
from farm.utils import MLFlowLogger as MlLogger
from farm.utils import try_get


import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

ID_NAMES = ["example_id", "external_id", "doc_id", "id"]


Expand Down Expand Up @@ -1633,21 +1637,64 @@ def __init__(
logger.info("Initialized processor without tasks. Supply `metric` and `label_list` to the constructor for "
"using the default task or add a custom task later via processor.add_task()")



def _fill_baskets_from_chunk(self, chunk):
"""
Creating a dataset for a chunk (= subset) of dicts. In multiprocessing:
* we read in all dicts from a file
* split all dicts into chunks
* feed *one chunk* to *one process*
=> the *one chunk* gets converted to *one dataset* (that's what we do here)
* all datasets get collected and concatenated
:param chunk: Instead of only having a list of dicts here we also supply an index (ascending int) for each.
=> [(0, dict), (1, dict) ...]
:type chunk: list of tuples
:param processor: FARM Processor (e.g. TextClassificationProcessor)
:return: PyTorch Dataset
"""
dicts_tokenized = [d[1] for d in chunk]
indices = [x[0] for x in chunk]
baskets = self._fill_baskets(dicts_tokenized, indices)
return baskets

def dataset_from_dicts(self, dicts, indices=None, return_baskets=False):
""" Overwrites the method from the base class since Question Answering processing is quite different.
This method allows for documents and questions to be tokenized earlier. Then SampleBaskets are initialized
with one document and one question. """


import time
# convert to standard format
dicts = [convert_qa_input_dict(x) for x in dicts]

#tokenize
start = time.time()
dicts_tokenized = _apply_tokenization_batch(dicts, self.tokenizer)

print(f"tokenized after {time.time() - start} seconds")
# split into passages and featurize
self.baskets = self._fill_baskets(dicts_tokenized, indices)

###################
######## TRY multiprocessing here
import torch.multiprocessing as mp
from contextlib import ExitStack
from functools import partial
from farm.data_handler.utils import grouper
with ExitStack() as stack:
p = stack.enter_context(mp.Pool(processes=8))
results = p.imap(
partial(self._fill_baskets_from_chunk),
grouper(dicts_tokenized, 5),
chunksize=1,
)
baskets = []
for b in results:
baskets.extend(b)
self.baskets = baskets
########## END OF try multiprocessing
#######################

#without mp
#self.baskets = self._fill_baskets(dicts_tokenized, indices)
print(f"featurized after {time.time() - start} seconds")
if 0 in indices:
self._log_samples(2)
# This mode is for inference where we need to keep baskets
Expand Down Expand Up @@ -2761,6 +2808,7 @@ def _apply_tokenization_batch(dicts,tokenizer):
"external_id": external_id}
raw_basket.append(raw)
raw_baskets_batch.append(raw_basket)

return raw_baskets_batch

def _apply_tokenization(dictionary, tokenizer, answer_types_list=[]):
Expand Down

0 comments on commit 270996c

Please sign in to comment.