Skip to content

Commit

Permalink
Merge pull request #77 from eWaterCycle/value-too-big
Browse files Browse the repository at this point in the history
Handle when model value is too big for message
  • Loading branch information
sverhoeven authored Aug 11, 2020
2 parents abead79 + 102b812 commit 30e199e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 1 deletion.
37 changes: 37 additions & 0 deletions grpc4bmi/bmi_grpc_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import os
import socket
from contextlib import closing
Expand All @@ -12,6 +13,7 @@
from google.rpc import error_details_pb2

from . import bmi_pb2, bmi_pb2_grpc
from .utils import GRPC_MAX_MESSAGE_LENGTH

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,6 +46,12 @@ def handle_error(exc):
raise


def _fits_in_message(array):
"""Tests whether array can be passed through a gRPC message with a max message size of 4Mb"""
array_size = array.size * array.itemsize
return array_size <= GRPC_MAX_MESSAGE_LENGTH


class BmiClient(Bmi):
"""
Client BMI interface, implementing BMI by forwarding every function call via GRPC to the server connected to the
Expand Down Expand Up @@ -209,13 +217,42 @@ def get_var_location(self, name: str) -> str:
handle_error(e)

def get_value(self, name, dest):
fits = _fits_in_message(dest)
if not fits:
return self._chunked_get_value(name, dest)
try:
response = self.stub.getValue(bmi_pb2.GetVarRequest(name=name))
numpy.copyto(src=BmiClient.make_array(response), dst=dest)
return dest
except grpc.RpcError as e:
handle_error(e)

def _chunked_get_value(self, name: str, dest: np.array) -> np.array:
# Make chunk one item smaller than maximum (4Mb)
chunk_size = math.floor(GRPC_MAX_MESSAGE_LENGTH / dest.dtype.itemsize) - dest.dtype.itemsize
chunks = []
log.info(f'Too many items ({dest.size}) for single call, '
f'using multiple get_value_at_indices() with into chunks of {chunk_size} items')
for i in range(0, dest.size, chunk_size):
start = i
stop = i + chunk_size
# Last chunk can be smaller
if stop > dest.size:
stop = dest.size
chunks.append(self._get_value_at_range(name, start, stop))

numpy.concatenate(chunks, out=dest)
return dest

def _get_value_at_range(self, name, start, stop):
log.info(f'Fetching value range {start} - {stop}')
try:
response = self.stub.getValueAtIndices(bmi_pb2.GetValueAtIndicesRequest(name=name,
indices=range(start, stop)))
return BmiClient.make_array(response)
except grpc.RpcError as e:
handle_error(e)

def get_value_ptr(self, name: str) -> np.ndarray:
"""Not possible, unable give reference to data structure in another process and possibly another machine"""
raise NotImplementedError("Array references cannot be transmitted through this GRPC channel")
Expand Down
4 changes: 4 additions & 0 deletions grpc4bmi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ def stage_config_file(filename, input_dir, input_mount_point, home_mounted=False
# Assume filename exists inside container or model does not need a file to initialize
pass
return fn


# grpc max message size is 4Mb
GRPC_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024
21 changes: 20 additions & 1 deletion test/fake_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import numpy as np
from bmipy import Bmi

from grpc4bmi.utils import GRPC_MAX_MESSAGE_LENGTH


class SomeException(Exception):
pass
Expand Down Expand Up @@ -357,7 +359,7 @@ def get_var_itemsize(self, name):
return self.dtype.itemsize

def get_var_nbytes(self, name):
return self.dtype.itemsize * 3
return self.dtype.itemsize * self.value.size

def get_value(self, name, dest):
numpy.copyto(src=self.value, dst=dest)
Expand Down Expand Up @@ -390,3 +392,20 @@ def __init__(self):
super().__init__()
self.dtype = numpy.dtype('bool')
self.value = numpy.array((True, False, True), dtype=self.dtype)


class HugeModel(DTypeModel):
"""Model which has value which does not fit in message body
Can be run from command line with
..code-block:: bash
run-bmi-server --path $PWD/test --name fake_models.HugeModel --port 55555 --debug
"""
def __init__(self):
super().__init__()
self.dtype = numpy.dtype('float64')
# Create value which is bigger than 4Mb
dimension = (3 * GRPC_MAX_MESSAGE_LENGTH) // self.dtype.itemsize + 1000
self.value = numpy.ones((dimension,), dtype=self.dtype)
13 changes: 13 additions & 0 deletions test/test_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os

from grpc4bmi.reserve import reserve_values_at_indices, reserve_values, reserve_grid_shape, reserve_grid_padding
from test.fake_models import HugeModel
from test.flatbmiheat import FlatBmiHeat

from grpc4bmi.bmi_client_subproc import BmiClientSubProcess
Expand Down Expand Up @@ -143,6 +144,18 @@ def test_get_var_values():
del client


def test_get_value_huge():
os.environ["PYTHONPATH"] = os.path.dirname(os.path.abspath(__file__))
client = BmiClientSubProcess("fake_models.HugeModel")
local = HugeModel()
varname = local.get_output_var_names()[0]

result = client.get_value(varname, reserve_values(client, varname))
expected = local.get_value(varname, reserve_values(local, varname))
assert numpy.array_equal(result, expected)
del client


def test_get_var_ptr():
client, local = make_bmi_classes(True)
varname = local.get_output_var_names()[0]
Expand Down

0 comments on commit 30e199e

Please sign in to comment.