Skip to content

Commit

Permalink
fix: speed improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
standy66 committed Mar 21, 2019
1 parent 33ff295 commit 1cb3d46
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 84 deletions.
17 changes: 11 additions & 6 deletions misc/greeter/failing_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,31 @@

async def worker(channel):
stub = GreeterStub(channel)
data = random_payload()
data = "\0" * 100000

@async_generator
async def gen():
for _ in range(4):
await yield_(greeter_pb2.HelloRequest(name=data))


print("stream_unary")
assert (await stub.SayHelloToManyAtOnce(gen())).message == data * 4
print("Done")


async def main():
async with purerpc.insecure_channel("localhost", 50055) as channel:
async with anyio.create_task_group() as task_group:
for _ in range(20):
for _ in range(50):
await task_group.spawn(worker, channel)


if __name__ == "__main__":
for i in range(50):
print("Warming up")
for i in range(20):
anyio.run(main)
print("Testing")
import time
start = time.time()
for i in range(20):
anyio.run(main)
print("took", time.time() - start)

4 changes: 1 addition & 3 deletions misc/greeter/failing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@
GreeterServicer = greeter_grpc.GreeterServicer
class Servicer(GreeterServicer):
async def SayHello(self, message):
return greeter_pb2.HelloReply(message="Hello, " + message.name)
return greeter_pb2.HelloReply(message=message.name)

@async_generator
async def SayHelloGoodbye(self, message):
await yield_(greeter_pb2.HelloReply(message=message.name))
await yield_(greeter_pb2.HelloReply(message=message.name))

async def SayHelloToManyAtOnce(self, messages):
print("Got SayHelloToManyAtOnce")
names = []
async for message in messages:
names.append(message.name)
print("Returning from SayHelloToManyAtOnce")
return greeter_pb2.HelloReply(message="".join(names))

@async_generator
Expand Down
58 changes: 21 additions & 37 deletions misc/greeter/test_perf.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,40 @@
import time
import curio
import cProfile
import anyio
import sys
import argparse
import contextlib
import multiprocessing

import purerpc
from async_generator import async_generator, yield_
from generated.greeter_pb2 import HelloRequest, HelloReply
from generated.greeter_grpc import GreeterServicer, GreeterStub


@contextlib.contextmanager
def run_purerpc_service_in_process(service):
queue = multiprocessing.Queue()
def target_fn():
server = purerpc.Server(port=0)
server.add_service(service)
socket = server._create_socket_and_listen()
queue.put(socket.getsockname()[1])
queue.close()
queue.join_thread()
curio.run(server._run_async_server, socket)

process = multiprocessing.Process(target=target_fn)
process.start()
port = queue.get()
queue.close()
queue.join_thread()
try:
yield port
finally:
process.terminate()
process.join()
from purerpc.test_utils import run_purerpc_service_in_process


class Greeter(GreeterServicer):
async def SayHello(self, message):
return HelloReply(message=message.name)

@async_generator
async def SayHelloToMany(self, input_messages):
async for message in input_messages:
yield HelloReply(message=message.name)
await yield_(HelloReply(message=message.name))


async def do_load_unary(stub, num_requests, message_size):
async def do_load_unary(result_queue, stub, num_requests, message_size):
message = "0" * message_size
start = time.time()
for _ in range(num_requests):
result = (await stub.SayHello(HelloRequest(name=message))).message
assert (len(result) == message_size)
avg_latency = (time.time() - start) / num_requests
return avg_latency
await result_queue.put(avg_latency)


async def do_load_stream(stub, num_requests, message_size):
async def do_load_stream(result_queue, stub, num_requests, message_size):
message = "0" * message_size
stream = await stub.SayHelloToMany()
start = time.time()
Expand All @@ -63,7 +45,7 @@ async def do_load_stream(stub, num_requests, message_size):
avg_latency = (time.time() - start) / num_requests
await stream.close()
await stream.receive_message()
return avg_latency
await result_queue.put(avg_latency)


async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
Expand All @@ -77,16 +59,18 @@ async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
else:
raise ValueError(f"Unknown load type: {load_type}")
for _ in range(num_rounds):
tasks = []
start = time.time()
async with curio.TaskGroup() as task_group:
task_results = anyio.create_queue(sys.maxsize)
async with anyio.create_task_group() as task_group:
for _ in range(num_concurrent_streams):
task = await task_group.spawn(load_fn(stub, num_requests_per_stream, message_size))
tasks.append(task)
await task_group.spawn(load_fn, task_results, stub, num_requests_per_stream, message_size)
end = time.time()
rps = num_concurrent_streams * num_requests_per_stream / (end - start)
queue.put(rps)
queue.put([task.result for task in tasks])
results = []
for _ in range(num_concurrent_streams):
results.append(await task_results.get())
queue.put(results)
queue.close()
queue.join_thread()

Expand All @@ -107,9 +91,9 @@ async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
with run_purerpc_service_in_process(Greeter().service) as port:
def target_fn(worker_id):
queue = queues[worker_id]
curio.run(worker, port, queue, args.num_concurrent_streams,
args.num_requests_per_stream, args.num_rounds, args.message_size,
args.load_type)
anyio.run(worker, port, queue, args.num_concurrent_streams,
args.num_requests_per_stream, args.num_rounds, args.message_size,
args.load_type)

processes = []
for worker_id in range(args.num_workers):
Expand Down
139 changes: 139 additions & 0 deletions misc/greeter/test_perf_grpcio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# from gevent import monkey
# monkey.patch_all()
#
# import grpc._cython.cygrpc
# grpc._cython.cygrpc.init_grpc_gevent()


import time
import anyio
import sys
import argparse
import functools
from queue import Queue
import multiprocessing




import grpc
from generated.greeter_pb2 import HelloRequest, HelloReply
from generated.greeter_pb2_grpc import GreeterServicer, GreeterStub, add_GreeterServicer_to_server

from purerpc.test_utils import run_grpc_service_in_process


class Greeter(GreeterServicer):
def SayHello(self, message, context):
return HelloReply(message=message.name[::-1])

def SayHelloToMany(self, messages, context):
for message in messages:
yield HelloReply(message=message.name[::-1])


def do_load_unary(result_queue, stub, num_requests, message_size):
requests_left = num_requests
avg_latency = 0
message = "0" * message_size
start = time.time()
fut = stub.SayHello.future(HelloRequest(name=message))

def done_callback(fut):
nonlocal requests_left
nonlocal avg_latency
requests_left -= 1
assert len(fut.result().message) == message_size
if requests_left > 0:
fut = stub.SayHello.future(HelloRequest(name=message))
fut.add_done_callback(done_callback)
else:
avg_latency = (time.time() - start) / num_requests
result_queue.put(avg_latency)

fut.add_done_callback(done_callback)


# def do_load_stream(result_queue, stub, num_requests, message_size):
# message = "0" * message_size
# stream = await stub.SayHelloToMany()
# start = time.time()
# for _ in range(num_requests):
# await stream.send_message(HelloRequest(name=message))
# result = await stream.receive_message()
# assert (len(result.message) == message_size)
# avg_latency = (time.time() - start) / num_requests
# await stream.close()
# await stream.receive_message()
# await result_queue.put(avg_latency)


def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
num_rounds, message_size, load_type):
with grpc.insecure_channel("localhost:{}".format(port)) as channel:
stub = GreeterStub(channel)
if load_type == "unary":
load_fn = do_load_unary
else:
raise ValueError(f"Unknown load type: {load_type}")
for _ in range(num_rounds):
start = time.time()
task_results = Queue()
for _ in range(num_concurrent_streams):
load_fn(task_results, stub, num_requests_per_stream, message_size)

results = []
for _ in range(num_concurrent_streams):
results.append(task_results.get())

end = time.time()
rps = num_concurrent_streams * num_requests_per_stream / (end - start)
queue.put(rps)
queue.put(results)
queue.close()
queue.join_thread()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--message_size", type=int, default=1000)
parser.add_argument("--num_workers", type=int, default=3)
parser.add_argument("--num_concurrent_streams", type=int, default=100)
parser.add_argument("--num_requests_per_stream", type=int, default=50)
parser.add_argument("--num_rounds", type=int, default=10)
parser.add_argument("--load_type", choices=["unary", "stream"], required=True)

args = parser.parse_args()

queues = [multiprocessing.Queue() for _ in range(args.num_workers)]

with run_grpc_service_in_process(functools.partial(add_GreeterServicer_to_server, Greeter())) as port:
def target_fn(worker_id):
queue = queues[worker_id]
worker(port, queue, args.num_concurrent_streams,
args.num_requests_per_stream, args.num_rounds, args.message_size,
args.load_type)

processes = []
for worker_id in range(args.num_workers):
process = multiprocessing.Process(target=target_fn, args=(worker_id,))
process.start()
processes.append(process)

for round_id in range(args.num_rounds):
total_rps = 0
latencies = []
for queue in queues:
total_rps += queue.get()
latencies.extend(queue.get())
avg_latency = 1000 * sum(latencies) / len(latencies)
max_latency = 1000 * max(latencies)
print(f"Round {round_id}, RPS: {total_rps}, avg latency: {avg_latency} ms, "
f"max latency: {max_latency} ms")

for queue in queues:
queue.close()
queue.join_thread()

for process in processes:
process.join()
4 changes: 2 additions & 2 deletions misc/pypy_tests/bytearray_perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ def main():
from purerpc.grpclib.buffers import ByteBuffer
b = b"\x00" * 50
x = ByteBuffer()
for i in range(5000000):
for i in range(500000):
for j in range(50):
x.append(b)
x.popleft(2000)


if __name__ == "__main__":
main()
main()
1 change: 1 addition & 0 deletions src/purerpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from purerpc.rpc import Cardinality, RPCSignature, Stream
from purerpc.grpclib.status import Status, StatusCode
from purerpc.grpclib.exceptions import *
from purerpc._version import __version__
3 changes: 1 addition & 2 deletions src/purerpc/grpc_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class GRPCSocket(async_exit_stack.AsyncExitStack):
StreamClass = GRPCStream

def __init__(self, config: GRPCConfiguration, sock,
receive_buffer_size=16384):
receive_buffer_size=1024*1024):
super().__init__()
self._grpc_connection = GRPCConnection(config=config)
self._socket = SocketWrapper(self._grpc_connection, sock)
Expand Down Expand Up @@ -236,7 +236,6 @@ async def _listen(self):
events = self._grpc_connection.receive_data(data)
await self._socket.flush()
for event in events:
# TODO: implement this
if isinstance(event, h2.events.WindowUpdated):
if event.stream_id == 0:
for stream in self._streams.values():
Expand Down
Loading

0 comments on commit 1cb3d46

Please sign in to comment.