Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net)!: improve dns behaviour by staggering requests #2313

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-dns-server/examples/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
TxtAttrs::<String>::lookup_by_id(&resolver, &node_id, origin).await?
}
Command::Domain { domain } => {
TxtAttrs::<String>::lookup_by_domain(&resolver, &domain).await?
TxtAttrs::<String>::lookup_by_name(&resolver, &domain).await?
}
};
println!("resolved node {}", resolved.node_id());
Expand Down
9 changes: 3 additions & 6 deletions iroh-dns-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ mod tests {
};
use iroh_net::{
discovery::pkarr_publish::PkarrRelayClient,
dns::{
node_info::{lookup_by_id, NodeInfo},
DnsResolver,
},
dns::{node_info::NodeInfo, DnsResolver, ResolverExt},
key::SecretKey,
};
use pkarr::{PkarrClient, SignedPacket};
Expand Down Expand Up @@ -168,7 +165,7 @@ mod tests {
pkarr.publish(&signed_packet).await?;

let resolver = test_resolver(nameserver);
let res = lookup_by_id(&resolver, &node_id, origin).await?;
let res = resolver.lookup_by_id(&node_id, origin).await?;

assert_eq!(res.node_id, node_id);
assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url));
Expand Down Expand Up @@ -204,7 +201,7 @@ mod tests {

// resolve via DNS from our server, which will lookup from our DHT
let resolver = test_resolver(nameserver);
let res = lookup_by_id(&resolver, &node_id, origin).await?;
let res = resolver.lookup_by_id(&node_id, origin).await?;

assert_eq!(res.node_id, node_id);
assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url));
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ mod test_dns_pkarr {

use crate::{
discovery::pkarr_publish::PkarrPublisher,
dns::node_info::{lookup_by_id, NodeInfo},
dns::{node_info::NodeInfo, ResolverExt},
relay::{RelayMap, RelayMode},
test_utils::{
dns_server::{create_dns_resolver, run_dns_server},
Expand Down Expand Up @@ -590,7 +590,7 @@ mod test_dns_pkarr {
state.upsert(signed_packet)?;

let resolver = create_dns_resolver(nameserver)?;
let resolved = lookup_by_id(&resolver, &node_info.node_id, &origin).await?;
let resolved = resolver.lookup_by_id(&node_info.node_id, &origin).await?;

assert_eq!(resolved, node_info.into());

Expand Down Expand Up @@ -620,7 +620,7 @@ mod test_dns_pkarr {
publisher.update_addr_info(&addr_info);
// wait until our shared state received the update from pkarr publishing
dns_pkarr_server.on_node(&node_id, timeout).await?;
let resolved = lookup_by_id(&resolver, &node_id, &origin).await?;
let resolved = resolver.lookup_by_id(&node_id, &origin).await?;

let expected = NodeAddr {
info: addr_info,
Expand Down
9 changes: 6 additions & 3 deletions iroh-net/src/discovery/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use futures_lite::stream::Boxed as BoxStream;

use crate::{
discovery::{Discovery, DiscoveryItem},
dns, Endpoint, NodeId,
dns::ResolverExt,
Endpoint, NodeId,
};

/// The n0 testing DNS node origin
pub const N0_DNS_NODE_ORIGIN: &str = "dns.iroh.link";
const DNS_STAGGERING_MS: &[u64] = &[200, 300];

/// DNS node discovery
///
Expand Down Expand Up @@ -53,8 +55,9 @@ impl Discovery for DnsDiscovery {
let resolver = ep.dns_resolver().clone();
let origin_domain = self.origin_domain.clone();
let fut = async move {
let node_addr =
dns::node_info::lookup_by_id(&resolver, &node_id, &origin_domain).await?;
let node_addr = resolver
.lookup_by_id_staggered(&node_id, &origin_domain, DNS_STAGGERING_MS)
.await?;
Ok(DiscoveryItem {
provenance: "dns",
last_updated: None,
Expand Down
222 changes: 214 additions & 8 deletions iroh-net/src/dns.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
//! This module exports a DNS resolver, which is also the default resolver used in the
//! [`crate::Endpoint`] if no custom resolver is configured.
//!
//! It also exports [`ResolverExt`]: A extension trait over [`DnsResolver`] to perform DNS queries
//! by ipv4, ipv6, name and node_id. See the [`node_info`] module documentation for details on how
//! iroh node records are structured.

use std::fmt::Write;
use std::net::{IpAddr, Ipv6Addr};
use std::time::Duration;

use anyhow::Result;
use futures_lite::Future;
use futures_lite::{Future, StreamExt};
use hickory_resolver::{AsyncResolver, IntoName, TokioAsyncResolver};
use iroh_base::key::NodeId;
use iroh_base::node_addr::NodeAddr;
use once_cell::sync::Lazy;

pub mod node_info;
Expand Down Expand Up @@ -78,14 +85,14 @@ fn create_default_resolver() -> Result<TokioAsyncResolver> {
/// Extension trait to [`DnsResolver`].
pub trait ResolverExt {
/// Perform an ipv4 lookup with a timeout.
fn lookup_ipv4<N: IntoName + Clone>(
fn lookup_ipv4<N: IntoName>(
&self,
host: N,
timeout: Duration,
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;

/// Perform an ipv6 lookup with a timeout.
fn lookup_ipv6<N: IntoName + Clone>(
fn lookup_ipv6<N: IntoName>(
&self,
host: N,
timeout: Duration,
Expand All @@ -97,10 +104,69 @@ pub trait ResolverExt {
host: N,
timeout: Duration,
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;

/// Looks up node info by DNS name.
fn lookup_by_name(&self, name: &str) -> impl Future<Output = Result<NodeAddr>>;

/// Looks up node info by [`NodeId`] and origin domain name.
fn lookup_by_id(
&self,
node_id: &NodeId,
origin: &str,
) -> impl Future<Output = Result<NodeAddr>>;

/// Perform an ipv4 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
fn lookup_ipv4_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
dignifiedquire marked this conversation as resolved.
Show resolved Hide resolved

/// Perform an ipv6 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
fn lookup_ipv6_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;

/// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
fn lookup_ipv4_ipv6_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;

/// Looks up node info by DNS name in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
fn lookup_by_name_staggered(
&self,
name: &str,
delays_ms: &[u64],
) -> impl Future<Output = Result<NodeAddr>>;

/// Looks up node info by [`NodeId`] and origin domain name.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
fn lookup_by_id_staggered(
&self,
node_id: &NodeId,
origin: &str,
delays_ms: &[u64],
) -> impl Future<Output = Result<NodeAddr>>;
}

impl ResolverExt for DnsResolver {
async fn lookup_ipv4<N: IntoName + Clone>(
async fn lookup_ipv4<N: IntoName>(
&self,
host: N,
timeout: Duration,
Expand All @@ -109,7 +175,7 @@ impl ResolverExt for DnsResolver {
Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0)))
}

async fn lookup_ipv6<N: IntoName + Clone>(
async fn lookup_ipv6<N: IntoName>(
&self,
host: N,
timeout: Duration,
Expand Down Expand Up @@ -142,9 +208,90 @@ impl ResolverExt for DnsResolver {
}
}
}

/// Looks up node info by DNS name.
///
/// The resource records returned for `name` must either contain an [`node_info::IROH_TXT_NAME`] TXT
/// record or be a CNAME record that leads to an [`node_info::IROH_TXT_NAME`] TXT record.
async fn lookup_by_name(&self, name: &str) -> Result<NodeAddr> {
let attrs = node_info::TxtAttrs::<node_info::IrohAttr>::lookup_by_name(self, name).await?;
let info: node_info::NodeInfo = attrs.into();
Ok(info.into())
}

/// Looks up node info by [`NodeId`] and origin domain name.
async fn lookup_by_id(&self, node_id: &NodeId, origin: &str) -> Result<NodeAddr> {
let attrs =
node_info::TxtAttrs::<node_info::IrohAttr>::lookup_by_id(self, node_id, origin).await?;
let info: node_info::NodeInfo = attrs.into();
Ok(info.into())
}

/// Perform an ipv4 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
/// The timeout is passed to every individual call to [`ResolverExt::lookup_ipv4`].
async fn lookup_ipv4_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> Result<impl Iterator<Item = IpAddr>> {
let f = || self.lookup_ipv4(host.clone(), timeout);
stagger_call(f, delays_ms).await
}

/// Perform an ipv6 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
/// The timeout is passed to every individual call to [`ResolverExt::lookup_ipv6`].
async fn lookup_ipv6_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> Result<impl Iterator<Item = IpAddr>> {
let f = || self.lookup_ipv6(host.clone(), timeout);
stagger_call(f, delays_ms).await
}

/// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
/// The timeout is passed to every individual call to [`ResolverExt::lookup_ipv4_ipv6`].
async fn lookup_ipv4_ipv6_staggered<N: IntoName + Clone>(
&self,
host: N,
timeout: Duration,
delays_ms: &[u64],
) -> Result<impl Iterator<Item = IpAddr>> {
let f = || self.lookup_ipv4_ipv6(host.clone(), timeout);
stagger_call(f, delays_ms).await
}

/// Looks up node info by DNS name in a staggered fashion.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
async fn lookup_by_name_staggered(&self, name: &str, delays_ms: &[u64]) -> Result<NodeAddr> {
let f = || self.lookup_by_name(name);
stagger_call(f, delays_ms).await
}

/// Looks up node info by [`NodeId`] and origin domain name.
///
/// The first call is done immediately, with added calls according to the staggering strategy.
async fn lookup_by_id_staggered(
&self,
node_id: &NodeId,
origin: &str,
delays_ms: &[u64],
) -> Result<NodeAddr> {
let f = || self.lookup_by_id(node_id, origin);
stagger_call(f, delays_ms).await
}
}

/// Helper enum to give a unified type to the iterators of [`ResolverExt::lookup_ipv4_ipv6`]
/// Helper enum to give a unified type to the iterators of [`ResolverExt::lookup_ipv4_ipv6`].
enum LookupIter<A, B> {
Ipv4(A),
Ipv6(B),
Expand All @@ -163,11 +310,53 @@ impl<A: Iterator<Item = IpAddr>, B: Iterator<Item = IpAddr>> Iterator for Lookup
}
}

/// Staggers calls to the future F with the given delays.
///
/// The first call is performed immediately. The first call to succeed generates an Ok result
/// ignoring any previous error. If all calls fail, an error sumarizing all errors is returned.
async fn stagger_call<T, F: Fn() -> Fut, Fut: Future<Output = Result<T>>>(
f: F,
delays_ms: &[u64],
) -> Result<T> {
let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1);
// NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than
// using alternatives that allow futures of different types.
for delay in std::iter::once(&0u64).chain(delays_ms) {
let delay = std::time::Duration::from_millis(*delay);
let fut = f();
let staggered_fut = async move {
tokio::time::sleep(delay).await;
fut.await
};
calls.push(staggered_fut)
}

let mut errors = vec![];
while let Some(call_result) = calls.next().await {
match call_result {
Ok(t) => return Ok(t),
Err(e) => errors.push(e),
}
}

anyhow::bail!(
"no calls succeed: [ {}]",
errors.into_iter().fold(String::new(), |mut summary, e| {
write!(summary, "{e} ").expect("infallible");
summary
})
)
}

#[cfg(test)]
pub(crate) mod tests {
use std::sync::atomic::AtomicUsize;

use crate::defaults::NA_RELAY_HOSTNAME;

use super::*;
const TIMEOUT: Duration = Duration::from_secs(5);
const STAGGERING_DELAYS: &[u64] = &[200, 300];

#[tokio::test]
#[cfg_attr(target_os = "windows", ignore = "flaky")]
Expand All @@ -181,16 +370,33 @@ pub(crate) mod tests {
}

#[tokio::test]
#[cfg_attr(target_os = "windows", ignore = "flaky")]
async fn test_dns_lookup_ipv4_ipv6() {
let _logging = iroh_test::logging::setup();
let resolver = default_resolver();
let res: Vec<_> = resolver
.lookup_ipv4_ipv6(NA_RELAY_HOSTNAME, Duration::from_secs(5))
.lookup_ipv4_ipv6_staggered(NA_RELAY_HOSTNAME, TIMEOUT, STAGGERING_DELAYS)
.await
.unwrap()
.collect();
assert!(!res.is_empty());
dbg!(res);
}

#[tokio::test]
async fn stagger_basic() {
let _logging = iroh_test::logging::setup();
const CALL_RESULTS: &[Result<u8, u8>] = &[Err(2), Ok(3), Ok(5), Ok(7)];
static DONE_CALL: AtomicUsize = AtomicUsize::new(0);
let f = || {
let r_pos = DONE_CALL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
async move {
tracing::info!(r_pos, "call");
CALL_RESULTS[r_pos].map_err(|e| anyhow::anyhow!("{e}"))
}
};

let delays = [1000, 15];
let result = stagger_call(f, &delays).await.unwrap();
assert_eq!(result, 5)
}
}
Loading
Loading