Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add back iroh status -w feature using new rpc streams #602

Merged
merged 7 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crate::IpfsPath;
use crate::P2pApi;
use anyhow::{ensure, Context, Result};
use cid::Cid;
use futures::stream::BoxStream;
use futures::stream::{BoxStream, LocalBoxStream};
use futures::{StreamExt, TryStreamExt};
use iroh_resolver::resolver::Resolver;
use iroh_rpc_client::Client;
use iroh_rpc_client::StatusTable;
use iroh_rpc_client::{Client, ClientStatus};
use iroh_unixfs::{
builder::Entry as UnixfsEntry,
content_loader::{FullLoader, FullLoaderConfig},
Expand Down Expand Up @@ -168,12 +167,12 @@ impl Api {
Ok(stream.boxed())
}

pub async fn check(&self) -> StatusTable {
pub async fn check(&self) -> ClientStatus {
self.client.check().await
}

pub async fn watch(&self) -> BoxStream<'static, StatusTable> {
self.client.clone().watch().await.boxed()
pub async fn watch(&self) -> LocalBoxStream<'static, ClientStatus> {
ramfox marked this conversation as resolved.
Show resolved Hide resolved
self.client.clone().watch().await.boxed_local()
}

/// The `add_stream` method encodes the entry into a DAG and adds
Expand Down
2 changes: 1 addition & 1 deletion iroh-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use crate::p2p::PeerIdOrAddr;
pub use bytes::Bytes;
pub use cid::Cid;
pub use iroh_resolver::resolver::Path as IpfsPath;
pub use iroh_rpc_client::{Lookup, ServiceStatus, StatusRow, StatusTable};
pub use iroh_rpc_client::{ClientStatus, Lookup, ServiceStatus, ServiceType, StatusType};
pub use iroh_unixfs::builder::{
Config as UnixfsConfig, DirectoryBuilder, Entry as UnixfsEntry, FileBuilder, SymlinkBuilder,
};
Expand Down
1 change: 1 addition & 0 deletions iroh-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true
anyhow.workspace = true
async-recursion.workspace = true
async-trait.workspace = true
async-stream.workspace = true
axum.workspace = true
bytes.workspace = true
cid.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions iroh-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ pub mod response;
mod rpc;
pub mod templates;
mod text;

pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
4 changes: 3 additions & 1 deletion iroh-gateway/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use iroh_metrics::config::Config as MetricsConfig;

use crate::VERSION;

pub fn metrics_config_with_compile_time_info(cfg: MetricsConfig) -> MetricsConfig {
// compile time configuration
cfg.with_service_name(env!("CARGO_PKG_NAME").to_string())
dignifiedquire marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -11,5 +13,5 @@ pub fn metrics_config_with_compile_time_info(cfg: MetricsConfig) -> MetricsConfi
)
.to_string(),
)
.with_version(env!("CARGO_PKG_VERSION").to_string())
.with_version(VERSION.to_string())
}
26 changes: 21 additions & 5 deletions iroh-gateway/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
use std::result;

use anyhow::Result;
use iroh_rpc_client::{create_server, GatewayServer, ServerError, ServerSocket};
use iroh_rpc_types::gateway::{
GatewayAddr, GatewayRequest, GatewayService, VersionRequest, VersionResponse,
use futures::stream::Stream;
use iroh_rpc_client::{create_server, GatewayServer, ServerError, ServerSocket, HEALTH_POLL_WAIT};
use iroh_rpc_types::{
gateway::{GatewayAddr, GatewayRequest, GatewayService},
VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};
use tracing::info;

use crate::VERSION;

#[derive(Default, Debug, Clone)]
pub struct Gateway {}

impl Gateway {
#[tracing::instrument(skip(self))]
fn watch(self, _: WatchRequest) -> impl Stream<Item = WatchResponse> {
async_stream::stream! {
loop {
yield WatchResponse { version: VERSION.to_string() };
tokio::time::sleep(HEALTH_POLL_WAIT).await;
}
}
}

#[tracing::instrument(skip(self))]
async fn version(self, _: VersionRequest) -> VersionResponse {
let version = env!("CARGO_PKG_VERSION").to_string();
VersionResponse { version }
VersionResponse {
version: VERSION.to_string(),
}
}
}

Expand All @@ -31,6 +46,7 @@ async fn dispatch(
) -> result::Result<(), ServerError> {
use GatewayRequest::*;
match req {
Watch(req) => s.server_streaming(req, chan, target, Gateway::watch).await,
Version(req) => s.rpc(req, chan, target, Gateway::version).await,
}
}
Expand Down
2 changes: 2 additions & 0 deletions iroh-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ mod swarm;
pub use self::config::*;
pub use self::keys::{DiskStorage, Keychain, MemoryStorage};
pub use self::node::*;

pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
3 changes: 2 additions & 1 deletion iroh-p2p/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::VERSION;
use iroh_metrics::config::Config as MetricsConfig;

pub fn metrics_config_with_compile_time_info(cfg: MetricsConfig) -> MetricsConfig {
Expand All @@ -11,5 +12,5 @@ pub fn metrics_config_with_compile_time_info(cfg: MetricsConfig) -> MetricsConfi
)
.to_string(),
)
.with_version(env!("CARGO_PKG_VERSION").to_string())
.with_version(VERSION.to_string())
}
30 changes: 25 additions & 5 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use anyhow::{anyhow, ensure, Context, Result};
use bytes::Bytes;
use cid::Cid;
use futures::StreamExt;
use futures::{stream::BoxStream, TryFutureExt};
use futures::{
stream::{BoxStream, Stream},
TryFutureExt,
};
use iroh_bitswap::Block;
use iroh_rpc_client::{create_server, Lookup, P2pServer, ServerError, ServerSocket};
use iroh_rpc_types::{p2p::*, RpcError, RpcResult};
use iroh_rpc_client::{
create_server, Lookup, P2pServer, ServerError, ServerSocket, HEALTH_POLL_WAIT,
};
use iroh_rpc_types::{
p2p::*, RpcError, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};
use libp2p::gossipsub::{
error::{PublishError, SubscriptionError},
MessageId, TopicHash,
Expand All @@ -21,6 +28,7 @@ use tokio::sync::oneshot;
use tracing::{debug, info, trace};

use super::node::DEFAULT_PROVIDER_LIMIT;
use crate::VERSION;

#[derive(Clone)]
pub(crate) struct P2p {
Expand All @@ -32,10 +40,21 @@ impl P2p {
Self { sender }
}

#[tracing::instrument(skip(self))]
fn watch(self, _: WatchRequest) -> impl Stream<Item = WatchResponse> {
async_stream::stream! {
loop {
yield WatchResponse { version: VERSION.to_string() };
tokio::time::sleep(HEALTH_POLL_WAIT).await;
}
}
}

#[tracing::instrument(skip(self))]
async fn version(self, _: VersionRequest) -> VersionResponse {
let version = env!("CARGO_PKG_VERSION").to_string();
VersionResponse { version }
VersionResponse {
version: VERSION.to_string(),
}
}

#[tracing::instrument(skip(self))]
Expand Down Expand Up @@ -519,6 +538,7 @@ impl P2p {
async fn dispatch(s: P2pServer, req: P2pRequest, chan: ServerSocket<P2pService>, target: P2p) -> result::Result<(), ServerError> {
use P2pRequest::*;
match req {
Watch(req) => s.server_streaming(req, chan, target, P2p::watch).await,
Version(req) => s.rpc(req, chan, target, P2p::version).await,
Shutdown(req) => s.rpc_map_err(req, chan, target, P2p::shutdown).await,
FetchBitswap(req) => s.rpc_map_err(req, chan, target, P2p::fetch_bitswap).await,
Expand Down
35 changes: 21 additions & 14 deletions iroh-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use super::config::Config;
use super::gateway::GatewayClient;
use super::network::P2pClient;
use super::store::StoreClient;
use crate::config::Config;
use crate::gateway::GatewayClient;
use crate::network::P2pClient;
use crate::status::{ClientStatus, ServiceStatus, ServiceType};
use crate::store::StoreClient;
use anyhow::{Context, Result};
use futures::{Stream, StreamExt};

Expand Down Expand Up @@ -143,47 +144,53 @@ impl Client {
self.store.get().context("missing rpc store connection")
}

pub async fn check(&self) -> super::status::StatusTable {
pub async fn check(&self) -> crate::status::ClientStatus {
let g = if let Some(ref g) = self.gateway {
Some(g.check().await)
let (s, v) = g.check().await;
Some(ServiceStatus::new(ServiceType::Gateway, s, v))
} else {
None
};
let p = if let Some(ref p) = self.p2p.get() {
Some(p.check().await)
let (s, v) = p.check().await;
Some(ServiceStatus::new(ServiceType::P2p, s, v))
} else {
None
};
let s = if let Some(ref s) = self.store.get() {
Some(s.check().await)
let (s, v) = s.check().await;
Some(ServiceStatus::new(ServiceType::Store, s, v))
} else {
None
};
super::status::StatusTable::new(g, p, s)
ClientStatus::new(g, p, s)
}

pub async fn watch(self) -> impl Stream<Item = super::status::StatusTable> {
pub async fn watch(self) -> impl Stream<Item = ClientStatus> {
async_stream::stream! {
let mut status_table: super::status::StatusTable = Default::default();
let mut status: ClientStatus = Default::default();
let mut streams = Vec::new();

if let Some(ref g) = self.gateway {
let g = g.watch().await;
let g = g.map(|(status, version)| ServiceStatus::new(ServiceType::Gateway, status, version));
streams.push(g.boxed());
}
if let Some(ref p) = self.p2p.get() {
let p = p.watch().await;
let p = p.map(|(status, version)| ServiceStatus::new(ServiceType::P2p, status, version));
streams.push(p.boxed());
}
if let Some(ref s) = self.store.get() {
let s = s.watch().await;
let s = s.map(|(status, version)| ServiceStatus::new(ServiceType::Store, status, version));
streams.push(s.boxed());
}

let mut stream = futures::stream::select_all(streams);
while let Some(status) = stream.next().await {
status_table.update(status).unwrap();
yield status_table.clone();
while let Some(s) = stream.next().await {
status.update(s);
yield status.clone();
}
}
}
Expand Down
43 changes: 24 additions & 19 deletions iroh-rpc-client/src/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::status::StatusRow;
use super::ServiceStatus;
use anyhow::Result;
use futures::Stream;
use iroh_rpc_types::gateway::*;
use std::fmt;

pub(crate) const NAME: &str = "gateway";
use anyhow::Result;
use async_stream::stream;
use futures::{Stream, StreamExt};
use iroh_rpc_types::{gateway::*, VersionRequest, WatchRequest};

use crate::{StatusType, HEALTH_POLL_WAIT};

#[derive(Clone)]
pub struct GatewayClient {
Expand Down Expand Up @@ -33,22 +33,27 @@ impl GatewayClient {
}

#[tracing::instrument(skip(self))]
pub async fn check(&self) -> StatusRow {
let status: ServiceStatus = self
.version()
.await
.map(|_| ServiceStatus::Serving)
.unwrap_or_else(|_e| ServiceStatus::Unknown);
StatusRow {
name: "gateway",
number: 1,
status,
pub async fn check(&self) -> (StatusType, String) {
match self.version().await {
Ok(version) => (StatusType::Serving, version),
Err(_) => (StatusType::Down, String::new()),
}
}

#[tracing::instrument(skip(self))]
pub async fn watch(&self) -> impl Stream<Item = StatusRow> {
// todo
futures::stream::pending()
pub async fn watch(&self) -> impl Stream<Item = (StatusType, String)> {
let client = self.client.clone();
stream! {
loop {
let res = client.server_streaming(WatchRequest).await;
if let Ok(mut res) = res {
while let Some(Ok(version)) = res.next().await {
yield (StatusType::Serving, version.version);
}
}
yield (StatusType::Down, String::new());
tokio::time::sleep(HEALTH_POLL_WAIT).await;
}
}
}
}
2 changes: 1 addition & 1 deletion iroh-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use quic_rpc::{
transport::{combined, http2, CombinedChannelTypes, Http2ChannelTypes, MemChannelTypes},
RpcClient, RpcServer, Service,
};
pub use status::{ServiceStatus, StatusRow, StatusTable};
pub use status::{ClientStatus, ServiceStatus, ServiceType, StatusType, HEALTH_POLL_WAIT};
pub use store::StoreClient;

/// The types of channels used by the client and server.
Expand Down
Loading