Skip to content

Commit

Permalink
sinks/util/service/net/unix: use sinks/util/unix/UnixEither and move …
Browse files Browse the repository at this point in the history
…impls there
  • Loading branch information
jpovixwm committed Nov 14, 2024
1 parent 503cfc9 commit f30e104
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/sinks/util/service/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

#[cfg(unix)]
use std::path::PathBuf;
use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};

Check failure on line 15 in src/sinks/util/service/net/mod.rs

View workflow job for this annotation

GitHub Actions / Checks

unresolved import `crate::sinks::util::unix`

use crate::{
internal_events::{
Expand All @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode};
use self::tcp::TcpConnector;
use self::udp::UdpConnector;
#[cfg(unix)]
use self::unix::{UnixConnector, UnixEither};
use self::unix::UnixConnector;

use futures_util::{future::BoxFuture, FutureExt};
use snafu::{ResultExt, Snafu};
Expand Down
36 changes: 3 additions & 33 deletions src/sinks/util/service/net/unix.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::{
io,
os::fd::{AsFd, BorrowedFd},
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use snafu::ResultExt;
use tokio::{
io::AsyncWriteExt,
net::{UnixDatagram, UnixStream},
};
use tokio::net::{UnixDatagram, UnixStream};

use vector_lib::configurable::configurable_component;

use crate::net;
use crate::{net, sinks::util::unix::UnixEither};

Check failure on line 8 in src/sinks/util/service/net/unix.rs

View workflow job for this annotation

GitHub Actions / Checks

unresolved import `crate::sinks::util::unix`

use super::{net_error::*, ConnectorType, NetError, NetworkConnector};

Expand Down Expand Up @@ -74,29 +67,6 @@ impl UnixConnectorConfig {
}
}

pub(super) enum UnixEither {
Datagram(UnixDatagram),
Stream(UnixStream),
}

impl UnixEither {
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Datagram(datagram) => datagram.send(buf).await,
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
}
}
}

impl AsFd for UnixEither {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
Self::Datagram(datagram) => datagram.as_fd(),
Self::Stream(stream) => stream.as_fd(),
}
}
}

#[derive(Clone)]
pub(super) struct UnixConnector {
path: PathBuf,
Expand Down
30 changes: 27 additions & 3 deletions src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use std::{path::PathBuf, pin::Pin, time::Duration};
use std::{
io,
os::fd::{AsFd, BorrowedFd},
path::PathBuf,
pin::Pin,
time::Duration,
};

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, SinkExt, StreamExt};
use snafu::{ResultExt, Snafu};
use tokio::{
io::AsyncWriteExt,
net::{UnixDatagram, UnixStream},
time::sleep,
};
Expand Down Expand Up @@ -91,11 +98,29 @@ impl UnixSinkConfig {
}
}

enum UnixEither {
pub enum UnixEither {
Datagram(UnixDatagram),
Stream(UnixStream),
}

impl UnixEither {
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Datagram(datagram) => datagram.send(buf).await,
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
}
}
}

impl AsFd for UnixEither {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
Self::Datagram(datagram) => datagram.as_fd(),
Self::Stream(stream) => stream.as_fd(),
}
}
}

#[derive(Debug, Clone)]
struct UnixConnector {
pub path: PathBuf,
Expand Down Expand Up @@ -243,7 +268,6 @@ where
Ok(())
}

// Same as UdpSink
async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
let mut input = input.peekable();
Expand Down

0 comments on commit f30e104

Please sign in to comment.