Skip to content

Commit

Permalink
finagle: Make sure deadlines are respected in MethodBuilder global ti…
Browse files Browse the repository at this point in the history
…meout

Problem / Solution

Let's ensure that deadlines are respected in MethodBuilder. Deadlines should override global (or total) request timeout.

Differential Revision: https://phabricator.twitter.biz/D799473
  • Loading branch information
vkostyukov authored and jenkins committed Dec 15, 2021
1 parent 8dc79ec commit 135bb45
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import com.twitter.finagle.filter.RequestLogger
import com.twitter.finagle.naming.BindingFactory
import com.twitter.finagle.param._
import com.twitter.finagle.stack.nilStack
import com.twitter.finagle.stats.{
Client,
RelativeNameMarkingStatsReceiver,
RoleConfiguredStatsReceiver
}
import com.twitter.finagle.stats.Client
import com.twitter.finagle.stats.RelativeNameMarkingStatsReceiver
import com.twitter.finagle.stats.RoleConfiguredStatsReceiver
import com.twitter.finagle.util.Showable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ final class MethodBuilder[Req, Rep] private[finagle] (
brfParam: BackupRequestFilter.Param,
classifier: service.ResponseClassifier
): MethodBuilder[Req, Rep] = {
val stackClassifier =
if (!params.contains[param.ResponseClassifier]) None
else Some(params[param.ResponseClassifier].responseClassifier)
val idempotentedStackClassifier = idempotentify(stackClassifier, classifier)

val configClassifier = config.retry.underlyingClassifier
val idempotentedConfigClassifier = idempotentify(configClassifier, classifier)

Expand Down Expand Up @@ -386,7 +381,8 @@ final class MethodBuilder[Req, Rep] private[finagle] (
//
private[this] def newService(methodName: Option[String]): Service[Req, Rep] = {
materialize()
filters(methodName).andThen(wrappedService(methodName))
val withStats = configured(param.Stats(statsReceiver(methodName)))
withStats.filters(methodName).andThen(withStats.wrappedService(methodName))
}

/**
Expand Down Expand Up @@ -422,9 +418,11 @@ final class MethodBuilder[Req, Rep] private[finagle] (
): ServicePerEndpoint = {
materialize()

val withStats = configured(param.Stats(statsReceiver(methodName)))

builder
.servicePerEndpoint(wrappedService(methodName))
.filtered(filters(methodName))
.servicePerEndpoint(withStats.wrappedService(methodName))
.filtered(withStats.filters(methodName))
}

//
Expand Down Expand Up @@ -458,7 +456,7 @@ final class MethodBuilder[Req, Rep] private[finagle] (
methodPool.materialize(params)
}

private[this] def filters(methodName: Option[String]): Filter.TypeAgnostic = {
private def filters(methodName: Option[String]): Filter.TypeAgnostic = {
// Ordering of filters:
// Requests start at the top and traverse down.
// Responses flow back from the bottom up.
Expand All @@ -477,7 +475,6 @@ final class MethodBuilder[Req, Rep] private[finagle] (
// - Backup Requests
// - Service (Finagle client's stack, including Per Request Timeout)

val stats = statsReceiver(methodName)
val retries = withRetry
val timeouts = withTimeout

Expand All @@ -488,11 +485,11 @@ final class MethodBuilder[Req, Rep] private[finagle] (

config.traceInitializer
.andThen(config.filter)
.andThen(retries.logicalStatsFilter(stats))
.andThen(retries.logicalStatsFilter)
.andThen(retries.logFailuresFilter(clientName, methodName))
.andThen(failureSource)
.andThen(timeouts.totalFilter)
.andThen(retries.filter(stats))
.andThen(retries.filter)
.andThen(timeouts.perRequestFilter)
}

Expand Down Expand Up @@ -531,7 +528,11 @@ final class MethodBuilder[Req, Rep] private[finagle] (
private[this] def addToRegistry(name: Option[String]): Unit = {
val entry = registryEntry()
val keyPrefix = registryKeyPrefix(name)
ClientRegistry.register(entry, keyPrefix :+ "statsReceiver", statsReceiver(name).toString)

ClientRegistry.register(
entry,
keyPrefix :+ "statsReceiver",
params[param.Stats].statsReceiver.toString)

withTimeout.registryEntries.foreach {
case (suffix, value) =>
Expand All @@ -543,12 +544,11 @@ final class MethodBuilder[Req, Rep] private[finagle] (
}
}

private[this] def wrappedService(name: Option[String]): Service[Req, Rep] = {
private def wrappedService(name: Option[String]): Service[Req, Rep] = {
addToRegistry(name)
methodPool.open()

val backupRequestParams = params +
param.Stats(statsReceiver(name)) +
param.ResponseClassifier(config.retry.responseClassifier)

// register BackupRequestFilter under the same prefixes as other method entries
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.twitter.finagle.client

import com.twitter.finagle.{Filter, Service, param}
import com.twitter.finagle.service.{RequeueFilter, _}
import com.twitter.finagle.stats.{DenylistStatsReceiver, StatsReceiver}
import com.twitter.logging.{Level, Logger}
import com.twitter.util.{Future, Stopwatch, Throw, Try}
import com.twitter.finagle.Filter
import com.twitter.finagle.Service
import com.twitter.finagle.param
import com.twitter.finagle.service.RequeueFilter
import com.twitter.finagle.service._
import com.twitter.finagle.stats.DenylistStatsReceiver
import com.twitter.logging.Level
import com.twitter.logging.Logger
import com.twitter.util.Future
import com.twitter.util.Stopwatch
import com.twitter.util.Throw
import com.twitter.util.Try

/**
* @see [[BaseMethodBuilder]]
Expand Down Expand Up @@ -32,7 +39,7 @@ private[finagle] class MethodBuilderRetry[Req, Rep] private[client] (mb: MethodB
def disabled: MethodBuilder[Req, Rep] =
forClassifier(Disabled)

private[client] def filter(scopedStats: StatsReceiver): Filter.TypeAgnostic = {
private[client] def filter: Filter.TypeAgnostic = {
val classifier = mb.config.retry.responseClassifier
val maxRetries = mb.config.retry.maxRetries
if (classifier eq Disabled)
Expand All @@ -46,15 +53,16 @@ private[finagle] class MethodBuilderRetry[Req, Rep] private[client] (mb: MethodB
new RetryFilter[Req1, Rep1](
withoutRequeues,
mb.params[param.HighResTimer].timer,
scopedStats,
mb.params[param.Stats].statsReceiver,
mb.params[Retries.Budget].retryBudget
)
}
}
}
}

private[client] def logicalStatsFilter(stats: StatsReceiver): Filter.TypeAgnostic = {
private[client] def logicalStatsFilter: Filter.TypeAgnostic = {
val stats = mb.params[param.Stats].statsReceiver
val timeUnit = mb.params[StatsFilter.Param].unit
StatsFilter.typeAgnostic(
new DenylistStatsReceiver(stats.scope(LogicalScope), LogicalStatsDenylistFn),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.twitter.finagle.client

import com.twitter.finagle.{Filter, Service, SimpleFilter}
import com.twitter.finagle.Filter
import com.twitter.finagle.Service
import com.twitter.finagle.SimpleFilter
import com.twitter.util.tunable.Tunable
import com.twitter.util.{Duration, Future}
import com.twitter.util.Duration
import com.twitter.util.Future
import scala.collection.mutable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@ import com.twitter.util.Future
import com.twitter.util.Timer
import com.twitter.util.tunable.Tunable

private[finagle] object DeadlineOnlyToggle {
private[twitter] object DeadlineOnlyToggle {
private val enableToggle = CoreToggles("com.twitter.finagle.service.DeadlineOnly")
private var zoneEnabled = ServerInfo().zone.getOrElse("") == "atla"
private val zoneEnabled = ServerInfo().zone.contains("atla")
@volatile private var overridden: Option[Boolean] = None

//exposed for testing
def setEnabledZone(enabled: Boolean): Unit = zoneEnabled = enabled

def apply(trace: Tracing): Boolean = zoneEnabled && {
trace.idOption.flatMap(_._traceId) match {
case Some(spanId) => enableToggle(spanId.toLong.hashCode())
case None => false
/**
* This is exposed only to be used in selected tests. If you found this method by accident,
* there is a VERY high change you won't want to call it.
*/
def unsafeOverride(enabled: Option[Boolean]): Unit = overridden = enabled

def apply(trace: Tracing): Boolean = {
overridden match {
case Some(o) => o
case None =>
zoneEnabled && {
trace.idOption.flatMap(_._traceId) match {
case Some(spanId) => enableToggle(spanId.toLong.hashCode())
case None => false
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class MethodBuilderTest
intercept[GlobalRequestTimeoutException] {
awaitResult(client(1))
}
assert(stats.stat("current_deadline")().size > 0)
assert(stats.stat("timeout", "a_client", "current_deadline")().size > 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package com.twitter.finagle.context

import com.twitter.finagle.service.DeadlineOnlyToggle
import com.twitter.finagle.tracing.Flags
import com.twitter.finagle.tracing.SpanId
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.Time
import com.twitter.util.Duration
import com.twitter.util.Return
Expand Down Expand Up @@ -34,15 +30,9 @@ class DeadlineTest extends AnyFunSuite with AssertionsForJUnit with ScalaCheckDr
assert(Deadline.current == Some(sampled))
assert(Deadline.currentToggled == None)

com.twitter.finagle.toggle.flag.overrides
.let("com.twitter.finagle.service.DeadlineOnly", 1.0) {
val traceId = TraceId(Some(SpanId(42)), Some(SpanId(32)), SpanId(22), None, Flags(12))

Trace.letId(traceId) {
DeadlineOnlyToggle.setEnabledZone(true)
assert(Deadline.currentToggled == Some(sampled))
}
}
DeadlineOnlyToggle.unsafeOverride(Some(true))
try assert(Deadline.currentToggled == Some(sampled))
finally DeadlineOnlyToggle.unsafeOverride(None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.service.TimeoutFilterTest.TunableTimeoutHelper
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.toggle.flag
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.BufferingTracer
import com.twitter.finagle.tracing.Record
Expand Down Expand Up @@ -462,18 +461,9 @@ class TimeoutFilterTest extends AnyFunSuite with Matchers with MockitoSugar {
}

private def toggleOnCtx(fn: => Unit): Unit = {
// condition:
// 1. service zone == atla
// 2. _traceId exist
// 3. toggled up
DeadlineOnlyToggle.setEnabledZone(true)

flag.overrides.let("com.twitter.finagle.service.DeadlineOnly", 1.0) {
val traceContext1 = TraceId(Some(SpanId(0xabc)), None, SpanId(0x123), None)
Trace.letId(traceContext1) {
fn
}
}
DeadlineOnlyToggle.unsafeOverride(Some(true))
try fn
finally DeadlineOnlyToggle.unsafeOverride(None)
}

private def fakeTraceRecord(deadline: Deadline): Annotation.BinaryAnnotation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package com.twitter.finagle.thriftmux
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.mux.Request
import com.twitter.finagle.mux.Response
import com.twitter.finagle.service.DeadlineOnlyToggle
import com.twitter.finagle.service.ReqRep
import com.twitter.finagle.service.ResponseClass
import com.twitter.finagle.stats._
Expand All @@ -25,6 +28,18 @@ import org.scalatest.funsuite.AnyFunSuite

class MethodBuilderTest extends AnyFunSuite with Eventually {

private class SlowTestService(implicit val timer: Timer) extends TestService.MethodPerEndpoint {
def query(x: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(x) }
}
def question(y: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(y) }
}
def inquiry(z: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(z) }
}
}

def await[T](a: Awaitable[T], d: Duration = 5.seconds): T =
Await.result(a, d)

Expand Down Expand Up @@ -106,17 +121,7 @@ class MethodBuilderTest extends AnyFunSuite with Eventually {

test("methodBuilder timeouts from Stack") {
implicit val timer: Timer = DefaultTimer
val service = new TestService.MethodPerEndpoint {
def query(x: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(x) }
}
def question(y: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(y) }
}
def inquiry(z: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(z) }
}
}
val service = new SlowTestService
val server =
serverImpl.serveIface(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), service)

Expand All @@ -133,17 +138,7 @@ class MethodBuilderTest extends AnyFunSuite with Eventually {

test("methodBuilder timeouts from ClientBuilder") {
implicit val timer: Timer = DefaultTimer
val service = new TestService.MethodPerEndpoint {
def query(x: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(x) }
}
def question(y: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(y) }
}
def inquiry(z: String): Future[String] = {
Future.sleep(50.millis).before { Future.value(z) }
}
}
val service = new SlowTestService
val server =
serverImpl.serveIface(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), service)

Expand All @@ -161,6 +156,46 @@ class MethodBuilderTest extends AnyFunSuite with Eventually {
testMethodBuilderTimeouts(stats, server, mb)
}

private def toggleOnCtx(fn: => Unit): Unit = {
DeadlineOnlyToggle.unsafeOverride(Some(true))
try fn
finally DeadlineOnlyToggle.unsafeOverride(None)
}

test("methodBuilder prefers deadlines when they are in the context") {
implicit val timer: Timer = DefaultTimer
val service = new SlowTestService
val server =
serverImpl.serveIface(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), service)

val stats = new InMemoryStatsReceiver()
val client = clientImpl
.withStatsReceiver(stats)
.configured(param.Timer(timer))

val mb = client
.withLabel("a_label")
.methodBuilder(Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress])))

val shortTimeoutSvcPerEndpoint: Service[TestService.Query.Args, TestService.Query.SuccessType] =
mb.withTimeoutTotal(1000.millis) // very large timeout that should never fire
.servicePerEndpoint[TestService.ServicePerEndpoint]("fast")
.query

toggleOnCtx {
Contexts.broadcast.let(Deadline, Deadline.ofTimeout(5.millis)) {
// short deadline should fire
intercept[GlobalRequestTimeoutException] {
await(shortTimeoutSvcPerEndpoint(TestService.Query.Args("shorty")))
}
}
}

eventually {
assert(stats.counter("a_label", "fast", "deadline_only")() == 1)
}
}

test("methodBuilder timeouts from configured ClientBuilder") {
implicit val timer = new MockTimer
val sleepTime = new AtomicReference[Duration](Duration.Bottom)
Expand Down

0 comments on commit 135bb45

Please sign in to comment.