diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index b8680890..4265432c 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -106,7 +106,7 @@ def set_composer_flask_parser(parser=None): def set_service_parser(parser=None): - from ..service.base import SocketType, BaseService + from ..service.base import SocketType, BaseService, ParallelType import random if not parser: parser = set_base_parser() @@ -134,8 +134,15 @@ def set_service_parser(parser=None): parser.add_argument('--read_only', action='store_true', default=False, help='do not allow the service to modify the model, ' 'dump_interval will be ignored') - parser.add_argument('--concurrency_backend', type=str, choices=['thread', 'process'], default='thread', - help='concurrency backend of the service') + parser.add_argument('--parallel_backend', type=str, choices=['thread', 'process'], default='thread', + help='parallel backend of the service') + parser.add_argument('--num_parallel', type=int, default=1, + help='number of parallel services running at the same time, ' + '`port_in` and `port_out` will be set to random, ' + 'and routers will be added automatically when necessary') + parser.add_argument('--parallel_type', type=ParallelType.from_string, choices=list(ParallelType), + default=ParallelType.PUSH_NONBLOCK, + help='parallel type of the concurrent services') return parser @@ -172,7 +179,6 @@ def set_preprocessor_service_parser(parser=None): if not parser: parser = set_base_parser() set_loadable_service_parser(parser) - parser.set_defaults(read_only=True) return parser @@ -181,6 +187,8 @@ def set_router_service_parser(parser=None): if not parser: parser = set_base_parser() set_loadable_service_parser(parser) + parser.add_argument('--num_part', type=int, default=1, + help='explicitly set the number of parts of message') parser.set_defaults(read_only=True) return parser diff --git a/gnes/encoder/audio/mfcc.py b/gnes/encoder/audio/mfcc.py index 7ff4fb44..e1d32c4a 100644 --- a/gnes/encoder/audio/mfcc.py +++ b/gnes/encoder/audio/mfcc.py @@ -15,10 +15,12 @@ # pylint: disable=low-comment-ratio +from typing import List + +import numpy as np + from ..base import BaseAudioEncoder from ...helper import batching -import numpy as np -from typing import List class MfccEncoder(BaseAudioEncoder): @@ -38,9 +40,8 @@ def encode(self, data: List['np.array'], *args, **kwargs) -> np.ndarray: max_lenth = max([len(mf) for mf in mfccs]) - mfccs = [np.concatenate((mf, np.zeros((max_lenth - mf.shape[0], self.n_mfcc), dtype=np.float32)), axis=0) if mf.shape[0] < max_lenth else mf for mf in mfccs] mfccs = [mfcc.reshape((1, -1)) for mfcc in mfccs] mfccs = np.squeeze(np.array(mfccs), axis=1) - return mfccs \ No newline at end of file + return mfccs diff --git a/gnes/encoder/numeric/vlad.py b/gnes/encoder/numeric/vlad.py index 4aefe8ba..425cdf49 100644 --- a/gnes/encoder/numeric/vlad.py +++ b/gnes/encoder/numeric/vlad.py @@ -16,8 +16,10 @@ # pylint: disable=low-comment-ratio -import numpy as np import copy + +import numpy as np + from ..base import BaseNumericEncoder from ...helper import batching, train_required diff --git a/gnes/encoder/video/mixture_core/model.py b/gnes/encoder/video/mixture_core/model.py index 6a4a0631..cb29f0c5 100644 --- a/gnes/encoder/video/mixture_core/model.py +++ b/gnes/encoder/video/mixture_core/model.py @@ -14,6 +14,7 @@ # limitations under the License. import math + import tensorflow as tf import tensorflow.contrib.slim as slim @@ -52,11 +53,11 @@ def __init__(self, feature_size, @staticmethod def rand_init(feature_size): - return tf.random_normal_initializer(stddev=1/math.sqrt(feature_size)) + return tf.random_normal_initializer(stddev=1 / math.sqrt(feature_size)) def build_model(self): self.feeds = tf.placeholder(tf.float32, [None, None, self.input_size]) - #self.inputs = self.feeds + # self.inputs = self.feeds self.inputs = tf.layers.dense(self.feeds, self.feature_size) self.weights = tf.placeholder(tf.float32, [None, self.vocab_size]) self.max_frames = tf.shape(self.inputs)[1] @@ -83,7 +84,7 @@ def build_fvnet(self): covar_weights = tf.square(covar_weights) eps = tf.constant([1e-6]) - covar_weights = tf.add(covar_weights,eps) + covar_weights = tf.add(covar_weights, eps) tf.summary.histogram("cluster_weights", cluster_weights) activation = tf.matmul(reshaped_input, cluster_weights) @@ -111,7 +112,7 @@ def build_fvnet(self): a = tf.multiply(a_sum, cluster_weights2) - activation = tf.transpose(activation,perm=[0, 2, 1]) + activation = tf.transpose(activation, perm=[0, 2, 1]) reshaped_input = tf.reshape(reshaped_input, [-1, self.max_frames, self.feature_size]) @@ -131,15 +132,15 @@ def build_fvnet(self): fv2 = tf.divide(fv2, tf.square(covar_weights)) fv2 = tf.subtract(fv2, a_sum) - fv2 = tf.reshape(fv2, [-1, self.cluster_size*self.feature_size]) + fv2 = tf.reshape(fv2, [-1, self.cluster_size * self.feature_size]) fv2 = tf.nn.l2_normalize(fv2, 1) - fv2 = tf.reshape(fv2, [-1, self.cluster_size*self.feature_size]) + fv2 = tf.reshape(fv2, [-1, self.cluster_size * self.feature_size]) fv2 = tf.nn.l2_normalize(fv2, 1) fv1 = tf.subtract(fv1, a) fv1 = tf.divide(fv1, covar_weights) fv1 = tf.nn.l2_normalize(fv1, 1) - fv1 = tf.reshape(fv1, [-1, self.cluster_size*self.feature_size]) + fv1 = tf.reshape(fv1, [-1, self.cluster_size * self.feature_size]) fv1 = tf.nn.l2_normalize(fv1, 1) self.repre = tf.concat([fv1, fv2], 1) @@ -197,9 +198,9 @@ def build_loss(self): logits = tf.cast(self.label, tf.float32) if self.use_weights: logits = logits * self.weights - self.loss = - tf.log(tf.reduce_sum(logits * self.probabilities, axis=1)+1e-9) + self.loss = - tf.log(tf.reduce_sum(logits * self.probabilities, axis=1) + 1e-9) self.loss = tf.reduce_mean(self.loss) - self.pred =tf.argmax(self.probabilities, 1) + self.pred = tf.argmax(self.probabilities, 1) self.avg_diff = tf.cast(tf.equal(tf.argmax(self.label, 1), self.pred), tf.float32) self.avg_diff = tf.reduce_mean(self.avg_diff) @@ -230,7 +231,7 @@ def build_loss(self): self.probabilities2 = tf.nn.softmax(self.probabilities2) self.loss += tf.reduce_mean(-tf.log( - tf.reduce_sum(logits2*self.probabilities2, axis=1)+1e-9)) + tf.reduce_sum(logits2 * self.probabilities2, axis=1) + 1e-9)) self.pred2 = tf.argmax(self.probabilities2, 1) self.avg_diff2 = tf.cast(tf.equal(tf.argmax(self.label_2, 1), self.pred2), tf.float32) self.avg_diff2 = tf.reduce_mean(self.avg_diff2) @@ -242,4 +243,3 @@ def build_loss(self): self.eval_res = {'loss': self.loss, 'avg_diff': self.avg_diff} if self.use_2nd_label: self.eval_res['avg_diff2'] = self.avg_diff2 - diff --git a/gnes/preprocessor/audio/audio_vanilla.py b/gnes/preprocessor/audio/audio_vanilla.py index d40e7ca3..8f04abc4 100644 --- a/gnes/preprocessor/audio/audio_vanilla.py +++ b/gnes/preprocessor/audio/audio_vanilla.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base import BaseAudioPreprocessor import numpy as np + +from .base import BaseAudioPreprocessor from ..helper import get_video_length_from_raw, get_audio from ...proto import array2blob @@ -43,4 +44,4 @@ def apply(self, doc: 'gnes_pb2.Document') -> None: else: self.logger.info('bad document: no audio extracted') else: - self.logger.error('bad document: "raw_bytes" is empty!') \ No newline at end of file + self.logger.error('bad document: "raw_bytes" is empty!') diff --git a/gnes/preprocessor/helper.py b/gnes/preprocessor/helper.py index 9d62ab6d..45ac6ebd 100644 --- a/gnes/preprocessor/helper.py +++ b/gnes/preprocessor/helper.py @@ -15,14 +15,15 @@ # pylint: disable=low-comment-ratio +import datetime import io +import os import subprocess as sp +from datetime import timedelta from typing import List, Callable -import os + import cv2 import numpy as np -import datetime -from datetime import timedelta from PIL import Image from ..helper import set_logger @@ -32,12 +33,13 @@ def get_video_length(video_path): import re - process = sp.Popen(['ffmpeg', '-i', video_path], + process = sp.Popen(['ffmpeg', '-i', video_path], stdout=sp.PIPE, stderr=sp.STDOUT) stdout, _ = process.communicate() stdout = str(stdout) - matches = re.search(r"Duration:\s{1}(?P\d+?):(?P\d+?):(?P\d+\.\d+?),", stdout, re.DOTALL).groupdict() + matches = re.search(r"Duration:\s{1}(?P\d+?):(?P\d+?):(?P\d+\.\d+?),", stdout, + re.DOTALL).groupdict() h = float(matches['hours']) m = float(matches['minutes']) s = float(matches['seconds']) @@ -120,7 +122,9 @@ def split_mp4_random(video_path, avg_length, max_clip_second=10): prefix = os.path.basename(video_path).replace('.mp4', '') for i in range(num_part): i_len = len(ts_group[i]) - cmd = 'ffmpeg' + ''.join(ts_group[i]) + '-filter_complex "{}concat=n={}:v=1:a=1" -strict -2 {}_{}.mp4 -y'.format(''.join(['[{}]'.format(k) for k in range(i_len)]), i_len, prefix, i) + cmd = 'ffmpeg' + ''.join( + ts_group[i]) + '-filter_complex "{}concat=n={}:v=1:a=1" -strict -2 {}_{}.mp4 -y'.format( + ''.join(['[{}]'.format(k) for k in range(i_len)]), i_len, prefix, i) os.system(cmd) @@ -174,7 +178,8 @@ def get_video_frames(buffer_data: bytes, image_format: str = 'cv2', cv2.cvtColor(image, cv2.COLOR_BGR2RGB) frames.append(image) except Exception as e: - logger.warning("The decoded cv2 image from keyframe buffer can not be converted to RGB: %s" % str(e)) + logger.warning( + "The decoded cv2 image from keyframe buffer can not be converted to RGB: %s" % str(e)) else: logger.error("The image format [%s] is not supported so far!" % image_format) raise NotImplementedError diff --git a/gnes/preprocessor/image/segmentation.py b/gnes/preprocessor/image/segmentation.py index d66978c8..5bdda122 100644 --- a/gnes/preprocessor/image/segmentation.py +++ b/gnes/preprocessor/image/segmentation.py @@ -15,10 +15,10 @@ import io import os +from typing import List import numpy as np from PIL import Image -from typing import List from .base import BaseImagePreprocessor from ...proto import array2blob diff --git a/gnes/preprocessor/image/sliding_window.py b/gnes/preprocessor/image/sliding_window.py index 63cadeaf..31ea4b65 100644 --- a/gnes/preprocessor/image/sliding_window.py +++ b/gnes/preprocessor/image/sliding_window.py @@ -90,7 +90,8 @@ def _get_all_sliding_window(self, img: 'np.ndarray'): return [np.array(Image.fromarray(img).resize((self.target_img_size, self.target_img_size))) for img in expanded_input], center_point_list - def _get_slid_offset_nd(self, all_subareas: List[List[int]], index: List[List[int]], center_point: List[float]) -> List[int]: + def _get_slid_offset_nd(self, all_subareas: List[List[int]], index: List[List[int]], center_point: List[float]) -> \ + List[int]: location_list = self._get_location(all_subareas, center_point) location = [i for i in range(len(location_list)) if location_list[i] is True][0] return index[location][:2] @@ -104,9 +105,11 @@ def _get_location(all_subareas: List[List[int]], center_point: List[float]) -> L if center_point[0] in range(int(area[0]), int(area[2])) and center_point[1] in range(int(area[1]), int(area[3])): location_list.append(True) - elif center_point[0] in range(int(area[0]), int(area[2])) and y_boundary == area[3] and center_point[1] > y_boundary: + elif center_point[0] in range(int(area[0]), int(area[2])) and y_boundary == area[3] and center_point[ + 1] > y_boundary: location_list.append(True) - elif center_point[1] in range(int(area[1]), int(area[3])) and x_boundary == area[2] and center_point[0] > x_boundary: + elif center_point[1] in range(int(area[1]), int(area[3])) and x_boundary == area[2] and center_point[ + 0] > x_boundary: location_list.append(True) else: location_list.append(False) diff --git a/gnes/preprocessor/video/ffmpeg.py b/gnes/preprocessor/video/ffmpeg.py index 8dc7a07d..d2674933 100644 --- a/gnes/preprocessor/video/ffmpeg.py +++ b/gnes/preprocessor/video/ffmpeg.py @@ -131,13 +131,13 @@ def apply(self, doc: 'gnes_pb2.Document') -> None: if self.segment_interval == -1: sub_videos = [frames] else: - sub_videos = [frames[_: _+self.segment_interval] + sub_videos = [frames[_: _ + self.segment_interval] for _ in range(0, len(frames), self.segment_interval)] # cut by num: should specify how many chunks for each doc elif self.segment_method == 'cut_by_num': if self.segment_num >= 2: - _interval = int(len(frames)/self.segment_num) - sub_videos = [frames[_: _+_interval] + _interval = int(len(frames) / self.segment_num) + sub_videos = [frames[_: _ + _interval] for _ in range(0, len(frames), _interval)] else: sub_videos = [frames] diff --git a/gnes/router/base.py b/gnes/router/base.py index 174ab360..55b9368c 100644 --- a/gnes/router/base.py +++ b/gnes/router/base.py @@ -36,4 +36,6 @@ def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], * if len(msg.envelope.num_part) > 1: msg.envelope.num_part.pop() else: - self.logger.error('can not reduce the message further, as num_part="%s"' % msg.envelope.num_part) + self.logger.warning( + 'message envelope says num_part=%s, means no further message reducing. ' + 'ignore this if you explicitly set "num_part" in RouterService' % msg.envelope.num_part) diff --git a/gnes/service/base.py b/gnes/service/base.py index b47925a6..e551dbd6 100644 --- a/gnes/service/base.py +++ b/gnes/service/base.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy # pylint: disable=low-comment-ratio import multiprocessing +import random import threading import time import types +from contextlib import ExitStack from enum import Enum from typing import Tuple, List, Union, Type @@ -26,6 +29,7 @@ from termcolor import colored from ..base import TrainableBase, T +from ..cli.parser import resolve_yaml_path from ..helper import set_logger from ..proto import gnes_pb2, add_route, send_message, recv_message @@ -47,6 +51,21 @@ class ReduceOp(BetterEnum): ALWAYS_ONE = 1 +class ParallelType(BetterEnum): + PUSH_BLOCK = 0 + PUSH_NONBLOCK = 1 + PUB_BLOCK = 2 + PUB_NONBLOCK = 3 + + @property + def is_push(self): + return self.value == 0 or self.value == 1 + + @property + def is_block(self): + return self.value == 0 or self.value == 2 + + class SocketType(BetterEnum): PULL_BIND = 0 PULL_CONNECT = 1 @@ -167,7 +186,7 @@ def __call__(cls, *args, **kwargs): _cls = { 'thread': threading.Thread, 'process': multiprocessing.Process - }[args[0].concurrency_backend] + }[args[0].parallel_backend] # rebuild the class according to mro for c in cls.mro()[-2::-1]: @@ -384,3 +403,68 @@ def send_ctrl_message(address: str, msg: 'gnes_pb2.Message', timeout: int): r = recv_message(sock, timeout) sock.close() return r + + +class ServiceManager: + def __init__(self, service_cls, args): + self.logger = set_logger(self.__class__.__name__, args.verbose) + self.services = [] # type: List['BaseService'] + if args.num_parallel > 1: + from .router import RouterService + _head_router = copy.deepcopy(args) + _head_router.port_ctrl = self._get_random_port() + port_out = self._get_random_port() + _head_router.port_out = port_out + + _tail_router = copy.deepcopy(args) + port_in = self._get_random_port() + _tail_router.port_in = port_in + _tail_router.port_ctrl = self._get_random_port() + + _tail_router.socket_in = SocketType.PULL_BIND + + if args.parallel_type.is_push: + _head_router.socket_out = SocketType.PUSH_BIND + else: + _head_router.socket_out = SocketType.PUB_BIND + _head_router.yaml_path = resolve_yaml_path( + '!PublishRouter {parameter: {num_part: %d}}' % args.num_parallel) + + if args.parallel_type.is_block: + _tail_router.yaml_path = resolve_yaml_path('BaseReduceRouter') + _tail_router.num_part = args.num_parallel + + self.services.append(RouterService(_head_router)) + self.services.append(RouterService(_tail_router)) + + for j in range(args.num_parallel): + _args = copy.deepcopy(args) + _args.port_in = port_out + _args.port_out = port_in + _args.port_ctrl = self._get_random_port() + _args.socket_out = SocketType.PUSH_CONNECT + if args.parallel_type.is_push: + _args.socket_in = SocketType.PULL_CONNECT + else: + _args.socket_in = SocketType.SUB_CONNECT + self.services.append(service_cls(_args)) + self.logger.info('num_parallel=%d, add a router with port_in=%d and a router with port_out=%d' % ( + args.num_parallel, _head_router.port_in, _tail_router.port_out)) + else: + self.services.append(service_cls(args)) + + @staticmethod + def _get_random_port(min_port: int = 49152, max_port: int = 65536) -> int: + return random.randrange(min_port, max_port) + + def __enter__(self): + self.stack = ExitStack() + for s in self.services: + self.stack.enter_context(s) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stack.close() + + def join(self): + for s in self.services: + s.join() diff --git a/gnes/service/grpc.py b/gnes/service/grpc.py index f89fdf22..2e87a43c 100644 --- a/gnes/service/grpc.py +++ b/gnes/service/grpc.py @@ -64,10 +64,11 @@ def __init__(self, args): self.logger = set_logger(self.__class__.__name__, self.args.verbose) self.ctx = zmq.Context() self.ctx.setsockopt(zmq.LINGER, 0) - self.receiver, _ = build_socket(self.ctx, self.args.host_in, self.args.port_in, self.args.socket_in, - getattr(self, 'identity', None)) - self.sender, _ = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out, - getattr(self, 'identity', None)) + self.receiver, recv_addr = build_socket(self.ctx, self.args.host_in, self.args.port_in, self.args.socket_in, + getattr(self, 'identity', None)) + self.sender, send_addr = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out, + getattr(self, 'identity', None)) + self.logger.info('send via %s, receive via %s' % (send_addr, recv_addr)) def __enter__(self): return self diff --git a/gnes/service/router.py b/gnes/service/router.py index 272f7682..89f2e593 100644 --- a/gnes/service/router.py +++ b/gnes/service/router.py @@ -32,7 +32,8 @@ def post_init(self): self._pending = defaultdict(list) # type: Dict[str, List] def _is_msg_complete(self, msg: 'gnes_pb2.Message', num_req: int) -> bool: - return (hasattr(self.args, 'num_part') and num_req == self.args.num_part[-1]) or (num_req == msg.envelope.num_part[-1]) + return (num_req == msg.envelope.num_part[-1]) or \ + (hasattr(self.args, 'num_part') and num_req == self.args.num_part) @handler.register(NotImplementedError) def _handler_default(self, msg: 'gnes_pb2.Message'): diff --git a/tests/test_service_mgr.py b/tests/test_service_mgr.py new file mode 100644 index 00000000..bf955d68 --- /dev/null +++ b/tests/test_service_mgr.py @@ -0,0 +1,86 @@ +import os +import unittest.mock + +import grpc + +from gnes.cli.parser import set_router_service_parser, set_grpc_frontend_parser +from gnes.proto import gnes_pb2_grpc, RequestGenerator +from gnes.service.base import ServiceManager, SocketType, ParallelType +from gnes.service.grpc import GRPCFrontend +from gnes.service.router import RouterService + + +class TestServiceManager(unittest.TestCase): + def setUp(self): + self.all_bytes = [b'abc', b'def', b'cde'] * 10 + self.all_bytes2 = [b'abc', b'def', b'cde'] + os.unsetenv('http_proxy') + os.unsetenv('https_proxy') + + def _test_multiple_router(self, backend='thread'): + a = set_router_service_parser().parse_args([ + '--yaml_path', 'BaseRouter', + '--num_parallel', '5', + '--parallel_backend', backend + ]) + with ServiceManager(RouterService, a): + pass + + def _test_grpc_multiple_router(self, backend='thread'): + args = set_grpc_frontend_parser().parse_args([ + '--grpc_host', '127.0.0.1', + ]) + + p_args = set_router_service_parser().parse_args([ + '--port_in', str(args.port_out), + '--port_out', str(args.port_in), + '--socket_in', str(SocketType.PULL_CONNECT), + '--socket_out', str(SocketType.PUSH_CONNECT), + '--yaml_path', 'BaseRouter', + '--num_parallel', '5', + '--parallel_backend', backend + ]) + + with ServiceManager(RouterService, p_args) as sm, GRPCFrontend(args), grpc.insecure_channel( + '%s:%s' % (args.grpc_host, args.grpc_port), + options=[('grpc.max_send_message_length', 70 * 1024 * 1024), + ('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel: + stub = gnes_pb2_grpc.GnesRPCStub(channel) + resp = stub.Call(list(RequestGenerator.query(b'abc', 1))[0]) + self.assertEqual(resp.request_id, '0') + + def _test_grpc_multiple_pub(self, backend='thread'): + args = set_grpc_frontend_parser().parse_args([ + '--grpc_host', '127.0.0.1', + ]) + + p_args = set_router_service_parser().parse_args([ + '--port_in', str(args.port_out), + '--port_out', str(args.port_in), + '--socket_in', str(SocketType.PULL_CONNECT), + '--socket_out', str(SocketType.PUSH_CONNECT), + '--yaml_path', 'BaseRouter', + '--num_parallel', '5', + '--parallel_backend', backend, + '--parallel_type', str(ParallelType.PUB_BLOCK) + ]) + + with ServiceManager(RouterService, p_args) as sm, GRPCFrontend(args), grpc.insecure_channel( + '%s:%s' % (args.grpc_host, args.grpc_port), + options=[('grpc.max_send_message_length', 70 * 1024 * 1024), + ('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel: + stub = gnes_pb2_grpc.GnesRPCStub(channel) + resp = stub.Call(list(RequestGenerator.query(b'abc', 1))[0]) + self.assertEqual(resp.request_id, '0') + + def test_grpc_with_pub(self): + self._test_grpc_multiple_pub('thread') + self._test_grpc_multiple_pub('process') + + def test_grpc_with_multi_service(self): + self._test_grpc_multiple_router('thread') + self._test_grpc_multiple_router('process') + + def test_multiple_router(self): + self._test_multiple_router('thread') + self._test_multiple_router('process') diff --git a/tests/test_stream_grpc.py b/tests/test_stream_grpc.py index 0ae92da3..1e7258b9 100644 --- a/tests/test_stream_grpc.py +++ b/tests/test_stream_grpc.py @@ -37,8 +37,9 @@ class TestStreamgRPC(unittest.TestCase): def setUp(self): self.all_bytes = [b'abc', b'def', b'cde'] * 10 self.all_bytes2 = [b'abc', b'def', b'cde'] + os.unsetenv('http_proxy') + os.unsetenv('https_proxy') - # @unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}) def test_grpc_frontend(self): args = set_grpc_frontend_parser().parse_args([ '--grpc_host', '127.0.0.1', @@ -62,7 +63,6 @@ def test_grpc_frontend(self): self.assertEqual(resp.request_id, str(len(self.all_bytes))) # idx start with 0, but +1 for final FLUSH - # @unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}) def test_async_block(self): args = set_grpc_frontend_parser().parse_args([ '--grpc_host', '127.0.0.1',