Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace OMP with std::thread #3309

Merged
merged 10 commits into from
Sep 20, 2022
64 changes: 63 additions & 1 deletion faster_tokenizer/faster_tokenizer/core/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License. */
#ifdef WITH_OMP
#include <omp.h>
#endif

namespace paddlenlp {
namespace faster_tokenizer {
namespace core {
Expand Down Expand Up @@ -600,6 +601,23 @@ bool TruncateEncodings(Encoding* encoding,
return true;
}

void MultiThreadPadEncodings(std::vector<Encoding>* encodings,
const PadMethod& method,
size_t pad_length,
size_t start_index,
size_t step_index) {
auto batch_size = encodings->size();
size_t end_index = start_index + step_index;
if (end_index > batch_size) end_index = batch_size;
for (size_t i = start_index; i < end_index; ++i) {
auto& encoding = (*encodings)[i];
encoding.Pad(pad_length,
method.pad_id_,
method.pad_token_type_id_,
method.pad_token_,
method.direction_);
}
}
void PadEncodings(std::vector<Encoding>* encodings, const PadMethod& method) {
if (encodings == nullptr || encodings->empty()) {
return;
Expand All @@ -619,7 +637,6 @@ void PadEncodings(std::vector<Encoding>* encodings, const PadMethod& method) {
auto batch_size = encodings->size();
#ifdef WITH_OMP
#pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1)
#endif
for (int i = 0; i < batch_size; ++i) {
auto& encoding = (*encodings)[i];
encoding.Pad(pad_length,
Expand All @@ -628,6 +645,51 @@ void PadEncodings(std::vector<Encoding>* encodings, const PadMethod& method) {
method.pad_token_,
method.direction_);
}
#else
auto func = std::bind(&MultiThreadPadEncodings,
encodings,
std::ref(method),
pad_length,
std::placeholders::_1,
std::placeholders::_2);
RunMultiThread(func, batch_size);
#endif
}

int GetThreadNum(size_t batch_size) {
char* env_var = std::getenv("OMP_NUM_THREADS");
int thread_num = std::atoi(env_var);
if (batch_size <= 0) {
thread_num = 1;
VLOG(3) << "batch_size <=0, we set OMP_NUM_THREADS = 1";
} else {
int best_num = ceil(batch_size / 4.0);
if (thread_num > best_num) {
thread_num = best_num;
VLOG(3) << "OMP_NUM_THREADS > batch_size/4, we set OMP_NUM_THREADS = "
"batch_size/4";
} else if (thread_num == 0) {
thread_num = best_num;
VLOG(3) << "OMP_NUM_THREADS == 0, we set OMP_NUM_THREADS = batch_size/4";
}
}
return thread_num;
}

void RunMultiThread(std::function<void(size_t, size_t)> func,
size_t batch_size) {
int thread_num = GetThreadNum(batch_size);
std::vector<std::thread> vectorOfThread;
size_t start_index = 0;
size_t step_index = ceil(batch_size / float(thread_num));

for (size_t thread_index = 0; thread_index < thread_num; thread_index++) {
vectorOfThread.emplace_back(std::thread(func, start_index, step_index));
start_index = start_index + step_index;
}
for (size_t thread_index = 0; thread_index < thread_num; thread_index++) {
vectorOfThread[thread_index].join();
}
}

} // namespace core
Expand Down
10 changes: 10 additions & 0 deletions faster_tokenizer/faster_tokenizer/core/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ limitations under the License. */
#include "faster_tokenizer/core/base.h"
#include "faster_tokenizer/utils/utils.h"

#include <math.h>
#include <stdlib.h>
#include <functional>
#include <thread>
using namespace std;

namespace paddlenlp {
namespace faster_tokenizer {
namespace core {
Expand Down Expand Up @@ -122,6 +128,10 @@ bool FASTERTOKENIZER_DECL TruncateEncodings(Encoding* encoding,
void FASTERTOKENIZER_DECL PadEncodings(std::vector<Encoding>* encoding,
const PadMethod& method);

int FASTERTOKENIZER_DECL GetThreadNum(size_t batch_size);

void FASTERTOKENIZER_DECL
RunMultiThread(std::function<void(size_t, size_t)> func, size_t batch_size);
} // namespace core
} // namespace faster_tokenizer
} // namespace paddlenlp
87 changes: 83 additions & 4 deletions faster_tokenizer/faster_tokenizer/core/tokenizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License. */
#include "faster_tokenizer/postprocessors/postprocessors.h"
#include "faster_tokenizer/pretokenizers/pretokenizers.h"


#ifdef WITH_OMP
#include <omp.h>
#endif
Expand Down Expand Up @@ -248,23 +249,49 @@ void Tokenizer::EncodePairStrings(const EncodeInput& encode_input,
}
}

void Tokenizer::MultiThreadEncodeBatchStrings(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens,
size_t start_index,
size_t step_index) const {
auto batch_size = batch_encode_input.size();
size_t end_index = start_index + step_index;
if (end_index > batch_size) end_index = batch_size;
for (size_t i = start_index; i < end_index; ++i) {
EncodePairStrings(
batch_encode_input[i], &(*encodings)[i], add_special_tokens);
}
}

void Tokenizer::EncodeBatchStrings(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens) const {
auto batch_size = batch_encode_input.size();
encodings->resize(batch_size);

#ifdef WITH_OMP
// (TODO:zhoushunjie): Simply use the batch size to estimate the workload of
// tokenization.
// Use workload to determine whether create omp threads. Need to optimize the
// workload estimation.
#pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1)
#endif
for (int i = 0; i < batch_size; ++i) {
EncodePairStrings(
batch_encode_input[i], &(*encodings)[i], add_special_tokens);
}
#else
auto func = std::bind(&Tokenizer::MultiThreadEncodeBatchStrings,
this,
std::ref(batch_encode_input),
encodings,
add_special_tokens,
std::placeholders::_1,
std::placeholders::_2);
RunMultiThread(func, batch_size);
#endif

if (use_padding_) {
PadEncodings(encodings, pad_method_);
}
Expand All @@ -289,6 +316,23 @@ void Tokenizer::EncodePairStringsCharOffsets(const EncodeInput& encode_input,
PostProcess(&encoding, &pair_encoding, add_special_tokens, encodings);
}

void Tokenizer::MultiThreadEncodeBatchStringsCharOffsets(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens,
size_t start_index,
size_t step_index) const {
auto batch_size = batch_encode_input.size();
size_t end_index = start_index + step_index;
if (end_index > batch_size) end_index = batch_size;
for (size_t i = start_index; i < end_index; ++i) {
Encoding encoding;
EncodePairStringsCharOffsets(
batch_encode_input[i], &encoding, add_special_tokens);
(*encodings)[i] = std::move(encoding);
}
}

void Tokenizer::EncodeBatchStringsCharOffsets(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
Expand All @@ -301,13 +345,23 @@ void Tokenizer::EncodeBatchStringsCharOffsets(
// Use workload to determine whether create omp threads. Need to optimize the
// workload estimation.
#pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1)
#endif
for (int i = 0; i < batch_size; ++i) {
Encoding encoding;
EncodePairStringsCharOffsets(
batch_encode_input[i], &encoding, add_special_tokens);
(*encodings)[i] = std::move(encoding);
}
#else
auto func = std::bind(&Tokenizer::MultiThreadEncodeBatchStringsCharOffsets,
this,
std::ref(batch_encode_input),
encodings,
add_special_tokens,
std::placeholders::_1,
std::placeholders::_2);
RunMultiThread(func, batch_size);
#endif

if (use_padding_) {
PadEncodings(encodings, pad_method_);
}
Expand Down Expand Up @@ -404,22 +458,47 @@ void Tokenizer::Decode(const std::vector<uint32_t>& token_ids,
}
}


void Tokenizer::MultiThreadDecodeBatch(
const std::vector<std::vector<uint32_t>>& batch_token_ids,
std::vector<std::string>* results,
bool skip_special_tokens,
size_t start_index,
size_t step_index) const {
auto batch_size = batch_token_ids.size();
size_t end_index = start_index + step_index;
if (end_index > batch_size) end_index = batch_size;
for (size_t i = start_index; i < end_index; ++i) {
Decode(batch_token_ids[i], &(*results)[i], skip_special_tokens);
}
}

void Tokenizer::DecodeBatch(
const std::vector<std::vector<uint32_t>>& batch_token_ids,
std::vector<std::string>* results,
bool skip_special_tokens) const {
results->resize(batch_token_ids.size());
auto batch_size = batch_token_ids.size();
results->resize(batch_size);
#ifdef WITH_OMP
// (TODO:zhoushunjie): Simply use the batch size to estimate the workload of
// tokenization.
// Use workload to determine whether create omp threads. Need to optimize the
// workload estimation.
#pragma omp parallel for if (batch_token_ids.size() >= 4 && \
omp_get_num_threads() > 1)
#endif
for (int i = 0; i < batch_token_ids.size(); ++i) {
Decode(batch_token_ids[i], &(*results)[i], skip_special_tokens);
}
#else
auto func = std::bind(&Tokenizer::MultiThreadDecodeBatch,
this,
std::ref(batch_token_ids),
results,
skip_special_tokens,
std::placeholders::_1,
std::placeholders::_2);
RunMultiThread(func, batch_size);
#endif
}

bool Tokenizer::GetUseTruncation() const { return use_truncation_; }
Expand Down
20 changes: 20 additions & 0 deletions faster_tokenizer/faster_tokenizer/core/tokenizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,24 @@ class FASTERTOKENIZER_DECL Tokenizer {
bool add_special_tokens,
Encoding* result_encoding) const;

void MultiThreadEncodeBatchStrings(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens,
size_t start_index,
size_t step_index) const;

void EncodeBatchStrings(const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens = true) const;

void MultiThreadEncodeBatchStringsCharOffsets(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
bool add_special_tokens,
size_t start_index,
size_t step_index) const;

void EncodeBatchStringsCharOffsets(
const std::vector<EncodeInput>& batch_encode_input,
std::vector<Encoding>* encodings,
Expand Down Expand Up @@ -194,6 +208,12 @@ class FASTERTOKENIZER_DECL Tokenizer {
void Decode(const std::vector<uint32_t>& token_ids,
std::string* result,
bool skip_special_tokens = true) const;
void MultiThreadDecodeBatch(
const std::vector<std::vector<uint32_t>>& batch_token_ids,
std::vector<std::string>* results,
bool skip_special_tokens,
size_t start_index,
size_t step_index) const;
void DecodeBatch(const std::vector<std::vector<uint32_t>>& batch_token_ids,
std::vector<std::string>* results,
bool skip_special_tokens = true) const;
Expand Down