Skip to content

Commit

Permalink
pr review adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Dec 16, 2022
1 parent c34c51c commit 944ef06
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 158 deletions.
25 changes: 14 additions & 11 deletions iroh-gateway/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
use std::result;
use std::{result, time::Duration};

use anyhow::Result;
use futures::stream::{BoxStream, StreamExt};
use futures::stream::Stream;
use iroh_rpc_client::{create_server, GatewayServer, ServerError, ServerSocket};
use iroh_rpc_types::gateway::{
GatewayAddr, GatewayRequest, GatewayService, VersionRequest, VersionResponse, WatchRequest,
WatchResponse,
use iroh_rpc_types::{
gateway::{GatewayAddr, GatewayRequest, GatewayService},
VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};
use tracing::info;

const VERSION: &str = env!("CARGO_PKG_VERSION");
const WAIT: Duration = Duration::from_secs(1);

#[derive(Default, Debug, Clone)]
pub struct Gateway {}

impl Gateway {
#[tracing::instrument(skip(self))]
fn watch(self, _: WatchRequest) -> BoxStream<'static, WatchResponse> {
fn watch(self, _: WatchRequest) -> impl Stream<Item = WatchResponse> {
async_stream::stream! {
loop {
yield WatchResponse { version: env!("CARGO_PKG_VERSION").to_string() };
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
yield WatchResponse { version: VERSION.to_string() };
tokio::time::sleep(WAIT).await;
}
}
.boxed()
}

#[tracing::instrument(skip(self))]
async fn version(self, _: VersionRequest) -> VersionResponse {
let version = env!("CARGO_PKG_VERSION").to_string();
VersionResponse { version }
VersionResponse {
version: VERSION.to_string(),
}
}
}

Expand Down
26 changes: 18 additions & 8 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::time::Duration;

use anyhow::{anyhow, ensure, Context, Result};
use bytes::Bytes;
use cid::Cid;
use futures::StreamExt;
use futures::{stream::BoxStream, TryFutureExt};
use futures::{
stream::{BoxStream, Stream},
TryFutureExt,
};
use iroh_bitswap::Block;
use iroh_rpc_client::{create_server, Lookup, P2pServer, ServerError, ServerSocket};
use iroh_rpc_types::{p2p::*, RpcError, RpcResult};
use iroh_rpc_types::{
p2p::*, RpcError, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};
use libp2p::gossipsub::{
error::{PublishError, SubscriptionError},
MessageId, TopicHash,
Expand All @@ -22,6 +29,9 @@ use tracing::{debug, info, trace};

use super::node::DEFAULT_PROVIDER_LIMIT;

const VERSION: &str = env!("CARGO_PKG_VERSION");
const WAIT: Duration = Duration::from_secs(1);

#[derive(Clone)]
pub(crate) struct P2p {
sender: Sender<RpcMessage>,
Expand All @@ -33,20 +43,20 @@ impl P2p {
}

#[tracing::instrument(skip(self))]
fn watch(self, _: WatchRequest) -> BoxStream<'static, WatchResponse> {
fn watch(self, _: WatchRequest) -> impl Stream<Item = WatchResponse> {
async_stream::stream! {
loop {
yield WatchResponse { version: env!("CARGO_PKG_VERSION").to_string() };
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
yield WatchResponse { version: VERSION.to_string() };
tokio::time::sleep(WAIT).await;
}
}
.boxed()
}

#[tracing::instrument(skip(self))]
async fn version(self, _: VersionRequest) -> VersionResponse {
let version = env!("CARGO_PKG_VERSION").to_string();
VersionResponse { version }
VersionResponse {
version: VERSION.to_string(),
}
}

#[tracing::instrument(skip(self))]
Expand Down
2 changes: 1 addition & 1 deletion iroh-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl Client {

let mut stream = futures::stream::select_all(streams);
while let Some(s) = stream.next().await {
status.update(s).unwrap();
status.update(s).expect("unknown service");
yield status.clone();
}
}
Expand Down
7 changes: 3 additions & 4 deletions iroh-rpc-client/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt;
use anyhow::Result;
use async_stream::stream;
use futures::{Stream, StreamExt};
use iroh_rpc_types::gateway::*;
use iroh_rpc_types::{gateway::*, VersionRequest, WatchRequest};

use crate::{status::StatusType, ServiceStatus};

Expand Down Expand Up @@ -42,7 +42,6 @@ impl GatewayClient {
};
ServiceStatus {
name: "gateway",
number: 1,
status,
version,
}
Expand All @@ -58,11 +57,11 @@ impl GatewayClient {
Ok(mut res) => {
while let Some(v) = res.next().await {
let (status, version) = v.map_or((StatusType::Down, String::new()), |v| (StatusType::Serving, v.version));
yield ServiceStatus::new("gateway", 1, status, version);
yield ServiceStatus::new("gateway", status, version);
}
},
Err(_) => {
yield ServiceStatus::new("gateway", 1, StatusType::Down, "");
yield ServiceStatus::new("gateway", StatusType::Down, "");
}
}
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down
10 changes: 4 additions & 6 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use async_stream::stream;
use bytes::Bytes;
use cid::Cid;
use futures::{Stream, StreamExt};
use iroh_rpc_types::p2p;
use iroh_rpc_types::p2p::*;
use iroh_rpc_types::{p2p::*, VersionRequest, WatchRequest};
use libp2p::gossipsub::{MessageId, TopicHash};
use libp2p::{Multiaddr, PeerId};
use std::collections::{HashMap, HashSet};
Expand All @@ -27,7 +26,7 @@ impl P2pClient {

#[tracing::instrument(skip(self))]
pub async fn version(&self) -> Result<String> {
let res = self.client.rpc(p2p::VersionRequest).await?;
let res = self.client.rpc(VersionRequest).await?;
Ok(res.version)
}

Expand Down Expand Up @@ -278,7 +277,6 @@ impl P2pClient {
};
ServiceStatus {
name: "p2p",
number: 1,
status,
version,
}
Expand All @@ -294,11 +292,11 @@ impl P2pClient {
Ok(mut res) => {
while let Some(v) = res.next().await {
let (status, version) = v.map_or((StatusType::Down, String::new()), |v| (StatusType::Serving, v.version));
yield ServiceStatus::new("p2p", 1, status, version);
yield ServiceStatus::new("p2p", status, version);
}
},
Err(_) => {
yield ServiceStatus::new("p2p", 1, StatusType::Down, "");
yield ServiceStatus::new("p2p", StatusType::Down, "");
}
}
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down
49 changes: 13 additions & 36 deletions iroh-rpc-client/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,24 @@ pub enum StatusType {
Serving,
/// Indicates that the service is down.
Down,
/// Indicates that the service not serving data, but the service is not down.
// TODO(ramfox): NotServing is currently unused
NotServing,
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// The status of an individual rpc service
pub struct ServiceStatus {
/// name of the service: "gateway", "p2p", or "store"
pub(crate) name: &'static str,
pub(crate) number: usize,
pub(crate) status: StatusType,
pub(crate) version: String,
}

impl ServiceStatus {
pub fn new<I: Into<String>>(
name: &'static str,
number: usize,
status: StatusType,
version: I,
) -> Self {
pub fn new<I: Into<String>>(name: &'static str, status: StatusType, version: I) -> Self {
Self {
name,
number,
status,
version: version.into(),
}
Expand All @@ -40,24 +36,19 @@ impl ServiceStatus {
self.name
}

pub fn number(&self) -> usize {
self.number
}

pub fn status(&self) -> StatusType {
self.status.clone()
}

pub fn version(&self) -> String {
self.version.clone()
pub fn version(&self) -> &str {
&self.version
}
}

impl Default for ServiceStatus {
fn default() -> Self {
Self {
name: "",
number: 1,
status: StatusType::Unknown,
version: String::new(),
}
Expand Down Expand Up @@ -133,9 +124,9 @@ impl Iterator for ClientStatusIterator<'_> {
impl Default for ClientStatus {
fn default() -> Self {
Self {
gateway: ServiceStatus::new(gateway::NAME, 1, StatusType::Unknown, ""),
p2p: ServiceStatus::new(network::NAME, 1, StatusType::Unknown, ""),
store: ServiceStatus::new(store::NAME, 1, StatusType::Unknown, ""),
gateway: ServiceStatus::new(gateway::NAME, StatusType::Unknown, ""),
p2p: ServiceStatus::new(network::NAME, StatusType::Unknown, ""),
store: ServiceStatus::new(store::NAME, StatusType::Unknown, ""),
}
}
}
Expand All @@ -148,7 +139,6 @@ mod tests {
fn service_status_default() {
let expect = ServiceStatus {
name: "",
number: 1,
status: StatusType::Unknown,
version: String::new(),
};
Expand All @@ -159,13 +149,12 @@ mod tests {
fn service_status_new() {
let expect = ServiceStatus {
name: "test",
number: 15,
status: StatusType::Serving,
version: "v0.1.0".to_string(),
};
assert_eq!(
expect,
ServiceStatus::new("test", 15, StatusType::Serving, "v0.1.0")
ServiceStatus::new("test", StatusType::Serving, "v0.1.0")
);
}

Expand All @@ -174,19 +163,16 @@ mod tests {
let expect = ClientStatus {
gateway: ServiceStatus {
name: crate::gateway::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
p2p: ServiceStatus {
name: crate::network::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
store: ServiceStatus {
name: crate::store::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
Expand All @@ -200,29 +186,26 @@ mod tests {
let expect = ClientStatus {
gateway: ServiceStatus {
name: "test",
number: 1,
status: StatusType::Unknown,
version: "test".to_string(),
},
p2p: ServiceStatus {
name: "test",
number: 1,
status: StatusType::Unknown,
version: "test".to_string(),
},
store: ServiceStatus {
name: "test",
number: 1,
status: StatusType::Unknown,
version: "test".to_string(),
},
};
assert_eq!(
expect,
ClientStatus::new(
Some(ServiceStatus::new("test", 1, StatusType::Unknown, "test")),
Some(ServiceStatus::new("test", 1, StatusType::Unknown, "test")),
Some(ServiceStatus::new("test", 1, StatusType::Unknown, "test"))
Some(ServiceStatus::new("test", StatusType::Unknown, "test")),
Some(ServiceStatus::new("test", StatusType::Unknown, "test")),
Some(ServiceStatus::new("test", StatusType::Unknown, "test"))
)
);
}
Expand All @@ -231,19 +214,16 @@ mod tests {
fn status_table_update() {
let gateway = Some(ServiceStatus::new(
gateway::NAME,
1,
StatusType::Unknown,
"v0.1.0",
));
let mut p2p = Some(ServiceStatus::new(
network::NAME,
1,
StatusType::Unknown,
"v0.1.0",
));
let mut store = Some(ServiceStatus::new(
store::NAME,
1,
StatusType::Unknown,
"v0.1.0",
));
Expand All @@ -268,19 +248,16 @@ mod tests {
vec![
ServiceStatus {
name: crate::store::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
ServiceStatus {
name: crate::network::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
ServiceStatus {
name: crate::gateway::NAME,
number: 1,
status: StatusType::Unknown,
version: "".to_string(),
},
Expand Down
Loading

0 comments on commit 944ef06

Please sign in to comment.