Skip to content

Commit

Permalink
Python wRPC client subscription prototype (kaspanet#62)
Browse files Browse the repository at this point in the history
* change python package name to 'kaspa'

* Introduce Py wRPC client wrapping Inner struct that contains KaspaRpcClient

* scaffolding for Python wRPC client Inner struct

* Python wRPC subscribtions prototype

* Python wRPC subscription callback args/kwargs

* lint

* minor refactor, handling of UTXO change notification

* properly gate python code in kaspa-rpc-core

* Attempt to fix test suite CI failure

* Subscribe UTXOs Changed

* subscriptions

* wRPC client disconnect

* unregister callbacks

* fix failing kaspad build
  • Loading branch information
smartgoo committed Sep 16, 2024
1 parent 27a540c commit e749a08
Show file tree
Hide file tree
Showing 15 changed files with 620 additions and 44 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pbkdf2 = "0.12.2"
portable-atomic = { version = "1.5.1", features = ["float"] }
prost = "0.12.1"
# prost = "0.13.1"
pyo3 = { version = "0.21.0", features = ["extension-module", "multiple-pymethods"] }
pyo3 = { version = "0.21.0", features = ["multiple-pymethods"] }
pyo3-asyncio-0-21 = { version = "0.21", features = ["attributes", "tokio-runtime"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand All @@ -240,6 +240,7 @@ seqlock = "0.2.0"
serde = { version = "1.0.190", features = ["derive", "rc"] }
serde_bytes = "0.11.12"
serde_json = "1.0.107"
serde-pyobject = "0.3.0"
serde_repr = "0.1.18"
serde-value = "0.7.0"
serde-wasm-bindgen = "0.6.1"
Expand Down Expand Up @@ -344,4 +345,4 @@ debug = true
strip = false

[workspace.lints.clippy]
empty_docs = "allow"
empty_docs = "allow"
3 changes: 2 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition.workspace = true
include.workspace = true

[lib]
name = "kaspapy"
name = "kaspa"
crate-type = ["cdylib"]

[dependencies]
Expand All @@ -20,6 +20,7 @@ pyo3.workspace = true
[features]
default = []
py-sdk = [
"pyo3/extension-module",
"kaspa-addresses/py-sdk",
"kaspa-wallet-keys/py-sdk",
"kaspa-wrpc-python/py-sdk",
Expand Down
4 changes: 2 additions & 2 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ Rusty-Kaspa/Rust bindings for Python, using [PyO3](https://pyo3.rs/v0.20.0/) and
See Python files in `./python/examples`.

# Project Layout
The Python package `kaspapy` is built from the `kaspa-python` crate, which is located at `./python`.
The Python package `kaspa` is built from the `kaspa-python` crate, which is located at `./python`.

As such, the `kaspapy` function in `./python/src/lib.rs` is a good starting point. This function uses PyO3 to add functionality to the package.
As such, the `kaspa` function in `./python/src/lib.rs` is a good starting point. This function uses PyO3 to add functionality to the package.
2 changes: 1 addition & 1 deletion python/examples/addresses.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kaspapy import (
from kaspa import (
PrivateKey,
)

Expand Down
42 changes: 38 additions & 4 deletions python/examples/rpc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
import asyncio
import json
import time
import os

from kaspapy import RpcClient
from kaspa import RpcClient


async def main():
client = await RpcClient.connect(url = "ws://localhost:17110")
print(f'Client is connected: {client.is_connected()}')
def subscription_callback(event, name, **kwargs):
print(f'{name} | {event}')

async def rpc_subscriptions(client):
# client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!')
client.add_event_listener('all', subscription_callback, name="all")

await client.subscribe_virtual_daa_score_changed()
await client.subscribe_virtual_chain_changed(True)
await client.subscribe_block_added()
await client.subscribe_new_block_template()

await asyncio.sleep(5)

client.remove_event_listener('all')
print('Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.')

await asyncio.sleep(5)

await client.unsubscribe_virtual_daa_score_changed()
await client.unsubscribe_virtual_chain_changed(True)
await client.unsubscribe_block_added()
await client.unsubscribe_new_block_template()


async def rpc_calls(client):
get_server_info_response = await client.get_server_info()
print(get_server_info_response)

Expand All @@ -24,6 +47,17 @@ async def main():
get_balances_by_addresses_response = await client.get_balances_by_addresses_call(get_balances_by_addresses_request)
print(get_balances_by_addresses_response)

async def main():
rpc_host = os.environ.get("KASPA_RPC_HOST")
client = RpcClient(url = f"ws://{rpc_host}:17210")
await client.connect()
print(f'Client is connected: {client.is_connected()}')

await rpc_calls(client)
await rpc_subscriptions(client)

await client.disconnect()


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion python/examples/test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kaspapy import PrivateKeyGenerator
from kaspa import PrivateKeyGenerator

if __name__ == "__main__":
x = PrivateKeyGenerator('xprv9s21ZrQH143K2hP7m1bU4ZT6tWgX1Qn2cWvtLVDX6sTJVyg3XBa4p1So4s7uEvVFGyBhQWWRe8JeLPeDZ462LggxkkJpZ9z1YMzmPahnaZA', False, 1)
Expand Down
6 changes: 3 additions & 3 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"

[project]
name = "kaspapy"
name = "kaspa"
description = "Kaspa Python Bindings"
version = "0.1.0"
requires-python = ">=3.8"
Expand All @@ -23,8 +23,8 @@ dependencies = []
# changelog = ""

[package.metadata.maturin]
name = "kaspapy"
name = "kaspa"
description = "Kaspa Python Bindings"

[tool.maturin]
name = "kaspapy"
name = "kaspa"
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cfg_if::cfg_if! {
use pyo3::prelude::*;

#[pymodule]
fn kaspapy(m: &Bound<'_, PyModule>) -> PyResult<()> {
fn kaspa(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<kaspa_addresses::Address>()?;

m.add_class::<kaspa_wallet_keys::privkeygen::PrivateKeyGenerator>()?;
Expand Down
6 changes: 5 additions & 1 deletion rpc/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ wasm32-sdk = [
"kaspa-consensus-client/wasm32-sdk",
"kaspa-consensus-wasm/wasm32-sdk"
]
py-sdk = ["pyo3"]
py-sdk = [
"pyo3",
"serde-pyobject"
]

[dependencies]
kaspa-addresses.workspace = true
Expand Down Expand Up @@ -46,6 +49,7 @@ paste.workspace = true
rand.workspace = true
pyo3 = { workspace = true, optional = true }
serde-wasm-bindgen.workspace = true
serde-pyobject = { workspace = true, optional = true }
serde.workspace = true
smallvec.workspace = true
thiserror.workspace = true
Expand Down
21 changes: 21 additions & 0 deletions rpc/core/src/api/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use kaspa_notify::{
Subscription,
},
};
#[cfg(feature = "py-sdk")]
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
#[cfg(feature = "py-sdk")]
use serde_pyobject::to_pyobject;
use std::sync::Arc;
use wasm_bindgen::JsValue;
use workflow_serializer::prelude::*;
Expand Down Expand Up @@ -62,6 +66,23 @@ impl Notification {
Notification::VirtualChainChanged(v) => to_value(&v),
}
}

#[cfg(feature = "py-sdk")]
pub fn to_pyobject(&self, py: Python) -> PyResult<PyObject> {
let bound_obj = match self {
Notification::BlockAdded(v) => to_pyobject(py, &v),
Notification::FinalityConflict(v) => to_pyobject(py, &v),
Notification::FinalityConflictResolved(v) => to_pyobject(py, &v),
Notification::NewBlockTemplate(v) => to_pyobject(py, &v),
Notification::PruningPointUtxoSetOverride(v) => to_pyobject(py, &v),
Notification::UtxosChanged(v) => to_pyobject(py, &v),
Notification::VirtualDaaScoreChanged(v) => to_pyobject(py, &v),
Notification::SinkBlueScoreChanged(v) => to_pyobject(py, &v),
Notification::VirtualChainChanged(v) => to_pyobject(py, &v),
};

Ok(bound_obj.unwrap().to_object(py))
}
}

impl NotificationTrait for Notification {
Expand Down
6 changes: 6 additions & 0 deletions rpc/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ pub fn build_wrpc_python_interface(input: TokenStream) -> TokenStream {
wrpc::python::build_wrpc_python_interface(input)
}

#[proc_macro]
#[proc_macro_error]
pub fn build_wrpc_python_subscriptions(input: TokenStream) -> TokenStream {
wrpc::python::build_wrpc_python_subscriptions(input)
}

#[proc_macro]
#[proc_macro_error]
pub fn declare_typescript_wasm_interface(input: TokenStream) -> TokenStream {
Expand Down
93 changes: 91 additions & 2 deletions rpc/macros/src/wrpc/python.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::handler::*;
use proc_macro2::TokenStream;
use convert_case::{Case, Casing};
use proc_macro2::{Ident, Span, TokenStream};
use quote::{quote, ToTokens};
use regex::Regex;
use std::convert::Into;
use syn::{
parse::{Parse, ParseStream},
Expand Down Expand Up @@ -41,7 +43,7 @@ impl ToTokens for RpcTable {
#[pymethods]
impl RpcClient {
fn #fn_call(&self, py: Python, request: Py<PyDict>) -> PyResult<Py<PyAny>> {
let client = self.client.clone();
let client = self.inner.client.clone();

let request : #request_type = serde_pyobject::from_pyobject(request.into_bound(py)).unwrap();

Expand Down Expand Up @@ -71,3 +73,90 @@ pub fn build_wrpc_python_interface(input: proc_macro::TokenStream) -> proc_macro
// println!("MACRO: {}", ts.to_string());
ts.into()
}

#[derive(Debug)]
struct RpcSubscriptions {
handlers: ExprArray,
}

impl Parse for RpcSubscriptions {
fn parse(input: ParseStream) -> Result<Self> {
let parsed = Punctuated::<Expr, Token![,]>::parse_terminated(input).unwrap();
if parsed.len() != 1 {
return Err(Error::new_spanned(parsed, "usage: build_wrpc_python_!([getInfo, ..])".to_string()));
}

let mut iter = parsed.iter();
// Intake enum variants as an array
let handlers = get_handlers(iter.next().unwrap().clone())?;

Ok(RpcSubscriptions { handlers })
}
}

impl ToTokens for RpcSubscriptions {
fn to_tokens(&self, tokens: &mut TokenStream) {
let mut targets = Vec::new();

for handler in self.handlers.elems.iter() {
// TODO docs (name, docs)
let (name, _) = match handler {
syn::Expr::Path(expr_path) => (expr_path.path.to_token_stream().to_string(), &expr_path.attrs),
_ => {
continue;
}
};

let name = format!("Notify{}", name.as_str());
let regex = Regex::new(r"^Notify").unwrap();
let blank = regex.replace(&name, "");
let subscribe = regex.replace(&name, "Subscribe");
let unsubscribe = regex.replace(&name, "Unsubscribe");
let scope = Ident::new(&blank, Span::call_site());
let sub_scope = Ident::new(format!("{blank}Scope").as_str(), Span::call_site());
let fn_subscribe_snake = Ident::new(&subscribe.to_case(Case::Snake), Span::call_site());
let fn_unsubscribe_snake = Ident::new(&unsubscribe.to_case(Case::Snake), Span::call_site());

targets.push(quote! {
#[pymethods]
impl RpcClient {
fn #fn_subscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
} else {
Err(PyErr::new::<PyException, _>("RPC subscribe on a closed connection"))
}
}

fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
} else {
Err(PyErr::new::<PyException, _>("RPC unsubscribe on a closed connection"))
}
}
}
});
}

quote! {
#(#targets)*
}
.to_tokens(tokens);
}
}

pub fn build_wrpc_python_subscriptions(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let rpc_table = parse_macro_input!(input as RpcSubscriptions);
let ts = rpc_table.to_token_stream();
// println!("MACRO: {}", ts.to_string());
ts.into()
}
11 changes: 9 additions & 2 deletions rpc/wrpc/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@ license.workspace = true
repository.workspace = true

[features]
default = ["py-sdk"]
default = []
py-sdk = [
"pyo3/extension-module",
"kaspa-addresses/py-sdk",
"kaspa-rpc-core/py-sdk",
"kaspa-wrpc-client/py-sdk",
]

[dependencies]
ahash.workspace = true
cfg-if.workspace = true
futures.workspace = true
kaspa-addresses.workspace = true
kaspa-consensus-core.workspace = true
kaspa-notify.workspace = true
kaspa-rpc-core.workspace = true
kaspa-rpc-macros.workspace = true
kaspa-wrpc-client.workspace = true
kaspa-python-macros.workspace = true
pyo3.workspace = true
pyo3-asyncio-0-21.workspace = true
serde_json.workspace = true
serde-pyobject = "0.3.0"
serde-pyobject.workspace = true
thiserror.workspace = true
workflow-core.workspace = true
workflow-log.workspace = true
workflow-rpc.workspace = true
Loading

0 comments on commit e749a08

Please sign in to comment.