Skip to content

Commit

Permalink
Cache Serailize API (flyteorg#47)
Browse files Browse the repository at this point in the history
* WIP: add reservation apis

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add missing files

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* added create DAO

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* Add get dao

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* wip

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* fix tests

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* wired reservation manager

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add todos

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more tests

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more tests

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more logging

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add logging and stats

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* fix lint

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more comments

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* First -> Take

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* WIP: add createOrupdate API

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* Added boilerplate automation (flyteorg#41)

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* update boilerplate code (flyteorg#40)

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* Upgrade gorm to v1.21.9 (flyteorg#42)

Signed-off-by: Daniel Rammer <[email protected]>

* Added boilerplate automation (flyteorg#43)

Signed-off-by: Yuvraj <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more instructinos

Use upsert

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add more comments

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* refactor a bit

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add timer

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* fix lint / tests

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add comments & tests

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* fix lint

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* add docs

Signed-off-by: Chao-Han Tsai <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* Fix connection error handling (flyteorg#45)

Signed-off-by: Daniel Rammer <[email protected]>

* Update code of conduct (flyteorg#46)

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* separated ReservationManager CreateOrUpdate function into individual Create and Update functions

Signed-off-by: Daniel Rammer <[email protected]>

* fixed race condition on Reservation repository Create function

Signed-off-by: Daniel Rammer <[email protected]>

* changed reservation expiration to use heartbeatInterval and heartbeatGracePeriodMultiplier

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint and unit test errors

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests for extending reservation and update failure

Signed-off-by: Daniel Rammer <[email protected]>

* removed ExtendReservation API mocks

Signed-off-by: Daniel Rammer <[email protected]>

* added ExpiresAt and HeartbeatInterval fiedls to ReservationStatus return based on new API

Signed-off-by: Daniel Rammer <[email protected]>

* implemented ReleaseReservation

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests for reservation transformer

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint errors

Signed-off-by: Daniel Rammer <[email protected]>

* implemented unit tests for ReleaseReservation API call

Signed-off-by: Daniel Rammer <[email protected]>

* updated reservation API to only work with reservations - not actual artifacts.

Signed-off-by: Daniel Rammer <[email protected]>

* Fix error type check to detect uniqueConstraintViolation

Signed-off-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* Revert "Fix error type check to detect uniqueConstraintViolation"

This reverts commit a237bf0.

Signed-off-by: Daniel Rammer <[email protected]>

* Fix gorm wrong error type cast (flyteorg#48)

* Fix gorm wrong error type cast

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Fix unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Support unwrapping errors to detect connection problems

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* cleanup

Signed-off-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>

* added support for heartbeat_interval definition in reservation manager

Signed-off-by: Daniel Rammer <[email protected]>

* updated test and fixed lint errors

Signed-off-by: Daniel Rammer <[email protected]>

* removed unnecessary dependencies from go.mod

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl version - change before merging

Signed-off-by: Daniel Rammer <[email protected]>

* adding reservation model to migration

Signed-off-by: Daniel Rammer <[email protected]>

* udpated dockerfile go template to reflect current master fixing rebase overwrites

Signed-off-by: Daniel Rammer <[email protected]>

* and again .. with a space

Signed-off-by: Daniel Rammer <[email protected]>

* add docs on exports

Signed-off-by: Daniel Rammer <[email protected]>

* changed configuration to use config.Duration from flytestdlib instead of defining on seconds

Signed-off-by: Daniel Rammer <[email protected]>

* added owner id to reservation gorm impl delete function

Signed-off-by: Daniel Rammer <[email protected]>

* if reservation is missing on release reservation (meaning another entity acquired it) then gracefully fail

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl version

Signed-off-by: Daniel Rammer <[email protected]>

* remove flyteidl replace in go.mod and updating to latest version

Signed-off-by: Daniel Rammer <[email protected]>

Co-authored-by: Chao-Han Tsai <[email protected]>
Co-authored-by: Yuvraj <[email protected]>
Co-authored-by: Samhita Alla <[email protected]>
Co-authored-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
5 people authored Nov 24, 2021
1 parent 7c443db commit a9d0f7c
Show file tree
Hide file tree
Showing 23 changed files with 1,303 additions and 28 deletions.
2 changes: 2 additions & 0 deletions datacalog/datacatalog_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ datacatalog:
storage-prefix: "metadata"
metrics-scope: "datacatalog"
profiler-port: 10254
heartbeat-grace-period-multiplier: 3
max-reservation-heartbeat: 10s
storage:
connection:
access-key: minio
Expand Down
2 changes: 1 addition & 1 deletion datacalog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.13

require (
github.com/Selvatico/go-mocket v1.0.7
github.com/flyteorg/flyteidl v0.18.17
github.com/flyteorg/flyteidl v0.21.11
github.com/flyteorg/flytestdlib v0.3.13
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
Expand Down
16 changes: 6 additions & 10 deletions datacalog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -135,7 +134,6 @@ github.com/coocood/freecache v1.1.1 h1:uukNF7QKCZEdZ9gAV7WQzvh0SbjwdMF6m3x3rxEka
github.com/coocood/freecache v1.1.1/go.mod h1:OKrEjkGVoxZhyWAJoeFi5BMLUJm2Tit0kpGkIr7NGYY=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down Expand Up @@ -175,8 +173,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17 h1:74pPZ9PzITuzq+CgjMPb9EcFI5bVkf8mM5m4xmmlTmY=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -307,7 +305,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.2/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -523,6 +521,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -533,7 +532,6 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
github.com/pquerna/ffjson v0.0.0-20190813045741-dac163c6c0a9/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
Expand Down Expand Up @@ -744,7 +742,6 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -973,7 +970,6 @@ google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dT
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
Expand All @@ -989,6 +985,7 @@ google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
Expand All @@ -1015,7 +1012,6 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
Expand All @@ -1026,6 +1022,7 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
Expand Down Expand Up @@ -1058,7 +1055,6 @@ gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/kothar/go-backblaze.v0 v0.0.0-20190520213052-702d4e7eb465/go.mod h1:zJ2QpyDCYo1KvLXlmdnFlQAyF/Qfth0fB8239Qg7BIE=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
5 changes: 3 additions & 2 deletions datacalog/pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func getTestArtifact() *datacatalog.Artifact {

func newMockDataCatalogRepo() *mocks.DataCatalogRepo {
return &mocks.DataCatalogRepo{
MockDatasetRepo: &mocks.DatasetRepo{},
MockArtifactRepo: &mocks.ArtifactRepo{},
MockDatasetRepo: &mocks.DatasetRepo{},
MockArtifactRepo: &mocks.ArtifactRepo{},
MockReservationRepo: &mocks.ReservationRepo{},
}
}

Expand Down
212 changes: 212 additions & 0 deletions datacalog/pkg/manager/impl/reservation_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package impl

import (
"context"
"time"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/datacatalog/pkg/errors"
"github.com/flyteorg/datacatalog/pkg/repositories"
repo_errors "github.com/flyteorg/datacatalog/pkg/repositories/errors"
"github.com/flyteorg/datacatalog/pkg/repositories/models"
"github.com/flyteorg/datacatalog/pkg/repositories/transformers"

"github.com/flyteorg/datacatalog/pkg/manager/interfaces"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

type reservationMetrics struct {
scope promutils.Scope
reservationAcquired labeled.Counter
reservationReleased labeled.Counter
reservationAlreadyInProgress labeled.Counter
acquireReservationFailure labeled.Counter
releaseReservationFailure labeled.Counter
reservationDoesNotExist labeled.Counter
}

type NowFunc func() time.Time

type reservationManager struct {
repo repositories.RepositoryInterface
heartbeatGracePeriodMultiplier time.Duration
maxHeartbeatInterval time.Duration
now NowFunc
systemMetrics reservationMetrics
}

// Creates a new reservation manager with the specified properties
func NewReservationManager(
repo repositories.RepositoryInterface,
heartbeatGracePeriodMultiplier time.Duration,
maxHeartbeatInterval time.Duration,
nowFunc NowFunc, // Easier to mock time.Time for testing
reservationScope promutils.Scope,
) interfaces.ReservationManager {
systemMetrics := reservationMetrics{
scope: reservationScope,
reservationAcquired: labeled.NewCounter(
"reservation_acquired",
"Number of times a reservation was acquired",
reservationScope),
reservationReleased: labeled.NewCounter(
"reservation_released",
"Number of times a reservation was released",
reservationScope),
reservationAlreadyInProgress: labeled.NewCounter(
"reservation_already_in_progress",
"Number of times we try of acquire a reservation but the reservation is in progress",
reservationScope,
),
acquireReservationFailure: labeled.NewCounter(
"acquire_reservation_failure",
"Number of times we failed to acquire reservation",
reservationScope,
),
releaseReservationFailure: labeled.NewCounter(
"release_reservation_failure",
"Number of times we failed to release a reservation",
reservationScope,
),
reservationDoesNotExist: labeled.NewCounter(
"reservation_does_not_exist",
"Number of times we attempt to modify a reservation that does not exist",
reservationScope,
),
}

return &reservationManager{
repo: repo,
heartbeatGracePeriodMultiplier: heartbeatGracePeriodMultiplier,
maxHeartbeatInterval: maxHeartbeatInterval,
now: nowFunc,
systemMetrics: systemMetrics,
}
}

// Attempt to acquire a reservation for the specified artifact. If there is not active reservation, successfully
// acquire it. If you are the owner of the active reservation, extend it. If another owner, return the existing reservation.
func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) {
reservationID := request.ReservationId

// Use minimum of maxHeartbeatInterval and requested heartbeat interval
heartbeatInterval := r.maxHeartbeatInterval
requestHeartbeatInterval := request.GetHeartbeatInterval()
if requestHeartbeatInterval != nil && requestHeartbeatInterval.AsDuration() < heartbeatInterval {
heartbeatInterval = requestHeartbeatInterval.AsDuration()
}

reservation, err := r.tryAcquireReservation(ctx, reservationID, request.OwnerId, heartbeatInterval)
if err != nil {
r.systemMetrics.acquireReservationFailure.Inc(ctx)
return nil, err
}

return &datacatalog.GetOrExtendReservationResponse{
Reservation: &reservation,
}, nil
}

// tryAcquireReservation will fetch the reservation first and only create/update
// the reservation if it does not exist or has expired.
// This is an optimization to reduce the number of writes to db. We always need
// to do a GET here because we want to know who owns the reservation
// and show it to users on the UI. However, the reservation is held by a single
// task most of the times and there is no need to do a write.
func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string, heartbeatInterval time.Duration) (datacatalog.Reservation, error) {
repo := r.repo.ReservationRepo()
reservationKey := transformers.FromReservationID(reservationID)
repoReservation, err := repo.Get(ctx, reservationKey)

reservationExists := true
if err != nil {
if errors.IsDoesNotExistError(err) {
// Reservation does not exist yet so let's create one
reservationExists = false
} else {
return datacatalog.Reservation{}, err
}
}

now := r.now()
newRepoReservation := models.Reservation{
ReservationKey: reservationKey,
OwnerID: ownerID,
ExpiresAt: now.Add(heartbeatInterval * r.heartbeatGracePeriodMultiplier),
}

// Conditional upsert on reservation. Race conditions are handled
// within the reservation repository Create and Update function calls.
var repoErr error
if !reservationExists {
repoErr = repo.Create(ctx, newRepoReservation, now)
} else if repoReservation.ExpiresAt.Before(now) || repoReservation.OwnerID == ownerID {
repoErr = repo.Update(ctx, newRepoReservation, now)
} else {
logger.Debugf(ctx, "Reservation: %+v is held by %s", reservationKey, repoReservation.OwnerID)

reservation, err := transformers.CreateReservation(&repoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

if repoErr != nil {
if repoErr.Error() == repo_errors.AlreadyExists {
// Looks like someone else tried to obtain the reservation
// at the same time and they won. Let's find out who won.
rsv1, err := repo.Get(ctx, reservationKey)
if err != nil {
return datacatalog.Reservation{}, err
}

reservation, err := transformers.CreateReservation(&rsv1, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

return datacatalog.Reservation{}, repoErr
}

// Reservation has been acquired or extended without error
reservation, err := transformers.CreateReservation(&newRepoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

// Release an active reservation with the specified owner. If one does not exist, gracefully return.
func (r *reservationManager) ReleaseReservation(ctx context.Context, request *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) {
repo := r.repo.ReservationRepo()
reservationKey := transformers.FromReservationID(request.ReservationId)

err := repo.Delete(ctx, reservationKey, request.OwnerId)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Reservation does not exist id: %+v, err %v", request.ReservationId, err)
r.systemMetrics.reservationDoesNotExist.Inc(ctx)
return &datacatalog.ReleaseReservationResponse{}, nil
}

logger.Errorf(ctx, "Failed to release reservation: %+v, err: %v", reservationKey, err)
r.systemMetrics.releaseReservationFailure.Inc(ctx)
return nil, err
}

r.systemMetrics.reservationReleased.Inc(ctx)
return &datacatalog.ReleaseReservationResponse{}, nil
}
Loading

0 comments on commit a9d0f7c

Please sign in to comment.