diff --git a/cmd/main.go b/cmd/main.go index 3e2eecab..0296f002 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -108,12 +108,17 @@ var command = cobra.Command{ return fmt.Errorf("init vsl client: %w", err) } - networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams, vslClient) + chainID, err := vslClient.ChainID(context.Background()) + if err != nil { + return fmt.Errorf("get chain id: %w", err) + } + + networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) if err != nil { return fmt.Errorf("new network params caller: %w", err) } - settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement, vslClient) + settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) if err != nil { return fmt.Errorf("new settlement caller: %w", err) } @@ -140,7 +145,7 @@ var command = cobra.Command{ } // Convert big.Int to int64; safe as long as the value fits in int64. - blockStartInt64 := blockStart.Int64() + blockStartInt64 := blockStart.Block.Int64() // Update the current block start for the network in Redis. err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) diff --git a/config/config.go b/config/config.go index ebd92ca7..32c78847 100644 --- a/config/config.go +++ b/config/config.go @@ -124,9 +124,10 @@ func (e Endpoint) BuildEthereumOptions() []ethereum.Option { } type Database struct { - Driver database.Driver `mapstructure:"driver" validate:"required" default:"postgres"` - Partition *bool `mapstructure:"partition" validate:"required" default:"true"` - URI string `mapstructure:"uri" validate:"required" default:"postgres://postgres@localhost:5432/postgres"` + CoveragePeriod int `mapstructure:"coverage_period" validate:"min=3,max=12" default:"3"` + Driver database.Driver `mapstructure:"driver" validate:"required" default:"postgres"` + Partition *bool `mapstructure:"partition" validate:"required" default:"true"` + URI string `mapstructure:"uri" validate:"required" default:"postgres://postgres@localhost:5432/postgres"` } type Stream struct { diff --git a/config/config_test.go b/config/config_test.go index 603499eb..47f1c098 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -105,6 +105,7 @@ component: } }, "database": { + "coverage_period": 3, "driver": "postgres", "partition": true, "uri": "postgres://postgres@localhost:5432/postgres" @@ -189,6 +190,7 @@ global_indexer_endpoint = "https://gi.rss3.dev/" access_token = "test" [database] +coverage_period = 3 driver = "postgres" partition = true uri = "postgres://postgres@localhost:5432/postgres" @@ -316,9 +318,10 @@ var configFileExpected = &File{ }, }, Database: &Database{ - Driver: "postgres", - Partition: lo.ToPtr(true), - URI: "postgres://postgres@localhost:5432/postgres", + CoveragePeriod: 3, + Driver: "postgres", + Partition: lo.ToPtr(true), + URI: "postgres://postgres@localhost:5432/postgres", }, Stream: &Stream{ Enable: lo.ToPtr(false), diff --git a/config/parameter/data.go b/config/parameter/data.go index 40ce090a..6bb1b3f8 100644 --- a/config/parameter/data.go +++ b/config/parameter/data.go @@ -9,8 +9,14 @@ import ( // NumberOfMonthsToCover the number of months that a Node should cover data for const NumberOfMonthsToCover = 4 +type StartBlock struct { + Block *big.Int `json:"block"` + Timestamp int64 `json:"timestamp"` +} + type NetworkTolerance map[network.Network]uint64 -type NetworkStartBlock map[network.Network]*big.Int +type NetworkStartBlock map[network.Network]*StartBlock + type NetworkCoreWorkerDiskSpacePerMonth map[network.Network]uint // CurrentNetworkTolerance should be updated each epoch from vsl diff --git a/config/parameter/network.go b/config/parameter/network.go index 54ae5c96..a0dbbb85 100644 --- a/config/parameter/network.go +++ b/config/parameter/network.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "math/big" "strconv" "strings" @@ -27,9 +26,9 @@ type NetworkParamsData struct { // UnmarshalJSON defines a custom UnmarshalJSON method for NetworkParamsData func (npd *NetworkParamsData) UnmarshalJSON(data []byte) error { networkParam := struct { - NetworkTolerance map[string]uint64 `json:"network_tolerance"` - NetworkStartBlock map[string]*big.Int `json:"network_start_block"` - NetworkCoreWorkerDiskSpacePerMonth map[string]uint `json:"network_core_worker_disk_space_per_month"` + NetworkTolerance map[string]uint64 `json:"network_tolerance"` + NetworkStartBlock map[string]*StartBlock `json:"network_start_block"` + NetworkCoreWorkerDiskSpacePerMonth map[string]uint `json:"network_core_worker_disk_space_per_month"` }{} if err := json.Unmarshal(data, &networkParam); err != nil { @@ -37,7 +36,7 @@ func (npd *NetworkParamsData) UnmarshalJSON(data []byte) error { } npd.NetworkTolerance = make(map[network.Network]uint64) - npd.NetworkStartBlock = make(map[network.Network]*big.Int) + npd.NetworkStartBlock = make(map[network.Network]*StartBlock) npd.NetworkCoreWorkerDiskSpacePerMonth = make(map[network.Network]uint) for key, value := range networkParam.NetworkTolerance { @@ -227,10 +226,8 @@ func buildNetworkBlockStartCacheKey(network string) string { // InitVSLClient initializes the VSL client func InitVSLClient() (ethereum.Client, error) { - vslEndpoint := endpoint.MustGet(network.VSL) - // Initialize vsl ethereum client. - vslClient, err := ethereum.Dial(context.Background(), vslEndpoint) + vslClient, err := ethereum.Dial(context.Background(), endpoint.MustGet(network.VSL)) if err != nil { return nil, err } @@ -254,11 +251,8 @@ func CheckParamsTask(ctx context.Context, redisClient rueidis.Client, networkPar continue // Skip if the start block is not defined. } - // Convert big.Int to int64; safe as long as the value fits in int64. - blockStartInt64 := blockStart.Int64() - // Update the current block start for the network in Redis. - err := UpdateBlockStart(ctx, redisClient, n.String(), blockStartInt64) + err := UpdateBlockStart(ctx, redisClient, n.String(), blockStart.Block.Int64()) if err != nil { return fmt.Errorf("update current block start: %w", err) } diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 9eec7634..b78ffc83 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -22,8 +22,13 @@ discovery: # Database configuration database: + # `coverage_period` is the number of months that the database will store the data, which cannot be less than three months. + coverage_period: 3 + # `driver` is the database driver used by the Node, currently only `postgres` is supported driver: postgres + # `partition` is used to enable the partition feature of the database. partition: true + # `uri` is the connection string of the database. uri: postgres://postgres:password@localhost:5432/postgres # Deploying a redis will significantly improve the indexing performance of some workers. diff --git a/deploy/min/config.min.yaml b/deploy/min/config.min.yaml index 1f4a7bec..08f13360 100644 --- a/deploy/min/config.min.yaml +++ b/deploy/min/config.min.yaml @@ -10,6 +10,7 @@ discovery: access_token: your_access_token database: + coverage_period: 3 driver: postgres partition: true uri: postgres://postgres:password@rss3_node_database:5432/postgres diff --git a/go.mod b/go.mod index 295dc726..9dbe8c73 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/rss3-network/node -go 1.21.4 +go 1.22 + +toolchain go1.22.1 require ( github.com/JohannesKaufmann/html-to-markdown v1.6.0 @@ -55,9 +57,11 @@ require ( github.com/adrianbrad/psqldocker v1.2.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 github.com/emirpasic/gods v1.18.1 + github.com/go-redsync/redsync/v4 v4.13.0 github.com/grafana/pyroscope-go v1.1.2 github.com/mitchellh/mapstructure v1.5.0 github.com/redis/rueidis v1.0.45 + github.com/redis/rueidis/rueidiscompat v1.0.45 github.com/rss3-network/protocol-go v0.5.3 github.com/spf13/afero v1.11.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 @@ -109,6 +113,8 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 1f65e57f..302adf87 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/cli v26.1.1+incompatible h1:bE1/uE2tCa08fMv+7ikLR/RDPoCqytwrLtkIkSzxLvw= @@ -136,14 +138,22 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA= github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA= +github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ= github.com/go-shiori/dom v0.0.0-20210627111528-4e4722cd0d65 h1:zx4B0AiwqKDQq+AgqxWeHwbbLJQeidq20hgfP+aMNWI= github.com/go-shiori/dom v0.0.0-20210627111528-4e4722cd0d65/go.mod h1:NPO1+buE6TYOWhUI98/hXLHHJhunIpXRuvDN4xjkCoE= github.com/go-shiori/go-readability v0.0.0-20231029095239-6b97d5aba789 h1:G6wSuUyCoLB9jrUokipsmFuRi8aJozt3phw/g9Sl4Xs= github.com/go-shiori/go-readability v0.0.0-20231029095239-6b97d5aba789/go.mod h1:2DpZlTJO/ycxp/vsc/C11oUyveStOgIXB88SYV1lncI= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -166,6 +176,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -174,6 +186,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= @@ -189,8 +203,13 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjw github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hamba/avro v1.8.0 h1:eCVrLX7UYThA3R3yBZ+rpmafA5qTc3ZjpTz6gYJoVGU= github.com/hamba/avro v1.8.0/go.mod h1:NiGUcrLLT+CKfGu5REWQtD9OVPPYUGMVFiC+DE0lQfY= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= @@ -313,6 +332,8 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo/v2 v2.20.1 h1:YlVIbqct+ZmnEph770q9Q7NVAz4wwIiVNahee6JyUzo= +github.com/onsi/ginkgo/v2 v2.20.1/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= @@ -348,8 +369,14 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/redis/rueidis v1.0.45 h1:j7hfcqfLLIqgTK3IkxBhXdeJcP34t3XLXvorDLqXfgM= github.com/redis/rueidis v1.0.45/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= +github.com/redis/rueidis/mock v1.0.45 h1:r2LRiwOtFib8+NAuVxNmcdxL7IMUk3as9IGPzx2v5tA= +github.com/redis/rueidis/mock v1.0.45/go.mod h1:bjpk7ox5jwue03L8NpjgPBE91tLkm9Amla+XFYhaezc= +github.com/redis/rueidis/rueidiscompat v1.0.45 h1:G+3HIaETgtwr6aL6BOTFCa2DfpPDhxqcoiDn8cvd5Ps= +github.com/redis/rueidis/rueidiscompat v1.0.45/go.mod h1:JMw3cktmwNqsSl2npjcxrOfLa1L3rmj1Ox5aPHiDjOU= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= @@ -413,6 +440,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= @@ -502,6 +531,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -620,6 +651,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/database/client.go b/internal/database/client.go index 323d0704..8756821e 100644 --- a/internal/database/client.go +++ b/internal/database/client.go @@ -3,6 +3,7 @@ package database import ( "context" "database/sql" + "time" "github.com/ethereum/go-ethereum/common" "github.com/pressly/goose/v3" @@ -28,6 +29,7 @@ type Client interface { SaveActivities(ctx context.Context, activities []*activityx.Activity) error FindActivity(ctx context.Context, query model.ActivityQuery) (*activityx.Activity, *int, error) FindActivities(ctx context.Context, query model.ActivitiesQuery) ([]*activityx.Activity, error) + DeleteExpiredActivities(ctx context.Context, network network.Network, timestamp time.Time) error } type Session interface { diff --git a/internal/database/dialer/postgres/client.go b/internal/database/dialer/postgres/client.go index 95070ce9..0e8bda43 100644 --- a/internal/database/dialer/postgres/client.go +++ b/internal/database/dialer/postgres/client.go @@ -293,6 +293,15 @@ func (c *client) FindActivities(ctx context.Context, query model.ActivitiesQuery return nil, fmt.Errorf("not implemented") } +// DeleteExpiredActivities deletes expired activities. +func (c *client) DeleteExpiredActivities(ctx context.Context, network networkx.Network, timestamp time.Time) error { + if c.partition { + return c.deleteExpiredActivitiesPartitioned(ctx, network, timestamp) + } + + return fmt.Errorf("not implemented") +} + // LoadDatasetFarcasterProfile loads a profile. func (c *client) LoadDatasetFarcasterProfile(ctx context.Context, fid int64) (*model.Profile, error) { var value table.DatasetFarcasterProfile diff --git a/internal/database/dialer/postgres/client_partitioned.go b/internal/database/dialer/postgres/client_partitioned.go index f08c1246..22e9b749 100644 --- a/internal/database/dialer/postgres/client_partitioned.go +++ b/internal/database/dialer/postgres/client_partitioned.go @@ -12,6 +12,7 @@ import ( "github.com/rss3-network/node/internal/database/dialer/postgres/table" "github.com/rss3-network/node/internal/database/model" activityx "github.com/rss3-network/protocol-go/schema/activity" + "github.com/rss3-network/protocol-go/schema/network" "github.com/samber/lo" "github.com/sourcegraph/conc/pool" "go.uber.org/zap" @@ -38,6 +39,19 @@ func (c *client) createPartitionTable(ctx context.Context, name, template string return nil } +// findPartitionTableExists check if a partition table exists. +func (c *client) findPartitionTableExists(ctx context.Context, name string) (bool, error) { + statement := fmt.Sprintf(`SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '%s')`, name) + + var exists bool + + if err := c.database.WithContext(ctx).Raw(statement).Scan(&exists).Error; err != nil { + return false, err + } + + return exists, nil +} + // findIndexesPartitionTable finds partition table names of indexes in the past year. func (c *client) findIndexesPartitionTables(_ context.Context, index table.Index) []string { partitionedNames := make([]string, 0) @@ -499,6 +513,110 @@ func (c *client) findIndexesPartitioned(ctx context.Context, query model.Activit } } +// deleteExpiredActivitiesPartitioned deletes expired activities. +func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network network.Network, timestamp time.Time) error { + var ( + batchSize = 1000 + dropActivitiesTables = make([]string, 0) + checkTablesTimestamp = []time.Time{timestamp} + ) + + for i := 1; i <= 4; i++ { + dropActivitiesTables = append(dropActivitiesTables, c.buildActivitiesTableNames(network, timestamp.AddDate(0, -3*i, 0))) + checkTablesTimestamp = append(checkTablesTimestamp, timestamp.AddDate(0, -3*i, 0)) + } + + // Drop expired activities tables. + for _, name := range dropActivitiesTables { + zap.L().Info("dropping table", zap.String("table", name)) + + if err := c.database.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS "%s"`, name)).Error; err != nil { + zap.L().Error("failed to drop table", zap.Error(err), zap.String("table", name)) + + return fmt.Errorf("drop table: %w", err) + } + } + + for _, checkTimestamp := range checkTablesTimestamp { + activityTable := c.buildActivitiesTableNames(network, checkTimestamp) + + indexTable := c.buildIndexesTableNames(checkTimestamp) + + activityTableExists, err := c.findPartitionTableExists(ctx, activityTable) + if err != nil { + return fmt.Errorf("find partition table exists: %w", err) + } + + indexTableExists, err := c.findPartitionTableExists(ctx, indexTable) + if err != nil { + return fmt.Errorf("find partition table exists: %w", err) + } + + if !indexTableExists { + continue + } + + for { + done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(activityTableExists, &activityTable, nil)) + if err != nil { + return fmt.Errorf("batch delete expired activities: %w", err) + } + + if done { + break + } + } + } + + return nil +} + +func (c *client) batchDeleteExpiredActivities(ctx context.Context, network network.Network, timestamp time.Time, batchSize int, indexTable *string, activityTable *string) (bool, error) { + databaseTransaction := c.database.WithContext(ctx).Debug().Begin() + defer func() { + _ = databaseTransaction.Rollback().Error + }() + + var transactionIDs []string + + // Find expired activities. + if err := c.database.WithContext(ctx).Table(*indexTable).Select("id").Where("network = ?", network.String()). + Where("timestamp < ?", timestamp).Limit(batchSize). + Pluck("id", &transactionIDs).Error; err != nil { + zap.L().Error("failed to find expired activities", zap.Error(err), zap.String("table", *indexTable)) + + return false, fmt.Errorf("find expired activities: %w", err) + } + + if len(transactionIDs) == 0 { + return true, nil + } + + // Delete expired indexes. + if err := databaseTransaction.Table(*indexTable).Where("id IN ?", transactionIDs).Delete(&table.Index{}).Error; err != nil { + zap.L().Error("failed to delete expired indexes", zap.Error(err), zap.String("table", *indexTable)) + + return false, fmt.Errorf("delete expired indexes: %w", err) + } + + if activityTable != nil { + // Delete expired activities. + if err := databaseTransaction.Table(*activityTable).Where("id IN ?", transactionIDs).Delete(&table.Activity{}).Error; err != nil { + zap.L().Error("failed to delete expired activities", zap.Error(err), zap.String("table", *activityTable)) + + return false, fmt.Errorf("delete expired activities: %w", err) + } + } + + if err := databaseTransaction.Commit().Error; err != nil { + zap.L().Error("failed to commit transaction", zap.Error(err)) + + return false, fmt.Errorf("commit transaction: %w", err) + } + + return false, nil +} + // buildFindIndexStatement builds the query index statement. func (c *client) buildFindIndexStatement(ctx context.Context, partitionedName string, query model.ActivityQuery) *gorm.DB { databaseStatement := c.database.WithContext(ctx).Table(partitionedName) @@ -576,3 +694,13 @@ func (c *client) buildFindIndexesStatement(ctx context.Context, partition string return databaseStatement.Order("timestamp DESC, index DESC").Limit(query.Limit) } + +// buildActivitiesTableNames builds the activities table names. +func (c *client) buildActivitiesTableNames(network network.Network, timestamp time.Time) string { + return fmt.Sprintf("%s_%s_%d_q%d", (*table.Activity).TableName(nil), network, timestamp.Year(), int(timestamp.Month()-1)/3+1) +} + +// buildIndexesTableNames builds the indexes table names. +func (c *client) buildIndexesTableNames(timestamp time.Time) string { + return fmt.Sprintf("%s_%d_q%d", (*table.Index).TableName(nil), timestamp.Year(), int(timestamp.Month()-1)/3+1) +} diff --git a/internal/engine/source/arweave/option.go b/internal/engine/source/arweave/option.go index da40962e..89f6ce05 100644 --- a/internal/engine/source/arweave/option.go +++ b/internal/engine/source/arweave/option.go @@ -31,7 +31,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error if parameters == nil { return &Option{ - BlockStart: parameter.CurrentNetworkStartBlock[n], + BlockStart: parameter.CurrentNetworkStartBlock[n].Block, ConcurrentBlockRequests: lo.ToPtr(defaultConcurrentBlockRequests), }, nil } @@ -50,7 +50,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error } if option.BlockStart == nil { - option.BlockStart = parameter.CurrentNetworkStartBlock[n] + option.BlockStart = parameter.CurrentNetworkStartBlock[n].Block } return &option, nil diff --git a/internal/engine/source/ethereum/option.go b/internal/engine/source/ethereum/option.go index ee7c89b0..9d713816 100644 --- a/internal/engine/source/ethereum/option.go +++ b/internal/engine/source/ethereum/option.go @@ -36,7 +36,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error if parameters == nil { return &Option{ - BlockStart: parameter.CurrentNetworkStartBlock[n], + BlockStart: parameter.CurrentNetworkStartBlock[n].Block, ConcurrentBlockRequests: lo.ToPtr(defaultConcurrentBlockRequests), BlockBatchSize: lo.ToPtr(defaultBlockBatchSize), ReceiptsBatchSize: lo.ToPtr(defaultReceiptsBatchSize), @@ -66,7 +66,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error } if option.BlockStart == nil { - option.BlockStart = parameter.CurrentNetworkStartBlock[n] + option.BlockStart = parameter.CurrentNetworkStartBlock[n].Block } return &option, nil diff --git a/internal/engine/source/farcaster/option.go b/internal/engine/source/farcaster/option.go index 0e49bd92..d0931093 100644 --- a/internal/engine/source/farcaster/option.go +++ b/internal/engine/source/farcaster/option.go @@ -19,7 +19,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error if parameters == nil { return &Option{ - TimestampStart: parameter.CurrentNetworkStartBlock[n], + TimestampStart: parameter.CurrentNetworkStartBlock[n].Block, }, nil } @@ -28,7 +28,7 @@ func NewOption(n network.Network, parameters *config.Parameters) (*Option, error } if option.TimestampStart == nil { - option.TimestampStart = parameter.CurrentNetworkStartBlock[n] + option.TimestampStart = parameter.CurrentNetworkStartBlock[n].Block } return &option, nil diff --git a/internal/node/monitor/cronjob.go b/internal/node/monitor/cronjob.go new file mode 100644 index 00000000..e5af38a3 --- /dev/null +++ b/internal/node/monitor/cronjob.go @@ -0,0 +1,93 @@ +package monitor + +import ( + "context" + "fmt" + "time" + + "github.com/go-redsync/redsync/v4" + rueidisx "github.com/go-redsync/redsync/v4/redis/rueidis" + "github.com/redis/rueidis" + "github.com/redis/rueidis/rueidiscompat" + "github.com/robfig/cron/v3" + "go.uber.org/zap" +) + +type CronJob struct { + crontab *cron.Cron + mutex *redsync.Mutex + timeout time.Duration +} + +var KeyPrefix = "cronjob:%s" + +func (c *CronJob) AddFunc(ctx context.Context, spec string, cmd func()) error { + _, err := c.crontab.AddFunc(spec, func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := c.mutex.Lock(); err != nil { + zap.L().Error("lock error", zap.String("key", c.mutex.Name()), zap.Error(err)) + + return + } + + defer func() { + if _, err := c.mutex.Unlock(); err != nil { + zap.L().Error("release lock error", zap.String("key", c.mutex.Name()), zap.Error(err)) + } + }() + + c.Renewal(ctx) + cmd() + }) + + return err +} + +func (c *CronJob) Renewal(ctx context.Context) { + go func(ctx context.Context) { + // Renewal lock every half of timeout. + ticker := time.NewTicker(c.timeout / 2) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + result, err := c.mutex.ExtendContext(ctx) + if err != nil { + zap.L().Error("extend lock error", zap.String("key", c.mutex.Name()), zap.Error(err)) + + continue + } + + if !result { + zap.L().Error("extend lock failed", zap.String("key", c.mutex.Name())) + + continue + } + } + } + }(ctx) +} + +func (c *CronJob) Start() { + c.crontab.Start() +} + +func (c *CronJob) Stop() { + c.crontab.Stop() +} + +func NewCronJob(client rueidis.Client, name string, timeout time.Duration) (*CronJob, error) { + pool := rueidisx.NewPool(rueidiscompat.NewAdapter(client)) + rs := redsync.New(pool) + + return &CronJob{ + crontab: cron.New(cron.WithLocation(time.UTC), cron.WithSeconds()), + mutex: rs.NewMutex(fmt.Sprintf(KeyPrefix, name), redsync.WithExpiry(timeout)), + timeout: timeout, + }, nil +} diff --git a/internal/node/monitor/maintainer.go b/internal/node/monitor/maintainer.go new file mode 100644 index 00000000..ae141d9f --- /dev/null +++ b/internal/node/monitor/maintainer.go @@ -0,0 +1,56 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/rss3-network/node/config/parameter" + "go.uber.org/zap" +) + +func (m *Monitor) MaintainCoveragePeriod(ctx context.Context) error { + // Get the latest epoch from the chain. + epoch, err := m.settlementCaller.CurrentEpoch(&bind.CallOpts{Context: ctx}) + if err != nil { + zap.L().Error("failed to get current epoch", zap.Error(err)) + + return fmt.Errorf("failed to get current epoch: %w", err) + } + + // Read the network parameters from the chain. + params, err := m.networkParamsCaller.GetParams(&bind.CallOpts{}, epoch.Uint64()) + if err != nil { + zap.L().Error("failed to get network parameters", zap.Error(err)) + + return fmt.Errorf("failed to get network parameters: %w", err) + } + + var paramsData parameter.NetworkParamsData + + if err := json.Unmarshal([]byte(params), ¶msData); err != nil { + return fmt.Errorf("json unmarshal: %w", err) + } + + // Load the coverage period from the config file. + year, month, _ := time.Now().AddDate(0, -m.config.Database.CoveragePeriod, 0).Date() + + configTimestamp := time.Date(year, month, 1, 0, 0, 0, 0, time.Local) + + for network, start := range paramsData.NetworkStartBlock { + timestamp := time.Unix(start.Timestamp, 0) + + if timestamp.After(configTimestamp) { + timestamp = configTimestamp + } + + // Delete expired data. + if err := m.databaseClient.DeleteExpiredActivities(ctx, network, timestamp); err != nil { + zap.L().Error("delete coverage period", zap.Error(err), zap.String("network", network.String()), zap.Time("timestamp", timestamp)) + } + } + + return nil +} diff --git a/internal/node/monitor/server.go b/internal/node/monitor/server.go index 527e16c2..f965814e 100644 --- a/internal/node/monitor/server.go +++ b/internal/node/monitor/server.go @@ -6,19 +6,19 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/redis/rueidis" - "github.com/robfig/cron/v3" "github.com/rss3-network/node/config" "github.com/rss3-network/node/config/parameter" "github.com/rss3-network/node/internal/database" "github.com/rss3-network/node/provider/ethereum/contract/vsl" "github.com/rss3-network/protocol-go/schema/network" + "go.uber.org/zap" ) type Monitor struct { config *config.File - cron *cron.Cron databaseClient database.Client redisClient rueidis.Client networkParamsCaller *vsl.NetworkParamsCaller @@ -28,16 +28,13 @@ type Monitor struct { func (m *Monitor) Run(ctx context.Context) error { if m.databaseClient != nil && m.redisClient != nil { - _, err := m.cron.AddFunc("@every 15m", func() { - if err := m.MonitorWorkerStatus(ctx); err != nil { - return - } - }) + // Start the monitor cron job. + monitorWorkerStatus, err := NewCronJob(m.redisClient, "worker_status", 10*time.Minute) if err != nil { - return fmt.Errorf("add heartbeat cron job: %w", err) + return fmt.Errorf("new cron job: %w", err) } - _, err = m.cron.AddFunc("@every 5m", func() { + if err := monitorWorkerStatus.AddFunc(ctx, "@every 5m", func() { if err := parameter.CheckParamsTask(ctx, m.redisClient, m.networkParamsCaller); err != nil { return } @@ -45,13 +42,29 @@ func (m *Monitor) Run(ctx context.Context) error { if err := m.MonitorWorkerStatus(ctx); err != nil { return } - }) + }); err != nil { + return fmt.Errorf("add heartbeat cron job: %w", err) + } + + monitorWorkerStatus.Start() + // Start the database maintenance cron job. + databaseMaintenance, err := NewCronJob(m.redisClient, "database_maintenance", 5*24*time.Hour) if err != nil { - return fmt.Errorf("add heartbeat cron job: %w", err) + return fmt.Errorf("new cron job: %w", err) + } + + if err := databaseMaintenance.AddFunc(ctx, "0 0 0 * * *", func() { + if err := m.MaintainCoveragePeriod(ctx); err != nil { + zap.L().Error("maintain coverage period", zap.Error(err)) + + return + } + }); err != nil { + return fmt.Errorf("add database maintenance cron job: %w", err) } - m.cron.Start() + databaseMaintenance.Start() } stopChan := make(chan os.Signal, 1) @@ -119,7 +132,6 @@ func NewMonitor(_ context.Context, configFile *config.File, databaseClient datab instance := &Monitor{ config: configFile, - cron: cron.New(), databaseClient: databaseClient, redisClient: redisClient, clients: clients, diff --git a/provider/ethereum/contract/vsl/contract.go b/provider/ethereum/contract/vsl/contract.go index 2e0a2be7..83700deb 100644 --- a/provider/ethereum/contract/vsl/contract.go +++ b/provider/ethereum/contract/vsl/contract.go @@ -14,14 +14,27 @@ import ( // Settlement https://scan.rss3.io/address/0x0cE3159BF19F3C55B648D04E8f0Ae1Ae118D2A0B //go:generate go run --mod=mod github.com/ethereum/go-ethereum/cmd/abigen --abi ./abi/Settlement.abi --pkg vsl --type Settlement --out contract_settlement.go +const ( + ChainIDMainnet = 12553 + ChainIDTestnet = 2331 +) + var ( AddressL1RSS3 = common.HexToAddress("0xc98D64DA73a6616c42117b582e832812e7B8D57F") AddressL2RSS3 = common.HexToAddress("0x4200000000000000000000000000000000000042") AddressL1StandardBridge = common.HexToAddress("0x4cbab69108Aa72151EDa5A3c164eA86845f18438") AddressL2StandardBridge = common.HexToAddress("0x4200000000000000000000000000000000000010") AddressL1OptimismPortal = common.HexToAddress("0x6A12432491bbbE8d3babf75F759766774C778Db4") - AddressNetworkParams = common.HexToAddress("0x15176Aabdc4836c38947a67313d209204051C502") - AddressSettlement = common.HexToAddress("0x0cE3159BF19F3C55B648D04E8f0Ae1Ae118D2A0B") + + AddressNetworkParams = map[int64]common.Address{ + ChainIDMainnet: common.HexToAddress("0x15176Aabdc4836c38947a67313d209204051C502"), + ChainIDTestnet: common.HexToAddress("0x5d768cAef86d3DA8eC6009eE4B3d9b7Fe26A43CB"), + } + + AddressSettlement = map[int64]common.Address{ + ChainIDMainnet: common.HexToAddress("0x0cE3159BF19F3C55B648D04E8f0Ae1Ae118D2A0B"), + ChainIDTestnet: common.HexToAddress("0xA37a6Ef0c3635824be2b6c87A23F6Df5d0E2ba1b"), + } EventHashAddressL1StandardBridgeETHDepositInitiated = contract.EventHash("ETHDepositInitiated(address,address,uint256,bytes)") EventHashAddressL1StandardBridgeERC20DepositInitiated = contract.EventHash("ERC20DepositInitiated(address,address,address,address,uint256,bytes)")