Skip to content

Commit

Permalink
Make Infrastructure trait async (WIP)
Browse files Browse the repository at this point in the history
dkregistry-rs and shiplift need newer version of tokio (see
camallo/dkregistry-rs#138 and softprops/shiplift#191)
  • Loading branch information
schrieveslaach committed Jan 13, 2020
1 parent 5e4e7a8 commit 230ab70
Show file tree
Hide file tree
Showing 8 changed files with 874 additions and 404 deletions.
714 changes: 603 additions & 111 deletions api/Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ repository = "https://github.com/aixigo/PREvant/"
edition = "2018"

[dependencies]
base64 = "0.10"
async-trait = "0.1.22"
base64 = "0.11"
cached = "0.9"
chrono = { version = "0.4", features = ["serde"] }
clap = "2.33"
crossbeam = "0.7"
crossbeam-utils = "0.6"
dkregistry = "0.3"
dkregistry = { git = "https://github.com/schrieveslaach/dkregistry-rs.git", branch = "master" }
env_logger = "0.6"
failure = "0.1"
futures = "0.1"
futures = { version = "0.3", features = ["compat"] }
handlebars = "1.1"
http-api-problem = { version = "0.12", features = ["with_rocket"] }
kube = { version = "0.17", features = ["openapi"] }
Expand All @@ -29,7 +30,7 @@ serde_json = "1.0"
serde_regex = "0.4"
serde-value = "0.6"
serde_yaml = "0.8"
tokio = "0.1"
tokio = "0.2"
tokio-core = "0.1"
toml = "0.4"
rayon = "1.0"
Expand All @@ -43,7 +44,8 @@ url = "1.7"
yansi = "0.5"

[dependencies.shiplift]
version = "0.6"
git = "https://github.com/schrieveslaach/shiplift.git"
branch = "update-hyper"
default-features = false
features = ["unix-socket", "chrono"]

Expand Down
92 changes: 58 additions & 34 deletions api/src/apps/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use std::collections::HashSet;
use std::convert::From;
use std::sync::Mutex;
use std::time::Duration;
use tokio::runtime::Runtime;
use yansi::Paint;

cached_key_result! {
Expand Down Expand Up @@ -142,7 +143,9 @@ impl AppsService {
&self,
request_info: &RequestInfo,
) -> Result<MultiMap<String, Service>, AppsServiceError> {
let mut services = self.infrastructure.get_services()?;
let mut runtime = Runtime::new().expect("Should create runtime");

let mut services = runtime.block_on(self.infrastructure.get_services())?;

let mut all_services: Vec<&mut Service> = services
.iter_all_mut()
Expand Down Expand Up @@ -170,13 +173,13 @@ impl AppsService {
}
}

fn configs_to_replicate(
async fn configs_to_replicate(
&self,
services_to_deploy: &Vec<ServiceConfig>,
app_name: &String,
replicate_from_app_name: &String,
) -> Result<Vec<ServiceConfig>, AppsServiceError> {
let running_services = self.infrastructure.get_configs_of_app(&app_name)?;
let running_services = self.infrastructure.get_configs_of_app(&app_name).await?;
let running_service_names = running_services
.iter()
.filter(|c| c.container_type() == &ContainerType::Instance)
Expand All @@ -190,7 +193,8 @@ impl AppsService {

Ok(self
.infrastructure
.get_configs_of_app(&replicate_from_app_name)?
.get_configs_of_app(&replicate_from_app_name)
.await?
.into_iter()
.filter(|config| !service_names.contains(config.service_name()))
.filter(|config| !running_service_names.contains(config.service_name()))
Expand Down Expand Up @@ -226,15 +230,16 @@ impl AppsService {
Some(guard) => guard,
};

let mut runtime = Runtime::new().expect("Should create runtime");
let mut configs: Vec<ServiceConfig> = service_configs.clone();

let replicate_from_app_name = replicate_from.unwrap_or_else(|| String::from("master"));
if &replicate_from_app_name != app_name {
configs.extend(self.configs_to_replicate(
configs.extend(runtime.block_on(self.configs_to_replicate(
service_configs,
app_name,
&replicate_from_app_name,
)?);
))?);
}

for config in configs.iter_mut() {
Expand All @@ -258,12 +263,12 @@ impl AppsService {

AppsService::merge_companions_by_service_name(service_companions, &mut configs);
AppsService::merge_companions_by_service_name(
self.get_application_companion_configs(app_name, &configs)?,
runtime.block_on(self.get_application_companion_configs(app_name, &configs))?,
&mut configs,
);

let images_service = ImagesService::new();
let mappings = images_service.resolve_image_ports(&configs)?;
let mappings = runtime.block_on(images_service.resolve_image_ports(&configs))?;
for config in configs.iter_mut() {
if let Some(port) = mappings.get(config) {
config.set_port(port.clone());
Expand All @@ -276,11 +281,11 @@ impl AppsService {
index1.cmp(&index2)
});

let services = self.infrastructure.deploy_services(
let services = runtime.block_on(self.infrastructure.deploy_services(
app_name,
&configs,
&self.config.container_config(),
)?;
))?;

Ok(services)
}
Expand Down Expand Up @@ -322,7 +327,7 @@ impl AppsService {
.collect()
}

fn get_application_companion_configs(
async fn get_application_companion_configs(
&self,
app_name: &String,
service_configs: &Vec<ServiceConfig>,
Expand All @@ -332,7 +337,8 @@ impl AppsService {
// TODO: make sure that service companions are included!
for config in self
.infrastructure
.get_configs_of_app(app_name)?
.get_configs_of_app(app_name)
.await?
.into_iter()
.filter(|config| {
service_configs
Expand Down Expand Up @@ -368,11 +374,15 @@ impl AppsService {

/// Deletes all services for the given `app_name`.
pub fn delete_app(&self, app_name: &String) -> Result<Vec<Service>, AppsServiceError> {
match self.infrastructure.get_services()?.get_vec(app_name) {
let mut runtime = Runtime::new().expect("Should create runtime");
match runtime
.block_on(self.infrastructure.get_services())?
.get_vec(app_name)
{
None => Err(AppsServiceError::AppNotFound {
app_name: app_name.clone(),
}),
Some(_) => Ok(self.infrastructure.stop_services(app_name)?),
Some(_) => Ok(runtime.block_on(self.infrastructure.stop_services(app_name))?),
}
}

Expand All @@ -383,10 +393,13 @@ impl AppsService {
since: &Option<DateTime<FixedOffset>>,
limit: usize,
) -> Result<Option<LogChunk>, AppsServiceError> {
match self
.infrastructure
.get_logs(app_name, service_name, since, limit)?
{
let mut runtime = Runtime::new().expect("Should create runtime");
match runtime.block_on(self.infrastructure.get_logs(
app_name,
service_name,
since,
limit,
))? {
None => Ok(None),
Some(ref logs) if logs.is_empty() => Ok(None),
Some(logs) => Ok(Some(LogChunk::from(logs))),
Expand All @@ -399,10 +412,11 @@ impl AppsService {
service_name: &String,
status: ServiceStatus,
) -> Result<Option<Service>, AppsServiceError> {
match self
.infrastructure
.change_status(app_name, service_name, status)?
{
let mut runtime = Runtime::new().expect("Should create runtime");
match runtime.block_on(
self.infrastructure
.change_status(app_name, service_name, status),
)? {
Some(service) => {
let mut cache = WEB_HOST_META.lock().unwrap();
(*cache).cache_remove(service.id());
Expand Down Expand Up @@ -634,9 +648,11 @@ mod tests {

apps.create_or_update(&String::from("master"), None, &service_configs!("mariadb"))?;

let configs = apps
.infrastructure
.get_configs_of_app(&String::from("master"))?;
let mut runtime = Runtime::new().expect("Should create runtime");
let configs = runtime.block_on(
apps.infrastructure
.get_configs_of_app(&String::from("master")),
)?;
assert_eq!(configs.len(), 1);

let volumes = configs.get(0).unwrap().volumes().unwrap();
Expand Down Expand Up @@ -670,9 +686,11 @@ mod tests {
&service_configs!("mariadb"),
)?;

let configs = apps
.infrastructure
.get_configs_of_app(&String::from("master-1.x"))?;
let mut runtime = Runtime::new().expect("Should create runtime");
let configs = runtime.block_on(
apps.infrastructure
.get_configs_of_app(&String::from("master-1.x")),
)?;
assert_eq!(configs.len(), 1);

let volumes = configs.get(0).unwrap().volumes();
Expand Down Expand Up @@ -781,9 +799,12 @@ Log msg 3 of service-a of app master
assert_contains_service!(services, "openid", ContainerType::Instance);
assert_contains_service!(services, "db", ContainerType::Instance);

let openid_configs: Vec<ServiceConfig> = apps
.infrastructure
.get_configs_of_app(&String::from("master"))?
let mut runtime = Runtime::new().expect("Should create runtime");
let openid_configs: Vec<ServiceConfig> = runtime
.block_on(
apps.infrastructure
.get_configs_of_app(&String::from("master")),
)?
.into_iter()
.filter(|config| config.service_name() == "openid")
.collect();
Expand Down Expand Up @@ -830,9 +851,12 @@ Log msg 3 of service-a of app master
assert_eq!(services.len(), 1);
assert_contains_service!(services, "openid", ContainerType::Instance);

let openid_configs: Vec<ServiceConfig> = apps
.infrastructure
.get_configs_of_app(&String::from("master"))?
let mut runtime = Runtime::new().expect("Should create runtime");
let openid_configs: Vec<ServiceConfig> = runtime
.block_on(
apps.infrastructure
.get_configs_of_app(&String::from("master")),
)?
.into_iter()
.filter(|config| config.service_name() == "openid")
.collect();
Expand Down
Loading

0 comments on commit 230ab70

Please sign in to comment.