Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Limit number of messages in mempool, instead of total byte sizes (#2073)
Browse files Browse the repository at this point in the history
  • Loading branch information
objmagic authored and huijunw committed Aug 1, 2017
1 parent 0523244 commit 1ae7d9b
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 4 deletions.
17 changes: 17 additions & 0 deletions heron/common/src/cpp/basics/mempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,24 @@
////////////////////////////////////////////////////////////////////////////////
#include "basics/mempool.h"
#include "basics/basics.h"
#include "glog/logging.h"

<<<<<<< HEAD
MemPool<google::protobuf::Message>* __global_protobuf_pool__ =
new MemPool<google::protobuf::Message>();
=======
/* The initial value of pool size limit is 512. This value will
be overridden by value read from config during initialization
of stream manager. The reason why we do this is to avoid
circular dependency problem. See https://github.com/twitter/heron/pull/2073
for details */
MemPool<google::protobuf::Message>* __global_protobuf_pool__ =
new MemPool<google::protobuf::Message>(512);
>>>>>>> 8fc4455... Limit number of messages in mempool, instead of total byte sizes (#2073)
std::mutex __global_protobuf_pool_mutex__;

void __global_protobuf_pool_set_pool_max_number_of_messages__(sp_int32 _pool_limit) {
std::lock_guard<std::mutex> guard(__global_protobuf_pool_mutex__);
LOG(INFO) << "Set max size of each memory pool to " << _pool_limit;
__global_protobuf_pool__->set_pool_max_number_of_messages(_pool_limit);
}
20 changes: 18 additions & 2 deletions heron/common/src/cpp/basics/mempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,25 @@ class MemPool {
template<typename M>
void release(M* ptr) {
std::type_index type = typeid(M);
map_[type].push_back(static_cast<B*>(ptr));
sp_int32 size = mem_pool_map_[type].size();
// if pool size reaches the limit, release the memory
// otherwise put the memory into pool
if (size >= pool_limit_) {
delete ptr;
} else {
mem_pool_map_[type].push_back(static_cast<B*>(ptr));
}
}

void set_pool_max_number_of_messages(sp_int32 _pool_limit) {
pool_limit_ = _pool_limit;
}

private:
std::unordered_map<std::type_index, std::vector<B*>> map_;
// each type has its own separate mem pool entry
std::unordered_map<std::type_index, std::vector<B*>> mem_pool_map_;
// number of message in each mem pool should not exceed the pool_limit_
sp_int32 pool_limit_;
};

extern MemPool<google::protobuf::Message>* __global_protobuf_pool__;
Expand All @@ -107,5 +121,7 @@ void __global_protobuf_pool_release__(T* _m) {
__global_protobuf_pool__->release(_m);
}

void __global_protobuf_pool_set_pool_max_number_of_messages__(sp_int32 _pool_limit);

#endif

4 changes: 4 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrStatefulBufferSizeMb() {
.as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolMaxMessageNumber() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as<int>();
}
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class HeronInternalsConfigReader : public YamlFileReader {
// checkpoint markers to arrive before giving up
sp_int32 GetHeronStreammgrStatefulBufferSizeMb();

// The max number of messages in the memory pool for each message type
sp_int32 GetHeronStreammgrMempoolMaxMessageNumber();

// Get the Nbucket value, for efficient acknowledgement
sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets();

Expand Down
2 changes: 2 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_FREQUENCY_
"heron.streammgr.cache.drain.frequency.ms";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB =
"heron.streammgr.cache.drain.size.mb";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER =
"heron.streammgr.mempool.max.message.number";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS =
"heron.streammgr.xormgr.rotatingmap.nbuckets";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_INTERVAL_SEC =
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class HeronInternalsConfigVars {
// The sized based threshold in MB for draining the tuple cache
static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB;

// The max number of messages in the memory pool for each message type
static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER;

// For efficient acknowledgement
static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS;

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/aurora/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/examples/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/local/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/localzk/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/marathon/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/mesos/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
5 changes: 4 additions & 1 deletion heron/config/src/yaml/conf/slurm/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ heron.streammgr.stateful.buffer.size.mb: 100
heron.streammgr.cache.drain.size.mb: 100

# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1
Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/test/test_heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ heron.streammgr.stateful.buffer.size.mb: 100
# The sized based threshold in MB for draining the tuple cache
heron.streammgr.cache.drain.size.mb: 100

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# For efficient acknowledgement
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/yarn/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.cache.drain.size.mb: 100
# For efficient acknowledgements
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
5 changes: 4 additions & 1 deletion heron/stmgr/src/cpp/manager/stmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "manager/stream-consumers.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "basics/mempool.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
Expand Down Expand Up @@ -85,7 +86,9 @@ void StMgr::Init() {
LOG(INFO) << "Init Stmgr" << std::endl;
sp_int32 metrics_export_interval_sec =
config::HeronInternalsConfigReader::Instance()->GetHeronMetricsExportIntervalSec();

__global_protobuf_pool_set_pool_max_number_of_messages__(
heron::config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrMempoolMaxMessageNumber());
state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zkhostport_, zkroot_, eventLoop_, false);
metrics_manager_client_ = new heron::common::MetricsMgrSt(
stmgr_host_, stmgr_port_, metricsmgr_port_, "__stmgr__", stmgr_id_,
Expand Down

0 comments on commit 1ae7d9b

Please sign in to comment.