Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade scala to 2.13 #875

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
env:
PGPASSWORD: supersecret1
- name: Run tests
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Check Scala formatting
Expand All @@ -62,12 +62,4 @@ jobs:
sbt "project kafka" IntegrationTest/test
docker-compose -f integration-tests/enrich-kafka/docker-compose.yml down
- name: Run integration tests for enrich-nsq
run: sbt "project nsqDistroless" IntegrationTest/test
- name: Generate coverage report
run: sbt coverageReport
- name: Aggregate coverage data
run: sbt coverageAggregate
- name: Submit coveralls data
run: sbt coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
run: sbt "project nsqDistroless" IntegrationTest/test
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object S3Client {

def mk[F[_]: Async]: Resource[F, Client[F]] =
for {
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion).build()))
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion()).build()))
store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.thrift.TSerializer

import java.util.Base64

import com.snowplowanalytics.iglu.core.{ SelfDescribingData, SchemaKey, SchemaVer }
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
Expand All @@ -40,7 +40,9 @@ object CollectorPayloadGen {
generateRaw(nbGoodEvents, nbBadRows).map(_.toThrift).map(new TSerializer().serialize)

def generateRaw[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, CollectorPayload] =
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream.repeatEval(runGen(collectorPayloadGen(false))).take(nbBadRows)
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream
.repeatEval(runGen(collectorPayloadGen(false)))
.take(nbBadRows)

private def collectorPayloadGen(valid: Boolean): Gen[CollectorPayload] =
for {
Expand Down Expand Up @@ -74,46 +76,74 @@ object CollectorPayloadGen {
aid <- Gen.const("enrich-kinesis-integration-tests").withKey("aid")
e <- Gen.const("ue").withKey("e")
tv <- Gen.oneOf("scala-tracker_1.0.0", "js_2.0.0", "go_1.2.3").withKey("tv")
uePx <-
if(valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
uePx <- if (valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1,0,4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1, 0, 4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
).asJson.toString

private def ueGen =
for {
sdj <- Gen.oneOf(changeFormGen, clientSessionGen)
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0)),
sdj.asJson
).asJson


private def changeFormGen =
for {
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
elementId <- strGen(32, Gen.alphaNumChar).withKey("elementId")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen.option(Gen.oneOf(List("button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"))).withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen
.option(
Gen.oneOf(
List(
"button",
"checkbox",
"color",
"date",
"datetime",
"datetime-local",
"email",
"file",
"hidden",
"image",
"month",
"number",
"password",
"radio",
"range",
"reset",
"search",
"submit",
"tel",
"text",
"time",
"url",
"week"
)
)
)
.withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1, 0, 0)),
asObject(List(formId, elementId, nodeName, `type`, value))
)

private def clientSessionGen =
for {
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
previousSessionId <- Gen.option(Gen.uuid).withKeyNull("previousSessionId")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1,0,1)),
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1, 0, 1)),
asObject(List(userId, sessionId, sessionIndex, previousSessionId, storageMechanism))
)

Expand Down Expand Up @@ -159,7 +189,7 @@ object CollectorPayloadGen {

implicit class GenOps[A](gen: Gen[A]) {
def withKey[B](name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map { a => Some((name -> a.asJson)) }
gen.map(a => Some((name -> a.asJson)))
}

implicit class GenOptOps[A](gen: Gen[Option[A]]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import java.net.URI

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.Applicative
import cats.implicits._
Expand Down Expand Up @@ -259,8 +258,7 @@ object Assets {
def worthRetrying[F[_]: Applicative](e: Throwable): F[Boolean] =
e match {
case _: Clients.RetryableFailure => Applicative[F].pure(true)
case _: IllegalArgumentException => Applicative[F].pure(false)
case NonFatal(_) => Applicative[F].pure(false)
case _ => Applicative[F].pure(false)
}

def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object Enrich {
payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None"))

/** Log an error, turn the problematic `CollectorPayload` into `BadRow` and notify Sentry if configured */
def sendToSentry[F[_]: Sync: Clock](
def sendToSentry[F[_]: Sync](
original: Array[Byte],
sentry: Option[SentryClient],
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import cats.implicits._

import fs2.Stream

import scala.concurrent.ExecutionContext

import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.ExitCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.io

import java.nio.file.{Files, Path}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.io.{Source => SSource}

import cats.data.EitherT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.blackbox

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.specs2.mutable.Specification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ trait Adapter {
formatter: FormatterFunc,
platform: String
): RawEventParameters = {
val params = formatter(parameters - ("nuid", "aid", "cv", "p"))
val params = formatter(parameters -- List("nuid", "aid", "cv", "p"))
val json = toUnstructEvent(SelfDescribingData(schema, params)).noSpaces
buildUnstructEventParams(tracker, platform, parameters, json)
}
Expand All @@ -182,7 +182,7 @@ trait Adapter {
"p" -> parameters.getOrElse("p", Option(platform)), // Required field
"ue_pr" -> Option(json)
) ++
parameters.filterKeys(AcceptedQueryParameters)
parameters.view.filterKeys(AcceptedQueryParameters).toMap

/**
* Creates a Snowplow unstructured event by nesting the provided JValue in a self-describing
Expand Down Expand Up @@ -375,7 +375,7 @@ trait Adapter {
*/
private[registry] def camelCase(snakeOrDash: String) =
snakeCaseOrDashTokenCapturingRegex.replaceAllIn(
Character.toLowerCase(snakeOrDash.charAt(0)) + snakeOrDash.substring(1),
Character.toString(Character.toLowerCase(snakeOrDash.charAt(0))) + snakeOrDash.substring(1),
m => m.group(1).capitalize
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import scala.annotation.tailrec

import cats.{Applicative, Functor, Monad}
import cats.{Applicative, Monad}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._

Expand Down Expand Up @@ -531,7 +531,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
)
schemaVal = lookupSchema(
hitType.some,
unstructEventData.mapValues(_.schemaKey)
unstructEventData.view.mapValues(_.schemaKey).toMap
).toValidatedNel
simpleContexts = buildContexts(params, contextData, fieldToSchemaMap)
compositeContexts = buildCompositeContexts(
Expand Down Expand Up @@ -675,7 +675,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// composite params have digits in their key
composite <- originalParams
.collect { case (k, Some(v)) => (k, v) }
.view
.filterKeys(k => k.exists(_.isDigit))
.toMap
.asRight
brokenDown <- composite.toList.sorted.map {
case (k, v) => breakDownCompField(k, v, indicator)
Expand All @@ -684,7 +686,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// we additionally make sure we have a rectangular dataset
grouped = (partitioned._2 ++ removeConsecutiveDuplicates(partitioned._1)).flatten
.groupBy(_._1)
.view
.mapValues(_.map(_._2))
.toMap
translated <- {
val m = grouped
.foldLeft(
Expand Down Expand Up @@ -821,7 +825,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
case head => head :: transpose(l.collect { case _ :: tail => tail })
}

private def traverseMap[G[_]: Functor: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
private def traverseMap[G[_]: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
m.toList
.traverse {
case (name, vnel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object RedirectAdapter extends Adapter {
case (None, Some(Some(co))) if co == "" => newCo.asRight
case (None, Some(Some(co))) => addToExistingCo(json, co).map(str => Map("co" -> str))
case (Some(Some(cx)), _) => addToExistingCx(json, cx).map(str => Map("cx" -> str))
case other => throw new IllegalStateException(s"Illegal state: $other")
}
} else
// Add URI redirect as an unstructured event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object EnrichmentManager {
def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] =
EE.formatCollectorTstamp(timestamp).map { t =>
event.collector_tstamp = t
().asRight
()
}

def setUseragent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ final case class AnonIpEnrichment(ipv4Octets: AnonIPv4Octets.AnonIPv4Octets, ipv
.map {
case _: Inet4Address => anonymizeIpV4(ip)
case ipv6: Inet6Address => anonymizeIpV6(ipv6.getHostAddress)
case _ => throw new IllegalStateException(s"Illegal state")
}
.getOrElse(tryAnonymizingInvalidIp(ip))
}.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import cats.data.ValidatedNel
import cats.syntax.either._
Expand Down Expand Up @@ -64,7 +64,7 @@ object YauaaEnrichment extends ParseableEnrichment {
s match {
case _ if s.isEmpty => s
case _ if s.length == 1 => s.toLowerCase
case _ => s.charAt(0).toLower + s.substring(1)
case _ => Character.toString(s.charAt(0).toLower) + s.substring(1)
}
}

Expand Down Expand Up @@ -112,7 +112,9 @@ final case class YauaaEnrichment(cacheSize: Option[Int]) extends Enrichment {
parsedUA.getAvailableFieldNamesSorted.asScala
.map(field => decapitalize(field) -> parsedUA.getValue(field))
.toMap
.view
.filterKeys(validFields)
.toMap
}

/** Yauaa 7.x added many new fields which are not in the 1-0-4 schema */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object ApiRequestEnrichment extends ParseableEnrichment {
UUID.nameUUIDFromBytes(contentKey.getBytes).toString
}

def create[F[_]: Async: Clock](
def create[F[_]: Async](
schemaKey: SchemaKey,
inputs: List[Input],
api: HttpApi,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.data.ValidatedNel
import cats.implicits._

import io.circe.{Json => JSON, DecodingFailure, Decoder}
import io.gatling.jsonpath.{JsonPath => GatlingJsonPath}
import com.jayway.jsonpath.{JsonPath => JaywayJsonPath}

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData}
import com.snowplowanalytics.snowplow.badrows.igluSchemaCriterionDecoder
Expand All @@ -34,7 +34,7 @@ sealed trait Input extends Product with Serializable {

// We could short-circuit enrichment process on invalid JSONPath,
// but it won't give user meaningful error message
def validatedJsonPath: Either[String, GatlingJsonPath] =
def validatedJsonPath: Either[String, JaywayJsonPath] =
this match {
case json: Input.Json => compileQuery(json.jsonPath)
case _ => "No JSON Path given".asLeft
Expand Down
Loading
Loading