Skip to content

Commit

Permalink
#1693 DatasetService.updateProperties[V3] -> split between DatasetSer…
Browse files Browse the repository at this point in the history
…vice and DatasetServiceV3 with renaming.

VersionList removed in favor of VersionedSummary everywhere.
- explanatory comments
  • Loading branch information
dk1844 committed May 3, 2022
1 parent 09c326e commit 1c241d1
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 69 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class DatasetController @Autowired()(datasetService: DatasetService)
def replaceProperties(@AuthenticationPrincipal principal: UserDetails,
@PathVariable datasetName: String,
@RequestBody newProperties: Optional[Map[String, String]]): CompletableFuture[ResponseEntity[Option[Dataset]]] = {
datasetService.updatePropertiesV2(principal.getUsername, datasetName, newProperties.toScalaOption).map {
datasetService.updateProperties(principal.getUsername, datasetName, newProperties.toScalaOption).map {
case None => throw notFound()
case Some(dataset) =>
val location: URI = new URI(s"/api/dataset/${dataset.name}/${dataset.version}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class VersionedModelController[C <: VersionedModel with Product with Au
@GetMapping(Array("/list", "/list/{searchQuery}"))
@ResponseStatus(HttpStatus.OK)
def getList(@PathVariable searchQuery: Optional[String]): CompletableFuture[Seq[VersionedSummary]] = {
versionedModelService.getLatestVersionsSummary(searchQuery.toScalaOption)
versionedModelService.getLatestVersionsSummarySearch(searchQuery.toScalaOption)
}

@GetMapping(Array("/searchSuggestions"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ abstract class VersionedModelControllerV3[C <: VersionedModel with Product
@GetMapping(Array(""))
@ResponseStatus(HttpStatus.OK)
def getList(@RequestParam searchQuery: Optional[String]): CompletableFuture[Seq[VersionedSummary]] = {
versionedModelService.getLatestVersionsSummary(searchQuery.toScalaOption)
versionedModelService.getLatestVersionsSummarySearch(searchQuery.toScalaOption)
}

@GetMapping(Array("/{name}"))
@ResponseStatus(HttpStatus.OK)
def getVersionsList(@PathVariable name: String): CompletableFuture[VersionList] = {
versionedModelService.getAllVersionsValues(name) map {
def getVersionSummaryForEntity(@PathVariable name: String): CompletableFuture[VersionedSummary] = {
versionedModelService.getLatestVersionSummary(name) map {
case Some(entity) => entity
case None => throw notFound()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.mongodb.scala.model.Updates._
import org.mongodb.scala.model._
import org.mongodb.scala.result.UpdateResult
import za.co.absa.enceladus.model.menas._
import za.co.absa.enceladus.model.versionedModel.{VersionedModel, VersionedSummary, VersionList}
import za.co.absa.enceladus.model.versionedModel.{VersionedModel, VersionedSummary}

import scala.concurrent.Future
import scala.reflect.ClassTag
Expand Down Expand Up @@ -61,7 +61,7 @@ abstract class VersionedMongoRepository[C <: VersionedModel](mongoDb: MongoDatab
collection.distinct[String]("name", getNotDisabledFilter).toFuture().map(_.sorted)
}

def getLatestVersionsSummary(searchQuery: Option[String] = None): Future[Seq[VersionedSummary]] = {
def getLatestVersionsSummarySearch(searchQuery: Option[String] = None): Future[Seq[VersionedSummary]] = {
val searchFilter = searchQuery match {
case Some(search) => Filters.regex("name", search, "i")
case None => Filters.expr(true)
Expand All @@ -84,21 +84,26 @@ abstract class VersionedMongoRepository[C <: VersionedModel](mongoDb: MongoDatab
collection.find(getNameVersionFilter(name, Some(version))).headOption()
}

def getLatestVersionValue(name: String): Future[Option[Int]] = {
def getLatestVersionSummary(name: String): Future[Option[VersionedSummary]] = {
val pipeline = Seq(
filter(getNameFilter(name)),
Aggregates.group("$name", Accumulators.max("latestVersion", "$version"))
)
collection.aggregate[VersionedSummary](pipeline).headOption().map(_.map(_.latestVersion))
collection.aggregate[VersionedSummary](pipeline).headOption()
}

def getAllVersionsValues(name: String): Future[Option[VersionList]] = {
def getLatestVersionValue(name: String): Future[Option[Int]] = {
getLatestVersionSummary(name).map(_.map(_.latestVersion))
}

def getAllVersionsValues(name: String): Future[Seq[Int]] = {
val pipeline = Seq(
filter(getNameFilter(name)),
Aggregates.sort(Sorts.ascending("version")),
Aggregates.group("$name", Accumulators.push("versions", "$version")) // all versions into single array
)
collection.aggregate[VersionList](pipeline).headOption().map(_.map(vlist => VersionList("versions", vlist.versions)))
collection.aggregate[Seq[Int]](pipeline).headOption().map(_.getOrElse(Seq.empty)
)
}

def getAllVersions(name: String, inclDisabled: Boolean = false): Future[Seq[C]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DatasetService @Autowired()(datasetMongoRepository: DatasetMongoRepository
.setHDFSPath(dataset.hdfsPath)
.setHDFSPublishPath(dataset.hdfsPublishPath)
.setConformance(dataset.conformance)
.setProperties(removeBlankProperties(dataset.properties))
.setProperties(removeBlankPropertiesOpt(dataset.properties))
.setDescription(dataset.description).asInstanceOf[Dataset]
})
}
Expand Down Expand Up @@ -113,7 +113,7 @@ class DatasetService @Autowired()(datasetMongoRepository: DatasetMongoRepository
schemaName = newDataset.schemaName,
schemaVersion = newDataset.schemaVersion,
conformance = List(),
properties = removeBlankProperties(newDataset.properties))
properties = removeBlankPropertiesOpt(newDataset.properties))
super.create(dataset, username)
}

Expand All @@ -124,28 +124,13 @@ class DatasetService @Autowired()(datasetMongoRepository: DatasetMongoRepository
}
}

def updateProperties(username: String, datasetName: String, datasetVersion: Int,
updatedProperties: Map[String, String]): Future[Option[(Dataset, Validation)]] = {
for {
successfulValidation <- validateProperties(updatedProperties).flatMap {
case validation if !validation.isValid => Future.failed(ValidationException(validation)) // warnings are ok for update
case validation => Future.successful(validation) // empty or with warnings
}

// updateFuture includes latest-check and version increase
update <- updateFuture(username, datasetName, datasetVersion) { latest =>
Future.successful(latest.copy(properties = Some(removeBlankProperties(updatedProperties))))
}
} yield update
}

// kept for API v2 usage only
def updatePropertiesV2(username: String, datasetName: String,
updatedProperties: Option[Map[String, String]]): Future[Option[Dataset]] = {
def updateProperties(username: String, datasetName: String,
updatedProperties: Option[Map[String, String]]): Future[Option[Dataset]] = {
for {
latestVersion <- getLatestVersionNumber(datasetName)
update <- update(username, datasetName, latestVersion) { latest =>
latest.copy(properties = removeBlankProperties(updatedProperties))
latest.copy(properties = removeBlankPropertiesOpt(updatedProperties))
}
} yield update.map(_._1) // v2 does not expect validation on update
}
Expand Down Expand Up @@ -453,7 +438,7 @@ object DatasetService {
* @param properties original properties
* @return properties without empty-string value entries
*/
def removeBlankProperties(properties: Option[Map[String, String]]): Option[Map[String, String]] = {
private[services] def removeBlankPropertiesOpt(properties: Option[Map[String, String]]): Option[Map[String, String]] = {
properties.map {
removeBlankProperties
}
Expand All @@ -465,7 +450,7 @@ object DatasetService {
* @param properties original properties
* @return properties without empty-string value entries
*/
private def removeBlankProperties(properties: Map[String, String]): Map[String, String] = {
private[services] def removeBlankProperties(properties: Map[String, String]): Map[String, String] = {
properties.filter { case (_, propValue) => propValue.nonEmpty }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.springframework.security.core.context.SecurityContextHolder
import org.springframework.security.core.userdetails.UserDetails
import za.co.absa.enceladus.model.{ModelVersion, Schema, UsedIn, Validation}
import za.co.absa.enceladus.model.menas._
import za.co.absa.enceladus.model.versionedModel.{VersionedModel, VersionedSummary, VersionList}
import za.co.absa.enceladus.model.versionedModel.{VersionedModel, VersionedSummary}
import za.co.absa.enceladus.rest_api.exceptions._
import za.co.absa.enceladus.rest_api.repositories.VersionedMongoRepository
import za.co.absa.enceladus.model.menas.audit._
Expand All @@ -38,8 +38,8 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit

private[services] val logger = LoggerFactory.getLogger(this.getClass)

def getLatestVersionsSummary(searchQuery: Option[String]): Future[Seq[VersionedSummary]] = {
versionedMongoRepository.getLatestVersionsSummary(searchQuery)
def getLatestVersionsSummarySearch(searchQuery: Option[String]): Future[Seq[VersionedSummary]] = {
versionedMongoRepository.getLatestVersionsSummarySearch(searchQuery)
}

def getLatestVersions(): Future[Seq[C]] = {
Expand All @@ -58,10 +58,6 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
versionedMongoRepository.getAllVersions(name)
}

def getAllVersionsValues(name: String): Future[Option[VersionList]] = {
versionedMongoRepository.getAllVersionsValues(name)
}

def getLatestVersion(name: String): Future[Option[C]] = {
versionedMongoRepository.getLatestVersionValue(name).flatMap({
case Some(version) => getVersion(name, version)
Expand All @@ -80,6 +76,10 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
versionedMongoRepository.getLatestVersionValue(name)
}

def getLatestVersionSummary(name: String): Future[Option[VersionedSummary]] = {
versionedMongoRepository.getLatestVersionSummary(name)
}

def exportSingleItem(name: String, version: Int): Future[String] = {
getVersion(name, version).flatMap({
case Some(item) => Future(item.exportItem())
Expand Down Expand Up @@ -202,6 +202,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
}

private[rest_api] def create(item: C, username: String): Future[Option[(C, Validation)]] = {
// individual validations are deliberately not run in parallel - the latter may not be needed if the former fails
for {
validation <- for {
generalValidation <- validate(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset, Validation}
import za.co.absa.enceladus.rest_api.exceptions.ValidationException
import za.co.absa.enceladus.rest_api.repositories.{DatasetMongoRepository, OozieRepository}
import za.co.absa.enceladus.rest_api.services.DatasetService._
import za.co.absa.enceladus.rest_api.services.{DatasetService, MappingTableService, PropertyDefinitionService, SchemaService}

import scala.concurrent.Future
Expand Down Expand Up @@ -53,6 +55,7 @@ class DatasetServiceV3 @Autowired()(datasetMongoRepository: DatasetMongoReposito

// general entity validation is extendable for V3 - here with properties validation
override def validate(item: Dataset): Future[Validation] = {
// individual validations are deliberately not run in parallel - the latter may not be needed if the former fails
for {
originalValidation <- super.validate(item)
propertiesValidation <- validateProperties(item.propertiesAsMap)
Expand All @@ -73,6 +76,21 @@ class DatasetServiceV3 @Autowired()(datasetMongoRepository: DatasetMongoReposito
}
}

def updateProperties(username: String, datasetName: String, datasetVersion: Int,
updatedProperties: Map[String, String]): Future[Option[(Dataset, Validation)]] = {
for {
successfulValidation <- validateProperties(updatedProperties).flatMap {
case validation if !validation.isValid => Future.failed(ValidationException(validation)) // warnings are ok for update
case validation => Future.successful(validation) // empty or with warnings
}

// updateFuture includes latest-check and version increase
update <- updateFuture(username, datasetName, datasetVersion) { latest =>
Future.successful(latest.copy(properties = Some(removeBlankProperties(updatedProperties))))
}
} yield update
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ package object implicits {
classOf[Run], classOf[Schema], classOf[SchemaField], classOf[SplineReference], classOf[RunSummary],
classOf[RunDatasetNameGroupedSummary], classOf[RunDatasetVersionGroupedSummary],
classOf[RuntimeConfig], classOf[OozieSchedule], classOf[OozieScheduleInstance], classOf[ScheduleTiming], classOf[DataFormat],
classOf[UserInfo], classOf[VersionedSummary], classOf[VersionList], classOf[MenasAttachment], classOf[MenasReference],
classOf[UserInfo], classOf[VersionedSummary], classOf[MenasAttachment], classOf[MenasReference],
classOf[PropertyDefinition], classOf[PropertyType], classOf[Essentiality],
classOf[LandingPageInformation], classOf[TodaysRunsStatistics],
classOf[DataFrameFilter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import za.co.absa.enceladus.model.dataFrameFilter._
import za.co.absa.enceladus.model.properties.essentiality.Essentiality
import za.co.absa.enceladus.model.properties.propertyType.EnumPropertyType
import za.co.absa.enceladus.model.test.factories.{DatasetFactory, MappingTableFactory, PropertyDefinitionFactory, SchemaFactory}
import za.co.absa.enceladus.model.versionedModel.VersionList
import za.co.absa.enceladus.model.versionedModel.VersionedSummary
import za.co.absa.enceladus.model.{Dataset, UsedIn, Validation}
import za.co.absa.enceladus.rest_api.integration.controllers.{BaseRestApiTestV3, toExpected}
import za.co.absa.enceladus.rest_api.integration.fixtures._
Expand Down Expand Up @@ -148,9 +148,9 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
parent = Some(DatasetFactory.toParent(datasetV1)))
datasetFixture.add(datasetV1, datasetV2)

val response = sendGet[VersionList](s"$apiUrl/datasetA")
val response = sendGet[VersionedSummary](s"$apiUrl/datasetA")
assertOk(response)
assert(response.getBody == VersionList("versions", Seq(1, 2)))
assert(response.getBody == VersionedSummary("datasetA", 2))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,14 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
"DatasetMongoRepository::getLatestVersions" should {
"return an empty Seq" when {
"no datasets exist and search query is provided" in {
val actual = await(datasetMongoRepository.getLatestVersionsSummary(Some("abc")))
val actual = await(datasetMongoRepository.getLatestVersionsSummarySearch(Some("abc")))
assert(actual.isEmpty)
}
"only disabled dataset exists" in {
val dataset1 = DatasetFactory.getDummyDataset(name = "dataset1", version = 1,
disabled = true, dateDisabled = Option(DatasetFactory.dummyZonedDateTime), userDisabled = Option("user"))
datasetFixture.add(dataset1)
assert(await(datasetMongoRepository.getLatestVersionsSummary(Some("dataset1"))).isEmpty)
assert(await(datasetMongoRepository.getLatestVersionsSummarySearch(Some("dataset1"))).isEmpty)
}
}

Expand All @@ -496,7 +496,7 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset5 = DatasetFactory.getDummyDataset(name = "abc", version = 1)

datasetFixture.add(dataset2, dataset3, dataset4, dataset5)
val actual = await(datasetMongoRepository.getLatestVersionsSummary(Some("dataset2")))
val actual = await(datasetMongoRepository.getLatestVersionsSummarySearch(Some("dataset2")))

val expected = Seq(dataset3).map(DatasetFactory.toSummary)
assert(actual == expected)
Expand All @@ -508,7 +508,7 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset5 = DatasetFactory.getDummyDataset(name = "abc", version = 1)

datasetFixture.add(dataset2, dataset3, dataset4, dataset5)
val actual = await(datasetMongoRepository.getLatestVersionsSummary(Some("tas")))
val actual = await(datasetMongoRepository.getLatestVersionsSummarySearch(Some("tas")))

val expected = Seq(dataset3, dataset4).map(DatasetFactory.toSummary)
assert(actual == expected)
Expand Down Expand Up @@ -544,7 +544,7 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
val abc1 = DatasetFactory.getDummyDataset(name = "abc", version = 1)

datasetFixture.add(dataset1ver1, dataset1ver2, dataset2ver1, abc1)
val actual = await(datasetMongoRepository.getLatestVersionsSummary(Some("")))
val actual = await(datasetMongoRepository.getLatestVersionsSummarySearch(Some("")))

val expected = Seq(abc1, dataset1ver2, dataset2ver1).map(DatasetFactory.toSummary)
assert(actual == expected)
Expand All @@ -561,7 +561,7 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset1ver2 = DatasetFactory.getDummyDataset(name = "dataset1", version = 2)
datasetFixture.add(dataset1ver2)

val actual = await(datasetMongoRepository.getLatestVersionsSummary(None))
val actual = await(datasetMongoRepository.getLatestVersionsSummarySearch(None))

val expected = Seq(dataset1ver2, dataset2ver2).map(DatasetFactory.toSummary)
assert(actual == expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class DatasetServiceTest extends VersionedModelServiceTest[Dataset] with Matcher
)

val dataset = DatasetFactory.getDummyDataset(name = "datasetA", properties = Some(properties))
DatasetService.removeBlankProperties(dataset.properties) shouldBe Some(Map("propKey1" -> "someValue"))
DatasetService.removeBlankPropertiesOpt(dataset.properties) shouldBe Some(Map("propKey1" -> "someValue"))
}

test("DatasetService.replacePrefixIfFound replaces field prefixes") {
Expand Down

0 comments on commit 1c241d1

Please sign in to comment.