Skip to content

Commit

Permalink
feat: passing through metadata traceparent (#2172)
Browse files Browse the repository at this point in the history
* feat: automatically passing through metadata to action effects
  • Loading branch information
franciscolopezsancho authored Aug 1, 2024
1 parent 1b782ed commit df4074e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kalix.javasdk.testkit.impl

import kalix.javasdk.Metadata
import kalix.javasdk.impl.GrpcDeferredCall
import kalix.javasdk.impl.MetadataImpl
import kalix.javasdk.impl.action.ActionEffectImpl
Expand All @@ -16,7 +17,8 @@ class ActionResultSpec extends AnyWordSpec with Matchers {
"Action Results" must {
"extract side effects" in {
val replyWithSideEffectResult = new ActionResultImpl[String](
ActionEffectImpl.Builder
ActionEffectImpl
.builder(Metadata.EMPTY)
.reply("reply")
.addSideEffect(SideEffectImpl(
GrpcDeferredCall[String, Any]("request", MetadataImpl.Empty, "full.service.Name", "MethodName", _ => ???),
Expand All @@ -27,13 +29,15 @@ class ActionResultSpec extends AnyWordSpec with Matchers {
}

"extract forward details" in {
val forwardResult = new ActionResultImpl[String](
ActionEffectImpl.Builder.forward(
val forwardResult = new ActionResultImpl[String](ActionEffectImpl
.builder(Metadata.EMPTY)
.forward(
GrpcDeferredCall[String, String]("request", MetadataImpl.Empty, "full.service.Name", "MethodName", _ => ???)))

forwardResult.isForward() should ===(true)
forwardResult.getForward().getMessage should ===("request")
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void _internalSetActionContext(Optional<ActionContext> context) {
}

public final Effect.Builder effects() {
return ActionEffectImpl.builder();
return ActionEffectImpl.builder(actionContext().metadata());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@

package kalix.javasdk.impl.action

import kalix.javasdk.{ DeferredCall, Metadata, SideEffect }
import kalix.javasdk.action.Action
import java.util
import java.util.concurrent.CompletionStage

import io.grpc.Status
import kalix.javasdk.StatusCode.ErrorCode
import kalix.javasdk._
import kalix.javasdk.action.Action
import kalix.javasdk.impl.StatusCodeConverter
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import kalix.javasdk.impl.telemetry.Telemetry

import java.util
import java.util.concurrent.CompletionStage
import scala.concurrent.{ ExecutionContext, Future }
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters.CompletionStageOps

import kalix.javasdk.HttpResponse

/** INTERNAL API */
object ActionEffectImpl {
sealed abstract class PrimaryEffect[T] extends Action.Effect[T] {
Expand Down Expand Up @@ -74,21 +72,22 @@ object ActionEffectImpl {
}
}

object Builder extends Action.Effect.Builder {
class Builder(val actionContextMetadata: Metadata) extends Action.Effect.Builder {

def reply[S](message: S): Action.Effect[S] = {
message match {
case httpResponse: HttpResponse =>
ReplyEffect(message, Some(Metadata.EMPTY.withStatusCode(httpResponse.getStatusCode)), Nil)
case _ => ReplyEffect(message, None, Nil)
ReplyEffect(message, Some(Metadata.EMPTY.withStatusCode(httpResponse.getStatusCode).addTracing()), Nil)
case _ => ReplyEffect(message, Some(Metadata.EMPTY.addTracing()), Nil)
}
}
def reply[S](message: S, metadata: Metadata): Action.Effect[S] = {
message match {
case httpResponse: HttpResponse =>
ReplyEffect(message, Some(metadata.withStatusCode(httpResponse.getStatusCode)), Nil)
case _ => ReplyEffect(message, Some(metadata), Nil)
ReplyEffect(message, Some(metadata.withStatusCode(httpResponse.getStatusCode).addTracing()), Nil)
case _ => ReplyEffect(message, Some(metadata.addTracing()), Nil)
}
ReplyEffect(message, Some(metadata), Nil)
ReplyEffect(message, Some(metadata.addTracing()), Nil)
}
def forward[S](serviceCall: DeferredCall[_, S]): Action.Effect[S] = ForwardEffect(serviceCall, Nil)
def error[S](description: String): Action.Effect[S] = ErrorEffect(description, None, Nil)
Expand All @@ -101,13 +100,26 @@ object ActionEffectImpl {
def asyncReply[S](futureMessage: CompletionStage[S]): Action.Effect[S] =
asyncReply(futureMessage, Metadata.EMPTY)
def asyncReply[S](futureMessage: CompletionStage[S], metadata: Metadata): Action.Effect[S] =
AsyncEffect(futureMessage.asScala.map(s => Builder.reply[S](s, metadata))(ExecutionContext.parasitic), Nil)
AsyncEffect(futureMessage.asScala.map(s => reply[S](s, metadata.addTracing()))(ExecutionContext.parasitic), Nil)
def asyncEffect[S](futureEffect: CompletionStage[Action.Effect[S]]): Action.Effect[S] =
AsyncEffect(futureEffect.asScala, Nil)
def ignore[S](): Action.Effect[S] =
IgnoreEffect()

import scala.jdk.OptionConverters._

implicit class TracingWrapper(metadata: Metadata) {
def addTracing(): Metadata = {
actionContextMetadata.traceContext().traceParent().toScala match {
case Some(traceparent) if !metadata.has(Telemetry.TRACE_PARENT_KEY) =>
metadata.add(Telemetry.TRACE_PARENT_KEY, traceparent)
case _ => metadata
}
}
}

}

def builder(): Action.Effect.Builder = Builder
def builder(context: Metadata): Action.Effect.Builder = new Builder(context)

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,13 @@ private[javasdk] final class ActionsImpl(_system: ActorSystem, services: Map[Str
} catch {
case NonFatal(ex) =>
// command handler threw an "unexpected" error
span.foreach(_.end())
Future.successful(handleUnexpectedException(service, in, ex))
} finally {
MDC.remove(Telemetry.TRACE_ID)
}
fut.andThen { case _ =>
span.foreach { s =>
s.end()
}
span.foreach(_.end())
}
case None =>
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ package kalix.javasdk.impl.action
import akka.NotUsed
import akka.stream.javadsl.Source
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.javasdk.action.Action
import kalix.javasdk.action.MessageEnvelope
import kalix.javasdk.action.{ Action, MessageEnvelope }
import kalix.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl
import kalix.javasdk.impl.CommandHandler
import kalix.javasdk.impl.InvocationContext
import kalix.javasdk.impl.reflection.Reflect
import kalix.javasdk.impl.{ CommandHandler, InvocationContext }

// TODO: abstract away reactor dependency
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -56,7 +54,8 @@ class ReflectiveActionRouter[A <: Action](
.invoke(action, invocationContext)
.asInstanceOf[Action.Effect[_]]
}
case None if ignoreUnknown => ActionEffectImpl.Builder.ignore()
case None if ignoreUnknown =>
new ActionEffectImpl.Builder(invocationContext.metadata).ignore()
case None =>
throw new NoSuchElementException(
s"Couldn't find any method with input type [$inputTypeUrl] in Action [$action].")
Expand Down

0 comments on commit df4074e

Please sign in to comment.