Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breakers #302

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions docs/configuration.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/ec_vs_other_software.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ this job, because of the JRE runtime. This means higher memory footprint and lat
Linkerd v2 was rewritten in Rust to get better performance. Unfortunately, just like Istio - it's Kubernetes only.

### Consul Connect
[Consul Connect](https://www.consul.io/docs/connect/index.html) is a simple way to deploy Envoy to current
[Consul Connect](https://www.consul.io/docs/connect) is a simple way to deploy Envoy to current
Consul based infrastructure.
The problem with Consul Connect is that versions prior to 1.6.0 had very limited traffic control capabilities.
We want to have a fallback to instances from other DCs, canary deployment and other features specific to our
Expand All @@ -35,4 +35,4 @@ Control Plane implementation on. They're not a sufficient Control Plane by thems
Discovery Service.

Envoy Control is based on Java Control Plane and integrates with Consul by default. It also adds features like
Cross DC Synchronization or Permission management.
Cross DC Synchronization or Permission management.
2 changes: 1 addition & 1 deletion docs/features/multi_dc_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The state is available in `LocalClusterStateChanges#latestServiceState`.

Then build a `RemoteServices` class providing:

* [AsyncControlPlaneClient](https://github.com/allegro/envoy-control/blob/master/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/AsyncRestTemplateControlPlaneClient.kt) - an HTTP client
* [AsyncControlPlaneClient](https://github.com/allegro/envoy-control/blob/master/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt) - an HTTP client
* [ControlPlaneInstanceFetcher](https://github.com/allegro/envoy-control/blob/master/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/synchronization/SimpleConsulInstanceFetcher.kt) - the strategy of retrieving other Envoy Control from given cluster
* `remoteClusters` - list of remote clusters

Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ first-class integration with Consul.

Popular Service Mesh solutions provide integration with Consul by polling periodically the state of all services.
Assuming we polled the state each second in order to minimize change propagation latency, we would have to send a request
for a [list of services](https://www.consul.io/api/catalog.html#list-services) and then a
for a [list of services](https://www.consul.io/api/catalog#list-services) and then a
[request per each service](https://www.consul.io/api/catalog.html#list-nodes-for-service).
With 1,000 services, this would generate 1,000 rps per one instance of Control Plane.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.google.protobuf.Value
import com.google.protobuf.util.Durations
import io.envoyproxy.controlplane.server.exception.RequestException
import io.grpc.Status
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.CircuitBreakerProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.EgressProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.util.StatusCodeFilterParser
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.util.StatusCodeFilterSettings
Expand Down Expand Up @@ -77,20 +79,7 @@ fun Value?.toOutgoing(properties: SnapshotProperties): Outgoing {
val allServiceDependenciesIdentifier = properties.outgoingPermissions.allServicesDependencies.identifier
val rawDependencies = this?.field("dependencies")?.list().orEmpty().map(::toRawDependency)
val allServicesDependencies = toAllServiceDependencies(rawDependencies, allServiceDependenciesIdentifier)
val defaultSettingsFromProperties = DependencySettings(
handleInternalRedirect = properties.egress.handleInternalRedirect,
timeoutPolicy = Outgoing.TimeoutPolicy(
idleTimeout = Durations.fromMillis(properties.egress.commonHttp.idleTimeout.toMillis()),
connectionIdleTimeout = Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis()),
requestTimeout = Durations.fromMillis(properties.egress.commonHttp.requestTimeout.toMillis())
),
retryPolicy = RetryPolicy(
numberRetries = properties.retryPolicy.numberOfRetries,
retryHostPredicate = properties.retryPolicy.retryHostPredicate,
hostSelectionRetryMaxAttempts = properties.retryPolicy.hostSelectionRetryMaxAttempts,
retryBackOff = properties.retryPolicy.retryBackOff
)
)
val defaultSettingsFromProperties = createDefaultDependencySettingFromEgressProperties(properties.egress)
val allServicesDefaultSettings = allServicesDependencies?.value.toSettings(defaultSettingsFromProperties)
val services = rawDependencies.filter { it.service != null && it.service != allServiceDependenciesIdentifier }
.map {
Expand All @@ -114,6 +103,47 @@ fun Value?.toOutgoing(properties: SnapshotProperties): Outgoing {
)
}

private fun createDefaultDependencySettingFromEgressProperties(egress: EgressProperties) : DependencySettings {
return DependencySettings(
handleInternalRedirect = egress.handleInternalRedirect,
timeoutPolicy = egress.commonHttp.let {
Outgoing.TimeoutPolicy(
idleTimeout = Durations.fromMillis(it.idleTimeout.toMillis()),
connectionIdleTimeout = Durations.fromMillis(it.connectionIdleTimeout.toMillis()),
requestTimeout = Durations.fromMillis(it.requestTimeout.toMillis())
)
},
retryPolicy = egress.retryPolicy.let { RetryPolicy(
numberRetries = it.numberOfRetries,
retryHostPredicate = it.retryHostPredicate,
hostSelectionRetryMaxAttempts = it.hostSelectionRetryMaxAttempts,
retryBackOff = it.retryBackOff
) },
circuitBreakers = egress.commonHttp.circuitBreakers.let { properties ->
CircuitBreakers(defaultThreshold = properties.defaultThreshold.toCircuitBreaker(),
highThreshold = properties.highThreshold.toCircuitBreaker())
}
)
}

fun CircuitBreakerProperties.toCircuitBreaker(): CircuitBreaker {
return CircuitBreaker(
priority = this.priority,
maxRequests = this.maxRequests,
maxPendingRequests = this.maxPendingRequests,
maxConnections = this.maxConnections,
maxRetries = this.maxRetries,
maxConnectionPools = this.maxConnectionPools,
trackRemaining = this.trackRemaining,
retryBudget = this.retryBudget?.let {
RetryBudget(
budgetPercent = it.budgetPercent,
minRetryConcurrency = it.minRetryConcurrency
)
}
)
}

@Suppress("ComplexCondition")
private fun toRawDependency(it: Value): RawDependency {
val service = it.field("service")?.stringValue
Expand Down Expand Up @@ -193,11 +223,13 @@ private fun Value?.toSettings(defaultSettings: DependencySettings): DependencySe
defaultSettings.retryPolicy
)
}
val circuitBreakers = this?.field("circuitBreakers")?.toCircuitBreakers(defaultSettings.circuitBreakers)

val shouldAllBeDefault = handleInternalRedirect == null &&
rewriteHostHeader == null &&
timeoutPolicy == null &&
retryPolicy == null
retryPolicy == null &&
circuitBreakers == null

return if (shouldAllBeDefault) {
defaultSettings
Expand All @@ -206,11 +238,44 @@ private fun Value?.toSettings(defaultSettings: DependencySettings): DependencySe
handleInternalRedirect = handleInternalRedirect ?: defaultSettings.handleInternalRedirect,
timeoutPolicy = timeoutPolicy ?: defaultSettings.timeoutPolicy,
rewriteHostHeader = rewriteHostHeader ?: defaultSettings.rewriteHostHeader,
retryPolicy = retryPolicy ?: defaultSettings.retryPolicy
retryPolicy = retryPolicy ?: defaultSettings.retryPolicy,
circuitBreakers = circuitBreakers ?: defaultSettings.circuitBreakers
)
}
}

private fun Value?.toCircuitBreakers(defaultCircuitBreakers: CircuitBreakers): CircuitBreakers {
return CircuitBreakers(
defaultThreshold = this?.field("defaultThreshold")?.toCircuitBreaker(defaultCircuitBreakers.defaultThreshold)
?: defaultCircuitBreakers.defaultThreshold,
highThreshold = this?.field("highThreshold")?.toCircuitBreaker(defaultCircuitBreakers.highThreshold)
?: defaultCircuitBreakers.highThreshold
)
}

private fun Value?.toCircuitBreaker(defaultCircuitBreaker: CircuitBreaker?): CircuitBreaker {
return CircuitBreaker(priority = this?.field("priority")?.stringValue?.let { RoutingPriority.fromString(it) }
?: defaultCircuitBreaker?.priority,
maxRequests = this?.field("maxRequests")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxRequests,
maxPendingRequests = this?.field("maxPendingRequests")?.numberValue?.toInt()
?: defaultCircuitBreaker?.maxPendingRequests,
maxConnections = this?.field("maxConnections")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxConnections,
maxRetries = this?.field("maxRetries")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxRetries,
maxConnectionPools = this?.field("maxConnectionPools")?.numberValue?.toInt()
?: defaultCircuitBreaker?.maxConnectionPools,
trackRemaining = this?.field("trackRemaining")?.boolValue ?: defaultCircuitBreaker?.trackRemaining,
retryBudget = this?.field("retryBudget")?.toRetryBudget(defaultCircuitBreaker?.retryBudget)
?: defaultCircuitBreaker?.retryBudget
)
}
private fun Value?.toRetryBudget(defaultRetryBudget: RetryBudget?): RetryBudget {
return RetryBudget(
budgetPercent = this?.field("budgetPercent")?.numberValue ?: defaultRetryBudget?.budgetPercent,
minRetryConcurrency = this?.field("minRetryConcurrency")?.numberValue?.toInt()
?: defaultRetryBudget?.minRetryConcurrency
)
}

private fun mapProtoToRetryPolicy(value: Value, defaultRetryPolicy: RetryPolicy): RetryPolicy {
return RetryPolicy(
retryOn = value.field("retryOn")?.listValue?.valuesList?.map { it.stringValue },
Expand Down Expand Up @@ -537,9 +602,42 @@ data class DependencySettings(
val handleInternalRedirect: Boolean = false,
val timeoutPolicy: Outgoing.TimeoutPolicy = Outgoing.TimeoutPolicy(),
val rewriteHostHeader: Boolean = false,
val retryPolicy: RetryPolicy = RetryPolicy()
val retryPolicy: RetryPolicy = RetryPolicy(),
val circuitBreakers: CircuitBreakers = CircuitBreakers()
)

data class CircuitBreakers(
val defaultThreshold: CircuitBreaker? = null,
val highThreshold: CircuitBreaker? = null
)

data class CircuitBreaker(
val priority: RoutingPriority? = null,
val maxRequests: Int? = null,
val maxPendingRequests: Int? = null,
val maxConnections: Int? = null,
val maxRetries: Int? = null,
val maxConnectionPools: Int? = null,
val trackRemaining: Boolean? = null,
val retryBudget: RetryBudget? = null
)

data class RetryBudget(val budgetPercent: Double? = null, val minRetryConcurrency: Int? = null)

enum class RoutingPriority {
DEFAULT, HIGH, UNRECOGNIZED;

companion object {
fun fromString(value: String): RoutingPriority {
return when (value.toUpperCase()) {
"DEFAULT" -> DEFAULT
"HIGH" -> HIGH
else -> UNRECOGNIZED
}
}
}
}

data class RetryPolicy(
val retryOn: List<String>? = null,
val hostSelectionRetryMaxAttempts: Long? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.OAuth
import pl.allegro.tech.servicemesh.envoycontrol.groups.PathMatchingType
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryBackOff
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate
import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPriority
import java.net.URI
import java.time.Duration

Expand All @@ -35,7 +36,6 @@ class SnapshotProperties {
var jwt = JwtFilterProperties()
var requireServiceName = false
var rateLimit = RateLimitProperties()
var retryPolicy = RetryPolicyProperties()
}

class MetricsProperties {
Expand Down Expand Up @@ -251,6 +251,7 @@ class EgressProperties {
var hostHeaderRewriting = HostHeaderRewritingProperties()
var headersToRemove = mutableListOf<String>()
var domains = mutableListOf<String>()
var retryPolicy = RetryPolicyProperties()
}

class IngressProperties {
Expand All @@ -265,19 +266,27 @@ class CommonHttpProperties {
var idleTimeout: Duration = Duration.ofSeconds(120)
var connectionIdleTimeout: Duration = Duration.ofSeconds(120)
var requestTimeout: Duration = Duration.ofSeconds(120)
var circuitBreakers: CircuitBreakers = CircuitBreakers()
var circuitBreakers: CircuitBreakersProperties = CircuitBreakersProperties()
}

class CircuitBreakersProperties {
var highThreshold = CircuitBreakerProperties(RoutingPriority.HIGH)
var defaultThreshold = CircuitBreakerProperties(RoutingPriority.DEFAULT)
}

class CircuitBreakers {
var highThreshold = Threshold("HIGH")
var defaultThreshold = Threshold("DEFAULT")
class CircuitBreakerProperties(var priority: RoutingPriority = RoutingPriority.DEFAULT) {
var maxRequests: Int = 1024
var maxPendingRequests: Int = 1024
var maxConnections: Int = 1024
var maxRetries: Int = 3
var maxConnectionPools: Int? = null
var trackRemaining: Boolean = false
var retryBudget: RetryBudgetProperties? = RetryBudgetProperties()
}

class Threshold(var priority: String) {
var maxConnections = 1024
var maxPendingRequests = 1024
var maxRequests = 1024
var maxRetries = 3
class RetryBudgetProperties {
malfoj89 marked this conversation as resolved.
Show resolved Hide resolved
var budgetPercent: Double = 20.0
var minRetryConcurrency: Int = 3
}

class Http2Properties {
Expand Down
Loading