Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #83 from ovh/backoff
Browse files Browse the repository at this point in the history
feat(sink): add exponential backoff delay on error
  • Loading branch information
d33d33 authored Sep 13, 2018
2 parents 5e75df7 + 2e99166 commit 5bbb184
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 44 deletions.
64 changes: 44 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
[package]
name = "beamium"
version = "1.9.2"
version = "1.9.3"
authors = [ "d33d33 <[email protected]>" ]

build = "build.rs"

[dependencies]
backoff = "0.1.2"
bytes = "0.4"
clap = "2.29.0"
humantime="1.1.1"
yaml-rust = "0.4.0"
cast = "0.2.2"
nix = "0.7.0"
Expand All @@ -25,7 +27,7 @@ slog-scope = "4.0.1"
slog-syslog = "0.11.0"
flate2 = "1.0.1"
ctrlc = "3.0"
tokio-timer="0.1"
tokio-timer="0.2.6"

[dependencies.slog]
version = "2.1"
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,20 @@ labels: # Labels definitions (Optional)
Beamium can be customized through parameters. See available parameters bellow:
``` yaml
parameters: # Parameters definitions (Optional)
source-dir: sources # Beamer data source directory (Optional, default: sources)
sink-dir: sinks # Beamer data sink directory (Optional, default: sinks)
scan-period: 1000 # Delay(ms) between source/sink scan (Optional, default: 1000)
batch-count: 250 # Maximum number of files to process in a batch (Optional, default: 250)
batch-size: 200000 # Maximum batch size (Optional, default: 200000)
log-file: beamium.log # Log file (Optional, default: beamium.log)
log-level: 4 # Log level (Optional, default: info)
timeout: 500 # Http timeout (seconds) (Optional, default: 500)
router-parallel: 1 # Routing threads (Optional, default: 1)
source-dir: sources # Beamer data source directory (Optional, default: sources)
sink-dir: sinks # Beamer data sink directory (Optional, default: sinks)
scan-period: 1000 # Delay(ms) between source/sink scan (Optional, default: 1000)
batch-count: 250 # Maximum number of files to process in a batch (Optional, default: 250)
batch-size: 200000 # Maximum batch size (Optional, default: 200000)
log-file: beamium.log # Log file (Optional, default: beamium.log)
log-level: 4 # Log level (Optional, default: info)
timeout: 500 # Http timeout (seconds) (Optional, default: 500)
router-parallel: 1 # Routing threads (Optional, default: 1)
backoff: # Backoff configuration - slow down push on errors (Optional)
initial: 500ms # Initial interval (Optional, default: 500ms)
max: 1m # Max interval (Optional, default: 1m)
multiplier: 1.5 # Interval multiplier (Optional, default: 1.5)
randomization: 0.3 # Randomization factor - delay = interval * 0.3 (Optional, default: 0.3)
```
## Contributing
Expand Down
76 changes: 76 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! It set defaults and then load config from '/etc', local dir and provided path.
use cast;
use humantime::parse_duration;
use hyper;
use regex;
use slog;
Expand All @@ -16,9 +17,12 @@ use std::io;
use std::io::Read;
use std::path::Path;
use std::string::String;
use std::time::Duration;
use yaml_rust::{ScanError, YamlLoader};

pub const REST_TIME: u64 = 10;
pub const BACKOFF_WARN: Duration = Duration::from_millis(1000);
pub const CHUNK_SIZE: usize = 1024 * 1024;

#[derive(Debug, Clone)]
/// Config root.
Expand Down Expand Up @@ -75,6 +79,16 @@ pub struct Parameters {
pub syslog: bool,
pub timeout: u64,
pub router_parallel: u64,
pub backoff: Backoff,
}

#[derive(Debug, Clone)]
/// Backoff config.
pub struct Backoff {
pub initial: Duration,
pub max: Duration,
pub multiplier: f64,
pub randomization: f64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -178,6 +192,12 @@ pub fn load_config(config_path: &str) -> Result<Config, ConfigError> {
syslog: false,
timeout: 300,
router_parallel: 1,
backoff: Backoff {
initial: Duration::from_millis(500),
max: Duration::from_millis(60_000),
multiplier: 1.5,
randomization: 0.3,
},
},
};

Expand Down Expand Up @@ -512,6 +532,62 @@ fn load_path<P: AsRef<Path>>(file_path: P, config: &mut Config) -> Result<(), Co
.map_err(|_| "parameters.router-parallel is invalid".to_string())?;
config.parameters.router_parallel = router_parallel;
}

if !doc["parameters"]["backoff"].is_badvalue() {
if !doc["parameters"]["backoff"]["initial"].is_badvalue() {
let v = &doc["parameters"]["backoff"]["initial"];
let initial = v
.as_i64()
.and_then(|initial| cast::u64(initial).ok())
.map(|initial| Duration::from_millis(initial))
.or_else(|| v.as_str().and_then(|initial| parse_duration(initial).ok()))
.ok_or_else(|| {
"parameters.backoff.initial should be a duration string".to_string()
})?;
config.parameters.backoff.initial = initial;
}

if !doc["parameters"]["backoff"]["max"].is_badvalue() {
let v = &doc["parameters"]["backoff"]["max"];
let max = v
.as_i64()
.and_then(|max| cast::u64(max).ok())
.map(|max| Duration::from_millis(max))
.or_else(|| v.as_str().and_then(|max| parse_duration(max).ok()))
.ok_or_else(|| {
"parameters.backoff.max should be a duration string".to_string()
})?;
config.parameters.backoff.max = max;
}

if !doc["parameters"]["backoff"]["multiplier"].is_badvalue() {
let v = &doc["parameters"]["backoff"]["multiplier"];
let multiplier = v.as_f64().ok_or_else(|| {
"parameters.backoff.multiplier should be a number".to_string()
})?;

if multiplier < 0.0 {
return Err(ConfigError::from(
"parameters.backoff.multiplier is negative",
));
}
config.parameters.backoff.multiplier = multiplier;
}

if !doc["parameters"]["backoff"]["randomization"].is_badvalue() {
let v = &doc["parameters"]["backoff"]["randomization"];
let randomization = v.as_f64().ok_or_else(|| {
"parameters.backoff.randomization should be a number".to_string()
})?;

if randomization < 0.0 || randomization > 1.0 {
return Err(ConfigError::from(
"parameters.backoff.randomization should in [0-1]",
));
}
config.parameters.backoff.randomization = randomization;
}
}
}
}
Ok(())
Expand Down
Loading

0 comments on commit 5bbb184

Please sign in to comment.