Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment extracting canonical properties into dedicated contexts #471

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import enrichments.{EventEnrichments => EE}
import enrichments.{MiscEnrichments => ME}
import enrichments.registry._
import enrichments.registry.apirequest.ApiRequestEnrichment
import enrichments.registry.extractor.ExtractorEnrichment
import enrichments.registry.pii.PiiPseudonymizerEnrichment
import enrichments.registry.sqlquery.SqlQueryEnrichment
import enrichments.web.{PageEnrichments => WPE}
Expand Down Expand Up @@ -92,8 +93,15 @@ object EnrichmentManager {
enriched.pii = pii.asString
}
}
_ <- extractor[F](processor, raw, enriched, registry.extractor)
} yield enriched

def extractor[F[_]: Monad](processor: Processor, rawEvent: RawEvent, enriched: EnrichedEvent, enrichment: Option[ExtractorEnrichment]): EitherT[F, BadRow, Unit] =
enrichment match {
case Some(extractor) => extractor.process(processor, rawEvent, enriched)
case None => EitherT.rightT[F, BadRow](())
}

/**
* Run all the enrichments and aggregate the errors if any
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.data.{EitherT, ValidatedNel, NonEmptyList}

import cats.effect.Clock
import cats.implicits._

import io.circe._
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -37,6 +37,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.Enrichm
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.PiiPseudonymizerEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.SqlQueryEnrichment

Expand Down Expand Up @@ -109,6 +110,8 @@ object EnrichmentRegistry {
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
case c: ExtractorConf =>
er.map(_.copy(extractor = Some(c.enrichment)))
case c: ApiRequestConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker))
Expand Down Expand Up @@ -250,5 +253,6 @@ final case class EnrichmentRegistry[F[_]](
uaParser: Option[UaParserEnrichment] = None,
userAgentUtils: Option[UserAgentUtilsEnrichment] = None,
weather: Option[WeatherEnrichment[F]] = None,
yauaa: Option[YauaaEnrichment] = None
yauaa: Option[YauaaEnrichment] = None,
extractor: Option[ExtractorEnrichment] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
HttpApi
}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.{ ExtractorEnrichment, Extractable }
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

sealed trait EnrichmentConf {
Expand Down Expand Up @@ -209,4 +210,8 @@ object EnrichmentConf {
) extends EnrichmentConf {
def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize)
}

final case class ExtractorConf(schemaKey: SchemaKey, entities: Set[Extractable], erase: Boolean) extends EnrichmentConf {
def enrichment: ExtractorEnrichment = ExtractorEnrichment(entities, erase)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import io.circe.{JsonObject, Json, Decoder}
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey}

import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

sealed trait Extractable extends Product with Serializable {
def schemaKey: SchemaKey
def keys: List[(String, TypedField)]

private def getters =
keys.map { case (key, v) => Extractable.EventClass.getMethod(key) -> v }
private def erasers =
keys.map { case (key, f) => Extractable.EventClass.getMethod("set" ++ key.capitalize, f.manifest.runtimeClass) }

def getJson(event: EnrichedEvent): Either[Throwable, JsonObject] =
getters
.traverse { case (getter, to) => to.cast(getter.invoke(event)).map { value => to.name -> value } }
.map { kvs => JsonObject.fromIterable(kvs.collect { case (k, Some(v)) => (k, v) }) }

def process(event: EnrichedEvent): Either[Throwable, Option[SelfDescribingData[Json]]] =
getJson(event) match {
case Right(o) if o.isEmpty => Right(None)
case Right(o) => Right(Some(SelfDescribingData(schemaKey, Json.fromJsonObject(o))))
case Left(error) => Left(error)
}

def erase(event: EnrichedEvent): Unit = {
erasers.foreach { eraser => eraser.invoke(event, null) }
}
}

object Extractable {

type Extractables = List[Extractable]

def All: Extractables = List(MaxMind)

implicit def extractableCirceDecoder: Decoder[Extractable] =
Decoder[String].map(_.toLowerCase).emap { e =>
All
.find(_.toString.toLowerCase == e.toLowerCase)
.toRight(s"$e is an unknown entity to extract. Try: ${All.map(_.toString.toLowerCase).mkString(", ")} or all")
}

implicit def extractablesCirceDecoder: Decoder[Extractables] =
Decoder[List[Extractable]]
.handleErrorWith(e => Decoder[String].emap(s => if (s.toLowerCase == "all") All.asRight else e.show.asLeft))

private val EventClass = classOf[EnrichedEvent]

case object MaxMind extends Extractable {
val schemaKey = SchemaKey("com.maxmind", "context", "jsonschema", SchemaVer.Full(1,0,0))

def keys = List(
"geo_country" -> TypedField.Str("country"),
"geo_region" -> TypedField.Str("region"),
"geo_city" -> TypedField.Str("city"),
"geo_zipcode" -> TypedField.Str("zipcode"),
"geo_latitude" -> TypedField.Flo("latitude"),
"geo_longitude" -> TypedField.Flo("longitude"),
"geo_region_name" -> TypedField.Str("region_name"),

"ip_isp" -> TypedField.Str("isp"),
"ip_organization" -> TypedField.Str("organization"),
"ip_domain" -> TypedField.Str("domain"),
"ip_netspeed" -> TypedField.Str("netspeed")
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import java.time.Instant

import cats.Applicative
import cats.implicits._
import cats.data.{EitherT, ValidatedNel, NonEmptyList}

import io.circe.{Error, Json}
import io.circe.parser.parse
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows.FailureDetails.{EnrichmentFailureMessage, EnrichmentFailure}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Payload, Failure}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent.toRawEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.MiscEnrichments
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment.failedReflection
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{ParseableEnrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils

final case class ExtractorEnrichment(entities: Set[Extractable], erase: Boolean) {
def process[F[_]: Applicative](processor: Processor, rawEvent: RawEvent, event: EnrichedEvent): EitherT[F, BadRow, Unit] = {
// Create beforehand because later it could be mutated
val badRowPayload = Payload.EnrichmentPayload(toPartiallyEnrichedEvent(event), toRawEvent(rawEvent))
entities.toList.flatTraverse { entity =>
entity.process(event) match {
case Left(value) =>
Left((entity.toString.toLowerCase, value))
case Right(Some(json)) =>
if (erase) entity.erase(event) else () // Unsafe mutation
Right(List(json))
case Right(None) =>
Right(Nil)
}
} match {
case Left(error) =>
EitherT.leftT(failedReflection(processor, badRowPayload)(error))
case Right(entities) if entities.isEmpty =>
EitherT.rightT[F, BadRow](())
case Right(entities) =>
val contexts = if (event.derived_contexts == null)
SelfDescribingData(MiscEnrichments.ContextsSchema, List.empty[Json]).asRight
else
parse(event.derived_contexts)
.flatMap(_.as[SelfDescribingData[Json]])
.flatMap { contexts => contexts.data.as[List[Json]].map(data => SelfDescribingData(contexts.schema, data)) }

contexts
.map { contexts => SelfDescribingData(contexts.schema, (contexts.data ++ entities.map(_.normalize)).asJson) }
.leftMap(ExtractorEnrichment.failedDecoding(processor, badRowPayload))
.toEitherT[F]
.map(contexts => event.setDerived_contexts(contexts.asString)) // Unsafe mutation
}
}
}

object ExtractorEnrichment extends ParseableEnrichment {

def failedDecoding(processor: Processor, payload: Payload.EnrichmentPayload)(error: Error): BadRow = {
val message = EnrichmentFailureMessage.Simple(s"Cannot decode derived_contexts. ${error.show}")
val enrichmentFailure = EnrichmentFailure(None, message)
val messages = NonEmptyList.one(enrichmentFailure)
val failure = Failure.EnrichmentFailures(Instant.now(), messages)
BadRow.EnrichmentFailures(processor, failure, payload)
}

def failedReflection(processor: Processor, payload: Payload.EnrichmentPayload)(error: (String, Throwable)): BadRow = {
val message = EnrichmentFailureMessage.Simple(s"Failed to extract a property for ${error._1}. ${error._2}")
val enrichmentFailure = EnrichmentFailure(None, message)
val messages = NonEmptyList.one(enrichmentFailure)
val failure = Failure.EnrichmentFailures(Instant.now(), messages)
BadRow.EnrichmentFailures(processor, failure, payload)
}


val supportedSchema: SchemaCriterion =
SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
"extractor_enrichment_config",
"jsonschema",
1,
0,
0
)

def parse(config: Json, schemaKey: SchemaKey, localMode: Boolean): ValidatedNel[String, EnrichmentConf] = {
isParseable(config, schemaKey)
.toValidatedNel
.andThen { _ =>
val erase = CirceUtils.extract[Boolean](config, "parameters", "erase")
val extract = CirceUtils.extract[Extractable.Extractables](config, "parameters", "extract")

(erase, extract).mapN { (er, ex) =>
EnrichmentConf.ExtractorConf(schemaKey, ex.toSet, er)
}.toValidatedNel
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import io.circe.{Encoder, Json}

import cats.implicits._

sealed trait TypedField {
type Target
def manifest: Manifest[Target]
def name: String
def encoder: Encoder[Target]

def cast(anyRef: AnyRef): Either[Throwable, Option[Json]] =
Either.catchNonFatal(anyRef.asInstanceOf[Target]).map(Option.apply).nested.map(encoder.apply).value
}

object TypedField {
case class Str(name: String) extends TypedField {
type Target = String
def manifest: Manifest[String] = implicitly[Manifest[String]]
val encoder = Encoder[String]
}
case class Flo(name: String) extends TypedField {
type Target = java.lang.Float
def manifest: Manifest[java.lang.Float] = implicitly[Manifest[java.lang.Float]]
val encoder = Encoder[java.lang.Float]
}
}

Loading