Skip to content

Commit

Permalink
Async proxy pool, Event manager, Custom access log, Expose loop to pl…
Browse files Browse the repository at this point in the history
…ugins (#645)

* Async proxy pool

* Async proxy pool

* Late upstream initialization and exception guards

* Close upstream proxy connection on client connection close

* Refactor into EventManager

* Fix tests accounting in the event manager

* Ensure each process initializes logger

* pragma no cover

* Teardown connection when proxy pool upstream proxy closes

* Add ability to customize access log format and add additional context to it

* Maintain total size for response bytes in access logs

* Fix tests broken due to new plugin methods missing mock

* Update pubsub_eventing to use EventManager to avoid entire bootstrapping step
  • Loading branch information
abhinavsingh authored Oct 31, 2021
1 parent 567d616 commit d4ee4fa
Show file tree
Hide file tree
Showing 23 changed files with 518 additions and 262 deletions.
31 changes: 10 additions & 21 deletions examples/pubsub_eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,17 @@
:license: BSD, see LICENSE for more details.
"""
import time
import threading
import multiprocessing
import logging

from typing import Dict, Any

from proxy.core.event import EventQueue, EventSubscriber, EventDispatcher, eventNames
from proxy.core.event import EventQueue, EventSubscriber, eventNames
from proxy.core.event.manager import EventManager

# Enable debug logging to view core event logs
logging.basicConfig(level=logging.DEBUG)

# Eventing requires a multiprocess safe queue
# so that events can be safely published and received
# between processes.
manager = multiprocessing.Manager()

main_publisher_request_id = '1234'
process_publisher_request_id = '12345'
num_events_received = [0, 0]
Expand Down Expand Up @@ -59,17 +54,13 @@ def on_event(payload: Dict[str, Any]) -> None:
if __name__ == '__main__':
start_time = time.time()

# Start dispatcher thread
dispatcher_queue = EventQueue(manager.Queue())
dispatcher_shutdown_event = threading.Event()
dispatcher = EventDispatcher(
shutdown=dispatcher_shutdown_event,
event_queue=dispatcher_queue)
dispatcher_thread = threading.Thread(target=dispatcher.run)
dispatcher_thread.start()
# Start dispatcher thread using EventManager
event_manager = EventManager()
event_manager.start_event_dispatcher()
assert event_manager.event_queue

# Create a subscriber
subscriber = EventSubscriber(dispatcher_queue)
subscriber = EventSubscriber(event_manager.event_queue)
# Internally, subscribe will start a separate thread
# to receive incoming published messages
subscriber.subscribe(on_event)
Expand All @@ -79,13 +70,13 @@ def on_event(payload: Dict[str, Any]) -> None:
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, dispatcher_queue, ))
publisher_shutdown_event, event_manager.event_queue, ))
publisher.start()

try:
while True:
# Dispatch event from main process
dispatcher_queue.publish(
event_manager.event_queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
Expand All @@ -100,8 +91,6 @@ def on_event(payload: Dict[str, Any]) -> None:
# Stop subscriber thread
subscriber.unsubscribe()
# Signal dispatcher to shutdown
dispatcher_shutdown_event.set()
# Wait for dispatcher shutdown
dispatcher_thread.join()
event_manager.stop_event_dispatcher()
print('Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time() - start_time))
9 changes: 8 additions & 1 deletion proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@
DEFAULT_IPV6_HOSTNAME = ipaddress.IPv6Address('::1')
DEFAULT_KEY_FILE = None
DEFAULT_LOG_FILE = None
DEFAULT_LOG_FORMAT = '%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s'
DEFAULT_LOG_FORMAT = '%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(filename)s:%(funcName)s:%(lineno)d - %(message)s'
DEFAULT_LOG_LEVEL = 'INFO'
DEFAULT_HTTP_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {server_host}:{server_port}{request_path} - ' + \
'{response_code} {response_reason} - {response_bytes} bytes - ' + \
'{connection_time_ms} ms'
DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {server_host}:{server_port} - ' + \
'{response_bytes} bytes - {connection_time_ms} ms'
DEFAULT_NUM_WORKERS = 0
DEFAULT_OPEN_FILE_LIMIT = 1024
DEFAULT_PAC_FILE = None
Expand Down
37 changes: 30 additions & 7 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
:license: BSD, see LICENSE for more details.
"""
import ssl
import contextlib
import socket
import logging
import functools
import ipaddress
import socket
import contextlib

from types import TracebackType
from typing import Optional, Dict, Any, List, Tuple, Type, Callable

from .constants import HTTP_1_1, COLON, WHITESPACE, CRLF, DEFAULT_TIMEOUT
from .constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL


def text_(s: Any, encoding: str = 'utf-8', errors: str = 'strict') -> Any:
Expand Down Expand Up @@ -89,14 +91,14 @@ def build_http_pkt(line: List[bytes],
headers: Optional[Dict[bytes, bytes]] = None,
body: Optional[bytes] = None) -> bytes:
"""Build and returns a HTTP request or response packet."""
req = WHITESPACE.join(line) + CRLF
pkt = WHITESPACE.join(line) + CRLF
if headers is not None:
for k in headers:
req += build_http_header(k, headers[k]) + CRLF
req += CRLF
pkt += build_http_header(k, headers[k]) + CRLF
pkt += CRLF
if body:
req += body
return req
pkt += body
return pkt


def build_websocket_handshake_request(
Expand Down Expand Up @@ -226,3 +228,24 @@ def get_available_port() -> int:
sock.bind(('', 0))
_, port = sock.getsockname()
return int(port)


def setup_logger(
log_file: Optional[str] = DEFAULT_LOG_FILE,
log_level: str = DEFAULT_LOG_LEVEL,
log_format: str = DEFAULT_LOG_FORMAT) -> None:
ll = getattr(
logging,
{'D': 'DEBUG',
'I': 'INFO',
'W': 'WARNING',
'E': 'ERROR',
'C': 'CRITICAL'}[log_level.upper()[0]])
if log_file:
logging.basicConfig(
filename=log_file,
filemode='a',
level=ll,
format=log_format)
else:
logging.basicConfig(level=ll, format=log_format)
3 changes: 3 additions & 0 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ..event import EventQueue, eventNames
from ...common.constants import DEFAULT_THREADLESS
from ...common.flag import flags
from ...common.utils import setup_logger

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -133,6 +134,8 @@ def run_once(self) -> None:
self.start_work(conn, addr)

def run(self) -> None:
setup_logger(self.flags.log_file, self.flags.log_level,
self.flags.log_format)
self.selector = selectors.DefaultSelector()
fileno = recv_handle(self.work_queue)
self.work_queue.close()
Expand Down
60 changes: 6 additions & 54 deletions proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
import logging
import multiprocessing
import socket
import threading

from multiprocessing import connection
from multiprocessing.reduction import send_handle
from typing import List, Optional, Type

from .acceptor import Acceptor
from .work import Work

from ..event import EventQueue, EventDispatcher
from ..event import EventQueue

from ...common.flag import flags
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_ENABLE_EVENTS
from ...common.constants import DEFAULT_IPV6_HOSTNAME, DEFAULT_NUM_WORKERS, DEFAULT_PORT
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME, DEFAULT_NUM_WORKERS, DEFAULT_PORT

logger = logging.getLogger(__name__)

Expand All @@ -37,14 +37,6 @@
default=DEFAULT_BACKLOG,
help='Default: 100. Maximum number of pending connections to proxy server')

flags.add_argument(
'--enable-events',
action='store_true',
default=DEFAULT_ENABLE_EVENTS,
help='Default: False. Enables core to dispatch lifecycle events. '
'Plugins can be used to subscribe for core events.'
)

flags.add_argument(
'--hostname',
type=str,
Expand Down Expand Up @@ -79,31 +71,16 @@ class AcceptorPool:
pool.shutdown()
`work_klass` must implement `work.Work` class.
Optionally, AcceptorPool also initialize a global event queue.
It is a multiprocess safe queue which can be used to build pubsub patterns
for message sharing or signaling.
TODO(abhinavsingh): Decouple event queue setup & teardown into its own class.
"""

def __init__(self, flags: argparse.Namespace,
work_klass: Type[Work]) -> None:
work_klass: Type[Work], event_queue: Optional[EventQueue] = None) -> None:
self.flags = flags
self.socket: Optional[socket.socket] = None
self.acceptors: List[Acceptor] = []
self.work_queues: List[connection.Connection] = []
self.work_klass = work_klass

self.event_queue: Optional[EventQueue] = None
self.event_dispatcher: Optional[EventDispatcher] = None
self.event_dispatcher_thread: Optional[threading.Thread] = None
self.event_dispatcher_shutdown: Optional[threading.Event] = None
self.manager: Optional[multiprocessing.managers.SyncManager] = None

if self.flags.enable_events:
self.manager = multiprocessing.Manager()
self.event_queue = EventQueue(self.manager.Queue())
self.event_queue: Optional[EventQueue] = event_queue

def listen(self) -> None:
self.socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
Expand Down Expand Up @@ -137,42 +114,17 @@ def start_workers(self) -> None:
self.work_queues.append(work_queue[0])
logger.info('Started %d workers' % self.flags.num_workers)

def start_event_dispatcher(self) -> None:
self.event_dispatcher_shutdown = threading.Event()
assert self.event_dispatcher_shutdown
assert self.event_queue
self.event_dispatcher = EventDispatcher(
shutdown=self.event_dispatcher_shutdown,
event_queue=self.event_queue
)
self.event_dispatcher_thread = threading.Thread(
target=self.event_dispatcher.run
)
self.event_dispatcher_thread.start()
logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident)

def shutdown(self) -> None:
logger.info('Shutting down %d workers' % self.flags.num_workers)
for acceptor in self.acceptors:
acceptor.running.set()
if self.flags.enable_events:
assert self.event_dispatcher_shutdown
assert self.event_dispatcher_thread
self.event_dispatcher_shutdown.set()
self.event_dispatcher_thread.join()
logger.debug(
'Shutdown of global event dispatcher thread %d successful',
self.event_dispatcher_thread.ident)
for acceptor in self.acceptors:
acceptor.join()
logger.debug('Acceptors shutdown')

def setup(self) -> None:
"""Listen on port, setup workers and pass server socket to workers."""
self.listen()
if self.flags.enable_events:
logger.info('Core Event enabled')
self.start_event_dispatcher()
self.start_workers()
# Send server socket to all acceptor processes.
assert self.socket is not None
Expand Down
3 changes: 3 additions & 0 deletions proxy/core/acceptor/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ..connection import TcpClientConnection
from ..event import EventQueue, eventNames

from ...common.utils import setup_logger
from ...common.types import Readables, Writables
from ...common.constants import DEFAULT_TIMEOUT

Expand Down Expand Up @@ -179,6 +180,8 @@ def run_once(self) -> None:
self.cleanup_inactive()

def run(self) -> None:
setup_logger(self.flags.log_file, self.flags.log_level,
self.flags.log_format)
try:
self.selector = selectors.DefaultSelector()
self.selector.register(self.client_queue, selectors.EVENT_READ)
Expand Down
9 changes: 6 additions & 3 deletions proxy/core/connection/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
"""
import ssl
import socket

from typing import Optional, Union, Tuple

from .connection import TcpConnection, tcpConnectionTypes, TcpConnectionUninitializedException

from ...common.utils import new_socket_connection


Expand All @@ -23,6 +25,7 @@ def __init__(self, host: str, port: int):
super().__init__(tcpConnectionTypes.SERVER)
self._conn: Optional[Union[ssl.SSLSocket, socket.socket]] = None
self.addr: Tuple[str, int] = (host, int(port))
self.closed = True

@property
def connection(self) -> Union[ssl.SSLSocket, socket.socket]:
Expand All @@ -31,9 +34,9 @@ def connection(self) -> Union[ssl.SSLSocket, socket.socket]:
return self._conn

def connect(self) -> None:
if self._conn is not None:
return
self._conn = new_socket_connection(self.addr)
if self._conn is None:
self._conn = new_socket_connection(self.addr)
self.closed = False

def wrap(self, hostname: str, ca_file: Optional[str]) -> None:
ctx = ssl.create_default_context(
Expand Down
2 changes: 2 additions & 0 deletions proxy/core/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
from .names import EventNames, eventNames
from .dispatcher import EventDispatcher
from .subscriber import EventSubscriber
from .manager import EventManager

__all__ = [
'eventNames',
'EventNames',
'EventQueue',
'EventDispatcher',
'EventSubscriber',
'EventManager',
]
2 changes: 1 addition & 1 deletion proxy/core/event/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class EventDispatcher:
module is not-recommended. Python native multiprocessing queue
doesn't provide a fanout functionality which core dispatcher module
implements so that several plugins can consume same published
event at a time.
event concurrently.
When --enable-events is used, a multiprocessing.Queue is created and
attached to global argparse. This queue can then be used for
Expand Down
Loading

0 comments on commit d4ee4fa

Please sign in to comment.