Skip to content

Commit

Permalink
Add support for ReDap catalog projection and filtering to python SDK (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zehiko authored Jan 13, 2025
1 parent 38cc75b commit 0e26484
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
8 changes: 6 additions & 2 deletions examples/python/remote/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@

register_cmd.add_argument("storage_url", help="Storage URL to register")

print_cmd.add_argument("--columns", nargs="*", help="Define which columns to print")
print_cmd.add_argument("--recording-ids", nargs="*", help="Select specific recordings to print")

args = parser.parse_args()

# Register the new rrd
conn = rr.remote.connect("http://0.0.0.0:51234")

catalog = pl.from_arrow(conn.query_catalog().read_all())

if args.subcommand == "print":
catalog = pl.from_arrow(conn.query_catalog(args.columns, args.recording_ids).read_all())
print(catalog)

elif args.subcommand == "register":
Expand All @@ -39,6 +41,8 @@
print(f"Registered new recording with ID: {id}")

elif args.subcommand == "update":
catalog = pl.from_arrow(conn.query_catalog().read_all())

id = (
catalog.filter(catalog["rerun_recording_id"].str.starts_with(args.id))
.select(pl.first("rerun_recording_id"))
Expand Down
16 changes: 14 additions & 2 deletions rerun_py/rerun_bindings/rerun_bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,20 @@ class StorageNodeClient:
Required-feature: `remote`
"""

def query_catalog(self) -> pa.RecordBatchReader:
"""Get the metadata for all recordings in the storage node."""
def query_catalog(
self, columns: Optional[list[str]] = None, recording_ids: Optional[list[str]] = None
) -> pa.RecordBatchReader:
"""
Get the metadata recordings in the storage node.
Parameters
----------
columns : Optional[list[str]], optional
The columns to include in the output, by default None.
recording_ids : Optional[list[str]]
Filter specific recordings by Recording Id
"""
...

def register(self, storage_url: str, metadata: Optional[TableLike] = None) -> str:
Expand Down
37 changes: 29 additions & 8 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use re_log_types::{EntityPathFilter, StoreInfo, StoreSource};
use re_protos::{
common::v0::RecordingId,
remote_store::v0::{
storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest,
QueryCatalogRequest, QueryRequest, RecordingType, RegisterRecordingRequest,
UpdateCatalogRequest,
storage_node_client::StorageNodeClient, CatalogFilter, ColumnProjection,
FetchRecordingRequest, QueryCatalogRequest, QueryRequest, RecordingType,
RegisterRecordingRequest, UpdateCatalogRequest,
},
};
use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline};
Expand Down Expand Up @@ -190,13 +190,34 @@ impl PyStorageNodeClient {

#[pymethods]
impl PyStorageNodeClient {
/// Get the metadata for all recordings in the storage node.
fn query_catalog(&mut self) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
/// Get the metadata for recordings in the storage node.
///
/// Parameters
/// ----------
/// columns : Optional[list[str]]
/// The columns to fetch. If `None`, fetch all columns.
/// recording_ids : Optional[list[str]]
/// Fetch metadata of only specific recordings. If `None`, fetch for all.
#[pyo3(signature = (
columns = None,
recording_ids = None,
))]
fn query_catalog(
&mut self,
columns: Option<Vec<String>>,
recording_ids: Option<Vec<String>>,
) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let reader = self.runtime.block_on(async {
// TODO(jleibs): Support column projection and filtering
let column_projection = columns.map(|columns| ColumnProjection { columns });
let filter = recording_ids.map(|recording_ids| CatalogFilter {
recording_ids: recording_ids
.into_iter()
.map(|id| RecordingId { id })
.collect(),
});
let request = QueryCatalogRequest {
column_projection: None,
filter: None,
column_projection,
filter,
};

let transport_chunks = self
Expand Down

0 comments on commit 0e26484

Please sign in to comment.