Skip to content

Commit

Permalink
feat: Added grpc based health check (#6441)
Browse files Browse the repository at this point in the history
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
  • Loading branch information
Sarthak1799 and hyperswitch-bot[bot] authored Nov 26, 2024
1 parent 8fbb766 commit e922f96
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 17 deletions.
1 change: 1 addition & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,4 @@ connector_list = "cybersource" # Supported connectors for network tokenization
[grpc_client.dynamic_routing_client] # Dynamic Routing Client Configuration
host = "localhost" # Client Host
port = 7000 # Client Port
service = "dynamo" # Service name
1 change: 1 addition & 0 deletions config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,4 @@ check_token_status_url= "" # base url to check token status from token servic
[grpc_client.dynamic_routing_client] # Dynamic Routing Client Configuration
host = "localhost" # Client Host
port = 7000 # Client Port
service = "dynamo" # Service name
1 change: 1 addition & 0 deletions crates/api_models/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ v1 = ["common_utils/v1"]
v2 = ["common_utils/v2", "customer_v2"]
customer_v2 = ["common_utils/customer_v2"]
payment_methods_v2 = ["common_utils/payment_methods_v2"]
dynamic_routing = []

[dependencies]
actix-web = { version = "4.5.1", optional = true }
Expand Down
13 changes: 13 additions & 0 deletions crates/api_models/src/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::hash_map::HashMap;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RouterHealthCheckResponse {
pub database: bool,
Expand All @@ -9,10 +10,22 @@ pub struct RouterHealthCheckResponse {
#[cfg(feature = "olap")]
pub opensearch: bool,
pub outgoing_request: bool,
#[cfg(feature = "dynamic_routing")]
pub grpc_health_check: HealthCheckMap,
}

impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {}

/// gRPC based services eligible for Health check
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HealthCheckServices {
/// Dynamic routing service
DynamicRoutingService,
}

pub type HealthCheckMap = HashMap<HealthCheckServices, bool>;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: bool,
Expand Down
18 changes: 14 additions & 4 deletions crates/external_services/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "dynamic_routing")]
{
// Get the directory of the current crate
let proto_file = router_env::workspace_path()
.join("proto")
.join("success_rate.proto");

let proto_path = router_env::workspace_path().join("proto");
let success_rate_proto_file = proto_path.join("success_rate.proto");

let health_check_proto_file = proto_path.join("health_check.proto");
let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR")?);

// Compile the .proto file
tonic_build::compile_protos(proto_file).expect("Failed to compile success rate proto file");
tonic_build::configure()
.out_dir(out_dir)
.compile(
&[success_rate_proto_file, health_check_proto_file],
&[proto_path],
)
.expect("Failed to compile proto files");
}
Ok(())
}
35 changes: 34 additions & 1 deletion crates/external_services/src/grpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
/// Dyanimc Routing Client interface implementation
#[cfg(feature = "dynamic_routing")]
pub mod dynamic_routing;
/// gRPC based Heath Check Client interface implementation
#[cfg(feature = "dynamic_routing")]
pub mod health_check_client;
use std::{fmt::Debug, sync::Arc};

#[cfg(feature = "dynamic_routing")]
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
#[cfg(feature = "dynamic_routing")]
use health_check_client::HealthCheckClient;
#[cfg(feature = "dynamic_routing")]
use http_body_util::combinators::UnsyncBoxBody;
#[cfg(feature = "dynamic_routing")]
use hyper::body::Bytes;
#[cfg(feature = "dynamic_routing")]
use hyper_util::client::legacy::connect::HttpConnector;
use serde;
#[cfg(feature = "dynamic_routing")]
use tonic::Status;

#[cfg(feature = "dynamic_routing")]
/// Hyper based Client type for maintaining connection pool for all gRPC services
pub type Client = hyper_util::client::legacy::Client<HttpConnector, UnsyncBoxBody<Bytes, Status>>;

/// Struct contains all the gRPC Clients
#[derive(Debug, Clone)]
pub struct GrpcClients {
/// The routing client
#[cfg(feature = "dynamic_routing")]
pub dynamic_routing: RoutingStrategy,
/// Health Check client for all gRPC services
#[cfg(feature = "dynamic_routing")]
pub health_client: HealthCheckClient,
}
/// Type that contains the configs required to construct a gRPC client with its respective services.
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize, Default)]
Expand All @@ -29,17 +49,30 @@ impl GrpcClientSettings {
/// This function will be called at service startup.
#[allow(clippy::expect_used)]
pub async fn get_grpc_client_interface(&self) -> Arc<GrpcClients> {
#[cfg(feature = "dynamic_routing")]
let client =
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.http2_only(true)
.build_http();

#[cfg(feature = "dynamic_routing")]
let dynamic_routing_connection = self
.dynamic_routing_client
.clone()
.get_dynamic_routing_connection()
.get_dynamic_routing_connection(client.clone())
.await
.expect("Failed to establish a connection with the Dynamic Routing Server");

#[cfg(feature = "dynamic_routing")]
let health_client = HealthCheckClient::build_connections(self, client)
.await
.expect("Failed to build gRPC connections");

Arc::new(GrpcClients {
#[cfg(feature = "dynamic_routing")]
dynamic_routing: dynamic_routing_connection,
#[cfg(feature = "dynamic_routing")]
health_client,
})
}
}
17 changes: 6 additions & 11 deletions crates/external_services/src/grpc_client/dynamic_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use api_models::routing::{
};
use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom};
use error_stack::ResultExt;
use http_body_util::combinators::UnsyncBoxBody;
use hyper::body::Bytes;
use hyper_util::client::legacy::connect::HttpConnector;
use router_env::logger;
use serde;
use success_rate::{
Expand All @@ -18,7 +15,8 @@ use success_rate::{
InvalidateWindowsResponse, LabelWithStatus, UpdateSuccessRateWindowConfig,
UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
};
use tonic::Status;

use super::Client;
#[allow(
missing_docs,
unused_qualifications,
Expand All @@ -45,8 +43,6 @@ pub enum DynamicRoutingError {
SuccessRateBasedRoutingFailure(String),
}

type Client = hyper_util::client::legacy::Client<HttpConnector, UnsyncBoxBody<Bytes, Status>>;

/// Type that consists of all the services provided by the client
#[derive(Debug, Clone)]
pub struct RoutingStrategy {
Expand All @@ -64,6 +60,8 @@ pub enum DynamicRoutingClientConfig {
host: String,
/// The port of the client
port: u16,
/// Service name
service: String,
},
#[default]
/// If the dynamic routing client config has been disabled
Expand All @@ -74,13 +72,10 @@ impl DynamicRoutingClientConfig {
/// establish connection with the server
pub async fn get_dynamic_routing_connection(
self,
client: Client,
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
let client =
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.http2_only(true)
.build_http();
let success_rate_client = match self {
Self::Enabled { host, port } => {
Self::Enabled { host, port, .. } => {
let uri = format!("http://{}:{}", host, port).parse::<tonic::transport::Uri>()?;
logger::info!("Connection established with dynamic routing gRPC Server");
Some(SuccessRateCalculatorClient::with_origin(client, uri))
Expand Down
149 changes: 149 additions & 0 deletions crates/external_services/src/grpc_client/health_check_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{collections::HashMap, fmt::Debug};

use api_models::health_check::{HealthCheckMap, HealthCheckServices};
use common_utils::{errors::CustomResult, ext_traits::AsyncExt};
use error_stack::ResultExt;
pub use health_check::{
health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
HealthCheckResponse,
};
use router_env::logger;

#[allow(
missing_docs,
unused_qualifications,
clippy::unwrap_used,
clippy::as_conversions,
clippy::use_self
)]
pub mod health_check {
tonic::include_proto!("grpc.health.v1");
}

use super::{Client, DynamicRoutingClientConfig, GrpcClientSettings};

/// Result type for Dynamic Routing
pub type HealthCheckResult<T> = CustomResult<T, HealthCheckError>;
/// Dynamic Routing Errors
#[derive(Debug, Clone, thiserror::Error)]
pub enum HealthCheckError {
/// The required input is missing
#[error("Missing fields: {0} for building the Health check connection")]
MissingFields(String),
/// Error from gRPC Server
#[error("Error from gRPC Server : {0}")]
ConnectionError(String),
/// status is invalid
#[error("Invalid Status from server")]
InvalidStatus,
}

/// Health Check Client type
#[derive(Debug, Clone)]
pub struct HealthCheckClient {
/// Health clients for all gRPC based services
pub clients: HashMap<HealthCheckServices, HealthClient<Client>>,
}

impl HealthCheckClient {
/// Build connections to all gRPC services
pub async fn build_connections(
config: &GrpcClientSettings,
client: Client,
) -> Result<Self, Box<dyn std::error::Error>> {
let dynamic_routing_config = &config.dynamic_routing_client;
let connection = match dynamic_routing_config {
DynamicRoutingClientConfig::Enabled {
host,
port,
service,
} => Some((host.clone(), *port, service.clone())),
_ => None,
};

let mut client_map = HashMap::new();

if let Some(conn) = connection {
let uri = format!("http://{}:{}", conn.0, conn.1).parse::<tonic::transport::Uri>()?;
let health_client = HealthClient::with_origin(client, uri);

client_map.insert(HealthCheckServices::DynamicRoutingService, health_client);
}

Ok(Self {
clients: client_map,
})
}
/// Perform health check for all services involved
pub async fn perform_health_check(
&self,
config: &GrpcClientSettings,
) -> HealthCheckResult<HealthCheckMap> {
let dynamic_routing_config = &config.dynamic_routing_client;
let connection = match dynamic_routing_config {
DynamicRoutingClientConfig::Enabled {
host,
port,
service,
} => Some((host.clone(), *port, service.clone())),
_ => None,
};

let health_client = self
.clients
.get(&HealthCheckServices::DynamicRoutingService);

// SAFETY : This is a safe cast as there exists a valid
// integer value for this variant
#[allow(clippy::as_conversions)]
let expected_status = ServingStatus::Serving as i32;

let mut service_map = HealthCheckMap::new();

let health_check_succeed = connection
.as_ref()
.async_map(|conn| self.get_response_from_grpc_service(conn.2.clone(), health_client))
.await
.transpose()
.change_context(HealthCheckError::ConnectionError(
"error calling dynamic routing service".to_string(),
))
.map_err(|err| logger::error!(error=?err))
.ok()
.flatten()
.is_some_and(|resp| resp.status == expected_status);

connection.and_then(|_conn| {
service_map.insert(
HealthCheckServices::DynamicRoutingService,
health_check_succeed,
)
});

Ok(service_map)
}

async fn get_response_from_grpc_service(
&self,
service: String,
client: Option<&HealthClient<Client>>,
) -> HealthCheckResult<HealthCheckResponse> {
let request = tonic::Request::new(HealthCheckRequest { service });

let mut client = client
.ok_or(HealthCheckError::MissingFields(
"[health_client]".to_string(),
))?
.clone();

let response = client
.check(request)
.await
.change_context(HealthCheckError::ConnectionError(
"Failed to call dynamic routing service".to_string(),
))?
.into_inner();

Ok(response)
}
}
2 changes: 1 addition & 1 deletion crates/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ v2 = ["customer_v2", "payment_methods_v2", "common_default", "api_models/v2", "d
v1 = ["common_default", "api_models/v1", "diesel_models/v1", "hyperswitch_domain_models/v1", "storage_impl/v1", "hyperswitch_interfaces/v1", "kgraph_utils/v1", "common_utils/v1"]
customer_v2 = ["api_models/customer_v2", "diesel_models/customer_v2", "hyperswitch_domain_models/customer_v2", "storage_impl/customer_v2"]
payment_methods_v2 = ["api_models/payment_methods_v2", "diesel_models/payment_methods_v2", "hyperswitch_domain_models/payment_methods_v2", "storage_impl/payment_methods_v2", "common_utils/payment_methods_v2"]
dynamic_routing = ["external_services/dynamic_routing", "storage_impl/dynamic_routing"]
dynamic_routing = ["external_services/dynamic_routing", "storage_impl/dynamic_routing", "api_models/dynamic_routing"]

# Partial Auth
# The feature reduces the overhead of the router authenticating the merchant for every request, and trusts on `x-merchant-id` header to be present in the request.
Expand Down
23 changes: 23 additions & 0 deletions crates/router/src/core/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(feature = "olap")]
use analytics::health_check::HealthCheck;
#[cfg(feature = "dynamic_routing")]
use api_models::health_check::HealthCheckMap;
use api_models::health_check::HealthState;
use error_stack::ResultExt;
use router_env::logger;
Expand Down Expand Up @@ -28,6 +30,11 @@ pub trait HealthCheckInterface {
async fn health_check_opensearch(
&self,
) -> CustomResult<HealthState, errors::HealthCheckDBError>;

#[cfg(feature = "dynamic_routing")]
async fn health_check_grpc(
&self,
) -> CustomResult<HealthCheckMap, errors::HealthCheckGRPCServiceError>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -158,4 +165,20 @@ impl HealthCheckInterface for app::SessionState {
logger::debug!("Outgoing request successful");
Ok(HealthState::Running)
}

#[cfg(feature = "dynamic_routing")]
async fn health_check_grpc(
&self,
) -> CustomResult<HealthCheckMap, errors::HealthCheckGRPCServiceError> {
let health_client = &self.grpc_client.health_client;
let grpc_config = &self.conf.grpc_client;

let health_check_map = health_client
.perform_health_check(grpc_config)
.await
.change_context(errors::HealthCheckGRPCServiceError::FailedToCallService)?;

logger::debug!("Health check successful");
Ok(health_check_map)
}
}
Loading

0 comments on commit e922f96

Please sign in to comment.