Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
refactor(base): better batch_size control
Browse files Browse the repository at this point in the history
hanhxiao committed Aug 2, 2019
1 parent 58217d8 commit 8b72fd9
Showing 17 changed files with 79 additions and 97 deletions.
4 changes: 1 addition & 3 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
@@ -79,9 +79,7 @@ def set_composer_flask_parser(parser=None):
parser = set_base_parser()
set_composer_parser(parser)
parser.add_argument('--flask', action='store_true', default=False,
help='using Flask to serve GNES composer in interactive mode')
parser.add_argument('--cors', type=str, default='*',
help='setting "Access-Control-Allow-Origin" for HTTP requests')
help='using Flask to serve a composer in interactive mode, aka GNES board')
parser.add_argument('--http_port', type=int, default=8080,
help='server port for receiving HTTP requests')
return parser
5 changes: 1 addition & 4 deletions gnes/composer/flask.py
Original file line number Diff line number Diff line change
@@ -28,20 +28,18 @@ def _create_flask_app(self):
try:
from flask import Flask, request
from flask_compress import Compress
from flask_cors import CORS
except ImportError:
raise ImportError('Flask or its dependencies are not fully installed, '
'they are required for serving HTTP requests.'
'Please use "pip install gnes[http]" to install it.')

# support up to 10 concurrent HTTP requests
app = Flask(__name__)
args = set_composer_parser().parse_args([])
default_html = YamlComposer(args).build_all()['html']

@app.errorhandler(500)
def exception_handler(error):
self.logger.error('unhandled error, i better quit and restart!')
self.logger.error('unhandled error, i better quit and restart! %s' % error)
return '<h1>500 Internal Error</h1> ' \
'While we are fixing the issue, do you know you can deploy GNES board locally on your machine? ' \
'Simply run <pre>docker run -d -p 0.0.0.0:80:8080/tcp gnes/gnes compose --flask</pre>', 500
@@ -66,7 +64,6 @@ def _regenerate():
self.logger.error(e)
return '<h1>Bad YAML input</h1> please kindly check the format, indent and content of your YAML file!', 400

CORS(app, origins=self.args.cors)
Compress().init_app(app)
return app

3 changes: 1 addition & 2 deletions gnes/encoder/image/base.py
Original file line number Diff line number Diff line change
@@ -23,16 +23,15 @@


class BasePytorchEncoder(BaseImageEncoder):
batch_size = 64

def __init__(self, model_name: str,
layers: List[str],
model_dir: str,
batch_size: int = 64,
use_cuda: bool = False,
*args, **kwargs):
super().__init__(*args, **kwargs)

self.batch_size = batch_size
self.model_dir = model_dir
self.model_name = model_name
self.layers = layers
23 changes: 12 additions & 11 deletions gnes/encoder/image/cvae.py
Original file line number Diff line number Diff line change
@@ -19,14 +19,13 @@
from PIL import Image

from ..base import BaseImageEncoder
from ...helper import batch_iterator


class CVAEEncoder(BaseImageEncoder):
batch_size = 64

def __init__(self, model_dir: str,
latent_dim: int = 300,
batch_size: int = 64,
select_method: str = 'MEAN',
l2_normalize: bool = False,
use_gpu: bool = True,
@@ -35,7 +34,6 @@ def __init__(self, model_dir: str,

self.model_dir = model_dir
self.latent_dim = latent_dim
self.batch_size = batch_size
self.select_method = select_method
self.l2_normalize = l2_normalize
self.use_gpu = use_gpu
@@ -59,19 +57,22 @@ def post_init(self):
self.saver.restore(self.sess, self.model_dir)

def encode(self, img: List['np.ndarray'], *args, **kwargs) -> np.ndarray:
ret = []
img = [(np.array(Image.fromarray(im).resize((120, 120)),
dtype=np.float32) / 255) for im in img]
for _im in batch_iterator(img, self.batch_size):

def _encode(_, data):
_mean, _var = self.sess.run((self.mean, self.var),
feed_dict={self.inputs: _im})
feed_dict={self.inputs: data})
if self.select_method == 'MEAN':
ret.append(_mean)
return _mean
elif self.select_method == 'VAR':
ret.append(_var)
return _var
elif self.select_method == 'MEAN_VAR':
ret.append(np.concatenate([_mean, _var]), axis=1)
v = np.concatenate(ret, axis=0).astype(np.float32)
return np.concatenate([_mean, _var], axis=1)
else:
raise NotImplementedError

v = _encode(None, img).astype(np.float32)
if self.l2_normalize:
v = v / (v**2).sum(axis=1, keepdims=True)**0.5
v = v / (v ** 2).sum(axis=1, keepdims=True) ** 0.5
return v
18 changes: 9 additions & 9 deletions gnes/encoder/image/inception.py
Original file line number Diff line number Diff line change
@@ -19,20 +19,19 @@
from PIL import Image

from ..base import BaseImageEncoder
from ...helper import batching, batch_iterator, get_first_available_gpu
from ...helper import batching, get_first_available_gpu


class TFInceptionEncoder(BaseImageEncoder):
batch_size = 64

def __init__(self, model_dir: str,
batch_size: int = 64,
select_layer: str = 'PreLogitsFlatten',
use_cuda: bool = False,
*args, **kwargs):
super().__init__(*args, **kwargs)

self.model_dir = model_dir
self.batch_size = batch_size
self.select_layer = select_layer
self._use_cuda = use_cuda
self.inception_size_x = 299
@@ -64,14 +63,15 @@ def post_init(self):
self.saver = tf.train.Saver()
self.saver.restore(self.sess, self.model_dir)

@batching
def encode(self, img: List['np.ndarray'], *args, **kwargs) -> np.ndarray:
ret = []
img = [(np.array(Image.fromarray(im).resize((self.inception_size_x,
self.inception_size_y)), dtype=np.float32) * 2 / 255. - 1.) for im
in img]
for _im in batch_iterator(img, self.batch_size):

@batching
def _encode(_, data):
_, end_points_ = self.sess.run((self.logits, self.end_points),
feed_dict={self.inputs: _im})
ret.append(end_points_[self.select_layer])
return np.concatenate(ret, axis=0).astype(np.float32)
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

return _encode(None, img).astype(np.float32)
4 changes: 3 additions & 1 deletion gnes/encoder/numeric/hash.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@


class HashEncoder(BaseNumericEncoder):
batch_size = 2048

def __init__(self, num_bytes: int,
num_bits: int = 8,
num_idx: int = 3,
@@ -105,7 +107,7 @@ def hash(self, vecs):
return np.concatenate(ret, axis=1).astype(np.uint32)

@train_required
@batching(batch_size=2048)
@batching
def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray:
if vecs.shape[1] != self.vec_dim:
raise ValueError('input dimension error')
3 changes: 2 additions & 1 deletion gnes/encoder/numeric/pca.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@


class PCALocalEncoder(BaseNumericEncoder):
batch_size = 2048

def __init__(self, output_dim: int, num_locals: int,
*args, **kwargs):
super().__init__(*args, **kwargs)
@@ -32,7 +34,6 @@ def __init__(self, output_dim: int, num_locals: int,
self.num_locals = num_locals
self.pca_components = None
self.mean = None
self.batch_size = 2048

@batching(batch_size=get_optimal_sample_size, num_batch=1)
def train(self, vecs: np.ndarray, *args, **kwargs) -> None:
4 changes: 3 additions & 1 deletion gnes/encoder/numeric/pq.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@


class PQEncoder(BaseBinaryEncoder):
batch_size = 2048

def __init__(self, num_bytes: int, cluster_per_byte: int = 255, *args, **kwargs):
super().__init__(*args, **kwargs)
assert 1 < cluster_per_byte <= 255, 'cluster number should >1 and <= 255 (0 is reserved for NOP)'
@@ -49,7 +51,7 @@ def train(self, vecs: np.ndarray, *args, **kwargs):
dim_per_byte])

@train_required
@batching(batch_size=2048)
@batching
def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray:
dim_per_byte = self._get_dim_per_byte(vecs)

6 changes: 2 additions & 4 deletions gnes/encoder/numeric/tf_pq.py
Original file line number Diff line number Diff line change
@@ -25,15 +25,13 @@


class TFPQEncoder(PQEncoder):
batch_size = 8192

@classmethod
def pre_init(cls):
import os
os.environ['CUDA_VISIBLE_DEVICES'] = str(get_first_available_gpu())

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = 8192

def post_init(self):
import tensorflow as tf
self._graph = self._get_graph()
4 changes: 2 additions & 2 deletions gnes/encoder/text/elmo.py
Original file line number Diff line number Diff line change
@@ -26,14 +26,14 @@

class ElmoEncoder(BaseTextEncoder):
is_trained = True
batch_size = 64

def __init__(self, model_dir: str, batch_size: int = 64, pooling_layer: int = -1,
def __init__(self, model_dir: str, pooling_layer: int = -1,
pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs):
super().__init__(*args, **kwargs)

self.model_dir = model_dir

self.batch_size = batch_size
if pooling_layer > 2:
raise ValueError('pooling_layer = %d is not supported now!' %
pooling_layer)
3 changes: 0 additions & 3 deletions gnes/encoder/text/flair.py
Original file line number Diff line number Diff line change
@@ -28,13 +28,10 @@ class FlairEncoder(BaseTextEncoder):
is_trained = True

def __init__(self, model_name: str = 'multi-forward-fast',
batch_size: int = 64,
pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs):
super().__init__(*args, **kwargs)

self.model_name = model_name

self.batch_size = batch_size
self.pooling_strategy = pooling_strategy

def post_init(self):
3 changes: 1 addition & 2 deletions gnes/encoder/text/gpt.py
Original file line number Diff line number Diff line change
@@ -26,18 +26,17 @@

class GPTEncoder(BaseTextEncoder):
is_trained = True
batch_size = 64

def __init__(self,
model_dir: str,
batch_size: int = 64,
use_cuda: bool = False,
pooling_strategy: str = 'REDUCE_MEAN',
*args,
**kwargs):
super().__init__(*args, **kwargs)

self.model_dir = model_dir
self.batch_size = batch_size
self.pooling_strategy = pooling_strategy
self._use_cuda = use_cuda

2 changes: 0 additions & 2 deletions gnes/encoder/text/w2v.py
Original file line number Diff line number Diff line change
@@ -29,13 +29,11 @@ class Word2VecEncoder(BaseTextEncoder):

def __init__(self, model_dir: str,
skiprows: int = 1,
batch_size: int = 64,
dimension: int = 300,
pooling_strategy: str = 'REDUCE_MEAN', *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_dir = model_dir
self.skiprows = skiprows
self.batch_size = batch_size
self.pooling_strategy = pooling_strategy
self.dimension = dimension

35 changes: 18 additions & 17 deletions gnes/encoder/video/incep_mixture.py
Original file line number Diff line number Diff line change
@@ -19,14 +19,14 @@
from PIL import Image

from ..base import BaseVideoEncoder
from ...helper import batching, batch_iterator, get_first_available_gpu
from ...helper import batching, get_first_available_gpu


class IncepMixtureEncoder(BaseVideoEncoder):
batch_size = 64

def __init__(self, model_dir_inception: str,
model_dir_mixture: str,
batch_size: int = 64,
select_layer: str = 'PreLogitsFlatten',
use_cuda: bool = False,
feature_size: int = 300,
@@ -41,7 +41,6 @@ def __init__(self, model_dir_inception: str,
super().__init__(*args, **kwargs)
self.model_dir_inception = model_dir_inception
self.model_dir_mixture = model_dir_mixture
self.batch_size = batch_size
self.select_layer = select_layer
self.use_cuda = use_cuda
self.cluster_size = cluster_size
@@ -102,31 +101,33 @@ def post_init(self):
self.sess2.run(tf.global_variables_initializer())
saver.restore(self.sess2, self.model_dir_mixture)

@batching
def encode(self, data: List['np.ndarray'], *args, **kwargs) -> np.ndarray:
ret = []
v_len = [len(v) for v in data]
pos_start = [0] + [sum(v_len[:i+1]) for i in range(len(v_len)-1)]
pos_end = [sum(v_len[:i+1]) for i in range(len(v_len))]
pos_start = [0] + [sum(v_len[:i + 1]) for i in range(len(v_len) - 1)]
pos_end = [sum(v_len[:i + 1]) for i in range(len(v_len))]
max_len = min(max(v_len), self.max_frames)

img = [im for v in data for im in v]
img = [(np.array(Image.fromarray(im).resize((self.inception_size_x,
self.inception_size_y)), dtype=np.float32) * 2 / 255. - 1.) for im
in img]
for _im in batch_iterator(img, self.batch_size):

@batching(concat_axis=None)
def _encode1(_, data):
_, end_points_ = self.sess.run((self.logits, self.end_points),
feed_dict={self.inputs: _im})
ret.append(end_points_[self.select_layer])
v = [_ for vi in ret for _ in vi]
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

v = [_ for vi in _encode1(None, img) for _ in vi]

v_input = [v[s:e] for s, e in zip(pos_start, pos_end)]
v_input = [(vi + [[0.0]*self.input_size]*(max_len-len(vi)))[:max_len] for vi in v_input]
v_input = [(vi + [[0.0] * self.input_size] * (max_len - len(vi)))[:max_len] for vi in v_input]
v_input = [np.array(vi, dtype=np.float32) for vi in v_input]

ret = []
for _vi in batch_iterator(v_input, self.batch_size):
repre = self.sess2.run(self.mix_model.repre,
feed_dict={self.mix_model.feeds: v_input})
ret.append(repre)
return np.concatenate(ret, axis=1).astype(np.float32)
@batching
def _encode2(_, data):
return self.sess2.run(self.mix_model.repre,
feed_dict={self.mix_model.feeds: data})

return _encode2(None, v_input).astype(np.float32)
47 changes: 15 additions & 32 deletions gnes/helper.py
Original file line number Diff line number Diff line change
@@ -375,7 +375,7 @@ def pooling_torch(data_tensor, mask_tensor, pooling_strategy):

def batching(func: Callable[[Any], np.ndarray] = None, *,
batch_size: Union[int, Callable] = None, num_batch=None,
axis: int = 0):
iter_axis: int = 0, concat_axis: int = 0):
def _batching(func):
@wraps(func)
def arg_wrapper(self, data, label=None, *args, **kwargs):
@@ -391,52 +391,35 @@ def arg_wrapper(self, data, label=None, *args, **kwargs):
if hasattr(self, 'logger'):
self.logger.info(
'batching enabled for %s(). batch_size=%s\tnum_batch=%s\taxis=%s' % (
func.__qualname__, b_size, num_batch, axis))
func.__qualname__, b_size, num_batch, iter_axis))

total_size1 = get_size(data, axis)
total_size1 = get_size(data, iter_axis)
total_size2 = b_size * num_batch if num_batch else None

if total_size1 is not None and total_size2 is not None:
total_size = min(total_size1, total_size2)
else:
total_size = total_size1 or total_size2

final_result = None
final_result = []

done_size = 0
if label is not None:
data = (data, label)
for b in batch_iterator(data[:total_size], b_size, axis):

for b in batch_iterator(data[:total_size], b_size, iter_axis):
if label is None:
r = func(self, b, *args, **kwargs)
else:
r = func(self, b[0], b[1], *args, **kwargs)
if isinstance(r, np.ndarray):
# first result kicks in
if final_result is None:
if total_size is None:
final_result = []
else:
d_shape = list(r.shape)
d_shape[axis] = total_size
final_result = np.zeros(d_shape, dtype=r.dtype)

# fill the data into final_result
cur_size = get_size(r)

if total_size is None:
final_result.append(r)
else:
final_result[done_size:(done_size + cur_size)] = r

done_size += cur_size

if total_size is not None and done_size >= total_size:
break

if isinstance(final_result, list):
final_result = np.concatenate(final_result, 0)
return final_result

if r is not None:
final_result.append(r)

if len(final_result) and concat_axis is not None and isinstance(final_result[0], np.ndarray):
final_result = np.concatenate(final_result, concat_axis)

if len(final_result):
return final_result

return arg_wrapper

3 changes: 0 additions & 3 deletions gnes/router/map/simple.py
Original file line number Diff line number Diff line change
@@ -32,9 +32,6 @@ def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs) -> Generator:


class DocBatchRouter(BaseMapRouter):
def __init__(self, batch_size: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = batch_size

def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs) -> Generator:
if self.batch_size and self.batch_size > 0:
9 changes: 9 additions & 0 deletions tests/test_batching.py
Original file line number Diff line number Diff line change
@@ -90,3 +90,12 @@ def _test_fn(fn):
_test_fn(b.foo2)
_test_fn(b.foo3)
self.assertEqual(b.train([1]), None)

def test_mini_batch(self):
x = list(range(10))

@batching(batch_size=4)
def _do_mini_batch(_, y):
return y

self.assertEqual(_do_mini_batch(None, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]])

0 comments on commit 8b72fd9

Please sign in to comment.