From fc5026da1ee0021abfefcefaee2ec41c0583b2c2 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Fri, 2 Aug 2019 13:40:57 +0800 Subject: [PATCH 1/4] fix(board): improve gnes board 500 message --- gnes/composer/flask.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/gnes/composer/flask.py b/gnes/composer/flask.py index fe132c11..c94a671b 100644 --- a/gnes/composer/flask.py +++ b/gnes/composer/flask.py @@ -39,6 +39,13 @@ def _create_flask_app(self): args = set_composer_parser().parse_args([]) default_html = YamlComposer(args).build_all()['html'] + @app.errorhandler(500) + def exception_handler(error): + self.logger.error('unhandled error, i better quit and restart!') + return '

500 Internal Error

' \ + 'While we are fixing the issue, do you know you can deploy GNES board locally on your machine? ' \ + 'Simply run
docker run -d -p 0.0.0.0:80:8080/tcp gnes/gnes compose --flask
', 500 + @app.route('/', methods=['GET']) def _get_homepage(): return default_html From 58217d8cd3deaad6dbca6e8683e5baeea370593f Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Fri, 2 Aug 2019 13:52:55 +0800 Subject: [PATCH 2/4] refactor(base): moving is_trained to class attribute --- gnes/base/__init__.py | 4 ++-- gnes/encoder/text/bert.py | 4 ++-- gnes/encoder/text/elmo.py | 2 +- gnes/encoder/text/flair.py | 2 +- gnes/encoder/text/gpt.py | 3 ++- gnes/encoder/text/w2v.py | 3 ++- tests/test_compose.py | 7 +------ tests/test_load_dump_pipeline.py | 3 ++- 8 files changed, 13 insertions(+), 15 deletions(-) diff --git a/gnes/base/__init__.py b/gnes/base/__init__.py index 5945fd23..2213adaf 100644 --- a/gnes/base/__init__.py +++ b/gnes/base/__init__.py @@ -85,7 +85,8 @@ def __call__(cls, *args, **kwargs): obj = type.__call__(cls, *args, **kwargs) - # set attribute + # set attribute with priority + # gnes_config in YAML > class attribute > default_gnes_config for k, v in TrainableType.default_gnes_config.items(): if k in gnes_config: v = gnes_config[k] @@ -163,7 +164,6 @@ class TrainableBase(metaclass=TrainableType): store_args_kwargs = False def __init__(self, *args, **kwargs): - self.is_trained = False self.verbose = 'verbose' in kwargs and kwargs['verbose'] self.logger = set_logger(self.__class__.__name__, self.verbose) self._post_init_vars = set() diff --git a/gnes/encoder/text/bert.py b/gnes/encoder/text/bert.py index 9003a25d..c83be1c5 100644 --- a/gnes/encoder/text/bert.py +++ b/gnes/encoder/text/bert.py @@ -26,10 +26,10 @@ class BertEncoder(BaseTextEncoder): store_args_kwargs = True + is_trained = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.is_trained = True self._bc_encoder_args = args self._bc_encoder_kwargs = kwargs @@ -52,6 +52,7 @@ def encode(self, text: List[str], *args, **kwargs) -> np.ndarray: class BertEncoderServer(BaseTextEncoder): store_args_kwargs = True + is_trained = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -60,7 +61,6 @@ def __init__(self, *args, **kwargs): bert_args.append('-%s' % k) bert_args.append(str(v)) self._bert_args = bert_args - self.is_trained = True def post_init(self): from bert_serving.server import BertServer diff --git a/gnes/encoder/text/elmo.py b/gnes/encoder/text/elmo.py index 64352db3..b8eedca1 100644 --- a/gnes/encoder/text/elmo.py +++ b/gnes/encoder/text/elmo.py @@ -25,6 +25,7 @@ class ElmoEncoder(BaseTextEncoder): + is_trained = True def __init__(self, model_dir: str, batch_size: int = 64, pooling_layer: int = -1, pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs): @@ -38,7 +39,6 @@ def __init__(self, model_dir: str, batch_size: int = 64, pooling_layer: int = -1 pooling_layer) self.pooling_layer = pooling_layer self.pooling_strategy = pooling_strategy - self.is_trained = True def post_init(self): from elmoformanylangs import Embedder diff --git a/gnes/encoder/text/flair.py b/gnes/encoder/text/flair.py index a994a2a5..aacc95c3 100644 --- a/gnes/encoder/text/flair.py +++ b/gnes/encoder/text/flair.py @@ -25,6 +25,7 @@ class FlairEncoder(BaseTextEncoder): + is_trained = True def __init__(self, model_name: str = 'multi-forward-fast', batch_size: int = 64, @@ -35,7 +36,6 @@ def __init__(self, model_name: str = 'multi-forward-fast', self.batch_size = batch_size self.pooling_strategy = pooling_strategy - self.is_trained = True def post_init(self): from flair.embeddings import FlairEmbeddings diff --git a/gnes/encoder/text/gpt.py b/gnes/encoder/text/gpt.py index b7549aca..dcbd8455 100644 --- a/gnes/encoder/text/gpt.py +++ b/gnes/encoder/text/gpt.py @@ -25,6 +25,8 @@ class GPTEncoder(BaseTextEncoder): + is_trained = True + def __init__(self, model_dir: str, batch_size: int = 64, @@ -38,7 +40,6 @@ def __init__(self, self.batch_size = batch_size self.pooling_strategy = pooling_strategy self._use_cuda = use_cuda - self.is_trained = True def post_init(self): import torch diff --git a/gnes/encoder/text/w2v.py b/gnes/encoder/text/w2v.py index fac35fe1..493b9bcb 100644 --- a/gnes/encoder/text/w2v.py +++ b/gnes/encoder/text/w2v.py @@ -25,6 +25,8 @@ class Word2VecEncoder(BaseTextEncoder): + is_trained = True + def __init__(self, model_dir: str, skiprows: int = 1, batch_size: int = 64, @@ -35,7 +37,6 @@ def __init__(self, model_dir: str, self.skiprows = skiprows self.batch_size = batch_size self.pooling_strategy = pooling_strategy - self.is_trained = True self.dimension = dimension def post_init(self): diff --git a/tests/test_compose.py b/tests/test_compose.py index c785da4e..cc148d3c 100644 --- a/tests/test_compose.py +++ b/tests/test_compose.py @@ -35,12 +35,7 @@ def _test_topology(self, yaml_path: str, num_layer_before: int, num_layer_after: @unittest.SkipTest def test_flask_local(self): - yaml_path = os.path.join(self.dirname, 'yaml', 'topology1.yml') - args = set_composer_flask_parser().parse_args([ - '--flask', - '--yaml_path', yaml_path, - '--html_path', self.html_path - ]) + args = set_composer_flask_parser().parse_args(['--flask']) YamlComposerFlask(args).run() def test_flask(self): diff --git a/tests/test_load_dump_pipeline.py b/tests/test_load_dump_pipeline.py index 0a4d8143..3e464c59 100644 --- a/tests/test_load_dump_pipeline.py +++ b/tests/test_load_dump_pipeline.py @@ -5,9 +5,10 @@ class DummyTFEncoder(BaseEncoder): + is_trained = True + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.is_trained = True def post_init(self): import tensorflow as tf From 8b72fd9d8f5d30747c023e77dea7b999aa511b32 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Fri, 2 Aug 2019 16:02:35 +0800 Subject: [PATCH 3/4] refactor(base): better batch_size control --- gnes/cli/parser.py | 4 +-- gnes/composer/flask.py | 5 +-- gnes/encoder/image/base.py | 3 +- gnes/encoder/image/cvae.py | 23 +++++++------- gnes/encoder/image/inception.py | 18 +++++------ gnes/encoder/numeric/hash.py | 4 ++- gnes/encoder/numeric/pca.py | 3 +- gnes/encoder/numeric/pq.py | 4 ++- gnes/encoder/numeric/tf_pq.py | 6 ++-- gnes/encoder/text/elmo.py | 4 +-- gnes/encoder/text/flair.py | 3 -- gnes/encoder/text/gpt.py | 3 +- gnes/encoder/text/w2v.py | 2 -- gnes/encoder/video/incep_mixture.py | 35 ++++++++++----------- gnes/helper.py | 47 +++++++++-------------------- gnes/router/map/simple.py | 3 -- tests/test_batching.py | 9 ++++++ 17 files changed, 79 insertions(+), 97 deletions(-) diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index 154b70e8..d8f69fc7 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -79,9 +79,7 @@ def set_composer_flask_parser(parser=None): parser = set_base_parser() set_composer_parser(parser) parser.add_argument('--flask', action='store_true', default=False, - help='using Flask to serve GNES composer in interactive mode') - parser.add_argument('--cors', type=str, default='*', - help='setting "Access-Control-Allow-Origin" for HTTP requests') + help='using Flask to serve a composer in interactive mode, aka GNES board') parser.add_argument('--http_port', type=int, default=8080, help='server port for receiving HTTP requests') return parser diff --git a/gnes/composer/flask.py b/gnes/composer/flask.py index c94a671b..8660462a 100644 --- a/gnes/composer/flask.py +++ b/gnes/composer/flask.py @@ -28,20 +28,18 @@ def _create_flask_app(self): try: from flask import Flask, request from flask_compress import Compress - from flask_cors import CORS except ImportError: raise ImportError('Flask or its dependencies are not fully installed, ' 'they are required for serving HTTP requests.' 'Please use "pip install gnes[http]" to install it.') - # support up to 10 concurrent HTTP requests app = Flask(__name__) args = set_composer_parser().parse_args([]) default_html = YamlComposer(args).build_all()['html'] @app.errorhandler(500) def exception_handler(error): - self.logger.error('unhandled error, i better quit and restart!') + self.logger.error('unhandled error, i better quit and restart! %s' % error) return '

500 Internal Error

' \ 'While we are fixing the issue, do you know you can deploy GNES board locally on your machine? ' \ 'Simply run
docker run -d -p 0.0.0.0:80:8080/tcp gnes/gnes compose --flask
', 500 @@ -66,7 +64,6 @@ def _regenerate(): self.logger.error(e) return '

Bad YAML input

please kindly check the format, indent and content of your YAML file!', 400 - CORS(app, origins=self.args.cors) Compress().init_app(app) return app diff --git a/gnes/encoder/image/base.py b/gnes/encoder/image/base.py index 6a92c108..dbb451f8 100644 --- a/gnes/encoder/image/base.py +++ b/gnes/encoder/image/base.py @@ -23,16 +23,15 @@ class BasePytorchEncoder(BaseImageEncoder): + batch_size = 64 def __init__(self, model_name: str, layers: List[str], model_dir: str, - batch_size: int = 64, use_cuda: bool = False, *args, **kwargs): super().__init__(*args, **kwargs) - self.batch_size = batch_size self.model_dir = model_dir self.model_name = model_name self.layers = layers diff --git a/gnes/encoder/image/cvae.py b/gnes/encoder/image/cvae.py index 84c3f9dd..6b9aabdc 100644 --- a/gnes/encoder/image/cvae.py +++ b/gnes/encoder/image/cvae.py @@ -19,14 +19,13 @@ from PIL import Image from ..base import BaseImageEncoder -from ...helper import batch_iterator class CVAEEncoder(BaseImageEncoder): + batch_size = 64 def __init__(self, model_dir: str, latent_dim: int = 300, - batch_size: int = 64, select_method: str = 'MEAN', l2_normalize: bool = False, use_gpu: bool = True, @@ -35,7 +34,6 @@ def __init__(self, model_dir: str, self.model_dir = model_dir self.latent_dim = latent_dim - self.batch_size = batch_size self.select_method = select_method self.l2_normalize = l2_normalize self.use_gpu = use_gpu @@ -59,19 +57,22 @@ def post_init(self): self.saver.restore(self.sess, self.model_dir) def encode(self, img: List['np.ndarray'], *args, **kwargs) -> np.ndarray: - ret = [] img = [(np.array(Image.fromarray(im).resize((120, 120)), dtype=np.float32) / 255) for im in img] - for _im in batch_iterator(img, self.batch_size): + + def _encode(_, data): _mean, _var = self.sess.run((self.mean, self.var), - feed_dict={self.inputs: _im}) + feed_dict={self.inputs: data}) if self.select_method == 'MEAN': - ret.append(_mean) + return _mean elif self.select_method == 'VAR': - ret.append(_var) + return _var elif self.select_method == 'MEAN_VAR': - ret.append(np.concatenate([_mean, _var]), axis=1) - v = np.concatenate(ret, axis=0).astype(np.float32) + return np.concatenate([_mean, _var], axis=1) + else: + raise NotImplementedError + + v = _encode(None, img).astype(np.float32) if self.l2_normalize: - v = v / (v**2).sum(axis=1, keepdims=True)**0.5 + v = v / (v ** 2).sum(axis=1, keepdims=True) ** 0.5 return v diff --git a/gnes/encoder/image/inception.py b/gnes/encoder/image/inception.py index 3f5adc64..9919fba7 100644 --- a/gnes/encoder/image/inception.py +++ b/gnes/encoder/image/inception.py @@ -19,20 +19,19 @@ from PIL import Image from ..base import BaseImageEncoder -from ...helper import batching, batch_iterator, get_first_available_gpu +from ...helper import batching, get_first_available_gpu class TFInceptionEncoder(BaseImageEncoder): + batch_size = 64 def __init__(self, model_dir: str, - batch_size: int = 64, select_layer: str = 'PreLogitsFlatten', use_cuda: bool = False, *args, **kwargs): super().__init__(*args, **kwargs) self.model_dir = model_dir - self.batch_size = batch_size self.select_layer = select_layer self._use_cuda = use_cuda self.inception_size_x = 299 @@ -64,14 +63,15 @@ def post_init(self): self.saver = tf.train.Saver() self.saver.restore(self.sess, self.model_dir) - @batching def encode(self, img: List['np.ndarray'], *args, **kwargs) -> np.ndarray: - ret = [] img = [(np.array(Image.fromarray(im).resize((self.inception_size_x, self.inception_size_y)), dtype=np.float32) * 2 / 255. - 1.) for im in img] - for _im in batch_iterator(img, self.batch_size): + + @batching + def _encode(_, data): _, end_points_ = self.sess.run((self.logits, self.end_points), - feed_dict={self.inputs: _im}) - ret.append(end_points_[self.select_layer]) - return np.concatenate(ret, axis=0).astype(np.float32) + feed_dict={self.inputs: data}) + return end_points_[self.select_layer] + + return _encode(None, img).astype(np.float32) diff --git a/gnes/encoder/numeric/hash.py b/gnes/encoder/numeric/hash.py index be0264ee..52a4daca 100644 --- a/gnes/encoder/numeric/hash.py +++ b/gnes/encoder/numeric/hash.py @@ -23,6 +23,8 @@ class HashEncoder(BaseNumericEncoder): + batch_size = 2048 + def __init__(self, num_bytes: int, num_bits: int = 8, num_idx: int = 3, @@ -105,7 +107,7 @@ def hash(self, vecs): return np.concatenate(ret, axis=1).astype(np.uint32) @train_required - @batching(batch_size=2048) + @batching def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray: if vecs.shape[1] != self.vec_dim: raise ValueError('input dimension error') diff --git a/gnes/encoder/numeric/pca.py b/gnes/encoder/numeric/pca.py index 2ee1cd1c..bda9f946 100644 --- a/gnes/encoder/numeric/pca.py +++ b/gnes/encoder/numeric/pca.py @@ -23,6 +23,8 @@ class PCALocalEncoder(BaseNumericEncoder): + batch_size = 2048 + def __init__(self, output_dim: int, num_locals: int, *args, **kwargs): super().__init__(*args, **kwargs) @@ -32,7 +34,6 @@ def __init__(self, output_dim: int, num_locals: int, self.num_locals = num_locals self.pca_components = None self.mean = None - self.batch_size = 2048 @batching(batch_size=get_optimal_sample_size, num_batch=1) def train(self, vecs: np.ndarray, *args, **kwargs) -> None: diff --git a/gnes/encoder/numeric/pq.py b/gnes/encoder/numeric/pq.py index c5a3a401..c3ec8b9d 100644 --- a/gnes/encoder/numeric/pq.py +++ b/gnes/encoder/numeric/pq.py @@ -23,6 +23,8 @@ class PQEncoder(BaseBinaryEncoder): + batch_size = 2048 + def __init__(self, num_bytes: int, cluster_per_byte: int = 255, *args, **kwargs): super().__init__(*args, **kwargs) assert 1 < cluster_per_byte <= 255, 'cluster number should >1 and <= 255 (0 is reserved for NOP)' @@ -49,7 +51,7 @@ def train(self, vecs: np.ndarray, *args, **kwargs): dim_per_byte]) @train_required - @batching(batch_size=2048) + @batching def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray: dim_per_byte = self._get_dim_per_byte(vecs) diff --git a/gnes/encoder/numeric/tf_pq.py b/gnes/encoder/numeric/tf_pq.py index 1d051f11..377049da 100644 --- a/gnes/encoder/numeric/tf_pq.py +++ b/gnes/encoder/numeric/tf_pq.py @@ -25,15 +25,13 @@ class TFPQEncoder(PQEncoder): + batch_size = 8192 + @classmethod def pre_init(cls): import os os.environ['CUDA_VISIBLE_DEVICES'] = str(get_first_available_gpu()) - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.batch_size = 8192 - def post_init(self): import tensorflow as tf self._graph = self._get_graph() diff --git a/gnes/encoder/text/elmo.py b/gnes/encoder/text/elmo.py index b8eedca1..2186a3cb 100644 --- a/gnes/encoder/text/elmo.py +++ b/gnes/encoder/text/elmo.py @@ -26,14 +26,14 @@ class ElmoEncoder(BaseTextEncoder): is_trained = True + batch_size = 64 - def __init__(self, model_dir: str, batch_size: int = 64, pooling_layer: int = -1, + def __init__(self, model_dir: str, pooling_layer: int = -1, pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs): super().__init__(*args, **kwargs) self.model_dir = model_dir - self.batch_size = batch_size if pooling_layer > 2: raise ValueError('pooling_layer = %d is not supported now!' % pooling_layer) diff --git a/gnes/encoder/text/flair.py b/gnes/encoder/text/flair.py index aacc95c3..38c55ea9 100644 --- a/gnes/encoder/text/flair.py +++ b/gnes/encoder/text/flair.py @@ -28,13 +28,10 @@ class FlairEncoder(BaseTextEncoder): is_trained = True def __init__(self, model_name: str = 'multi-forward-fast', - batch_size: int = 64, pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs): super().__init__(*args, **kwargs) self.model_name = model_name - - self.batch_size = batch_size self.pooling_strategy = pooling_strategy def post_init(self): diff --git a/gnes/encoder/text/gpt.py b/gnes/encoder/text/gpt.py index dcbd8455..4d712fc2 100644 --- a/gnes/encoder/text/gpt.py +++ b/gnes/encoder/text/gpt.py @@ -26,10 +26,10 @@ class GPTEncoder(BaseTextEncoder): is_trained = True + batch_size = 64 def __init__(self, model_dir: str, - batch_size: int = 64, use_cuda: bool = False, pooling_strategy: str = 'REDUCE_MEAN', *args, @@ -37,7 +37,6 @@ def __init__(self, super().__init__(*args, **kwargs) self.model_dir = model_dir - self.batch_size = batch_size self.pooling_strategy = pooling_strategy self._use_cuda = use_cuda diff --git a/gnes/encoder/text/w2v.py b/gnes/encoder/text/w2v.py index 493b9bcb..45725cf1 100644 --- a/gnes/encoder/text/w2v.py +++ b/gnes/encoder/text/w2v.py @@ -29,13 +29,11 @@ class Word2VecEncoder(BaseTextEncoder): def __init__(self, model_dir: str, skiprows: int = 1, - batch_size: int = 64, dimension: int = 300, pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs): super().__init__(*args, **kwargs) self.model_dir = model_dir self.skiprows = skiprows - self.batch_size = batch_size self.pooling_strategy = pooling_strategy self.dimension = dimension diff --git a/gnes/encoder/video/incep_mixture.py b/gnes/encoder/video/incep_mixture.py index 2c5a8a3f..cab05042 100644 --- a/gnes/encoder/video/incep_mixture.py +++ b/gnes/encoder/video/incep_mixture.py @@ -19,14 +19,14 @@ from PIL import Image from ..base import BaseVideoEncoder -from ...helper import batching, batch_iterator, get_first_available_gpu +from ...helper import batching, get_first_available_gpu class IncepMixtureEncoder(BaseVideoEncoder): + batch_size = 64 def __init__(self, model_dir_inception: str, model_dir_mixture: str, - batch_size: int = 64, select_layer: str = 'PreLogitsFlatten', use_cuda: bool = False, feature_size: int = 300, @@ -41,7 +41,6 @@ def __init__(self, model_dir_inception: str, super().__init__(*args, **kwargs) self.model_dir_inception = model_dir_inception self.model_dir_mixture = model_dir_mixture - self.batch_size = batch_size self.select_layer = select_layer self.use_cuda = use_cuda self.cluster_size = cluster_size @@ -102,31 +101,33 @@ def post_init(self): self.sess2.run(tf.global_variables_initializer()) saver.restore(self.sess2, self.model_dir_mixture) - @batching def encode(self, data: List['np.ndarray'], *args, **kwargs) -> np.ndarray: ret = [] v_len = [len(v) for v in data] - pos_start = [0] + [sum(v_len[:i+1]) for i in range(len(v_len)-1)] - pos_end = [sum(v_len[:i+1]) for i in range(len(v_len))] + pos_start = [0] + [sum(v_len[:i + 1]) for i in range(len(v_len) - 1)] + pos_end = [sum(v_len[:i + 1]) for i in range(len(v_len))] max_len = min(max(v_len), self.max_frames) img = [im for v in data for im in v] img = [(np.array(Image.fromarray(im).resize((self.inception_size_x, self.inception_size_y)), dtype=np.float32) * 2 / 255. - 1.) for im in img] - for _im in batch_iterator(img, self.batch_size): + + @batching(concat_axis=None) + def _encode1(_, data): _, end_points_ = self.sess.run((self.logits, self.end_points), - feed_dict={self.inputs: _im}) - ret.append(end_points_[self.select_layer]) - v = [_ for vi in ret for _ in vi] + feed_dict={self.inputs: data}) + return end_points_[self.select_layer] + + v = [_ for vi in _encode1(None, img) for _ in vi] v_input = [v[s:e] for s, e in zip(pos_start, pos_end)] - v_input = [(vi + [[0.0]*self.input_size]*(max_len-len(vi)))[:max_len] for vi in v_input] + v_input = [(vi + [[0.0] * self.input_size] * (max_len - len(vi)))[:max_len] for vi in v_input] v_input = [np.array(vi, dtype=np.float32) for vi in v_input] - ret = [] - for _vi in batch_iterator(v_input, self.batch_size): - repre = self.sess2.run(self.mix_model.repre, - feed_dict={self.mix_model.feeds: v_input}) - ret.append(repre) - return np.concatenate(ret, axis=1).astype(np.float32) + @batching + def _encode2(_, data): + return self.sess2.run(self.mix_model.repre, + feed_dict={self.mix_model.feeds: data}) + + return _encode2(None, v_input).astype(np.float32) diff --git a/gnes/helper.py b/gnes/helper.py index f732ca68..522b8cc7 100644 --- a/gnes/helper.py +++ b/gnes/helper.py @@ -375,7 +375,7 @@ def pooling_torch(data_tensor, mask_tensor, pooling_strategy): def batching(func: Callable[[Any], np.ndarray] = None, *, batch_size: Union[int, Callable] = None, num_batch=None, - axis: int = 0): + iter_axis: int = 0, concat_axis: int = 0): def _batching(func): @wraps(func) def arg_wrapper(self, data, label=None, *args, **kwargs): @@ -391,9 +391,9 @@ def arg_wrapper(self, data, label=None, *args, **kwargs): if hasattr(self, 'logger'): self.logger.info( 'batching enabled for %s(). batch_size=%s\tnum_batch=%s\taxis=%s' % ( - func.__qualname__, b_size, num_batch, axis)) + func.__qualname__, b_size, num_batch, iter_axis)) - total_size1 = get_size(data, axis) + total_size1 = get_size(data, iter_axis) total_size2 = b_size * num_batch if num_batch else None if total_size1 is not None and total_size2 is not None: @@ -401,42 +401,25 @@ def arg_wrapper(self, data, label=None, *args, **kwargs): else: total_size = total_size1 or total_size2 - final_result = None + final_result = [] - done_size = 0 if label is not None: data = (data, label) - for b in batch_iterator(data[:total_size], b_size, axis): + + for b in batch_iterator(data[:total_size], b_size, iter_axis): if label is None: r = func(self, b, *args, **kwargs) else: r = func(self, b[0], b[1], *args, **kwargs) - if isinstance(r, np.ndarray): - # first result kicks in - if final_result is None: - if total_size is None: - final_result = [] - else: - d_shape = list(r.shape) - d_shape[axis] = total_size - final_result = np.zeros(d_shape, dtype=r.dtype) - - # fill the data into final_result - cur_size = get_size(r) - - if total_size is None: - final_result.append(r) - else: - final_result[done_size:(done_size + cur_size)] = r - - done_size += cur_size - - if total_size is not None and done_size >= total_size: - break - - if isinstance(final_result, list): - final_result = np.concatenate(final_result, 0) - return final_result + + if r is not None: + final_result.append(r) + + if len(final_result) and concat_axis is not None and isinstance(final_result[0], np.ndarray): + final_result = np.concatenate(final_result, concat_axis) + + if len(final_result): + return final_result return arg_wrapper diff --git a/gnes/router/map/simple.py b/gnes/router/map/simple.py index 1c2ee97f..08855dbd 100644 --- a/gnes/router/map/simple.py +++ b/gnes/router/map/simple.py @@ -32,9 +32,6 @@ def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs) -> Generator: class DocBatchRouter(BaseMapRouter): - def __init__(self, batch_size: int, *args, **kwargs): - super().__init__(*args, **kwargs) - self.batch_size = batch_size def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs) -> Generator: if self.batch_size and self.batch_size > 0: diff --git a/tests/test_batching.py b/tests/test_batching.py index e44385d6..b5f05065 100644 --- a/tests/test_batching.py +++ b/tests/test_batching.py @@ -90,3 +90,12 @@ def _test_fn(fn): _test_fn(b.foo2) _test_fn(b.foo3) self.assertEqual(b.train([1]), None) + + def test_mini_batch(self): + x = list(range(10)) + + @batching(batch_size=4) + def _do_mini_batch(_, y): + return y + + self.assertEqual(_do_mini_batch(None, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]) From c430ef64eaa6960a9768b006c1959630ae4f18d4 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Fri, 2 Aug 2019 16:30:10 +0800 Subject: [PATCH 4/4] refactor(base): better batch_size control --- gnes/encoder/video/incep_mixture.py | 1 - tests/yaml/router-batch.yml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/gnes/encoder/video/incep_mixture.py b/gnes/encoder/video/incep_mixture.py index cab05042..b3991ca1 100644 --- a/gnes/encoder/video/incep_mixture.py +++ b/gnes/encoder/video/incep_mixture.py @@ -102,7 +102,6 @@ def post_init(self): saver.restore(self.sess2, self.model_dir_mixture) def encode(self, data: List['np.ndarray'], *args, **kwargs) -> np.ndarray: - ret = [] v_len = [len(v) for v in data] pos_start = [0] + [sum(v_len[:i + 1]) for i in range(len(v_len) - 1)] pos_end = [sum(v_len[:i + 1]) for i in range(len(v_len))] diff --git a/tests/yaml/router-batch.yml b/tests/yaml/router-batch.yml index 597c11e3..2f4000c6 100644 --- a/tests/yaml/router-batch.yml +++ b/tests/yaml/router-batch.yml @@ -1,3 +1,3 @@ !DocBatchRouter -parameter: +gnes_config: batch_size: 2 \ No newline at end of file