From 5813e09445cb7520aab49f6bf3c25a269c01e9b0 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 17 Oct 2023 14:06:20 +0200 Subject: [PATCH] fix: avoid FuturesUnordered (#1647) ## Description Workaround for #1646 and likely makes more sense in the context of Tokio usage anyway. In general `JoinSet` is simply used instead. --- .github/workflows/ci.yml | 2 +- Cargo.lock | 351 +++++++++++++++-------------- iroh-bytes/Cargo.toml | 2 +- iroh-gossip/Cargo.toml | 2 +- iroh-gossip/src/net/util.rs | 38 +++- iroh-metrics/Cargo.toml | 2 +- iroh-net/Cargo.toml | 4 +- iroh-net/src/derp/http/client.rs | 34 +-- iroh-net/src/netcheck/reportgen.rs | 71 +++--- iroh-sync/Cargo.toml | 2 +- iroh-test/Cargo.toml | 2 +- iroh/Cargo.toml | 2 +- iroh/src/downloader.rs | 30 ++- iroh/src/sync_engine/gossip.rs | 23 +- iroh/src/sync_engine/live.rs | 57 ++--- 15 files changed, 331 insertions(+), 291 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3f64ef23d..4bb569787a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ env: RUST_BACKTRACE: 1 RUSTFLAGS: -Dwarnings RUSTDOCFLAGS: -Dwarnings - MSRV: "1.70" + MSRV: "1.71" jobs: build_and_test_nix: diff --git a/Cargo.lock b/Cargo.lock index 094f3258a3..1eaa8b49da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,9 +52,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -94,9 +94,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -108,15 +108,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -202,13 +202,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -391,9 +391,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "block-buffer" @@ -410,6 +410,15 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78a6932c88f1d2c29533a3b8a5f5a2f84cc19c3339b431677c3160c5c2e6ca85" +[[package]] +name = "bounded_join_set" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a551ab5b908bdda1554a7045f624c46cafed4032669dc588b75f2f44afd644" +dependencies = [ + "tokio", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -418,9 +427,9 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -442,9 +451,9 @@ dependencies = [ [[package]] name = "cargo-platform" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +checksum = "12024c4645c97566567129c204f65d5815a8c9aecf30fcbe682b2fe034996d36" dependencies = [ "serde", ] @@ -549,9 +558,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -559,9 +568,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -578,7 +587,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -707,18 +716,18 @@ checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "const_format" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c990efc7a285731f9a4378d81aff2f0e85a2c8781a05ef0f8baa8dac54d0ff48" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" dependencies = [ "const_format_proc_macros", ] [[package]] name = "const_format_proc_macros" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e026b6ce194a874cb9cf32cd5772d1ef9767cc8fcb5765948d74f37a9d8b2bf6" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" dependencies = [ "proc-macro2", "quote", @@ -988,7 +997,7 @@ checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1012,7 +1021,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1023,7 +1032,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1033,7 +1042,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "lock_api", "once_cell", "parking_lot_core", @@ -1115,15 +1124,16 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ + "powerfmt", "serde", ] @@ -1144,7 +1154,7 @@ checksum = "df541e0e2a8069352be228ce4b85a1da6f59bfd325e56f57e4b241babbc3f832" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", "unicode-xid", ] @@ -1227,7 +1237,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1275,9 +1285,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "2.2.2" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ "pkcs8", "serde", @@ -1318,9 +1328,9 @@ checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" [[package]] name = "elliptic-curve" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "968405c8fdc9b3bf4df0a6638858cc0b52462836ab6b1c87377785dd09cf1c0b" +checksum = "d97ca172ae9dc9f9b779a6e3a65d308f2af74e5b8c921299075bdb4a0370e914" dependencies = [ "base16ct", "crypto-bigint", @@ -1376,15 +1386,15 @@ dependencies = [ [[package]] name = "enum-ordinalize" -version = "3.1.13" +version = "3.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4f76552f53cefc9a7f64987c3701b99d982f7690606fd67de1d09712fbf52f1" +checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee" dependencies = [ "num-bigint", "num-traits", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1410,25 +1420,14 @@ checksum = "76a5aa24577083f8190ad401e376b55887c7cd9083ae95d83ceec5d28ea78125" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys 0.48.0", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "error-code" version = "2.3.1" @@ -1441,9 +1440,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fastrand" @@ -1480,9 +1479,9 @@ checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d" [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -1584,7 +1583,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1755,9 +1754,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" dependencies = [ "ahash 0.8.3", "allocator-api2", @@ -2034,12 +2033,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "serde", ] @@ -2237,7 +2236,7 @@ dependencies = [ "ed25519-dalek", "futures", "genawaiter", - "indexmap 2.0.1", + "indexmap 2.0.2", "iroh-blake3", "iroh-metrics", "iroh-net", @@ -2290,6 +2289,7 @@ dependencies = [ "aead", "anyhow", "backoff", + "bounded_join_set", "bytes", "clap", "criterion", @@ -2355,7 +2355,6 @@ dependencies = [ "tracing", "tracing-subscriber", "trust-dns-resolver", - "ucd-parse", "url", "webpki-roots", "wg", @@ -2471,15 +2470,15 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libm" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "linked-hash-map" @@ -2489,9 +2488,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" @@ -2515,7 +2514,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -2577,9 +2576,9 @@ checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memoffset" @@ -2870,9 +2869,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", "libm", @@ -2906,7 +2905,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -3005,7 +3004,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -3139,7 +3138,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -3170,7 +3169,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -3333,6 +3332,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -3341,9 +3346,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "precis-core" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65ec1da47f90d677a62a9cc338a71be0fa7af7a15dd0914879acd59d0b2e172c" +checksum = "d73e9dd26361c32e7cd13d1032bb01c4e26a23287274e8a4e2f228cf2c9ff77b" dependencies = [ "precis-tools", "ucd-parse", @@ -3352,9 +3357,9 @@ dependencies = [ [[package]] name = "precis-profiles" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053f4c98a1da64c07d41082f3c304e8958598a9562ba99f63065004f3307d49f" +checksum = "688124d96df311c37d794fb574bb5f75cdc86d8c5e514d3816d770a3dd0e1568" dependencies = [ "lazy_static", "precis-core", @@ -3364,9 +3369,9 @@ dependencies = [ [[package]] name = "precis-tools" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317d05520f469febc928408153e5a65f3ffac1499674b0b65be15306bd322704" +checksum = "d07ecadec70b0f560f09abf815ae0ee1a940d38d2354c938ba7229ac7c9f5f52" dependencies = [ "lazy_static", "regex", @@ -3460,9 +3465,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -3487,24 +3492,24 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "proptest" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e35c06b98bf36aba164cc17cb25f7e232f5c4aeea73baa14b8a9f0d92dbfa65" +checksum = "7c003ac8c77cb07bb74f5f198bce836a689bcd5a42574612bf14d17bfd08c20e" dependencies = [ "bit-set", - "bitflags 1.3.2", - "byteorder", + "bit-vec", + "bitflags 2.4.1", "lazy_static", "num-traits", "rand", "rand_chacha", "rand_xorshift", - "regex-syntax 0.6.29", + "regex-syntax 0.7.5", "rusty-fork", "tempfile", "unarray", @@ -3752,9 +3757,9 @@ dependencies = [ [[package]] name = "rcgen" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4426f9f4d65c83b570885bee479ba4c5e78d7a5286c8a58e3d2570462121b447" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" dependencies = [ "pem 3.0.2", "ring", @@ -3818,14 +3823,14 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "reflink-copy" -version = "0.1.8" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e3947399fd46f412918bafde71ec68f9b3505f11ef082eeb80bc7fdf4d7caf" +checksum = "8c9bd37fcf997c2d9ec7ebdff893c396677664164cf72105b063ac4a483702d3" dependencies = [ "cfg-if", "ioctl-sys", @@ -3834,14 +3839,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -3855,15 +3860,21 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -3876,11 +3887,17 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.4", "bytes", @@ -3904,6 +3921,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-rustls", "tower-service", @@ -4035,11 +4053,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.14" +version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", @@ -4113,7 +4131,7 @@ version = "12.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "994eca4bca05c87e86e15d90fc7a91d1be64b4482b38cb2d27474568fe7c9db9" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if", "clipboard-win", "fd-lock", @@ -4224,18 +4242,18 @@ checksum = "4c309e515543e67811222dbc9e3dd7e1056279b782e1dacffe4242b718734fb6" [[package]] name = "semver" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" dependencies = [ "serde", ] [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] @@ -4251,13 +4269,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4266,7 +4284,7 @@ version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -4312,7 +4330,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_json", "serde_with_macros", @@ -4328,7 +4346,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4360,9 +4378,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1b21f559e07218024e7e9f90f96f601825397de0e25420135f7f952453fed0b" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -4518,9 +4536,9 @@ dependencies = [ [[package]] name = "ssh-key" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "728fdf5286c394f12d83eaad51190af629aade5494813ce6b57f7425f1ca51b7" +checksum = "2180b3bc4955efd5661a97658d3cf4c8107e0d132f619195afe9486c13cca313" dependencies = [ "ed25519-dalek", "p256", @@ -4580,7 +4598,7 @@ dependencies = [ "proc-macro2", "quote", "struct_iterable_internal", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4598,7 +4616,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4609,7 +4627,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4624,7 +4642,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.2", + "strum_macros 0.25.3", ] [[package]] @@ -4642,22 +4660,22 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ "heck", "proc-macro2", "quote", "rustversion", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "stun-rs" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e54b2dabc0934dd9c6916bc60c6b412082a5509b9fdffe361137c5d08667b0" +checksum = "78517bf347f802eba0204cdecf5ff10fb7056c914a3b2d9f2011f231cb1438b5" dependencies = [ "bounded-integer", "byteorder", @@ -4710,9 +4728,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.37" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -4811,14 +4829,14 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "testdir" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48b7965698cfb3d1ac1e6e54b4b45f5caa9e89bda223c8cf723d9cf53d7cefa7" +checksum = "480060a2e7e1d3c779d3dea588a81c0df78b6a6322b7ce25c0d2ec14a0d5d869" dependencies = [ "anyhow", "backtrace", @@ -4845,7 +4863,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -4860,12 +4878,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -4913,9 +4932,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -4949,7 +4968,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -5069,7 +5088,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -5138,11 +5157,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -5151,20 +5169,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -5278,12 +5296,11 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ucd-parse" -version = "0.1.10" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc2d0556a998f4c55500ce1730901ba32bafbe820068cbdc091421525d61253b" +checksum = "212c59636157b18c2f57eed2799e6606c52fc49c6a11685ffb0d08f06e55f428" dependencies = [ - "once_cell", - "regex", + "regex-lite", ] [[package]] @@ -5438,7 +5455,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -5472,7 +5489,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5749,9 +5766,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index 879dc0fe98..d7cf1fbb7d 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -9,7 +9,7 @@ authors = ["dignifiedquire ", "n0 team"] repository = "https://github.com/n0-computer/iroh" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] anyhow = { version = "1", features = ["backtrace"] } diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 23f1f476f0..66c911c0bb 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -9,7 +9,7 @@ authors = ["n0 team"] repository = "https://github.com/n0-computer/iroh-sync" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] # proto dependencies (required) diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index 6d91223206..cb761f2edc 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -4,13 +4,15 @@ use std::{collections::HashMap, io, pin::Pin, time::Instant}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::future::BoxFuture; use iroh_net::{key::PublicKey, MagicEndpoint, PeerAddr}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + task::JoinSet, time::{sleep_until, Sleep}, }; use tokio_util::sync::CancellationToken; +use tracing::error; use crate::proto::util::TimerMap; @@ -88,7 +90,7 @@ pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result, + pending: JoinSet<(PublicKey, anyhow::Result)>, pending_peers: HashMap, } @@ -113,16 +115,14 @@ impl Dialer { let cancel = CancellationToken::new(); self.pending_peers.insert(peer_id, cancel.clone()); let endpoint = self.endpoint.clone(); - let fut = async move { + self.pending.spawn(async move { let res = tokio::select! { biased; _ = cancel.cancelled() => Err(anyhow!("Cancelled")), res = endpoint.connect(PeerAddr::new(peer_id), alpn_protocol) => res }; (peer_id, res) - } - .boxed(); - self.pending.push(fut.boxed()); + }); } /// Abort a pending dial @@ -141,8 +141,22 @@ impl Dialer { pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result) { match self.pending_peers.is_empty() { false => { - let (peer_id, res) = self.pending.next().await.unwrap(); - self.pending_peers.remove(&peer_id); + let (peer_id, res) = loop { + match self.pending.join_next().await { + Some(Ok((peer_id, res))) => { + self.pending_peers.remove(&peer_id); + break (peer_id, res); + } + Some(Err(e)) => { + error!("next conn error: {:?}", e); + } + None => { + error!("no more pending conns available"); + futures::future::pending().await + } + } + }; + (peer_id, res) } true => futures::future::pending().await, @@ -162,11 +176,15 @@ impl futures::Stream for Dialer { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match self.pending.poll_next_unpin(cx) { - std::task::Poll::Ready(Some((peer_id, result))) => { + match self.pending.poll_join_next(cx) { + std::task::Poll::Ready(Some(Ok((peer_id, result)))) => { self.pending_peers.remove(&peer_id); std::task::Poll::Ready(Some((peer_id, result))) } + std::task::Poll::Ready(Some(Err(e))) => { + error!("dialer error: {:?}", e); + std::task::Poll::Pending + } _ => std::task::Poll::Pending, } } diff --git a/iroh-metrics/Cargo.toml b/iroh-metrics/Cargo.toml index 09edc44df7..52f4ed1fa9 100644 --- a/iroh-metrics/Cargo.toml +++ b/iroh-metrics/Cargo.toml @@ -9,7 +9,7 @@ authors = ["arqu ", "n0 team"] repository = "https://github.com/n0-computer/iroh" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] prometheus-client = { version = "0.21.0", optional = true } diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 5f87564f18..466d9c04d1 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -9,7 +9,7 @@ authors = ["dignifiedquire ", "n0 team"] repository = "https://github.com/n0-computer/iroh" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] anyhow = { version = "1", features = ["backtrace"] } @@ -58,7 +58,6 @@ tokio-util = { version = "0.7", features = ["io-util", "io", "codec"] } tokio-rustls = { version = "0.24" } tokio-rustls-acme = { version = "0.2" } tokio-stream = { version = "0.1", features = ["sync"]} -ucd-parse = "=0.1.10" # pinned to avoid having to bump MSRV to 1.70 (recursive dep of stun-rs) url = { version = "2.4", features = ["serde"] } webpki = { package = "rustls-webpki", version = "0.101.4", features = ["std"] } webpki-roots = "0.25" @@ -79,6 +78,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr # metrics iroh-metrics = { version = "0.7.0", path = "../iroh-metrics", default-features = false } +bounded_join_set = "0.1.0" [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] netlink-packet-core = "0.7.0" diff --git a/iroh-net/src/derp/http/client.rs b/iroh-net/src/derp/http/client.rs index b302970b6d..ab59530139 100644 --- a/iroh-net/src/derp/http/client.rs +++ b/iroh-net/src/derp/http/client.rs @@ -8,7 +8,6 @@ use std::time::Duration; use anyhow::bail; use bytes::Bytes; use futures::future::BoxFuture; -use futures::StreamExt; use hyper::upgrade::{Parts, Upgraded}; use hyper::{header::UPGRADE, Body, Request}; use iroh_metrics::inc; @@ -642,31 +641,32 @@ impl Client { } // usually 1 IPv4, 1 IPv6 and 2x http const DIAL_PARALLELISM: usize = 4; - - let this = self.clone(); - let mut dials = futures::stream::iter(reg.nodes.clone().into_iter()) - .map(|node| { - let this = this.clone(); - let target = target.clone(); - async move { - if node.stun_only { - return Err(ClientError::StunOnlyNodesFound(target)); - } - - this.dial_node(&node).await.map(|c| (c, node)) + let mut dials = bounded_join_set::JoinSet::new(DIAL_PARALLELISM); + + for node in reg.nodes.clone().into_iter() { + let this = self.clone(); + let target = target.clone(); + dials.spawn(async move { + if node.stun_only { + return Err(ClientError::StunOnlyNodesFound(target)); } - }) - .buffer_unordered(DIAL_PARALLELISM); + + this.dial_node(&node).await.map(|c| (c, node)) + }); + } let mut errs = Vec::new(); - while let Some(res) = dials.next().await { + while let Some(res) = dials.join_next().await { match res { - Ok((conn, node)) => { + Ok(Ok((conn, node))) => { // return on the first successfull one trace!("dialed region"); return Ok((conn, node)); } Err(e) => { + warn!("dial join error: {:?}", e); + } + Ok(Err(e)) => { errs.push(e); } } diff --git a/iroh-net/src/netcheck/reportgen.rs b/iroh-net/src/netcheck/reportgen.rs index 1e91f8794e..b039e27e26 100644 --- a/iroh-net/src/netcheck/reportgen.rs +++ b/iroh-net/src/netcheck/reportgen.rs @@ -23,12 +23,11 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use iroh_metrics::inc; use rand::seq::IteratorRandom; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinSet; use tokio::time::{self, Instant}; use tracing::{debug, debug_span, error, info_span, instrument, trace, warn, Instrument, Span}; @@ -233,7 +232,7 @@ impl Actor { let mut port_mapping = self.prepare_portmapper_task(); let mut captive_task = self.prepare_captive_portal_task(); - let mut probes = self.prepare_probes_task().await?; + let mut probes = self.spawn_probes_task().await?; let total_timer = tokio::time::sleep(OVERALL_PROBE_TIMEOUT); tokio::pin!(total_timer); @@ -253,6 +252,7 @@ impl Actor { _ = &mut probe_timer => { warn!("probes timed out"); + probes.abort_all(); self.handle_abort_probes(); } @@ -265,12 +265,17 @@ impl Actor { trace!("portmapper future done"); } - // Drive the probes. - set_result = probes.next(), if self.outstanding_tasks.probes => { + // Check for probes finishing. + set_result = probes.join_next(), if self.outstanding_tasks.probes => { match set_result { - Some(Ok(report)) => self.handle_probe_report(report), - Some(Err(_)) => (), - None => self.handle_abort_probes(), + Some(Ok(Ok(report))) => self.handle_probe_report(report), + Some(Ok(Err(_))) => (), + Some(Err(e)) => { + warn!("probes task error: {:?}", e); + } + None => { + self.handle_abort_probes(); + } } } @@ -551,8 +556,8 @@ impl Actor { /// Probes operate like the following: /// /// - A future is created for each probe in all probe sets. - /// - All probes in a set are grouped in [`FuturesUnordered`]. - /// - All those probe sets are grouped in one overall [`FuturesUnordered`]. + /// - All probes in a set are grouped in [`JoinSet`]. + /// - All those probe sets are grouped in one overall [`JoinSet`]. /// - This future is polled by the main actor loop to make progress. /// - Once a probe future is polled: /// - Many probes start with a delay, they sleep during this time. @@ -564,9 +569,7 @@ impl Actor { /// failure permanent. Probes in a probe set are essentially retries. /// - Once there are [`ProbeReport`]s from enough regions, all remaining probes are /// aborted. That is, the main actor loop stops polling them. - async fn prepare_probes_task( - &mut self, - ) -> Result>>>>> { + async fn spawn_probes_task(&mut self) -> Result>> { let if_state = interfaces::State::new().await; let plan = match self.last_report { Some(ref report) => ProbePlan::with_last_report(&self.derp_map, &if_state, report), @@ -587,9 +590,9 @@ impl Actor { }; // A collection of futures running probe sets. - let probes = FuturesUnordered::default(); + let mut probes = JoinSet::default(); for probe_set in plan.iter() { - let mut set = FuturesUnordered::default(); + let mut set = JoinSet::default(); for probe in probe_set { let reportstate = self.addr(); let stun_sock4 = self.stun_sock4.clone(); @@ -599,42 +602,44 @@ impl Actor { let netcheck = self.netcheck.clone(); let pinger = pinger.clone(); - set.push(Box::pin(async move { - run_probe( - reportstate, - stun_sock4, - stun_sock6, - derp_node, - probe, - netcheck, - pinger, - ) - .await - })); + set.spawn(run_probe( + reportstate, + stun_sock4, + stun_sock6, + derp_node, + probe, + netcheck, + pinger, + )); } // Add the probe set to all futures of probe sets. Handle aborting a probe set // if needed, only normal errors means the set continues. - probes.push(Box::pin(async move { + probes.spawn(async move { // Hack because ProbeSet is not it's own type yet. let mut probe_proto = None; - while let Some(res) = set.next().await { + while let Some(res) = set.join_next().await { match res { - Ok(report) => return Ok(report), - Err(ProbeError::Error(err, probe)) => { + Ok(Ok(report)) => return Ok(report), + Ok(Err(ProbeError::Error(err, probe))) => { probe_proto = Some(probe.proto()); warn!(?probe, "probe failed: {:#}", err); continue; } - Err(ProbeError::AbortSet(err, probe)) => { + Ok(Err(ProbeError::AbortSet(err, probe))) => { debug!(?probe, "probe set aborted: {:#}", err); + set.abort_all(); return Err(err); } + Err(err) => { + warn!("fatal probe set error, aborting: {:#}", err); + continue; + } } } warn!(?probe_proto, "no successfull probes in ProbeSet"); Err(anyhow!("All probes in ProbeSet failed")) - })); + }); } self.outstanding_tasks.probes = true; diff --git a/iroh-sync/Cargo.toml b/iroh-sync/Cargo.toml index 90af0d1b43..d357e8d9e0 100644 --- a/iroh-sync/Cargo.toml +++ b/iroh-sync/Cargo.toml @@ -9,7 +9,7 @@ authors = ["n0 team"] repository = "https://github.com/n0-computer/iroh" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] anyhow = "1" diff --git a/iroh-test/Cargo.toml b/iroh-test/Cargo.toml index 66bfcb7eb1..6fc067b9cd 100644 --- a/iroh-test/Cargo.toml +++ b/iroh-test/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/n0-computer/iroh" publish = true # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] anyhow = "1" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index db39f4aaab..e312b291e6 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/n0-computer/iroh" default-run = "iroh" # Sadly this also needs to be updated in .github/workflows/ci.yml -rust-version = "1.70" +rust-version = "1.71" [dependencies] anyhow = { version = "1", features = ["backtrace"] } diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 2e4b1f0392..06a424d601 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -36,10 +36,13 @@ use std::{ }; use bao_tree::ChunkRanges; -use futures::{future::LocalBoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{future::LocalBoxFuture, FutureExt, StreamExt}; use iroh_bytes::{protocol::RangeSpecSeq, store::Store, Hash, HashAndFormat, TempTag}; use iroh_net::{key::PublicKey, MagicEndpoint}; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinSet, +}; use tokio_util::{sync::CancellationToken, time::delay_queue}; use tracing::{debug, error_span, trace, warn, Instrument}; @@ -440,8 +443,8 @@ enum PeerState { }, } -/// Type of future that performs a download request. -type DownloadFut = LocalBoxFuture<'static, (DownloadKind, Result)>; +/// Type that is returned from a download request. +type DownloadRes = (DownloadKind, Result); #[derive(Debug)] struct Service { @@ -463,7 +466,7 @@ struct Service { /// request. This map allows deduplication of efforts. current_requests: HashMap, /// Downloads underway. - in_progress_downloads: FuturesUnordered, + in_progress_downloads: JoinSet, /// Requests scheduled to be downloaded at a later time. scheduled_requests: HashMap, /// Queue of scheduled requests. @@ -486,7 +489,7 @@ impl, D: Dialer> Service { peers: HashMap::default(), goodbye_peer_queue: delay_queue::DelayQueue::default(), current_requests: HashMap::default(), - in_progress_downloads: FuturesUnordered::default(), + in_progress_downloads: Default::default(), scheduled_requests: HashMap::default(), scheduled_request_queue: delay_queue::DelayQueue::default(), } @@ -512,9 +515,16 @@ impl, D: Dialer> Service { None => return self.shutdown().await, } } - Some((kind, result)) = self.in_progress_downloads.next() => { - trace!("tick: download completed"); - self.on_download_completed(kind, result); + Some(res) = self.in_progress_downloads.join_next() => { + match res { + Ok((kind, result)) => { + trace!("tick: download completed"); + self.on_download_completed(kind, result); + } + Err(e) => { + warn!("download issue: {:?}", e); + } + } } Some(expired) = self.scheduled_request_queue.next(), if !at_capacity => { trace!("tick: scheduled request ready"); @@ -1001,7 +1011,7 @@ impl, D: Dialer> Service { (kind, res) }; - self.in_progress_downloads.push(fut.boxed_local()); + self.in_progress_downloads.spawn_local(fut); } /// Schedule a request for later processing. diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index ce5f9a2e8d..61b41a1179 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -1,17 +1,17 @@ -use std::{collections::HashSet, future::Future, pin::Pin}; +use std::collections::HashSet; use anyhow::{anyhow, Context, Result}; -use futures::{ - stream::{FuturesUnordered, StreamExt}, - FutureExt, -}; +use futures::{stream::StreamExt, FutureExt}; use iroh_gossip::{ net::{Event, Gossip}, proto::TopicId, }; use iroh_net::key::PublicKey; use iroh_sync::{actor::SyncHandle, ContentStatus, NamespaceId}; -use tokio::sync::{broadcast::error::RecvError, mpsc}; +use tokio::{ + sync::{broadcast::error::RecvError, mpsc}, + task::JoinSet, +}; use tracing::{debug, error, trace}; use super::live::{Op, ToLiveActor}; @@ -29,8 +29,6 @@ pub enum ToGossipActor { }, } -type JoinFut = Pin)> + Send + 'static>>; - /// This actor subscribes to all gossip events. When receiving entries, they are inserted in the /// replica (if open). Other events are forwarded to the main actor to be handled there. pub struct GossipActor { @@ -41,7 +39,7 @@ pub struct GossipActor { to_sync_actor: mpsc::Sender, joined: HashSet, want_join: HashSet, - pending_joins: FuturesUnordered, + pending_joins: JoinSet<(NamespaceId, Result)>, } impl GossipActor { @@ -84,7 +82,7 @@ impl GossipActor { break; } } - res = self.pending_joins.next(), if !self.pending_joins.is_empty() => { + Some(res) = self.pending_joins.join_next(), if !self.pending_joins.is_empty() => { trace!(?i, "tick: pending_joins"); let (namespace, res) = res.context("pending_joins closed")?; match res { @@ -119,10 +117,9 @@ impl GossipActor { .gossip .join(namespace.into(), peers) .await? - .map(move |res| (namespace, res)) - .boxed(); + .map(move |res| (namespace, res)); self.want_join.insert(namespace); - self.pending_joins.push(fut); + self.pending_joins.spawn(fut); } ToGossipActor::Leave { namespace } => { self.gossip.quit(namespace.into()).await?; diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 6ab18dc41b..549c46e252 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -7,11 +7,7 @@ use std::{ use crate::downloader::{DownloadKind, Downloader, PeerRole}; use anyhow::{Context, Result}; -use futures::{ - future::BoxFuture, - stream::{FuturesUnordered, StreamExt}, - FutureExt, -}; +use futures::FutureExt; use iroh_bytes::{store::EntryStatus, Hash}; use iroh_gossip::{net::Gossip, proto::TopicId}; use iroh_net::{key::PublicKey, MagicEndpoint, PeerAddr}; @@ -24,7 +20,10 @@ use iroh_sync::{ ContentStatus, InsertOrigin, NamespaceId, SignedEntry, }; use serde::{Deserialize, Serialize}; -use tokio::sync::{self, mpsc, oneshot}; +use tokio::{ + sync::{self, mpsc, oneshot}, + task::JoinSet, +}; use tracing::{debug, error, instrument, trace, warn, Instrument, Span}; use super::gossip::ToGossipActor; @@ -114,16 +113,13 @@ pub enum Event { SyncFinished(SyncEvent), } -type SyncConnectFut = BoxFuture< - 'static, - ( - NamespaceId, - PublicKey, - SyncReason, - Result, - ), ->; -type SyncAcceptFut = BoxFuture<'static, Result>; +type SyncConnectRes = ( + NamespaceId, + PublicKey, + SyncReason, + Result, +); +type SyncAcceptRes = Result; // Currently peers might double-sync in both directions. pub struct LiveActor { @@ -146,12 +142,11 @@ pub struct LiveActor { gossip_actor_tx: mpsc::Sender, /// Running sync futures (from connect). - #[allow(clippy::type_complexity)] - running_sync_connect: FuturesUnordered, + running_sync_connect: JoinSet, /// Running sync futures (from accept). - running_sync_accept: FuturesUnordered, + running_sync_accept: JoinSet, /// Runnning download futures. - pending_downloads: FuturesUnordered>>, + pending_downloads: JoinSet>, // Subscribers to actor events subscribers: SubscribersMap, @@ -222,7 +217,7 @@ impl LiveActor { error!(?err, "Failed to process replica event"); } } - res = self.running_sync_connect.next(), if !self.running_sync_connect.is_empty() => { + Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => { trace!(?i, "tick: on_sync_via_connect_finished"); let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?; if let Err(err) = self.on_sync_via_connect_finished(namespace, peer, reason, res).await { @@ -230,14 +225,14 @@ impl LiveActor { } } - res = self.running_sync_accept.next(), if !self.running_sync_accept.is_empty() => { + Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => { trace!(?i, "tick: on_sync_via_accept_finished"); let res = res.context("running_sync_accept closed")?; if let Err(err) = self.on_sync_via_accept_finished(res).await { error!(?err, "Failed to process incoming sync request"); } } - res = self.pending_downloads.next(), if !self.pending_downloads.is_empty() => { + Some(res) = self.pending_downloads.join_next(), if !self.pending_downloads.is_empty() => { trace!(?i, "tick: pending_downloads"); let res = res.context("pending_downloads closed")?; if let Some((namespace, hash)) = res { @@ -358,9 +353,8 @@ impl LiveActor { let res = connect_and_sync(&endpoint, &sync, namespace, PeerAddr::new(peer)).await; (namespace, peer, reason, res) } - .instrument(Span::current()) - .boxed(); - self.running_sync_connect.push(fut); + .instrument(Span::current()); + self.running_sync_connect.spawn(fut); } async fn shutdown(&mut self) -> anyhow::Result<()> { @@ -622,13 +616,12 @@ impl LiveActor { .downloader .queue(DownloadKind::Blob { hash }, vec![(from, role).into()]) .await; - let fut = async move { + + self.pending_downloads.spawn(async move { // NOTE: this ignores the result for now, simply keeping the option let res = handle.await.ok(); res.map(|_| (namespace, hash)) - } - .boxed(); - self.pending_downloads.push(fut); + }); } } } @@ -665,8 +658,8 @@ impl LiveActor { }; debug!("incoming connection"); let sync = self.sync.clone(); - let fut = async move { handle_connection(sync, conn, accept_request_cb).await }.boxed(); - self.running_sync_accept.push(fut); + self.running_sync_accept + .spawn(async move { handle_connection(sync, conn, accept_request_cb).await }); } pub fn accept_sync_request(