From ca182bb13164394f225b910933cb40e79257add5 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 19 Apr 2023 20:21:54 +0800 Subject: [PATCH 01/21] Provide a way to access logs through the CLI --- Cargo.lock | 630 ++++++++++++++++++++++++-- binaries/cli/Cargo.toml | 1 + binaries/cli/src/logs.rs | 39 ++ binaries/cli/src/main.rs | 9 +- binaries/coordinator/src/lib.rs | 90 +++- binaries/daemon/src/lib.rs | 23 + binaries/daemon/src/spawn.rs | 54 ++- libraries/core/src/daemon_messages.rs | 5 + libraries/core/src/topics.rs | 6 + 9 files changed, 803 insertions(+), 54 deletions(-) create mode 100644 binaries/cli/src/logs.rs diff --git a/Cargo.lock b/Cargo.lock index 89190f76..dbe82c3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aes" version = "0.8.2" @@ -44,6 +50,15 @@ dependencies = [ "libc", ] +[[package]] +name = "ansi_colours" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7db9d9767fde724f83933a716ee182539788f293828244e9d999695ce0f7ba1e" +dependencies = [ + "rgb", +] + [[package]] name = "anyhow" version = "1.0.69" @@ -235,7 +250,7 @@ version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf6b26f6a6f8410e3b9531cbd1886399b99842701da77d4b4cf2013f7708f20f" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -480,6 +495,43 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bat" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd4b13b0233143ae151a66e0135d715b65f631d1028c40502cc88182bcb9f4fa" +dependencies = [ + "ansi_colours", + "atty", + "bincode", + "bugreport", + "bytesize", + "clap 4.1.11", + "clircle", + "console", + "content_inspector", + "dirs 5.0.0", + "encoding", + "flate2", + "git2", + "globset", + "grep-cli", + "nu-ansi-term 0.47.0", + "once_cell", + "path_abs", + "plist", + "regex", + "semver", + "serde", + "serde_yaml 0.8.23", + "shell-words", + "syntect", + "thiserror", + "unicode-width", + "walkdir", + "wild", +] + [[package]] name = "benchmark-example-node" version = "0.2.2" @@ -518,6 +570,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07d1372b52fa64b8af7cf6e3848cfa2dadb81c21e810f144bdd9c26e21895235" + [[package]] name = "bitvec" version = "1.0.1" @@ -574,6 +632,29 @@ dependencies = [ "once_cell", ] +[[package]] +name = "bstr" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d4260bcc2e8fc9df1eac4919a720effeb63a3f0952f5bf4944adfa18897f09" +dependencies = [ + "memchr", + "once_cell", + "regex-automata", + "serde", +] + +[[package]] +name = "bugreport" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "535120b8182547808081a66f1f77a64533c780b23da26763e0ee34dfb94f98c9" +dependencies = [ + "git-version", + "shell-escape", + "sys-info", +] + [[package]] name = "bumpalo" version = "3.12.0" @@ -586,6 +667,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" +[[package]] +name = "bytemuck" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17febce684fd15d89027105661fec94afb475cb995fbc59d2865198446ba2eea" + [[package]] name = "byteorder" version = "1.4.3" @@ -598,6 +685,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "bytesize" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" + [[package]] name = "cache-padded" version = "1.2.0" @@ -624,6 +717,9 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] [[package]] name = "cesu8" @@ -666,7 +762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_derive 3.2.18", "clap_lex 0.2.4", "indexmap", @@ -678,17 +774,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.3" +version = "4.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3943ff31339d6d148b4e9cfb9b1eb0f814989ed21dfede3c2d2e11f3d9497f60" +checksum = "42dfd32784433290c51d92c438bb72ea5063797fc3cc9a21a8c4346bebbb2098" dependencies = [ - "atty", - "bitflags", - "clap_derive 4.0.1", + "bitflags 2.2.0", + "clap_derive 4.1.9", "clap_lex 0.3.0", + "is-terminal", "once_cell", "strsim", "termcolor", + "terminal_size", ] [[package]] @@ -706,9 +803,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.0.1" +version = "4.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca689d7434ce44517a12a89456b2be4d1ea1cafcd8f581978c03d45f5a5c12a7" +checksum = "fddf67631444a3a3e3e5ac51c36a5e01335302de677bd78759eaa90ab1f46644" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -735,6 +832,18 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clircle" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e68bbd985a63de680ab4d1ad77b6306611a8f961b282c8b5ab513e6de934e396" +dependencies = [ + "cfg-if", + "libc", + "serde", + "winapi", +] + [[package]] name = "codespan-reporting" version = "0.11.1" @@ -785,6 +894,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys 0.42.0", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -813,6 +935,15 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "content_inspector" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7bda66e858c683005a53a9a60c69a4aca7eeaa45d124526e389f7aec8e62f38" +dependencies = [ + "memchr", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -838,6 +969,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -888,7 +1028,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crossterm_winapi", "libc", "mio", @@ -1073,7 +1213,16 @@ version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" dependencies = [ - "dirs-sys", + "dirs-sys 0.3.7", +] + +[[package]] +name = "dirs" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd" +dependencies = [ + "dirs-sys 0.4.0", ] [[package]] @@ -1097,6 +1246,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "dirs-sys" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04414300db88f70d74c5ff54e50f9e1d1737d9a5b90f53fcf2e95ca2a9ab554b" +dependencies = [ + "libc", + "redox_users", + "windows-sys 0.45.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -1113,7 +1273,8 @@ name = "dora-cli" version = "0.2.2" dependencies = [ "atty", - "clap 4.0.3", + "bat", + "clap 4.1.11", "communication-layer-request-reply", "ctrlc", "dora-core", @@ -1213,7 +1374,7 @@ dependencies = [ name = "dora-examples" version = "0.0.0" dependencies = [ - "clap 4.0.3", + "clap 4.1.11", "dora-coordinator", "dora-core", "dora-daemon", @@ -1362,7 +1523,7 @@ dependencies = [ name = "dora-runtime" version = "0.2.2" dependencies = [ - "clap 4.0.3", + "clap 4.1.11", "dora-core", "dora-download", "dora-metrics", @@ -1417,6 +1578,76 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + +[[package]] +name = "encoding" +version = "0.2.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec" +dependencies = [ + "encoding-index-japanese", + "encoding-index-korean", + "encoding-index-simpchinese", + "encoding-index-singlebyte", + "encoding-index-tradchinese", +] + +[[package]] +name = "encoding-index-japanese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91" +dependencies = [ + "encoding_index_tests", +] + +[[package]] +name = "encoding-index-korean" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81" +dependencies = [ + "encoding_index_tests", +] + +[[package]] +name = "encoding-index-simpchinese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87a7194909b9118fc707194baa434a4e3b0fb6a5a757c73c3adb07aa25031f7" +dependencies = [ + "encoding_index_tests", +] + +[[package]] +name = "encoding-index-singlebyte" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3351d5acffb224af9ca265f435b859c7c01537c0849754d3db3fdf2bfe2ae84a" +dependencies = [ + "encoding_index_tests", +] + +[[package]] +name = "encoding-index-tradchinese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd0e20d5688ce3cab59eb3ef3a2083a5c77bf496cb798dc6fcdb75f323890c18" +dependencies = [ + "encoding_index_tests", +] + +[[package]] +name = "encoding_index_tests" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569" + [[package]] name = "encoding_rs" version = "0.8.31" @@ -1450,6 +1681,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "errno-dragonfly" version = "0.1.2" @@ -1544,10 +1786,20 @@ version = "23.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" dependencies = [ - "bitflags", + "bitflags 1.3.2", "rustc_version", ] +[[package]] +name = "flate2" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.10.14" @@ -1771,12 +2023,38 @@ dependencies = [ "syn", ] +[[package]] +name = "git2" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf7f68c2995f392c49fffb4f95ae2c873297830eb25c6bc4c114ce8f4562acc" +dependencies = [ + "bitflags 1.3.2", + "libc", + "libgit2-sys", + "log", + "url", +] + [[package]] name = "glob" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "globset" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029d74589adefde59de1a0c4f4732695c32805624aec7b68d91503d4dba79afc" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", +] + [[package]] name = "gloo-timers" version = "0.2.3" @@ -1789,6 +2067,23 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "grep-cli" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d19fc6687bc64b6719a839cd24f2c700bcb05ffeb684d19da6a637c2455a7ba1" +dependencies = [ + "atty", + "bstr", + "globset", + "lazy_static", + "log", + "regex", + "same-file", + "termcolor", + "winapi-util", +] + [[package]] name = "h2" version = "0.3.17" @@ -2035,7 +2330,7 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" dependencies = [ - "bitflags", + "bitflags 1.3.2", "inotify-sys", "libc", ] @@ -2064,7 +2359,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6055ce38cac9b10ac819ed4a509d92ccbc60808152c19ff9121c98198964272" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crossterm", "dyn-clone", "lazy_static", @@ -2144,7 +2439,7 @@ checksum = "21b6b32576413a8e69b90e952e4a026476040d81017b80445deda5f2d3921857" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", - "rustix", + "rustix 0.36.9", "windows-sys 0.45.0", ] @@ -2183,6 +2478,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +[[package]] +name = "jobserver" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.56" @@ -2225,7 +2529,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", ] @@ -2317,6 +2621,18 @@ version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +[[package]] +name = "libgit2-sys" +version = "0.14.2+1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f3d95f6b51075fe9810a7ae22c7095f12b98005ab364d8544797a825ce946a4" +dependencies = [ + "cc", + "libc", + "libz-sys", + "pkg-config", +] + [[package]] name = "libloading" version = "0.7.4" @@ -2333,6 +2649,27 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db" +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "line-wrap" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30344350a2a51da54c1d53be93fade8a237e545dbcc4bdbe635413f2117cab9" +dependencies = [ + "safemem", +] + [[package]] name = "link-cplusplus" version = "1.0.6" @@ -2354,6 +2691,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "linux-raw-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f508063cc7bb32987c71511216bd5a32be15bccb6a80b52df8b9d7f01fc3aa2" + [[package]] name = "lock_api" version = "0.4.6" @@ -2458,6 +2801,15 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "miniz_oxide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.4" @@ -2532,7 +2884,7 @@ version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4916f159ed8e5de0082076562152a76b7a1f64a01fd9d1e0fea002c37624faf" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cc", "cfg-if", "libc", @@ -2545,7 +2897,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cc", "cfg-if", "libc", @@ -2558,7 +2910,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", "memoffset 0.7.1", @@ -2578,7 +2930,7 @@ version = "5.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crossbeam-channel", "filetime", "fsevent-sys", @@ -2609,6 +2961,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df031e117bca634c262e9bd3173776844b6c17a90b3741c9163663b4385af76" +dependencies = [ + "windows-sys 0.45.0", +] + [[package]] name = "num" version = "0.4.0" @@ -2728,6 +3089,28 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +[[package]] +name = "onig" +version = "6.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c4b31c8722ad9171c6d77d3557db078cab2bd50afcc9d09c8b315c59df8ca4f" +dependencies = [ + "bitflags 1.3.2", + "libc", + "once_cell", + "onig_sys", +] + +[[package]] +name = "onig_sys" +version = "69.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b829e3d7e9cc74c7e315ee8edb185bf4190da5acde74afd7fc59c35b1f086e7" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "opaque-debug" version = "0.2.3" @@ -2931,6 +3314,15 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +[[package]] +name = "path_abs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05ef02f6342ac01d8a93b65f96db53fe68a92a15f41144f97fb00a9e669633c3" +dependencies = [ + "std_prelude", +] + [[package]] name = "pem-rfc7468" version = "0.6.0" @@ -3053,6 +3445,26 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" + +[[package]] +name = "plist" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd9647b268a3d3e14ff09c23201133a62589c658db02bb7388c7246aafe0590" +dependencies = [ + "base64 0.21.0", + "indexmap", + "line-wrap", + "quick-xml", + "serde", + "time", +] + [[package]] name = "pnet" version = "0.31.0" @@ -3324,6 +3736,15 @@ dependencies = [ "syn", ] +[[package]] +name = "quick-xml" +version = "0.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1" +dependencies = [ + "memchr", +] + [[package]] name = "quinn" version = "0.9.3" @@ -3472,7 +3893,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -3547,6 +3968,15 @@ dependencies = [ "winreg", ] +[[package]] +name = "rgb" +version = "0.8.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20ec2d3e3fc7a92ced357df9cebd5a10b6fb2aa1ee797bf7e9ce2f17dffc8f59" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.16.20" @@ -3640,11 +4070,25 @@ version = "0.36.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc" dependencies = [ - "bitflags", - "errno", + "bitflags 1.3.2", + "errno 0.2.8", + "io-lifetimes", + "libc", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + +[[package]] +name = "rustix" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b24138615de35e32031d041a09032ef3487a616d901ca4db224e7d557efae2" +dependencies = [ + "bitflags 1.3.2", + "errno 0.3.1", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.2", "windows-sys 0.45.0", ] @@ -3687,6 +4131,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +[[package]] +name = "safemem" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" + [[package]] name = "safer-ffi" version = "0.1.0-rc1" @@ -3764,7 +4214,7 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3921,6 +4371,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "shell-escape" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" + +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shellexpand" version = "2.1.0" @@ -3936,7 +4398,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd1c7ddea665294d484c39fd0c0d2b7e35bbfe10035c5fe1854741a57f6880e1" dependencies = [ - "dirs", + "dirs 4.0.0", ] [[package]] @@ -4032,6 +4494,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "std_prelude" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8207e78455ffdf55661170876f88daf85356e4edd54e0a3dbc79586ca1e50cbe" + [[package]] name = "stop-token" version = "0.7.0" @@ -4067,6 +4535,39 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syntect" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6c454c27d9d7d9a84c7803aaa3c50cd088d2906fe3c6e42da3209aa623576a8" +dependencies = [ + "bincode", + "bitflags 1.3.2", + "flate2", + "fnv", + "lazy_static", + "once_cell", + "onig", + "plist", + "regex-syntax", + "serde", + "serde_derive", + "serde_json", + "thiserror", + "walkdir", + "yaml-rust", +] + +[[package]] +name = "sys-info" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b3a0d0aba8bf96a0e1ddfdc352fc53b3df7f39318c71854910c3c4b024ae52c" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "sysinfo" version = "0.24.5" @@ -4103,7 +4604,7 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall", - "rustix", + "rustix 0.36.9", "windows-sys 0.42.0", ] @@ -4116,6 +4617,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237" +dependencies = [ + "rustix 0.37.3", + "windows-sys 0.48.0", +] + [[package]] name = "textwrap" version = "0.16.0" @@ -4173,6 +4684,33 @@ dependencies = [ "threadpool", ] +[[package]] +name = "time" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" +dependencies = [ + "itoa", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +dependencies = [ + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -4428,7 +4966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ "matchers", - "nu-ansi-term", + "nu-ansi-term 0.46.0", "once_cell", "regex", "sharded-slab", @@ -4500,9 +5038,9 @@ checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" [[package]] name = "unicode-width" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" +checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" [[package]] name = "unindent" @@ -4610,6 +5148,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -4738,7 +5282,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa61ff77f695a94d9c8558e0bb5c362a8fd1f27c74663770fbc633acbafedbb6" dependencies = [ "core-foundation", - "dirs", + "dirs 4.0.0", "jni", "log", "ndk-context", @@ -4795,6 +5339,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8" +[[package]] +name = "wild" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b116685a6be0c52f5a103334cbff26db643826c7b3735fc0a3ba9871310a74" +dependencies = [ + "glob", +] + [[package]] name = "winapi" version = "0.3.9" @@ -4885,6 +5438,15 @@ dependencies = [ "windows-targets 0.42.1", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", +] + [[package]] name = "windows-targets" version = "0.42.1" diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index e7bab9c8..aea69bef 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -35,3 +35,4 @@ notify = "5.1.0" ctrlc = "3.2.5" tracing = "0.1.36" dora-tracing = { workspace = true, optional = true } +bat = "0.23.0" diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs new file mode 100644 index 00000000..f96d06a2 --- /dev/null +++ b/binaries/cli/src/logs.rs @@ -0,0 +1,39 @@ +use dora_core::topics::{ControlRequest, ControlRequestReply}; +use eyre::{bail, Context, Result}; +use uuid::Uuid; + +use crate::control_connection; +use bat::{Input, PrettyPrinter}; + +pub fn logs(uuid: Option, name: Option, node: String) -> Result<()> { + let mut control_session = None; + let connection = control_connection(&mut control_session)?; + let logs = { + let reply_raw = connection + .request(&serde_json::to_vec(&ControlRequest::Logs { + uuid, + name, + node: node.clone(), + })?) + .wrap_err("failed to send DaemonConnected message")?; + + let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match reply { + ControlRequestReply::Logs { logs } => logs, + other => bail!("unexpected reply to daemon connection check: {other:?}"), + } + }; + + PrettyPrinter::new() + .header(true) + .grid(true) + .line_numbers(true) + .paging_mode(bat::PagingMode::Always) + .inputs(vec![Input::from_bytes(&logs) + .name("Logs") // TODO: Make a better name + .title(format!("Logs from {node}.").as_str())]) + .print() + .unwrap(); + + Ok(()) +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 6c7daf04..b7e0154f 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -16,6 +16,7 @@ mod attach; mod build; mod check; mod graph; +mod logs; mod template; mod up; @@ -85,7 +86,12 @@ enum Command { List, // Planned for future releases: // Dashboard, - // Logs, + Logs { + uuid: Option, + #[clap(long)] + name: Option, + node: String, + }, // Metrics, // Stats, // Get, @@ -166,6 +172,7 @@ fn run() -> eyre::Result<()> { coordinator_path.as_deref(), daemon_path.as_deref(), )?, + Command::Logs { uuid, name, node } => logs::logs(uuid, name, node)?, Command::Start { dataflow, name, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 4a51dce5..654c3f6b 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -79,6 +79,25 @@ pub async fn start( Ok((port, future)) } +fn resolve_name( + name: String, + running_dataflows: &HashMap, +) -> eyre::Result { + let uuids: Vec<_> = running_dataflows + .iter() + .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) + .map(|(k, _)| k) + .copied() + .collect(); + if uuids.is_empty() { + bail!("no running dataflow with name `{name}`"); + } else if let [uuid] = uuids.as_slice() { + Ok(*uuid) + } else { + bail!("multiple dataflows found with name `{name}`"); + } +} + async fn start_inner( listener: TcpListener, tasks: &FuturesUnordered>, @@ -325,20 +344,7 @@ async fn start_inner( } ControlRequest::StopByName { name } => { let stop = async { - let uuids: Vec<_> = running_dataflows - .iter() - .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) - .map(|(k, _)| k) - .copied() - .collect(); - let dataflow_uuid = if uuids.is_empty() { - bail!("no running dataflow with name `{name}`"); - } else if let [uuid] = uuids.as_slice() { - *uuid - } else { - bail!("multiple dataflows found with name `{name}`"); - }; - + let dataflow_uuid = resolve_name(name, &running_dataflows)?; stop_dataflow( &running_dataflows, dataflow_uuid, @@ -350,6 +356,24 @@ async fn start_inner( stop.await .map(|uuid| ControlRequestReply::DataflowStopped { uuid }) } + ControlRequest::Logs { uuid, name, node } => { + let dataflow_uuid = if let Some(uuid) = uuid { + uuid + } else if let Some(name) = name { + resolve_name(name, &running_dataflows)? + } else { + bail!("No uuid") + }; + + retrieve_logs( + &running_dataflows, + dataflow_uuid, + node.into(), + &mut daemon_connections, + ) + .await + .map(|logs| ControlRequestReply::Logs { logs }) + } ControlRequest::Destroy => { tracing::info!("Received destroy command"); @@ -586,6 +610,44 @@ async fn reload_dataflow( Ok(()) } +async fn retrieve_logs( + running_dataflows: &HashMap, + dataflow_id: Uuid, + node_id: NodeId, + daemon_connections: &mut HashMap, +) -> eyre::Result> { + let Some(dataflow) = running_dataflows.get(&dataflow_id) else { + bail!("No running dataflow found with UUID `{dataflow_id}`") + }; + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs { + dataflow_id, + node_id, + })?; + let mut reply_logs = Vec::new(); + for machine_id in &dataflow.machines { + let daemon_connection = daemon_connections + .get_mut(machine_id) + .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + tcp_send(daemon_connection, &message) + .await + .wrap_err("failed to send reload message to daemon")?; + + // wait for reply + let reply_raw = tcp_receive(daemon_connection) + .await + .wrap_err("failed to receive reload reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize reload reply from daemon")? + { + DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, + other => bail!("unexpected reply after sending reload: {other:?}"), + } + } + tracing::info!("successfully reloaded dataflow `{dataflow_id}`"); + + Ok(reply_logs) +} + async fn start_dataflow( dataflow: Descriptor, working_dir: PathBuf, diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e43a676c..0e1a4483 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -28,6 +28,8 @@ use std::{ time::Duration, }; use tcp_utils::{tcp_receive, tcp_send}; +use tokio::fs::File; +use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, oneshot}; @@ -303,6 +305,27 @@ impl Daemon { } (None, RunStatus::Continue) } + DaemonCoordinatorEvent::Logs { + dataflow_id, + node_id, + } => { + // read file + let logs = async { + println!("logs/{node_id}.txt"); + let mut file = File::open(format!("logs/{node_id}.txt")) + .await + .wrap_err("Could not open log file")?; + + let mut contents = vec![]; + file.read_to_end(&mut contents) + .await + .wrap_err("Could not read content of log file")?; + Result::, eyre::Report>::Ok(contents) + } + .await + .expect("Could not retrieve logs"); + (DaemonCoordinatorReply::Logs { logs }, RunStatus::Continue) + } DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, node_id, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index f4ca5909..655bb21c 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -10,7 +10,12 @@ use dora_core::{ use dora_download::download_file; use eyre::WrapErr; use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; -use tokio::sync::mpsc; +use tokio::{ + fs::File, + io::{AsyncBufReadExt, AsyncWriteExt}, + sync::mpsc, +}; +use tracing::info; pub async fn spawn_node( dataflow_id: DataflowId, @@ -152,20 +157,59 @@ pub async fn spawn_node( } } - command.spawn().wrap_err(format!( - "failed to run runtime {}/{}", - runtime_config.node.dataflow_id, runtime_config.node.node_id - ))? + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .wrap_err(format!( + "failed to run runtime {}/{}", + runtime_config.node.dataflow_id, runtime_config.node.node_id + ))? } }; tokio::spawn(async move { + // let hlc = HLC::default(); + // let timestamp = hlc.new_timestamp().to_string(); + // let time = timestamp + // .split('.') + // .next() + // .expect("Could not extract date from timestamp."); // TODO: Add time within log file name + + let mut file = File::create(format!("logs/{node_id}.txt")) + .await + .expect("Failed to create log file"); + + let mut stdout_lines = + (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))) + .lines(); + + while let Ok(Some(line)) = stdout_lines.next_line().await { + file.write(line.as_bytes()).await.unwrap(); + file.write(b"\n").await.unwrap(); + info!(line); + } + + let mut stderr_lines = + (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))) + .lines(); + + while let Ok(Some(line)) = stderr_lines.next_line().await { + file.write(line.as_bytes()).await.unwrap(); + file.write(b"\n").await.unwrap(); + info!(line); + } + + file.sync_all().await.unwrap(); + let exit_status = NodeExitStatus::from(child.wait().await); let event = DoraEvent::SpawnedNodeResult { dataflow_id, node_id, exit_status, }; + let _ = daemon_tx.send(event.into()).await; }); Ok(()) diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8035571c..be2bad69 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -226,6 +226,10 @@ pub enum InterDaemonEvent { dataflow_id: DataflowId, inputs: BTreeSet<(NodeId, DataId)>, }, + Logs { + dataflow_id: Uuid, + node_id: NodeId, + }, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -235,6 +239,7 @@ pub enum DaemonCoordinatorReply { StopResult(Result<(), String>), DestroyResult(Result<(), String>), WatchdogAck, + Logs { logs: Vec }, } pub type DataflowId = Uuid; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index b9b897cf..542c55bc 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -42,6 +42,11 @@ pub enum ControlRequest { StopByName { name: String, }, + Logs { + uuid: Option, + name: Option, + node: String, + }, Destroy, List, DaemonConnected, @@ -59,6 +64,7 @@ pub enum ControlRequestReply { DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), + Logs { logs: Vec }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] From d925d05a2f8907870ebe0e881f348525c0b721c7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 14:03:47 +0800 Subject: [PATCH 02/21] Use temp_dir for log files --- binaries/daemon/src/lib.rs | 6 ++-- binaries/daemon/src/spawn.rs | 59 +++++++++++++++++++++++------------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 0e1a4483..4301e858 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -19,6 +19,7 @@ use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; use shared_memory_server::ShmemConf; use std::collections::HashSet; +use std::env::temp_dir; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -311,8 +312,9 @@ impl Daemon { } => { // read file let logs = async { - println!("logs/{node_id}.txt"); - let mut file = File::open(format!("logs/{node_id}.txt")) + let log_dir = temp_dir(); + + let mut file = File::open(log_dir.join(format!("{dataflow_id}-{node_id}.txt"))) .await .wrap_err("Could not open log file")?; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 655bb21c..1db615b1 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -9,7 +9,11 @@ use dora_core::{ }; use dora_download::download_file; use eyre::WrapErr; -use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; +use std::{ + env::{consts::EXE_EXTENSION, temp_dir}, + path::{Path, PathBuf}, + process::Stdio, +}; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt}, @@ -170,38 +174,51 @@ pub async fn spawn_node( }; tokio::spawn(async move { - // let hlc = HLC::default(); - // let timestamp = hlc.new_timestamp().to_string(); - // let time = timestamp - // .split('.') - // .next() - // .expect("Could not extract date from timestamp."); // TODO: Add time within log file name + let log_dir = temp_dir(); - let mut file = File::create(format!("logs/{node_id}.txt")) - .await - .expect("Failed to create log file"); + let (tx, mut rx) = mpsc::channel(1); + let mut file = File::create( + &log_dir + .join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), + ) + .await + .expect("Failed to create log file"); let mut stdout_lines = (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))) .lines(); - while let Ok(Some(line)) = stdout_lines.next_line().await { - file.write(line.as_bytes()).await.unwrap(); - file.write(b"\n").await.unwrap(); - info!(line); - } + let stdout_tx = tx.clone(); + + // Stdout listener stream + tokio::spawn(async move { + while let Ok(Some(line)) = stdout_lines.next_line().await { + stdout_tx.send(line).await.unwrap(); + } + }); let mut stderr_lines = (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))) .lines(); - while let Ok(Some(line)) = stderr_lines.next_line().await { - file.write(line.as_bytes()).await.unwrap(); - file.write(b"\n").await.unwrap(); - info!(line); - } + let stderr_tx = tx.clone(); - file.sync_all().await.unwrap(); + // Stderr listener stream + tokio::spawn(async move { + while let Ok(Some(line)) = stderr_lines.next_line().await { + stderr_tx.send(line).await.unwrap(); + } + }); + + // Log to file stream. + tokio::spawn(async move { + while let Some(line) = rx.recv().await { + file.write(line.as_bytes()).await.unwrap(); + file.write(b"\n").await.unwrap(); + info!(line); + } + file.sync_all().await.unwrap(); + }); let exit_status = NodeExitStatus::from(child.wait().await); let event = DoraEvent::SpawnedNodeResult { From cb39cb00a1e239affc6a1fb4075d8e414ac453a9 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 14:04:42 +0800 Subject: [PATCH 03/21] Improve error message for logs --- Cargo.lock | 25 +++++++++++++++++-------- binaries/cli/src/logs.rs | 23 +++++++++++++---------- binaries/coordinator/src/lib.rs | 11 ++++++----- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbe82c3e..fa128169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1770,7 +1770,7 @@ checksum = "8a3de6e8d11b22ff9edc6d916f890800597d60f8b2da1caf2955c274638d6412" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.2.12", "windows-sys 0.45.0", ] @@ -3303,7 +3303,7 @@ checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.2.12", "smallvec", "windows-sys 0.32.0", ] @@ -3896,6 +3896,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_users" version = "0.4.2" @@ -3903,7 +3912,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55" dependencies = [ "getrandom", - "redox_syscall", + "redox_syscall 0.2.12", "thiserror", ] @@ -4597,15 +4606,15 @@ checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1" [[package]] name = "tempfile" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" +checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", - "rustix 0.36.9", - "windows-sys 0.42.0", + "redox_syscall 0.3.5", + "rustix 0.37.3", + "windows-sys 0.45.0", ] [[package]] diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index f96d06a2..45a990c2 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -10,17 +10,20 @@ pub fn logs(uuid: Option, name: Option, node: String) -> Result<() let connection = control_connection(&mut control_session)?; let logs = { let reply_raw = connection - .request(&serde_json::to_vec(&ControlRequest::Logs { - uuid, - name, - node: node.clone(), - })?) - .wrap_err("failed to send DaemonConnected message")?; + .request( + &serde_json::to_vec(&ControlRequest::Logs { + uuid, + name, + node: node.clone(), + }) + .wrap_err("")?, + ) + .wrap_err("failed to send Logs request message")?; let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match reply { ControlRequestReply::Logs { logs } => logs, - other => bail!("unexpected reply to daemon connection check: {other:?}"), + other => bail!("unexpected reply to daemon logs: {other:?}"), } }; @@ -28,12 +31,12 @@ pub fn logs(uuid: Option, name: Option, node: String) -> Result<() .header(true) .grid(true) .line_numbers(true) - .paging_mode(bat::PagingMode::Always) + .paging_mode(bat::PagingMode::QuitIfOneScreen) .inputs(vec![Input::from_bytes(&logs) - .name("Logs") // TODO: Make a better name + .name("Logs") .title(format!("Logs from {node}.").as_str())]) .print() - .unwrap(); + .wrap_err("Something went wrong with viewing log file")?; Ok(()) } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 654c3f6b..fd2b45f9 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -621,29 +621,30 @@ async fn retrieve_logs( }; let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs { dataflow_id, - node_id, + node_id: node_id.clone(), })?; let mut reply_logs = Vec::new(); + for machine_id in &dataflow.machines { let daemon_connection = daemon_connections .get_mut(machine_id) .wrap_err("no daemon connection")?; // TODO: take from dataflow spec tcp_send(daemon_connection, &message) .await - .wrap_err("failed to send reload message to daemon")?; + .wrap_err("failed to send logs message to daemon")?; // wait for reply let reply_raw = tcp_receive(daemon_connection) .await - .wrap_err("failed to receive reload reply from daemon")?; + .wrap_err("failed to retrieve logs reply from daemon")?; match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize reload reply from daemon")? + .wrap_err("failed to deserialize logs reply from daemon")? { DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, other => bail!("unexpected reply after sending reload: {other:?}"), } } - tracing::info!("successfully reloaded dataflow `{dataflow_id}`"); + tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); Ok(reply_logs) } From 3b6b7663b9715c555c2940abf3c0fd973ebe8962 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 14:04:51 +0800 Subject: [PATCH 04/21] Make cli less verbose --- binaries/cli/src/main.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index b7e0154f..4cf62707 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -44,7 +44,9 @@ enum Command { open: bool, }, /// Run build commands provided in the given dataflow. - Build { dataflow: PathBuf }, + Build { + dataflow: PathBuf, + }, /// Generate a new project, node or operator. Choose the language between Rust, Python, C or C++. New { #[clap(flatten)] @@ -87,9 +89,7 @@ enum Command { // Planned for future releases: // Dashboard, Logs { - uuid: Option, - #[clap(long)] - name: Option, + dataflow: String, node: String, }, // Metrics, @@ -172,7 +172,11 @@ fn run() -> eyre::Result<()> { coordinator_path.as_deref(), daemon_path.as_deref(), )?, - Command::Logs { uuid, name, node } => logs::logs(uuid, name, node)?, + Command::Logs { dataflow, node } => { + let uuid = Uuid::parse_str(&dataflow).ok(); + let name = if uuid.is_some() { None } else { Some(dataflow) }; + logs::logs(uuid, name, node)? + } Command::Start { dataflow, name, From fcea39a3ffb61ae5794316ac5c92299f008b8636 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 14:17:45 +0800 Subject: [PATCH 05/21] Fix clippy issue about partial file writing --- binaries/daemon/src/spawn.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1db615b1..8fcf3b14 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -213,8 +213,8 @@ pub async fn spawn_node( // Log to file stream. tokio::spawn(async move { while let Some(line) = rx.recv().await { - file.write(line.as_bytes()).await.unwrap(); - file.write(b"\n").await.unwrap(); + file.write_all(line.as_bytes()).await.unwrap(); + file.write_all(b"\n").await.unwrap(); info!(line); } file.sync_all().await.unwrap(); From ca0dcdba208c283f5285d90d281cfb3d34bcf4ad Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 14:58:25 +0800 Subject: [PATCH 06/21] Query node-corresponding daemon for logs --- binaries/coordinator/src/lib.rs | 55 ++++++++++++++++++----------- binaries/coordinator/src/run/mod.rs | 11 ++++-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index fd2b45f9..d977126d 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -7,7 +7,8 @@ use dora_core::{ config::{NodeId, OperatorId}, coordinator_messages::RegisterResult, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, - descriptor::Descriptor, + descriptor::{self, Descriptor, ResolvedNode}, + message::Metadata, topics::{ control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT, @@ -522,6 +523,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, + nodes: Vec, } impl PartialEq for RunningDataflow { @@ -625,24 +627,33 @@ async fn retrieve_logs( })?; let mut reply_logs = Vec::new(); - for machine_id in &dataflow.machines { - let daemon_connection = daemon_connections - .get_mut(machine_id) - .wrap_err("no daemon connection")?; // TODO: take from dataflow spec - tcp_send(daemon_connection, &message) - .await - .wrap_err("failed to send logs message to daemon")?; + let nodes = &dataflow.nodes; + let machine_ids: Vec = nodes + .iter() + .filter(|node| node.id == node_id) + .map(|node| node.deploy.machine.clone()) + .collect(); - // wait for reply - let reply_raw = tcp_receive(daemon_connection) - .await - .wrap_err("failed to retrieve logs reply from daemon")?; - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize logs reply from daemon")? - { - DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, - other => bail!("unexpected reply after sending reload: {other:?}"), - } + let machine_id = machine_ids + .first() + .wrap_err("Did not find node in dataflow")?; + + let daemon_connection = daemon_connections + .get_mut(machine_id.as_str()) + .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + tcp_send(daemon_connection, &message) + .await + .wrap_err("failed to send logs message to daemon")?; + + // wait for reply + let reply_raw = tcp_receive(daemon_connection) + .await + .wrap_err("failed to retrieve logs reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize logs reply from daemon")? + { + DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, + other => bail!("unexpected reply after sending reload: {other:?}"), } tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); @@ -655,8 +666,11 @@ async fn start_dataflow( name: Option, daemon_connections: &mut HashMap, ) -> eyre::Result { - let SpawnedDataflow { uuid, machines } = - spawn_dataflow(dataflow, working_dir, daemon_connections).await?; + let SpawnedDataflow { + uuid, + machines, + nodes, + } = spawn_dataflow(path, working_dir, daemon_connections).await?; Ok(RunningDataflow { uuid, name, @@ -666,6 +680,7 @@ async fn start_dataflow( BTreeSet::new() }, machines, + nodes, }) } diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 6647548a..4e2843a1 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -5,7 +5,7 @@ use crate::{ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes}, - descriptor::Descriptor, + descriptor::{Descriptor, ResolvedNode}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use std::{ @@ -39,7 +39,7 @@ pub(super) async fn spawn_dataflow( let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, working_dir, - nodes, + nodes: nodes.clone(), communication: dataflow.communication, machine_listen_ports, }; @@ -54,7 +54,11 @@ pub(super) async fn spawn_dataflow( tracing::info!("successfully spawned dataflow `{uuid}`"); - Ok(SpawnedDataflow { uuid, machines }) + Ok(SpawnedDataflow { + uuid, + machines, + nodes, + }) } async fn spawn_dataflow_on_machine( @@ -85,4 +89,5 @@ async fn spawn_dataflow_on_machine( pub struct SpawnedDataflow { pub uuid: Uuid, pub machines: BTreeSet, + pub nodes: Vec, } From 9707c1c66bf5cdf84c766c5f62fb81b6728932b7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 15:46:22 +0800 Subject: [PATCH 07/21] Make sure that logs are been synced to disk before exiting --- binaries/coordinator/src/lib.rs | 9 +++--- binaries/daemon/src/lib.rs | 5 ++- binaries/daemon/src/spawn.rs | 55 +++++++++++++++++++++------------ 3 files changed, 44 insertions(+), 25 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d977126d..5647dead 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -7,7 +7,7 @@ use dora_core::{ config::{NodeId, OperatorId}, coordinator_messages::RegisterResult, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, - descriptor::{self, Descriptor, ResolvedNode}, + descriptor::ResolvedNode, message::Metadata, topics::{ control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, @@ -625,7 +625,6 @@ async fn retrieve_logs( dataflow_id, node_id: node_id.clone(), })?; - let mut reply_logs = Vec::new(); let nodes = &dataflow.nodes; let machine_ids: Vec = nodes @@ -649,12 +648,12 @@ async fn retrieve_logs( let reply_raw = tcp_receive(daemon_connection) .await .wrap_err("failed to retrieve logs reply from daemon")?; - match serde_json::from_slice(&reply_raw) + let reply_logs = match serde_json::from_slice(&reply_raw) .wrap_err("failed to deserialize logs reply from daemon")? { - DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, + DaemonCoordinatorReply::Logs { logs } => logs, other => bail!("unexpected reply after sending reload: {other:?}"), - } + }; tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); Ok(reply_logs) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4301e858..ee8aca33 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -326,7 +326,10 @@ impl Daemon { } .await .expect("Could not retrieve logs"); - (DaemonCoordinatorReply::Logs { logs }, RunStatus::Continue) + ( + Some(DaemonCoordinatorReply::Logs { logs }), + RunStatus::Continue, + ) } DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 8fcf3b14..34398603 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -176,7 +176,7 @@ pub async fn spawn_node( tokio::spawn(async move { let log_dir = temp_dir(); - let (tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel(10); let mut file = File::create( &log_dir .join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), @@ -193,7 +193,10 @@ pub async fn spawn_node( // Stdout listener stream tokio::spawn(async move { while let Ok(Some(line)) = stdout_lines.next_line().await { - stdout_tx.send(line).await.unwrap(); + let sent = stdout_tx.send(Some(line)).await; + if sent.is_err() { + break; + } } }); @@ -201,33 +204,47 @@ pub async fn spawn_node( (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))) .lines(); - let stderr_tx = tx.clone(); - // Stderr listener stream + let stderr_tx = tx.clone(); tokio::spawn(async move { while let Ok(Some(line)) = stderr_lines.next_line().await { - stderr_tx.send(line).await.unwrap(); + let sent = stderr_tx.send(Some(line)).await; + if sent.is_err() { + break; + } } }); - // Log to file stream. + let exit_status_tx = tx.clone(); tokio::spawn(async move { - while let Some(line) = rx.recv().await { - file.write_all(line.as_bytes()).await.unwrap(); - file.write_all(b"\n").await.unwrap(); - info!(line); - } - file.sync_all().await.unwrap(); + let exit_status = NodeExitStatus::from(child.wait().await); + let event = DoraEvent::SpawnedNodeResult { + dataflow_id, + node_id, + exit_status, + }; + + let _ = daemon_tx.send(event.into()).await; + exit_status_tx.send(None).await.unwrap(); }); - let exit_status = NodeExitStatus::from(child.wait().await); - let event = DoraEvent::SpawnedNodeResult { - dataflow_id, - node_id, - exit_status, - }; + // Log to file stream. + tokio::spawn(async move { + while let Some(Some(line)) = rx.recv().await { + file.write_all(line.as_bytes()) + .await + .expect("Could not log stdout/stderr to file"); + file.write_all(b"\n") + .await + .expect("Could not add newline to log file."); + info!(line); - let _ = daemon_tx.send(event.into()).await; + // Make sure that all data has been synced to disk. + file.sync_all().await.unwrap(); + } + }) + .await + .expect("Could not write logs to file"); }); Ok(()) } From 29d7ed728e5474b763b4e307f0866d8877bc5b3a Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 24 Apr 2023 18:09:03 +0800 Subject: [PATCH 08/21] Refactor spawning of log thread to avoid deadlock --- binaries/daemon/src/spawn.rs | 129 +++++++++++++++++------------------ 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 34398603..443f72d2 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -19,7 +19,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt}, sync::mpsc, }; -use tracing::info; +use tracing::debug; pub async fn spawn_node( dataflow_id: DataflowId, @@ -173,78 +173,71 @@ pub async fn spawn_node( } }; + let log_dir = temp_dir(); + + let (tx, mut rx) = mpsc::channel(10); + let mut file = File::create( + &log_dir.join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), + ) + .await + .expect("Failed to create log file"); + + let mut stdout_lines = + (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines(); + + let stdout_tx = tx.clone(); + + // Stdout listener stream tokio::spawn(async move { - let log_dir = temp_dir(); - - let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create( - &log_dir - .join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), - ) - .await - .expect("Failed to create log file"); - - let mut stdout_lines = - (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))) - .lines(); - - let stdout_tx = tx.clone(); - - // Stdout listener stream - tokio::spawn(async move { - while let Ok(Some(line)) = stdout_lines.next_line().await { - let sent = stdout_tx.send(Some(line)).await; - if sent.is_err() { - break; - } + while let Ok(Some(line)) = stdout_lines.next_line().await { + let sent = stdout_tx.send(Some(line)).await; + if sent.is_err() { + break; } - }); - - let mut stderr_lines = - (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))) - .lines(); - - // Stderr listener stream - let stderr_tx = tx.clone(); - tokio::spawn(async move { - while let Ok(Some(line)) = stderr_lines.next_line().await { - let sent = stderr_tx.send(Some(line)).await; - if sent.is_err() { - break; - } - } - }); + } + }); - let exit_status_tx = tx.clone(); - tokio::spawn(async move { - let exit_status = NodeExitStatus::from(child.wait().await); - let event = DoraEvent::SpawnedNodeResult { - dataflow_id, - node_id, - exit_status, - }; + let mut stderr_lines = + (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))).lines(); - let _ = daemon_tx.send(event.into()).await; - exit_status_tx.send(None).await.unwrap(); - }); - - // Log to file stream. - tokio::spawn(async move { - while let Some(Some(line)) = rx.recv().await { - file.write_all(line.as_bytes()) - .await - .expect("Could not log stdout/stderr to file"); - file.write_all(b"\n") - .await - .expect("Could not add newline to log file."); - info!(line); - - // Make sure that all data has been synced to disk. - file.sync_all().await.unwrap(); + // Stderr listener stream + let stderr_tx = tx.clone(); + tokio::spawn(async move { + while let Ok(Some(line)) = stderr_lines.next_line().await { + let sent = stderr_tx.send(Some(line)).await; + if sent.is_err() { + break; } - }) - .await - .expect("Could not write logs to file"); + } + }); + + let exit_status_tx = tx.clone(); + tokio::spawn(async move { + let exit_status = NodeExitStatus::from(child.wait().await); + let event = DoraEvent::SpawnedNodeResult { + dataflow_id, + node_id, + exit_status, + }; + + exit_status_tx.send(None).await.unwrap(); + let _ = daemon_tx.send(event.into()).await; + }); + + // Log to file stream. + tokio::spawn(async move { + while let Some(Some(line)) = rx.recv().await { + file.write_all(line.as_bytes()) + .await + .expect("Could not log stdout/stderr to file"); + file.write_all(b"\n") + .await + .expect("Could not add newline to log file."); + debug!("{dataflow_id}/{node_id} logged {line}"); + + // Make sure that all data has been synced to disk. + file.sync_all().await.unwrap(); + } }); Ok(()) } From 5573e4b5c0117a642596f5db9881412676a77768 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 25 Apr 2023 16:03:45 +0800 Subject: [PATCH 09/21] Pipe custom node stdout --- binaries/daemon/src/spawn.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 443f72d2..a37361ac 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -97,13 +97,18 @@ pub async fn spawn_node( command.env(key, value.to_string()); } } - command.spawn().wrap_err_with(move || { - format!( - "failed to run `{}` with args `{}`", - n.source, - n.args.as_deref().unwrap_or_default() - ) - })? + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .wrap_err_with(move || { + format!( + "failed to run `{}` with args `{}`", + n.source, + n.args.as_deref().unwrap_or_default() + ) + })? } dora_core::descriptor::CoreNodeKind::Runtime(n) => { let has_python_operator = n @@ -134,7 +139,6 @@ pub async fn spawn_node( eyre::bail!("Runtime can not mix Python Operator with other type of operator."); }; command.current_dir(working_dir); - command.stdin(Stdio::null()); let runtime_config = RuntimeConfig { node: NodeConfig { @@ -181,7 +185,6 @@ pub async fn spawn_node( ) .await .expect("Failed to create log file"); - let mut stdout_lines = (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines(); @@ -233,7 +236,7 @@ pub async fn spawn_node( file.write_all(b"\n") .await .expect("Could not add newline to log file."); - debug!("{dataflow_id}/{node_id} logged {line}"); + debug!("{dataflow_id}/{} logged {line}", node.id.clone()); // Make sure that all data has been synced to disk. file.sync_all().await.unwrap(); From 19606141db080adad742310725b289b17de4b3db Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 26 Apr 2023 14:05:57 +0800 Subject: [PATCH 10/21] archive `RunningDataflow` in order to query logs --- binaries/coordinator/src/lib.rs | 59 +++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5647dead..691e6352 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -83,6 +83,7 @@ pub async fn start( fn resolve_name( name: String, running_dataflows: &HashMap, + archived_dataflows: &HashMap, ) -> eyre::Result { let uuids: Vec<_> = running_dataflows .iter() @@ -90,8 +91,21 @@ fn resolve_name( .map(|(k, _)| k) .copied() .collect(); + let archived_uuids: Vec<_> = archived_dataflows + .iter() + .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) + .map(|(k, _)| k) + .copied() + .collect(); + if uuids.is_empty() { - bail!("no running dataflow with name `{name}`"); + if archived_uuids.is_empty() { + bail!("no dataflow with name `{name}`"); + } else if let [uuid] = archived_uuids.as_slice() { + Ok(*uuid) + } else { + bail!("multiple archived dataflows found with name `{name}`"); + } } else if let [uuid] = uuids.as_slice() { Ok(*uuid) } else { @@ -136,6 +150,7 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); + let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); while let Some(event) = events.next().await { @@ -237,6 +252,11 @@ async fn start_inner( DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { + // Archive finished dataflow + if archived_dataflows.get(&uuid).is_none() { + archived_dataflows + .insert(uuid, ArchivedDataflow::from(entry.get())); + } entry.get_mut().machines.remove(&machine_id); match result { Ok(()) => { @@ -345,7 +365,8 @@ async fn start_inner( } ControlRequest::StopByName { name } => { let stop = async { - let dataflow_uuid = resolve_name(name, &running_dataflows)?; + let dataflow_uuid = + resolve_name(name, &running_dataflows, &archived_dataflows)?; stop_dataflow( &running_dataflows, dataflow_uuid, @@ -361,13 +382,14 @@ async fn start_inner( let dataflow_uuid = if let Some(uuid) = uuid { uuid } else if let Some(name) = name { - resolve_name(name, &running_dataflows)? + resolve_name(name, &running_dataflows, &archived_dataflows)? } else { bail!("No uuid") }; retrieve_logs( &running_dataflows, + &archived_dataflows, dataflow_uuid, node.into(), &mut daemon_connections, @@ -526,6 +548,26 @@ struct RunningDataflow { nodes: Vec, } +#[allow(dead_code)] // Keeping the communication layer for later use. +struct ArchivedDataflow { + name: Option, + uuid: Uuid, + /// The IDs of the machines that the dataflow is running on. + machines: BTreeSet, + nodes: Vec, +} + +impl From<&RunningDataflow> for ArchivedDataflow { + fn from(dataflow: &RunningDataflow) -> ArchivedDataflow { + ArchivedDataflow { + name: dataflow.name.clone(), + uuid: dataflow.uuid, + machines: dataflow.machines.clone(), + nodes: dataflow.nodes.clone(), + } + } +} + impl PartialEq for RunningDataflow { fn eq(&self, other: &Self) -> bool { self.name == other.name && self.uuid == other.uuid && self.machines == other.machines @@ -614,19 +656,24 @@ async fn reload_dataflow( async fn retrieve_logs( running_dataflows: &HashMap, + archived_dataflows: &HashMap, dataflow_id: Uuid, node_id: NodeId, daemon_connections: &mut HashMap, ) -> eyre::Result> { - let Some(dataflow) = running_dataflows.get(&dataflow_id) else { - bail!("No running dataflow found with UUID `{dataflow_id}`") + let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) { + dataflow.nodes.clone() + } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) { + dataflow.nodes.clone() + } else { + bail!("No dataflow found with UUID `{dataflow_id}`") }; + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs { dataflow_id, node_id: node_id.clone(), })?; - let nodes = &dataflow.nodes; let machine_ids: Vec = nodes .iter() .filter(|node| node.id == node_id) From e3079a88bd9a9d6718b2a3e380eb3e288f976f3e Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 28 Apr 2023 16:47:52 +0800 Subject: [PATCH 11/21] Refactor to multi-daemon branch --- binaries/cli/src/logs.rs | 13 ++++++++----- binaries/cli/src/main.rs | 4 +++- libraries/core/src/daemon_messages.rs | 8 ++++---- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index 45a990c2..5f879615 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -1,15 +1,18 @@ +use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context, Result}; use uuid::Uuid; -use crate::control_connection; use bat::{Input, PrettyPrinter}; -pub fn logs(uuid: Option, name: Option, node: String) -> Result<()> { - let mut control_session = None; - let connection = control_connection(&mut control_session)?; +pub fn logs( + session: &mut TcpRequestReplyConnection, + uuid: Option, + name: Option, + node: String, +) -> Result<()> { let logs = { - let reply_raw = connection + let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Logs { uuid, diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4cf62707..9f91f3b2 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -175,7 +175,9 @@ fn run() -> eyre::Result<()> { Command::Logs { dataflow, node } => { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(uuid, name, node)? + let mut session = + connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + logs::logs(&mut *session, uuid, name, node)? } Command::Start { dataflow, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index be2bad69..ce8f8d6d 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -209,6 +209,10 @@ pub enum DaemonCoordinatorEvent { node_id: NodeId, operator_id: Option, }, + Logs { + dataflow_id: DataflowId, + node_id: NodeId, + }, Destroy, Watchdog, } @@ -226,10 +230,6 @@ pub enum InterDaemonEvent { dataflow_id: DataflowId, inputs: BTreeSet<(NodeId, DataId)>, }, - Logs { - dataflow_id: Uuid, - node_id: NodeId, - }, } #[derive(Debug, serde::Deserialize, serde::Serialize)] From e451a6bd24cdcb17781ed3043f57b3a8d3762981 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 28 Apr 2023 16:48:41 +0800 Subject: [PATCH 12/21] Bail if log node has been found on more than one machine --- binaries/coordinator/src/lib.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 691e6352..6def1a86 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -7,8 +7,7 @@ use dora_core::{ config::{NodeId, OperatorId}, coordinator_messages::RegisterResult, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, - descriptor::ResolvedNode, - message::Metadata, + descriptor::{Descriptor, ResolvedNode}, topics::{ control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT, @@ -548,7 +547,6 @@ struct RunningDataflow { nodes: Vec, } -#[allow(dead_code)] // Keeping the communication layer for later use. struct ArchivedDataflow { name: Option, uuid: Uuid, @@ -659,7 +657,7 @@ async fn retrieve_logs( archived_dataflows: &HashMap, dataflow_id: Uuid, node_id: NodeId, - daemon_connections: &mut HashMap, + daemon_connections: &mut HashMap, ) -> eyre::Result> { let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) { dataflow.nodes.clone() @@ -680,19 +678,27 @@ async fn retrieve_logs( .map(|node| node.deploy.machine.clone()) .collect(); - let machine_id = machine_ids - .first() - .wrap_err("Did not find node in dataflow")?; + let machine_id = if let [machine_id] = &machine_ids[..] { + machine_id + } else if machine_ids.is_empty() { + bail!("No machine contains {}/{}", dataflow_id, node_id) + } else { + bail!( + "More than one machine contains {}/{}. However, it should only be present on one.", + dataflow_id, + node_id + ) + }; let daemon_connection = daemon_connections .get_mut(machine_id.as_str()) .wrap_err("no daemon connection")?; // TODO: take from dataflow spec - tcp_send(daemon_connection, &message) + tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err("failed to send logs message to daemon")?; // wait for reply - let reply_raw = tcp_receive(daemon_connection) + let reply_raw = tcp_receive(&mut daemon_connection.stream) .await .wrap_err("failed to retrieve logs reply from daemon")?; let reply_logs = match serde_json::from_slice(&reply_raw) @@ -716,7 +722,7 @@ async fn start_dataflow( uuid, machines, nodes, - } = spawn_dataflow(path, working_dir, daemon_connections).await?; + } = spawn_dataflow(dataflow, working_dir, daemon_connections).await?; Ok(RunningDataflow { uuid, name, From 0854f9149a0f757abbc7cbb3029c8d588dccd536 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 15:24:03 +0800 Subject: [PATCH 13/21] Make a centralised `log_path` function --- binaries/daemon/src/lib.rs | 3 ++- binaries/daemon/src/log.rs | 8 ++++++++ binaries/daemon/src/spawn.rs | 4 ++-- 3 files changed, 12 insertions(+), 3 deletions(-) create mode 100644 binaries/daemon/src/log.rs diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index ee8aca33..e79290fb 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -39,6 +39,7 @@ use uuid::Uuid; mod coordinator; mod inter_daemon; +mod log; mod node_communication; mod spawn; mod tcp_utils; @@ -314,7 +315,7 @@ impl Daemon { let logs = async { let log_dir = temp_dir(); - let mut file = File::open(log_dir.join(format!("{dataflow_id}-{node_id}.txt"))) + let mut file = File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) .await .wrap_err("Could not open log file")?; diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs new file mode 100644 index 00000000..62119994 --- /dev/null +++ b/binaries/daemon/src/log.rs @@ -0,0 +1,8 @@ +use std::path::PathBuf; + +use dora_core::config::NodeId; +use uuid::Uuid; + +pub fn log_path(dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf { + PathBuf::from(format!("{dataflow_id}-{node_id}.txt")) +} diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index a37361ac..fb510fff 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,5 +1,5 @@ use crate::{ - node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, + log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, runtime_node_outputs, DoraEvent, Event, NodeExitStatus, }; use dora_core::{ @@ -181,7 +181,7 @@ pub async fn spawn_node( let (tx, mut rx) = mpsc::channel(10); let mut file = File::create( - &log_dir.join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), + &log_dir.join(PathBuf::from(log::log_path(&dataflow_id, &node_id)).with_extension("txt")), ) .await .expect("Failed to create log file"); From 7f82f32a9807ad6f4dc5973ef98a48f04e1d3ee7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 15:39:53 +0800 Subject: [PATCH 14/21] Make logs not fail in the daemon --- binaries/coordinator/src/lib.rs | 4 ++-- binaries/daemon/src/lib.rs | 4 ++-- libraries/core/src/daemon_messages.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 6def1a86..89b1dc0a 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -704,12 +704,12 @@ async fn retrieve_logs( let reply_logs = match serde_json::from_slice(&reply_raw) .wrap_err("failed to deserialize logs reply from daemon")? { - DaemonCoordinatorReply::Logs { logs } => logs, + DaemonCoordinatorReply::Logs(logs) => logs, other => bail!("unexpected reply after sending reload: {other:?}"), }; tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); - Ok(reply_logs) + reply_logs.map_err(|err| eyre!(err)) } async fn start_dataflow( diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e79290fb..469de8c7 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -326,9 +326,9 @@ impl Daemon { Result::, eyre::Report>::Ok(contents) } .await - .expect("Could not retrieve logs"); + .map_err(|err| format!("{err:?}")); ( - Some(DaemonCoordinatorReply::Logs { logs }), + Some(DaemonCoordinatorReply::Logs(logs)), RunStatus::Continue, ) } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ce8f8d6d..e51c1586 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -239,7 +239,7 @@ pub enum DaemonCoordinatorReply { StopResult(Result<(), String>), DestroyResult(Result<(), String>), WatchdogAck, - Logs { logs: Vec }, + Logs(Result, String>), } pub type DataflowId = Uuid; From 5c98327655512598b9c7a1057af173423af04033 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 19:01:25 +0800 Subject: [PATCH 15/21] Reply logs in a new thread in order to not block daemon when reading logs --- binaries/daemon/src/lib.rs | 89 +++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 469de8c7..7f38d5f7 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -33,8 +33,10 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tracing::log::error; use uuid::Uuid; mod coordinator; @@ -206,8 +208,7 @@ impl Daemon { while let Some(event) = events.next().await { match event { Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { - let (reply, status) = self.handle_coordinator_event(event).await?; - let _ = reply_tx.send(reply); + let status = self.handle_coordinator_event(event, reply_tx).await?; match status { RunStatus::Continue => {} @@ -258,8 +259,9 @@ impl Daemon { async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, - ) -> eyre::Result<(Option, RunStatus)> { - let (reply, status) = match event { + reply_tx: Sender>, + ) -> eyre::Result { + let status = match event { DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, working_dir, @@ -291,7 +293,10 @@ impl Daemon { } let reply = DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); - (Some(reply), RunStatus::Continue) + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send `SpawnResult` reply from daemon to coordinator") + }); + RunStatus::Continue } DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { match self.running.get_mut(&dataflow_id) { @@ -305,32 +310,39 @@ impl Daemon { ); } } - (None, RunStatus::Continue) + let _ = reply_tx.send(None).map_err(|_| { + error!("could not send `AllNodesReady` reply from daemon to coordinator") + }); + RunStatus::Continue } DaemonCoordinatorEvent::Logs { dataflow_id, node_id, } => { - // read file - let logs = async { - let log_dir = temp_dir(); - - let mut file = File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) - .await - .wrap_err("Could not open log file")?; - - let mut contents = vec![]; - file.read_to_end(&mut contents) - .await - .wrap_err("Could not read content of log file")?; - Result::, eyre::Report>::Ok(contents) - } - .await - .map_err(|err| format!("{err:?}")); - ( - Some(DaemonCoordinatorReply::Logs(logs)), - RunStatus::Continue, - ) + tokio::spawn(async move { + let logs = async { + let log_dir = temp_dir(); + + let mut file = + File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) + .await + .wrap_err("Could not open log file")?; + + let mut contents = vec![]; + file.read_to_end(&mut contents) + .await + .wrap_err("Could not read content of log file")?; + Result::, eyre::Report>::Ok(contents) + } + .await + .map_err(|err| format!("{err:?}")); + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::Logs(logs))) + .map_err(|_| { + error!("could not send logs reply from daemon to coordinator") + }); + }); + RunStatus::Continue } DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, @@ -340,7 +352,10 @@ impl Daemon { let result = self.send_reload(dataflow_id, node_id, operator_id).await; let reply = DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}"))); - (Some(reply), RunStatus::Continue) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send reload reply from daemon to coordinator")); + RunStatus::Continue } DaemonCoordinatorEvent::StopDataflow { dataflow_id } => { let stop = async { @@ -354,19 +369,25 @@ impl Daemon { let reply = DaemonCoordinatorReply::StopResult( stop.await.map_err(|err| format!("{err:?}")), ); - (Some(reply), RunStatus::Continue) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send stop reply from daemon to coordinator")); + RunStatus::Continue } DaemonCoordinatorEvent::Destroy => { tracing::info!("received destroy command -> exiting"); let reply = DaemonCoordinatorReply::DestroyResult(Ok(())); - (Some(reply), RunStatus::Exit) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); + RunStatus::Exit + } + DaemonCoordinatorEvent::Watchdog => { + let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck)); + RunStatus::Continue } - DaemonCoordinatorEvent::Watchdog => ( - Some(DaemonCoordinatorReply::WatchdogAck), - RunStatus::Continue, - ), }; - Ok((reply, status)) + Ok(status) } async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> { From 98f70189778d160a9f1d6e2a09b7f5080af0e866 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 20:23:55 +0800 Subject: [PATCH 16/21] Resolve node name dependant on providing the archived nodes or not --- binaries/coordinator/src/lib.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 89b1dc0a..436239cd 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -79,10 +79,12 @@ pub async fn start( Ok((port, future)) } +// Resolve the dataflow name. +// Search for archived dataflows if they are provided. fn resolve_name( name: String, running_dataflows: &HashMap, - archived_dataflows: &HashMap, + archived_dataflows: Option<&HashMap>, ) -> eyre::Result { let uuids: Vec<_> = running_dataflows .iter() @@ -90,12 +92,16 @@ fn resolve_name( .map(|(k, _)| k) .copied() .collect(); - let archived_uuids: Vec<_> = archived_dataflows - .iter() - .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) - .map(|(k, _)| k) - .copied() - .collect(); + let archived_uuids: Vec<_> = if let Some(archived_dataflows) = archived_dataflows { + archived_dataflows + .iter() + .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) + .map(|(k, _)| k) + .copied() + .collect() + } else { + vec![] + }; if uuids.is_empty() { if archived_uuids.is_empty() { @@ -364,8 +370,7 @@ async fn start_inner( } ControlRequest::StopByName { name } => { let stop = async { - let dataflow_uuid = - resolve_name(name, &running_dataflows, &archived_dataflows)?; + let dataflow_uuid = resolve_name(name, &running_dataflows, None)?; stop_dataflow( &running_dataflows, dataflow_uuid, @@ -381,7 +386,7 @@ async fn start_inner( let dataflow_uuid = if let Some(uuid) = uuid { uuid } else if let Some(name) = name { - resolve_name(name, &running_dataflows, &archived_dataflows)? + resolve_name(name, &running_dataflows, Some(&archived_dataflows))? } else { bail!("No uuid") }; @@ -549,9 +554,6 @@ struct RunningDataflow { struct ArchivedDataflow { name: Option, - uuid: Uuid, - /// The IDs of the machines that the dataflow is running on. - machines: BTreeSet, nodes: Vec, } @@ -559,8 +561,6 @@ impl From<&RunningDataflow> for ArchivedDataflow { fn from(dataflow: &RunningDataflow) -> ArchivedDataflow { ArchivedDataflow { name: dataflow.name.clone(), - uuid: dataflow.uuid, - machines: dataflow.machines.clone(), nodes: dataflow.nodes.clone(), } } From b8fbfbd4a12a9e6d6a1d4ab163e0c242aa3f222d Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 20:55:16 +0800 Subject: [PATCH 17/21] Print log on std stdout/err if logging thread failed --- binaries/daemon/src/spawn.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index fb510fff..1e33b509 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -195,7 +195,7 @@ pub async fn spawn_node( while let Ok(Some(line)) = stdout_lines.next_line().await { let sent = stdout_tx.send(Some(line)).await; if sent.is_err() { - break; + println!("Could not log: {line}"); } } }); @@ -209,7 +209,7 @@ pub async fn spawn_node( while let Ok(Some(line)) = stderr_lines.next_line().await { let sent = stderr_tx.send(Some(line)).await; if sent.is_err() { - break; + eprintln!("Could not log: {line}"); } } }); From 1caf7c7178ffbaf9109efbe5326dcf98d5f63748 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 22:18:53 +0800 Subject: [PATCH 18/21] Make daemon not panic if logs are not written --- binaries/daemon/src/lib.rs | 2 +- binaries/daemon/src/spawn.rs | 26 ++++++++++++++------------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 7f38d5f7..512ee9cc 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -36,7 +36,7 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tracing::log::error; +use tracing::error; use uuid::Uuid; mod coordinator; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1e33b509..40952a74 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -19,7 +19,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt}, sync::mpsc, }; -use tracing::debug; +use tracing::{debug, error}; pub async fn spawn_node( dataflow_id: DataflowId, @@ -193,7 +193,7 @@ pub async fn spawn_node( // Stdout listener stream tokio::spawn(async move { while let Ok(Some(line)) = stdout_lines.next_line().await { - let sent = stdout_tx.send(Some(line)).await; + let sent = stdout_tx.send(line.clone()).await; if sent.is_err() { println!("Could not log: {line}"); } @@ -207,14 +207,13 @@ pub async fn spawn_node( let stderr_tx = tx.clone(); tokio::spawn(async move { while let Ok(Some(line)) = stderr_lines.next_line().await { - let sent = stderr_tx.send(Some(line)).await; + let sent = stderr_tx.send(line.clone()).await; if sent.is_err() { eprintln!("Could not log: {line}"); } } }); - let exit_status_tx = tx.clone(); tokio::spawn(async move { let exit_status = NodeExitStatus::from(child.wait().await); let event = DoraEvent::SpawnedNodeResult { @@ -223,23 +222,26 @@ pub async fn spawn_node( exit_status, }; - exit_status_tx.send(None).await.unwrap(); let _ = daemon_tx.send(event.into()).await; }); // Log to file stream. tokio::spawn(async move { - while let Some(Some(line)) = rx.recv().await { - file.write_all(line.as_bytes()) + while let Some(line) = rx.recv().await { + let _ = file + .write_all(line.as_bytes()) .await - .expect("Could not log stdout/stderr to file"); - file.write_all(b"\n") + .map_err(|err| error!("Could not log {line} to file due to {err}")); + let _ = file + .write(b"\n") .await - .expect("Could not add newline to log file."); + .map_err(|err| error!("Could not add newline to log file due to {err}")); debug!("{dataflow_id}/{} logged {line}", node.id.clone()); - // Make sure that all data has been synced to disk. - file.sync_all().await.unwrap(); + let _ = file + .sync_all() + .await + .map_err(|err| error!("Could not sync logs to file due to {err}")); } }); Ok(()) From 664e8d48bab6ca032af573f9404d054c7de90c67 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 22:49:55 +0800 Subject: [PATCH 19/21] Minor refactoring --- binaries/cli/src/logs.rs | 2 +- binaries/coordinator/src/lib.rs | 6 +++--- binaries/daemon/src/lib.rs | 6 +++++- binaries/daemon/src/spawn.rs | 11 +++++------ libraries/core/src/topics.rs | 2 +- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index 5f879615..20230a94 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -25,7 +25,7 @@ pub fn logs( let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match reply { - ControlRequestReply::Logs { logs } => logs, + ControlRequestReply::Logs(logs) => logs, other => bail!("unexpected reply to daemon logs: {other:?}"), } }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 436239cd..95f70db3 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -399,7 +399,7 @@ async fn start_inner( &mut daemon_connections, ) .await - .map(|logs| ControlRequestReply::Logs { logs }) + .map(|logs| ControlRequestReply::Logs(logs)) } ControlRequest::Destroy => { tracing::info!("Received destroy command"); @@ -692,7 +692,7 @@ async fn retrieve_logs( let daemon_connection = daemon_connections .get_mut(machine_id.as_str()) - .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err("failed to send logs message to daemon")?; @@ -705,7 +705,7 @@ async fn retrieve_logs( .wrap_err("failed to deserialize logs reply from daemon")? { DaemonCoordinatorReply::Logs(logs) => logs, - other => bail!("unexpected reply after sending reload: {other:?}"), + other => bail!("unexpected reply after sending logs: {other:?}"), }; tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 512ee9cc..7a774137 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -383,7 +383,11 @@ impl Daemon { RunStatus::Exit } DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck)); + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::WatchdogAck)) + .map_err(|_| { + error!("could not send WatchdogAck reply from daemon to coordinator") + }); RunStatus::Continue } }; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 40952a74..91853aab 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -11,7 +11,7 @@ use dora_download::download_file; use eyre::WrapErr; use std::{ env::{consts::EXE_EXTENSION, temp_dir}, - path::{Path, PathBuf}, + path::Path, process::Stdio, }; use tokio::{ @@ -180,11 +180,10 @@ pub async fn spawn_node( let log_dir = temp_dir(); let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create( - &log_dir.join(PathBuf::from(log::log_path(&dataflow_id, &node_id)).with_extension("txt")), - ) - .await - .expect("Failed to create log file"); + let mut file = + File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt"))) + .await + .expect("Failed to create log file"); let mut stdout_lines = (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines(); diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 542c55bc..a77318c0 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -64,7 +64,7 @@ pub enum ControlRequestReply { DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), - Logs { logs: Vec }, + Logs(Vec), } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] From 87403141241f3d6958d0c4512fcd93a676565f04 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 3 May 2023 19:11:50 +0800 Subject: [PATCH 20/21] Wait for logs to be logged before exiting daemon In previous iteration of logging, the logging thread was not waited for and the daemon would exit before finishing to log all lines causing a panic. See: https://github.com/dora-rs/dora/actions/runs/4805449241/jobs/8551873559#step:11:619 --- binaries/coordinator/src/lib.rs | 3 ++- binaries/daemon/src/spawn.rs | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 95f70db3..c9ad5ed5 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -109,7 +109,8 @@ fn resolve_name( } else if let [uuid] = archived_uuids.as_slice() { Ok(*uuid) } else { - bail!("multiple archived dataflows found with name `{name}`"); + // TOOD: Index the archived dataflows in order to return logs based on the index. + bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead."); } } else if let [uuid] = uuids.as_slice() { Ok(*uuid) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 91853aab..0755282a 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -17,7 +17,7 @@ use std::{ use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt}, - sync::mpsc, + sync::{mpsc, oneshot}, }; use tracing::{debug, error}; @@ -213,14 +213,15 @@ pub async fn spawn_node( } }); + let (log_finish_tx, log_finish_rx) = oneshot::channel(); tokio::spawn(async move { let exit_status = NodeExitStatus::from(child.wait().await); + let _ = log_finish_rx.await; let event = DoraEvent::SpawnedNodeResult { dataflow_id, node_id, exit_status, }; - let _ = daemon_tx.send(event.into()).await; }); @@ -242,6 +243,9 @@ pub async fn spawn_node( .await .map_err(|err| error!("Could not sync logs to file due to {err}")); } + let _ = log_finish_tx + .send(()) + .map_err(|_| error!("Could not inform that log file thread finished")); }); Ok(()) } From 68a690cd2eedfeb83aacd991033f6bb50a20f962 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 3 May 2023 19:45:02 +0800 Subject: [PATCH 21/21] Improve error messages when nodes fails as they are no longer reported within stdout. --- binaries/daemon/src/lib.rs | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 7a774137..1192086d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -866,15 +866,24 @@ impl Daemon { } NodeExitStatus::IoError(err) => { let err = eyre!(err).wrap_err(format!( - "I/O error while waiting for node `{dataflow_id}/{node_id}`" + " + I/O error while waiting for node `{dataflow_id}/{node_id}. + + Check logs using: dora logs {dataflow_id} {node_id} + " )); tracing::error!("{err:?}"); Some(err) } NodeExitStatus::ExitCode(code) => { - let err = - eyre!("node {dataflow_id}/{node_id} finished with exit code {code}"); - tracing::warn!("{err}"); + let err = eyre!( + " + {dataflow_id}/{node_id} failed with exit code {code}. + + Check logs using: dora logs {dataflow_id} {node_id} + " + ); + tracing::error!("{err}"); Some(err) } NodeExitStatus::Signal(signal) => { @@ -896,15 +905,24 @@ impl Daemon { other => other.to_string().into(), }; let err = eyre!( - "node {dataflow_id}/{node_id} finished because of signal `{signal}`" + " + {dataflow_id}/{node_id} failed with signal `{signal}` + + Check logs using: dora logs {dataflow_id} {node_id} + " ); - tracing::warn!("{err}"); + tracing::error!("{err}"); Some(err) } NodeExitStatus::Unknown => { - let err = - eyre!("node {dataflow_id}/{node_id} finished with unknown exit code"); - tracing::warn!("{err}"); + let err = eyre!( + " + {dataflow_id}/{node_id} failed with unknown exit code + + Check logs using: dora logs {dataflow_id} {node_id} + " + ); + tracing::error!("{err}"); Some(err) } };