Skip to content

Commit

Permalink
feat(k8s): add http forwarding to the proxy library
Browse files Browse the repository at this point in the history
This consists of a layer7 proxy using the kube-rs text request.
Once kube-rs/kube#972 is released we'll tweak this commit
to make use of the raw request, thus avoid multiple data conversions.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Aug 4, 2022
1 parent c0c58e8 commit 845b683
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions k8s/proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ futures = "0.3.21"
anyhow = "1.0.44"
tracing = "0.1.28"
shutdown = { path = "../../utils/shutdown" }
serde_json = "1.0.82"
hyper = { version = "0.14.20", features = [ "client", "http1", "http2", "tcp", "stream" ] }

[dev-dependencies]
tracing-subscriber = "0.3.15"
27 changes: 27 additions & 0 deletions k8s/proxy/examples/http-forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use hyper::service::Service;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let selector = k8s_proxy::TargetSelector::svc_label("app", "api-rest");
let target = k8s_proxy::Target::new(selector, "http", "mayastor");
let uri = k8s_proxy::HttpForward::new(target).await?.uri().await?;

let proxy = k8s_proxy::HttpProxy::try_default().await?;
let mut svc = hyper::service::service_fn(|request: hyper::Request<hyper::body::Body>| {
let mut proxy = proxy.clone();
async move { proxy.call(request).await }
});

let request = hyper::Request::builder()
.method("GET")
.uri(&format!("{}/v0/nodes", uri))
.body(hyper::Body::empty())
.unwrap();

let result = svc.call(request).await?;
tracing::info!(?result, "http request complete");

Ok(())
}
242 changes: 242 additions & 0 deletions k8s/proxy/src/http_forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use crate::{
pod_selection::{AnyReady, PodSelection},
vx::{Pod, Service},
};
use hyper::{body, Response};
use kube::{
api::{Api, ListParams},
ResourceExt,
};
use std::{future::Future, pin::Pin};

/// Used to retrieve the `hyper::Uri` that can be used to proxy with the kubeapi server.
/// This uri may then be used with `HttpProxy` which is a `tower::Service`.
/// # Example
/// ```ignore
/// let selector = k8s_proxy::TargetSelector::svc_label("app", "api-rest");
/// let target = k8s_proxy::Target::new(selector, "http", "mayastor");
/// let hf = k8s_proxy::HttpForward::new(target).await?;
///
/// let uri = hf.uri().await?;
/// tracing::info!(%uri, "generated kube-api");
/// ```
#[derive(Clone)]
pub struct HttpForward {
target: crate::Target,
pod_api: Api<Pod>,
svc_api: Api<Service>,
}

impl HttpForward {
/// Return a new `Self`.
/// # Arguments
/// * `target` - the target we'll forward to
pub async fn new(target: crate::Target) -> anyhow::Result<Self> {
let client = kube::Client::try_default().await?;
let namespace = target.namespace.name_any();

Ok(Self {
target,
pod_api: Api::namespaced(client.clone(), &namespace),
svc_api: Api::namespaced(client, &namespace),
})
}

/// Returns the `hyper::Uri` that can be used to proxy with the kubeapi server.
pub async fn uri(self) -> anyhow::Result<hyper::Uri> {
let target = self.finder().find(&self.target).await?;
let uri = hyper::Uri::try_from(target)?;
tracing::info!(%uri, "generated kube-api");
Ok(uri)
}

fn finder(&self) -> TargetFinder {
TargetFinder {
pod_api: &self.pod_api,
svc_api: &self.svc_api,
}
}
}

/// A `tower::Service` that proxies requests to services/pods via the kubeapi server.
/// The client must connect using the appropriate `hyper::Uri`, which can be easily
/// generated using `HttpForward::uri`.
/// # Example
/// ```ignore
/// let selector = k8s_proxy::TargetSelector::svc_label("app", "api-rest");
/// let target = k8s_proxy::Target::new(selector, "http", "mayastor");
/// let pf = k8s_proxy::HttpForward::new(target).await?;
///
/// let uri = pf.uri().await?;
/// tracing::info!(%uri, "generated kube-api");
///
/// let proxy = k8s_proxy::HttpProxy::try_default().await?;
/// let mut svc = hyper::service::service_fn(|request: hyper::Request<hyper::body::Body>| {
/// let mut proxy = proxy.clone();
/// async move { proxy.call(request).await }
/// });
///
/// let request = hyper::Request::builder()
/// .method("GET")
/// .uri(&format!("{}/v0/nodes", uri))
/// .body(hyper::Body::empty())
/// .unwrap();
///
/// let result = svc.call(request).await?;
/// tracing::info!(?result, "http request complete");
/// ```
#[derive(Clone)]
pub struct HttpProxy {
client: kube::Client,
}
impl HttpProxy {
/// Returns a new `HttpProxy` using the provided `kube::Client`.
pub fn new(client: kube::Client) -> Self {
Self { client }
}
/// Tries to return a default `HttpProxy` with a default `kube::Client`.
pub async fn try_default() -> anyhow::Result<Self> {
Ok(Self {
client: kube::Client::try_default().await?,
})
}
}

impl hyper::service::Service<hyper::Request<body::Body>> for HttpProxy {
type Response = Response<body::Body>;
type Error = kube::Error;
type Future = Pin<Box<dyn Future<Output = Result<Response<body::Body>, kube::Error>> + Send>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, request: hyper::Request<body::Body>) -> Self::Future {
let client = self.client.clone();
Box::pin(async move {
let (parts, body) = request.into_parts();

let body_bytes = body::to_bytes(body).await.unwrap();
let bytes = body_bytes.to_vec();
let request = hyper::Request::from_parts(parts, bytes);
match client.request_text(request).await {
Ok(r) => Ok(Response::new(body::Body::from(r))),
Err(error) => match error {
kube::Error::Api(response) => {
// undo the debug print which created response.message
let message = serde_json::from_str::<serde_json::Value>(&response.message)
.map_err(kube::Error::SerdeError)?
.as_str()
.unwrap_or("")
.to_string();

Response::builder()
.status(response.code)
.body(body::Body::from(message))
.map_err(kube::Error::HttpError)
}
_ => Err(error),
},
}
})
}
}

/// Finds an `HttpTarget` which is either a pod/service name and port.
#[derive(Clone)]
struct TargetFinder<'a> {
pod_api: &'a Api<Pod>,
svc_api: &'a Api<Service>,
}
impl<'a> TargetFinder<'a> {
/// Finds the `HttpTarget` according to the specified target.
/// # Arguments
/// * `target` - the target to be found
async fn find(&self, target: &crate::Target) -> anyhow::Result<HttpTarget> {
let pod_api = self.pod_api;
let svc_api = self.svc_api;

let target = target.clone();
let namespace = target.namespace;
match target.selector {
crate::TargetSelector::PodName(name) => Ok(HttpTarget::new(
TargetName::Pod(name),
target.port,
namespace,
)),
crate::TargetSelector::PodLabel(selector) => {
let pods = pod_api.list(&Self::pod_params(&selector)).await?;
let pod = AnyReady {}.select(&pods.items, &selector)?;
Ok(HttpTarget::new(
TargetName::Pod(pod.name_any()),
target.port,
namespace,
))
}
crate::TargetSelector::ServiceLabel(selector) => {
let services = svc_api.list(&Self::svc_params(&selector)).await?;
let service = match services.items.into_iter().next() {
Some(service) => Ok(service),
None => Err(anyhow::anyhow!("Service '{}' not found", selector)),
}?;

Ok(HttpTarget::new(
TargetName::Service(service.name_any()),
target.port,
namespace,
))
}
}
}
fn pod_params(selector: &str) -> ListParams {
ListParams::default()
.labels(selector)
.fields("status.phase=Running")
}
fn svc_params(selector: &str) -> ListParams {
ListParams::default().labels(selector)
}
}

enum TargetName {
Pod(String),
Service(String),
}

/// A target which is can either be a pod or a service.
/// The port can be specified by name or number.
struct HttpTarget {
name: TargetName,
port: crate::Port,
namespace: crate::NameSpace,
}
impl HttpTarget {
fn new(name: TargetName, port: crate::Port, namespace: crate::NameSpace) -> Self {
Self {
name,
port,
namespace,
}
}
}

impl TryFrom<HttpTarget> for hyper::Uri {
type Error = hyper::http::uri::InvalidUri;

fn try_from(value: HttpTarget) -> Result<Self, Self::Error> {
let (resource, name) = match value.name {
TargetName::Pod(name) => ("pods", name),
TargetName::Service(name) => ("services", name),
};
let port = value.port.any();
let namespace = value.namespace.name_any();

hyper::Uri::try_from(format!(
"/api/v1/namespaces/{}/{}/{}:{}/proxy",
namespace, resource, name, port
))
}
}
10 changes: 10 additions & 0 deletions k8s/proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
//! The different proxies can be used to communicate with in-cluster pods/services using the
//! kubernetes api-server.
mod http_forward;
mod pod_selection;
mod port_forward;

/// Layer 7 proxies.
pub use http_forward::{HttpForward, HttpProxy};
/// Layer 4 proxies.
pub use port_forward::PortForward;

Expand Down Expand Up @@ -80,6 +83,13 @@ impl Port {
Port::Name(_) => None,
}
}
/// Returns the port as a string.
pub(crate) fn any(&self) -> String {
match self {
Port::Number(number) => number.to_string(),
Port::Name(name) => name.clone(),
}
}
}

/// A kubernetes target.
Expand Down

0 comments on commit 845b683

Please sign in to comment.