-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(socket sink): support unix datagram mode #21762
base: master
Are you sure you want to change the base?
enhancement(socket sink): support unix datagram mode #21762
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ci-run-component-features
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jpovixwm, I did a quick pass. Left some suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jpovixwm!
@pront I'm not sure how to best resolve the build errors. But then we can either:
|
@jpovixwm I am OK with (1) |
Verified locally with
Hope it works this time🤞 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ci-run-component-features
…datagram-mode-in-socket-sink
Turns out that the above workflow doesn't run on fork branches. I manually created: https://github.com/vectordotdev/vector/actions/runs/11860591369 |
I'm unable to reproduce the unit test failure on my machine, which makes it more difficult for me to fix it. let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
let handle = tokio::task::spawn_blocking(move || {
let mut output_lines = Vec::<String>::with_capacity(num_lines);
for _ in 0..num_lines {
let mut buf = [0; 100];
let (size, _) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let line = String::from_utf8_lossy(&buf[..size]).to_string();
output_lines.push(line);
}
output_lines
}); Tokio's |
Let me check this out locally and run the failing check. |
Edited: Hmm, |
@pront can you tell me the invocation that fails for you locally? |
You can do:
I did:
|
user@hostname:~/vector/src/sinks/util$ git status && git rev-parse HEAD
On branch feature/5269-support-unix-datagram-mode-in-socket-sink
Your branch is up to date with 'remotes/jpovixwm/feature/5269-support-unix-datagram-mode-in-socket-sink'.
nothing to commit, working tree clean
543d33f393a294342fda20beca8040020840d4ca
user@hostname:~/vector/src/sinks/util$ cargo test
Finished `test` profile [unoptimized + debuginfo] target(s) in 2.21s
Running unittests src/lib.rs (/home/user/vector/target/debug/deps/vector-8dfd964da1bd40f7)
running 1638 tests
test api::schema::components::sink::tests::sort_component_id_asc ... ok
[...]
test transforms::log_to_metric::tests::parse_failure ... ok
test result: ok. 1635 passed; 0 failed; 3 ignored; 0 measured; 0 filtered out; finished in 19.14s
Running tests/e2e/mod.rs (/home/user/vector/target/debug/deps/e2e-6fdf1afbfae3c336)
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
Running tests/integration/lib.rs (/home/user/vector/target/debug/deps/integration-fc9efbfa10dceb61)
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
Doc-tests vector
running 2 tests
test src/expiring_hash_map.rs - expiring_hash_map::ExpiringHashMap<K,V>::next_expired (line 141) ... ignored
test src/expiring_hash_map.rs - expiring_hash_map::ExpiringHashMap<K,V>::next_expired (line 158) ... ok
test result: ok. 1 passed; 0 failed; 1 ignored; 0 measured; 0 filtered out; finished in 42.10s
user@hostname:~/vector/src/sinks/util$ cargo test --color=always --package vector --lib sinks::util::unix::tests::basic_unix_datagram_sink --no-fail-fast --config env.RUSTC_BOOTSTRAP=\"1\" -- --format=json --exact -Z unstable-options --show-output
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.42s
Running unittests src/lib.rs (/home/user/vector/target/debug/deps/vector-8dfd964da1bd40f7)
{ "type": "suite", "event": "started", "test_count": 1 }
{ "type": "test", "event": "started", "name": "sinks::util::unix::tests::basic_unix_datagram_sink" }
{ "type": "test", "name": "sinks::util::unix::tests::basic_unix_datagram_sink", "event": "ok" }
{ "type": "suite", "event": "ok", "passed": 1, "failed": 0, "ignored": 0, "measured": 0, "filtered_out": 1637, "exec_time": 0.056496991 } "Works on my machine". I'm afraid there's not much I can do on my end. I guess I could try it with a proper Linux VM if I can find the time. user@hostname:~/vector$ uname -r
5.15.153.1-microsoft-standard-WSL2
user@hostname:~/vector$ lsb_release -d
Description: Ubuntu 22.04.5 LTS So maybe my setup is a bit exotic. |
Your setup looks fine to me. I can devote some of my time later this week since I have a macOS machine here. |
Can you share the Vector config you used to test this end to end? I think we need to tweak the UX a bit. Currently it only accepts I have:
and a listener:
and the behavior looks good. |
FYI, I tried this locally and it passed. However, I didn't dig into it to understand the root cause of this. |
Sure. It's nothing special: sources:
dummy_logs:
type: "demo_logs"
format: "syslog"
count: 3
sinks:
emit_console:
inputs:
- "dummy_logs"
type: "console"
encoding:
codec: "text"
emit_syslog:
inputs:
- "dummy_logs"
type: "socket"
mode: "unix"
# address: "127.0.0.1:5006"
unix_mode: "Datagram"
path: "/tmp/test_socket"
encoding:
codec: "text"
framing:
method: "bytes" Listener:
It also accepts |
Thanks for sharing, adding these details to the PR test plan. I also did a very similar test on macOS and it worked well.
Yes, exactly.
It's just a bit unusual from a config UX perspective. I agree this is out of scope for this PR. So the only open question is the difference in behavior for |
If it fixes the failing unit test in a reliable way, I'll push it tomorrow (or actually later today, depending on the time zone). |
…datagram-mode-in-socket-sink
Hope this helps:
Which means the test is less flakey, but still flakey. |
I'd also say that the buffer should be larger than 100 bytes (even just 101 bytes should be alright) so that in the future it can catch a potential regression where some additional garbage data is sent to the socket after the expected payload. For example, if you replace the bytes framer with a newline delimited one, there will be a newline character after the first 100 expected bytes. |
It's not flaky for me, so I'm essentially fixing it blind, but still: user@hostname:~/vector$ git status && git rev-parse HEAD
On branch feature/5269-support-unix-datagram-mode-in-socket-sink
Your branch is ahead of 'remotes/jpovixwm/feature/5269-support-unix-datagram-mode-in-socket-sink' by 1 commit.
(use "git push" to publish your local commits)
nothing to commit, working tree clean
09dd31b6da377cffbaaabf407c8f036e3881ee5a
user@hostname:~/vector$ rust-flaker vector basic_unix_datagram_sink
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.39s
Found test binary: target/debug/deps/vector-390aa0364275b128
Running test once...
running 1 test
test sinks::util::unix::tests::basic_unix_datagram_sink ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 1637 filtered out; finished in 0.09s
Running test repeatedly...
1 runs so far, 0 failures (100.00% pass rate). 153.486541ms avg, 153.486541ms max, 153.486541ms min
[...]
10201 runs so far, 0 failures (100.00% pass rate). 145.68302ms avg, 227.653805ms max, 114.913275ms min FYI: rust-flaker |
Sorry, this is still flakey and not ready to be merged. Which is a shame, since this is a great PR. There is an interesting trace if you are motivated to investigate:
Otherwise, we can leave this open for me or someone else who has access to a macOS machine to pick it up in the future since it's impossible to iterate on it without one. |
It's what I suspected earlier: joshtriplett/io-mux#7 |
Are you willing to dig into this? An alternative is to document this big difference in semantics with macOS and exclude it with |
Yeah. I looked into it today and got this locally: let sent = match socket {
DatagramSocket::Udp(udp) => udp.send(buf).await?,
#[cfg(unix)]
DatagramSocket::Unix(uds, _) => {
let mut backoff = ExponentialBackoff::from_millis(2)
.factor(5)
.max_delay(Duration::from_secs(60));
loop {
match uds.send(buf).await {
Ok(sent) => break sent,
Err(error) if error.raw_os_error() == Some(libc::ENOBUFS) => {
let delay = backoff.next().unwrap();
warn!(
message = "Throttling due to ENOBUFS.",
delay_secs = delay.as_secs_f64(),
internal_log_rate_limit = true
);
sleep(delay).await
}
Err(error) => return Err(error),
}
}
}
}; I'm testing my changes in FreeBSD 14 via Vagrant by building Vector like this:
(This won't build out of the box due to the dependency on heim-disk, so I had to remove a bunch of code from Without this explicit handling of ENOBUFS, I was able to reproduce the flakiness we saw earlier on macOS. But with the loop in place, the unit test reliably succeeds. I have yet to check whether this issue also affects UDP. If it does, I think this retrying logic should be applied to both UDP and Unix Datagram. BTW, do you think the number of ENOBUFS' encountered should be exposed as a metric from the sink, or is it enough to just log a warning? (the current message is provisional, I'll replace it with something clearer) |
First of all, thanks! Secondly, with these changes, we now have a lot of new scope in this PR. What do you think about targeting supporting this new
👍
Yes, good idea. I can see this as something a user would like to monitor. However, I am happy to just wait for you finalize everything in PR and then review again. Up to you. |
I'm OK with that. Though, to be fair, the follow-up PR may never come (from me, anyway) because I don't really have anything to gain from this feature being available on macOS. I've just pushed b648cfe containing minimal changes to disable this feature on macOS. Regarding the unit tests, I only gated Lastly, on UDP: I didn't manage to get a single ENOBUFS on FreeBSD when I tried that. Instead, the packets were silently dropped, resulting in equally flaky tests but with no obvious way forward. But for UDP that's probably acceptable, because it should be fairly obvious to anyone that UDP doesn't make any guarantees. |
Summary
Adds a
unix_mode
option to thesocket
sink.The implementation is somewhat based on how the
statsd
sink handles this, but not to the full extent, as that felt like too big of a refactor for me to deal with.Change Type
Is this a breaking change?
How did you test this PR?
make test
on x86_64 Ubuntu 22.04 running within WSL, and thento ensure I didn't forget any of the
#[cfg(unix)]
attributes.Other than this, I also performed some basic tests with a trivial UDS listener implemented in Python.
Listener:
nc -lkuU /tmp/test_socket
(from package netcat-openbsd)Does this PR include user facing changes?
Checklist
Cargo.lock
), pleaserun
dd-rust-license-tool write
to regenerate the license inventory and commit the changes (if any). More details here.References