From c951743db65aac003e257e40c476388b48138a65 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 3 Jan 2023 13:35:17 -0500 Subject: [PATCH 1/4] Force removal of Pyo3 Object to avoid memory leak This commit fix the memory leak happening in the dora API operator. This is seemingly due to pyo3 leaking memory on object created in Rust. Using standard `drop` did not drop the memory on the `PyBytes` included in the `PyDict`. See: https://github.com/PyO3/pyo3/issues/1801 Fixes https://github.com/dora-rs/dora/issues/163 --- binaries/runtime/src/operator/python.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index b4849cca3..a00f1755f 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -12,10 +12,10 @@ use dora_operator_api_python::metadata_to_pydict; use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context}; use pyo3::{ - pyclass, + ffi, pyclass, types::IntoPyDict, types::{PyBytes, PyDict}, - Py, Python, + AsPyPointer, Py, Python, }; use std::{ borrow::Cow, @@ -139,14 +139,21 @@ pub fn spawn( let status_enum = Python::with_gil(|py| { let input_dict = PyDict::new(py); + let bytes = PyBytes::new(py, &input.data()); input_dict.set_item("id", input.id.as_str())?; - input_dict.set_item("data", PyBytes::new(py, &input.data()))?; + input_dict.set_item("data", bytes)?; input_dict.set_item("metadata", metadata_to_pydict(input.metadata(), py))?; - operator + let status_enum = operator .call_method1(py, "on_input", (input_dict, send_output.clone())) - .map_err(traceback) + .map_err(traceback); + + unsafe { + ffi::Py_DECREF(bytes.as_ptr()); + ffi::Py_DECREF(input_dict.as_ptr()); + } + status_enum })?; let status_val = Python::with_gil(|py| status_enum.getattr(py, "value")) .wrap_err("on_input must have enum return value")?; From e7611e188ee02d69bd954fae6019a8d20249fd97 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 3 Jan 2023 16:21:52 -0500 Subject: [PATCH 2/4] Update requirements and fix `malloc` in plot --- apis/python/node/Cargo.toml | 2 +- examples/python-dataflow/plot.py | 3 --- examples/python-dataflow/requirements.txt | 10 +++++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index fdffc2780..7de836e79 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -1,6 +1,6 @@ [package] +version = "0.1.2" name = "dora-node-api-python" -version.workspace = true edition = "2021" license = "Apache-2.0" diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 57a2a293f..0161f93b2 100644 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -81,6 +81,3 @@ def on_input( return DoraStatus.STOP return DoraStatus.CONTINUE - - def __del__(self): - cv2.destroyAllWindows() diff --git a/examples/python-dataflow/requirements.txt b/examples/python-dataflow/requirements.txt index 55f71178a..3245eada7 100644 --- a/examples/python-dataflow/requirements.txt +++ b/examples/python-dataflow/requirements.txt @@ -2,17 +2,20 @@ # Usage: pip install -r requirements.txt # Base ---------------------------------------- +gitpython +ipython # interactive notebook matplotlib>=3.2.2 numpy>=1.18.5 opencv-python>=4.1.1 Pillow>=7.1.2 +psutil # system resources PyYAML>=5.3.1 requests>=2.23.0 scipy>=1.4.1 -torch>=1.7.0 +thop>=0.1.1 # FLOPs computation +torch>=1.7.0 # see https://pytorch.org/get-started/locally (recommended) torchvision>=0.8.1 tqdm>=4.64.0 -protobuf<=3.20.1 # https://github.com/ultralytics/yolov5/issues/8012 # Logging ------------------------------------- tensorboard>=2.4.1 @@ -35,9 +38,6 @@ seaborn>=0.11.0 # openvino-dev # OpenVINO export # Extras -------------------------------------- -ipython # interactive notebook -psutil # system utilization -thop>=0.1.1 # FLOPs computation # albumentations>=1.0.3 # pycocotools>=2.0 # COCO mAP # roboflow From 14b29b7edda2ca9ddc76c59f71f54302e56018a1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Jan 2023 12:11:12 +0100 Subject: [PATCH 3/4] Use `py.new_pool()` to bound pyo3 variable To alievate the unbounded memory growth, we're replacing variable dereferencing with scoped `GILPool` as described in the pyo3 documentation recently updated. See: https://github.com/PyO3/pyo3/pull/2864 --- binaries/runtime/src/operator/python.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index a00f1755f..975eb2aef 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -10,7 +10,7 @@ use dora_message::uhlc; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use dora_operator_api_types::DoraStatus; -use eyre::{bail, eyre, Context}; +use eyre::{bail, eyre, Context, Result}; use pyo3::{ ffi, pyclass, types::IntoPyDict, @@ -137,7 +137,9 @@ pub fn spawn( }; input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); - let status_enum = Python::with_gil(|py| { + let status = Python::with_gil(|py| -> Result { + let pool = unsafe { py.new_pool() }; + let py = pool.python(); let input_dict = PyDict::new(py); let bytes = PyBytes::new(py, &input.data()); @@ -147,18 +149,15 @@ pub fn spawn( let status_enum = operator .call_method1(py, "on_input", (input_dict, send_output.clone())) - .map_err(traceback); - - unsafe { - ffi::Py_DECREF(bytes.as_ptr()); - ffi::Py_DECREF(input_dict.as_ptr()); - } - status_enum + .map_err(traceback)?; + + let status_val = status_enum + .getattr(py, "value") + .wrap_err("on_input must have enum return value")?; + status_val + .extract(py) + .wrap_err("on_input has invalid return value") })?; - let status_val = Python::with_gil(|py| status_enum.getattr(py, "value")) - .wrap_err("on_input must have enum return value")?; - let status: i32 = Python::with_gil(|py| status_val.extract(py)) - .wrap_err("on_input has invalid return value")?; match status { s if s == DoraStatus::Continue as i32 => {} // ok s if s == DoraStatus::Stop as i32 => break StopReason::ExplicitStop, From 835b5f3950ab6583fdd4fa8fd32335a587df6be8 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Jan 2023 17:27:41 +0100 Subject: [PATCH 4/4] add documentation of scoped unsage `GILPool` --- binaries/runtime/src/operator/python.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 975eb2aef..900a30249 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -12,10 +12,10 @@ use dora_operator_api_python::metadata_to_pydict; use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context, Result}; use pyo3::{ - ffi, pyclass, + pyclass, types::IntoPyDict, types::{PyBytes, PyDict}, - AsPyPointer, Py, Python, + Py, Python, }; use std::{ borrow::Cow, @@ -138,6 +138,15 @@ pub fn spawn( input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); let status = Python::with_gil(|py| -> Result { + // We need to create a new scoped `GILPool` because the dora-runtime + // is currently started through a `start_runtime` wrapper function, + // which is annotated with `#[pyfunction]`. This attribute creates an + // initial `GILPool` that lasts for the entire lifetime of the `dora-runtime`. + // However, we want the `PyBytes` created below to be freed earlier. + // creating a new scoped `GILPool` tied to this closure, will free `PyBytes` + // at the end of the closure. + // See https://github.com/PyO3/pyo3/pull/2864 and + // https://github.com/PyO3/pyo3/issues/2853 for more details. let pool = unsafe { py.new_pool() }; let py = pool.python(); let input_dict = PyDict::new(py);