Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(socket sink): support unix datagram mode #21762

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:

- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)

This option only applies when `mode = "unix"`.

authors: jpovixwm
74 changes: 60 additions & 14 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {

#[cfg(test)]
mod test {
#[cfg(unix)]
use std::path::PathBuf;
use std::{
future::ready,
net::{SocketAddr, UdpSocket},
};
#[cfg(unix)]
use std::{os::unix::net::UnixDatagram, path::PathBuf};

use futures::stream::StreamExt;
use futures_util::stream;
Expand Down Expand Up @@ -196,14 +196,42 @@ mod test {
crate::test_util::test_generate_config::<SocketSinkConfig>();
}

async fn test_udp(addr: SocketAddr) {
let receiver = UdpSocket::bind(addr).unwrap();
enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram),
}

enum DatagramSocketAddr {
Udp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
}

async fn test_datagram(datagram_addr: DatagramSocketAddr) {
let receiver = match &datagram_addr {
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => {
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
}
};

let config = SocketSinkConfig {
mode: Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
mode: match &datagram_addr {
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
acknowledgements: Default::default(),
};

Expand All @@ -218,9 +246,13 @@ mod test {
.expect("Running sink failed");

let mut buf = [0; 256];
let (size, _src_addr) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let size = match &receiver {
DatagramSocket::Udp(sock) => {
sock.recv_from(&mut buf).expect("Did not receive message").0
}
#[cfg(unix)]
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
};

let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
Expand All @@ -234,14 +266,25 @@ mod test {
async fn udp_ipv4() {
trace_init();

test_udp(next_addr()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
}

#[tokio::test]
async fn udp_ipv6() {
trace_init();

test_udp(next_addr_v6()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
}

#[cfg(unix)]
#[tokio::test]
async fn unix_datagram() {
trace_init();

test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
"unix_datagram_socket_test",
)))
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -292,7 +335,10 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(out_path),
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
106 changes: 106 additions & 0 deletions src/sinks/util/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#[cfg(unix)]
use std::path::PathBuf;

use bytes::BytesMut;
use futures::{stream::BoxStream, StreamExt};
use futures_util::stream::Peekable;
use tokio::net::UdpSocket;
#[cfg(unix)]
use tokio::net::UnixDatagram;
use tokio_util::codec::Encoder;
use vector_lib::internal_event::RegisterInternalEvent;
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
use vector_lib::EstimatedJsonEncodedSizeOf;

use crate::{
codecs::Transformer,
event::{Event, EventStatus, Finalizable},
internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
};

#[cfg(unix)]
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};

pub enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram, PathBuf),
}

pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
input: &mut Peekable<BoxStream<'_, Event>>,
mut socket: DatagramSocket,
transformer: &Transformer,
encoder: &mut E,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) {
while let Some(mut event) = input.next().await {
let byte_size = event.estimated_json_encoded_size_of();

transformer.transform(&mut event);

let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
continue;
}

match send_datagram(&mut socket, &bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
mode: match socket {
DatagramSocket::Udp(_) => SocketMode::Udp,
#[cfg(unix)]
DatagramSocket::Unix(..) => SocketMode::Unix,
},
count: 1,
byte_size,
});

bytes_sent.emit(ByteSize(bytes.len()));
finalizers.update_status(EventStatus::Delivered);
}
Err(error) => {
match socket {
DatagramSocket::Udp(_) => emit!(SocketSendError {
mode: SocketMode::Udp,
error
}),
#[cfg(unix)]
DatagramSocket::Unix(_, path) => {
emit!(UnixSocketSendError {
path: path.as_path(),
error: &error
})
}
};
finalizers.update_status(EventStatus::Errored);
return;
}
}
}
}

async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
let sent = match socket {
DatagramSocket::Udp(udp) => udp.send(buf).await,
#[cfg(unix)]
DatagramSocket::Unix(uds, _) => uds.send(buf).await,
}?;
if sent != buf.len() {
match socket {
DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
data_size: buf.len(),
sent,
}),
#[cfg(unix)]
DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
data_size: buf.len(),
sent,
}),
}
}
Ok(())
}
1 change: 1 addition & 0 deletions src/sinks/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod batch;
pub mod buffer;
pub mod builder;
pub mod compressor;
pub mod datagram;
pub mod encoding;
pub mod http;
pub mod metadata;
Expand Down
71 changes: 16 additions & 55 deletions src/sinks/util/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@ use std::{
};

use async_trait::async_trait;
use bytes::BytesMut;
use futures::{stream::BoxStream, FutureExt, StreamExt};
use snafu::{ResultExt, Snafu};
use tokio::{net::UdpSocket, time::sleep};
use tokio_util::codec::Encoder;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle, Protocol, Registered};
use vector_lib::EstimatedJsonEncodedSizeOf;
use vector_lib::internal_event::{BytesSent, Protocol, Registered};

use super::SinkBuildError;
use super::{
datagram::{send_datagrams, DatagramSocket},
SinkBuildError,
};
use crate::{
codecs::Transformer,
dns,
event::{Event, EventStatus, Finalizable},
internal_events::{
SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError,
UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError,
},
event::Event,
internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
net,
sinks::{
util::{retries::ExponentialBackoff, StreamSink},
Expand Down Expand Up @@ -198,58 +196,21 @@ where

let mut encoder = self.encoder.clone();
while Pin::new(&mut input).peek().await.is_some() {
let mut socket = self.connector.connect_backoff().await;
while let Some(mut event) = input.next().await {
let byte_size = event.estimated_json_encoded_size_of();

self.transformer.transform(&mut event);

let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
continue;
}

match udp_send(&mut socket, &bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
mode: SocketMode::Udp,
count: 1,
byte_size,
});

self.bytes_sent.emit(ByteSize(bytes.len()));
finalizers.update_status(EventStatus::Delivered);
}
Err(error) => {
emit!(SocketSendError {
mode: SocketMode::Udp,
error
});
finalizers.update_status(EventStatus::Errored);
break;
}
}
}
let socket = self.connector.connect_backoff().await;
send_datagrams(
&mut input,
DatagramSocket::Udp(socket),
&self.transformer,
&mut encoder,
&self.bytes_sent,
)
.await;
}

Ok(())
}
}

async fn udp_send(socket: &mut UdpSocket, buf: &[u8]) -> tokio::io::Result<()> {
let sent = socket.send(buf).await?;
if sent != buf.len() {
emit!(UdpSendIncompleteError {
data_size: buf.len(),
sent,
});
}
Ok(())
}

pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
match remote_addr {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
Expand Down
Loading