From 781c956ee85cf9afbfdc827af4ffe39a749e1d07 Mon Sep 17 00:00:00 2001 From: Starnop Date: Tue, 19 Mar 2019 10:12:40 +0800 Subject: [PATCH] feature: implement the peer mgr Signed-off-by: Starnop --- apis/swagger.yml | 5 +- apis/types/peer_info.go | 21 +++ common/util/assert.go | 16 ++- common/util/assert_test.go | 8 ++ supernode/daemon/daemon.go | 1 + supernode/daemon/mgr/filter.go | 183 ++++++++++++++++++++++++++ supernode/daemon/mgr/filter_test.go | 134 +++++++++++++++++++ supernode/daemon/mgr/peer_mgr.go | 168 +++++++++++++++++++++-- supernode/daemon/mgr/peer_mgr_test.go | 179 +++++++++++++++++++++++++ supernode/daemon/mgr/store.go | 60 +++++++++ supernode/daemon/mgr/store_test.go | 112 ++++++++++++++++ supernode/errors/errors.go | 111 ++++++++++++++++ supernode/result/result_info.go | 68 ++++++++++ supernode/server/peer_bridge.go | 70 ++++++++++ supernode/server/router.go | 9 +- supernode/server/server.go | 11 +- 16 files changed, 1140 insertions(+), 16 deletions(-) create mode 100644 supernode/daemon/mgr/filter.go create mode 100644 supernode/daemon/mgr/filter_test.go create mode 100644 supernode/daemon/mgr/peer_mgr_test.go create mode 100644 supernode/daemon/mgr/store.go create mode 100644 supernode/daemon/mgr/store_test.go create mode 100644 supernode/errors/errors.go create mode 100644 supernode/result/result_info.go create mode 100644 supernode/server/peer_bridge.go diff --git a/apis/swagger.yml b/apis/swagger.yml index 5f8c1f483..6bce8fcfd 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -482,7 +482,10 @@ definitions: version: type: "string" description: "version number of dfget binary" - + created: + type : "string" + format : "date-time" + description: "the time to join the P2P network" TaskCreateRequest: type: "object" description: "" diff --git a/apis/types/peer_info.go b/apis/types/peer_info.go index cfe7232b9..91015960a 100644 --- a/apis/types/peer_info.go +++ b/apis/types/peer_info.go @@ -27,6 +27,10 @@ type PeerInfo struct { // Format: ipv4 IP strfmt.IPv4 `json:"IP,omitempty"` + // the time to join the P2P network + // Format: date-time + Created strfmt.DateTime `json:"created,omitempty"` + // host name of peer client node, as a valid RFC 1123 hostname. // Min Length: 1 // Format: hostname @@ -52,6 +56,10 @@ func (m *PeerInfo) Validate(formats strfmt.Registry) error { res = append(res, err) } + if err := m.validateCreated(formats); err != nil { + res = append(res, err) + } + if err := m.validateHostName(formats); err != nil { res = append(res, err) } @@ -79,6 +87,19 @@ func (m *PeerInfo) validateIP(formats strfmt.Registry) error { return nil } +func (m *PeerInfo) validateCreated(formats strfmt.Registry) error { + + if swag.IsZero(m.Created) { // not required + return nil + } + + if err := validate.FormatOf("created", "body", "date-time", m.Created.String(), formats); err != nil { + return err + } + + return nil +} + func (m *PeerInfo) validateHostName(formats strfmt.Registry) error { if swag.IsZero(m.HostName) { // not required diff --git a/common/util/assert.go b/common/util/assert.go index c50af13c3..88ba37be5 100644 --- a/common/util/assert.go +++ b/common/util/assert.go @@ -22,6 +22,7 @@ import ( "encoding/json" "reflect" "strconv" + "unicode" ) // Max returns the larger of x or y. @@ -42,7 +43,12 @@ func Min(x, y int32) int32 { // IsEmptyStr returns whether the string s is empty. func IsEmptyStr(s string) bool { - return s == "" + for _, v := range s { + if !unicode.IsSpace(v) { + return false + } + } + return true } // IsEmptySlice returns whether the slice values is empty. @@ -73,6 +79,14 @@ func IsPositive(value int64) bool { return value > 0 } +// IsNatural returns whether the value>=0. +func IsNatural(value string) bool { + if v, err := strconv.Atoi(value); err == nil { + return v >= 0 + } + return false +} + // IsNumeric returns whether the value is a numeric. // If the bitSize of value below 0 or above 64 an error is returned. func IsNumeric(value string) bool { diff --git a/common/util/assert_test.go b/common/util/assert_test.go index 4ac0ce18e..795cc73e2 100644 --- a/common/util/assert_test.go +++ b/common/util/assert_test.go @@ -48,6 +48,8 @@ func (suite *AssertSuite) TestMin(c *check.C) { func (suite *AssertSuite) TestIsEmptyStr(c *check.C) { c.Assert(IsEmptyStr(""), check.Equals, true) + c.Assert(IsEmptyStr(" "), check.Equals, true) + c.Assert(IsEmptyStr("\n "), check.Equals, true) c.Assert(IsEmptyStr("x"), check.Equals, false) } @@ -77,6 +79,12 @@ func (suite *AssertSuite) TestIsPositive(c *check.C) { c.Assert(IsPositive(-1), check.Equals, false) } +func (suite *AssertSuite) TestIsNatural(c *check.C) { + c.Assert(IsNatural("0"), check.Equals, true) + c.Assert(IsNatural("1"), check.Equals, true) + c.Assert(IsNatural("-1"), check.Equals, false) +} + func (suite *AssertSuite) TestIsNumeric(c *check.C) { c.Assert(IsNumeric("0"), check.Equals, true) c.Assert(IsNumeric("1"), check.Equals, true) diff --git a/supernode/daemon/daemon.go b/supernode/daemon/daemon.go index 25cbbacd8..0ea319bc6 100644 --- a/supernode/daemon/daemon.go +++ b/supernode/daemon/daemon.go @@ -3,6 +3,7 @@ package daemon import ( "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/server" + "github.com/sirupsen/logrus" ) diff --git a/supernode/daemon/mgr/filter.go b/supernode/daemon/mgr/filter.go new file mode 100644 index 000000000..1875c61ea --- /dev/null +++ b/supernode/daemon/mgr/filter.go @@ -0,0 +1,183 @@ +package mgr + +import ( + "net/http" + "sort" + "strconv" + "strings" + + "github.com/dragonflyoss/Dragonfly/common/util" + errorType "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/pkg/errors" +) + +const ( + // PAGENUM identity the page number of the data. + // The default value is 0. + PAGENUM = "pageNum" + + // PAGESIZE identity the page size of the data. + // If this value equals 0, return all values. + PAGESIZE = "pageSize" + + // SORTKEY identity the sort key of the data. + // Each mgr needs to define acceptable values based on its own implementation. + SORTKEY = "sortKey" + + // SORTDIRECT identity the sort direct of the data. + // The value can only be ASC or DESC. + SORTDIRECT = "sortDirect" + + // ASCDIRECT means to sort the records in ascending order. + ASCDIRECT = "ASC" + + // DESCDIRECT means to sort the records in descending order. + DESCDIRECT = "DESC" +) + +var sortDirectMap = map[string]bool{ + ASCDIRECT: true, + DESCDIRECT: true, +} + +// PageFilter is a struct. +type PageFilter struct { + pageNum int + pageSize int + sortKey []string + sortDirect string +} + +// ParseFilter gets filter params from request and returns a map[string][]string. +func ParseFilter(req *http.Request, sortKeyMap map[string]bool) (pageFilter *PageFilter, err error) { + v := req.URL.Query() + pageFilter = &PageFilter{} + + // pageNum + pageNum, err := stoi(v.Get(PAGENUM)) + if err != nil { + return nil, errors.Wrapf(errorType.ErrInvalidValue, "pageNum %s is not a number: %v", pageNum, err) + } + pageFilter.pageNum = pageNum + + // pageSize + pageSize, err := stoi(v.Get(PAGESIZE)) + if err != nil { + return nil, errors.Wrapf(errorType.ErrInvalidValue, "pageSize %s is not a number: %v", pageSize, err) + } + pageFilter.pageSize = pageSize + + // sortDirect + sortDirect := v.Get(SORTDIRECT) + if sortDirect == "" { + sortDirect = ASCDIRECT + } + pageFilter.sortDirect = sortDirect + + // sortKey + if sortKey, ok := v[SORTKEY]; ok { + pageFilter.sortKey = sortKey + } + + err = validateFilter(pageFilter, sortKeyMap) + if err != nil { + return nil, err + } + + return +} + +func stoi(str string) (int, error) { + if util.IsEmptyStr(str) { + return 0, nil + } + + result, err := strconv.Atoi(str) + if err != nil || result < 0 { + return -1, err + } + return result, nil +} + +func validateFilter(pageFilter *PageFilter, sortKeyMap map[string]bool) error { + // pageNum + if pageFilter.pageNum < 0 { + return errors.Wrapf(errorType.ErrInvalidValue, "pageNum %s is not a natural number: %v", pageFilter.pageNum) + } + + // pageSize + if pageFilter.pageSize < 0 { + return errors.Wrapf(errorType.ErrInvalidValue, "pageSize %s is not a natural number: %v", pageFilter.pageSize) + } + + // sortDirect + if _, ok := sortDirectMap[strings.ToUpper(pageFilter.sortDirect)]; !ok { + return errors.Wrapf(errorType.ErrInvalidValue, "unexpected sortDirect %s", pageFilter.sortDirect) + } + + // sortKey + if len(pageFilter.sortKey) == 0 || sortKeyMap == nil { + return nil + } + for _, value := range pageFilter.sortKey { + if v, ok := sortKeyMap[value]; !ok || !v { + return errors.Wrapf(errorType.ErrInvalidValue, "unexpected sortKey %s", value) + } + } + + return nil +} + +// getPageValues get some pages of metaSlice after ordering it. +// The less is a function that reports whether the element with +// index i should sort before the element with index j. +// +// Eg: +// people := []struct { +// Name string +// Age int +// }{ +// {"Gopher", 7}, +// {"Alice", 55}, +// {"Vera", 24}, +// {"Bob", 75}, +// } +// +// If you want to sort it by age, and the less function should be defined as follows: +// +// less := func(i, j int) bool { return people[i].Age < people[j].Age } +func getPageValues(metaSlice []interface{}, pageNum, pageSize int, + less func(i, j int) bool) []interface{} { + + if metaSlice == nil { + return nil + } + if less == nil { + return metaSlice + } + + // sort the data slice + sort.Slice(metaSlice, less) + + if pageSize == 0 { + return metaSlice + } + + sliceLength := len(metaSlice) + start := pageNum * pageSize + end := (pageNum + 1) * pageSize + + if sliceLength < start { + return nil + } + if sliceLength < end { + return metaSlice[start:sliceLength] + } + return metaSlice[start:end] +} + +// isDESC returns whether the sortDirect is desc. +func isDESC(str string) bool { + return strings.ToUpper(str) == DESCDIRECT +} diff --git a/supernode/daemon/mgr/filter_test.go b/supernode/daemon/mgr/filter_test.go new file mode 100644 index 000000000..2844fbcbf --- /dev/null +++ b/supernode/daemon/mgr/filter_test.go @@ -0,0 +1,134 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mgr + +import ( + "net/http" + "net/url" + + "github.com/dragonflyoss/Dragonfly/common/util" + + "github.com/go-check/check" +) + +func init() { + check.Suite(&FilterTestSuite{}) +} + +type FilterTestSuite struct { +} + +func (f *FilterTestSuite) TestParseFilter(c *check.C) { + var cases = []struct { + name string + req *http.Request + excepted *PageFilter + errNil bool + }{ + { + name: "normal test", + req: &http.Request{ + URL: &url.URL{ + RawQuery: "pageNum=1&pageSize=10&sortDirect=ASC&sortKey=aaa", + }, + }, + excepted: &PageFilter{ + pageNum: 1, + pageSize: 10, + sortDirect: "ASC", + sortKey: []string{"aaa"}, + }, + errNil: true, + }, + { + name: "err pageNum test", + req: &http.Request{ + URL: &url.URL{ + RawQuery: "pageNum=-1&pageSize=10&sortDirect=ASC", + }, + }, + excepted: nil, + errNil: false, + }, + { + name: "err sortDirect test", + req: &http.Request{ + URL: &url.URL{ + RawQuery: "pageNum=1&pageSize=10&sortDirect=ACS", + }, + }, + excepted: nil, + errNil: false, + }, + { + name: "err sortKey test", + req: &http.Request{ + URL: &url.URL{ + RawQuery: "pageNum=1&pageSize=10&sortDirect=ACS&sortKey=ccc", + }, + }, + excepted: nil, + errNil: false, + }, + } + + for _, v := range cases { + result, err := ParseFilter(v.req, map[string]bool{ + "aaa": true, + "bbb": true, + "ccc": false, + }) + c.Check(result, check.DeepEquals, v.excepted) + c.Check(util.IsNil(err), check.Equals, v.errNil) + } +} + +func (f *FilterTestSuite) TestStoi(c *check.C) { + var cases = []struct { + str string + excepted int + errNil bool + }{ + {"", 0, true}, + {"0", 0, true}, + {"25", 25, true}, + {"aaa", -1, false}, + } + + for _, v := range cases { + result, err := stoi(v.str) + c.Check(result, check.Equals, v.excepted) + c.Check(util.IsNil(err), check.Equals, v.errNil) + } +} + +func (f *FilterTestSuite) TestIsDESC(c *check.C) { + var cases = []struct { + str string + excepted bool + }{ + {"desc", true}, + {"dEsc", true}, + {"DESC", true}, + {"dsce", false}, + } + + for _, v := range cases { + result := isDESC(v.str) + c.Check(result, check.Equals, v.excepted) + } +} diff --git a/supernode/daemon/mgr/peer_mgr.go b/supernode/daemon/mgr/peer_mgr.go index f30d884de..844a5bd33 100644 --- a/supernode/daemon/mgr/peer_mgr.go +++ b/supernode/daemon/mgr/peer_mgr.go @@ -2,8 +2,15 @@ package mgr import ( "context" + "fmt" + "time" "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/common/util" + errorType "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/go-openapi/strfmt" + "github.com/pkg/errors" ) // PeerMgr as an interface defines all operations against Peer. @@ -11,7 +18,7 @@ import ( type PeerMgr interface { // Register a peer with specified peerInfo. // Supernode will generate a unique peerID for every Peer with PeerInfo provided. - Register(ctx context.Context, peerInfo *types.PeerInfo) (peerID string, err error) + Register(ctx context.Context, peerCreateRequest *types.PeerCreateRequest) (peerCreateResponse *types.PeerCreateResponse, err error) // DeRegister offline a peer service and // NOTE: update the info related for scheduler. @@ -21,14 +28,153 @@ type PeerMgr interface { Get(ctx context.Context, peerID string) (*types.PeerInfo, error) // List return a list of peers info with filter. - List(ctx context.Context, filter map[string]string) (peerList []*types.PeerInfo, err error) - - // Update the status of specified peer. - // - // Supernode will update the status of peer in the following situations: - // 1) When an exception occurs to the peer server. - // 2) When peer sends a request take the server offline. - // - // NOTE: update the info related for scheduler. - Update(ctx context.Context, peerID string, peerInfo *types.PeerInfo) error + List(ctx context.Context, filter *PageFilter) (peerList []*types.PeerInfo, err error) +} + +// PeerManager is an implement of the interface of PeerMgr. +type PeerManager struct { + peerStore *Store +} + +// NewPeerManager return a new PeerManager Object. +func NewPeerManager() (*PeerManager, error) { + return &PeerManager{ + peerStore: NewStore(), + }, nil +} + +// Register a peer and genreate a unique ID as returned. +func (pm *PeerManager) Register(ctx context.Context, peerCreateRequest *types.PeerCreateRequest) (peerCreateResponse *types.PeerCreateResponse, err error) { + if peerCreateRequest == nil { + return nil, errors.Wrap(errorType.ErrEmptyValue, "peer create request") + } + + ipString := peerCreateRequest.IP.String() + if !util.IsValidIP(ipString) { + return nil, errors.Wrapf(errorType.ErrInvalidValue, "peer IP: %s", ipString) + } + + id := generatePeerID(peerCreateRequest) + peerInfo := &types.PeerInfo{ + ID: id, + IP: peerCreateRequest.IP, + HostName: peerCreateRequest.HostName, + Port: peerCreateRequest.Port, + Version: peerCreateRequest.Version, + Created: strfmt.DateTime(time.Now()), + } + pm.peerStore.Put(id, peerInfo) + + return &types.PeerCreateResponse{ + ID: id, + }, nil +} + +// DeRegister a peer from p2p network. +func (pm *PeerManager) DeRegister(ctx context.Context, peerID string) error { + if _, err := pm.getPeerInfo(peerID); err != nil { + return err + } + + pm.peerStore.Delete(peerID) + return nil +} + +// Get returns the peerInfo of the specified peerID. +func (pm *PeerManager) Get(ctx context.Context, peerID string) (*types.PeerInfo, error) { + return pm.getPeerInfo(peerID) +} + +// List returns all filtered peerInfo by filter. +func (pm *PeerManager) List(ctx context.Context, filter *PageFilter) ( + peerList []*types.PeerInfo, err error) { + + // when filter is nil, return all values. + if filter == nil { + listResult := pm.peerStore.List() + peerList, err = pm.assertPeerInfoSlice(listResult) + if err != nil { + return nil, err + } + return + } + + // validate the filter + err = validateFilter(filter, nil) + if err != nil { + return nil, err + } + + listResult := pm.peerStore.List() + + // For PeerInfo, there is no need to sort by field; + // and the order of insertion is used by default. + less := getLessFunc(listResult, isDESC(filter.sortDirect)) + peerList, err = pm.assertPeerInfoSlice(getPageValues(listResult, filter.pageNum, filter.pageSize, less)) + + return +} + +// getPeerInfo gets peer info with specified peerID and +// returns the underlying PeerInfo value. +func (pm *PeerManager) getPeerInfo(peerID string) (*types.PeerInfo, error) { + // return error if peerID is empty + if util.IsEmptyStr(peerID) { + return nil, errors.Wrap(errorType.ErrEmptyValue, "peerID") + } + + // get value form store + v, err := pm.peerStore.Get(peerID) + if err != nil { + return nil, err + } + + // type assertion + if info, ok := v.(*types.PeerInfo); ok { + return info, nil + } + return nil, errors.Wrapf(errorType.ErrConvertFailed, "peerID %s: %v", peerID, v) +} + +func (pm *PeerManager) assertPeerInfoSlice(s []interface{}) ([]*types.PeerInfo, error) { + peerInfos := make([]*types.PeerInfo, 0) + for _, v := range s { + // type assertion + info, ok := v.(*types.PeerInfo) + if !ok { + return nil, errors.Wrapf(errorType.ErrConvertFailed, "value %v", v) + } + peerInfos = append(peerInfos, info) + } + return peerInfos, nil +} + +func getLessFunc(listResult []interface{}, desc bool) (less func(i, j int) bool) { + lessTemp := func(i, j int) bool { + peeri, ok := listResult[i].(*types.PeerInfo) + if !ok { + return false + } + peerj := listResult[j].(*types.PeerInfo) + if !ok { + return false + } + return time.Time(peeri.Created).Before(time.Time(peerj.Created)) + } + if desc { + less = func(i, j int) bool { + return lessTemp(j, i) + } + } + + if less == nil { + less = lessTemp + } + return +} + +// generatePeerID generates an ID with hostname and ip. +// Use timestamp to ensure the uniqueness. +func generatePeerID(peerInfo *types.PeerCreateRequest) string { + return fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano()) } diff --git a/supernode/daemon/mgr/peer_mgr_test.go b/supernode/daemon/mgr/peer_mgr_test.go new file mode 100644 index 000000000..f117d1fa6 --- /dev/null +++ b/supernode/daemon/mgr/peer_mgr_test.go @@ -0,0 +1,179 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mgr + +import ( + "context" + "time" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/go-check/check" +) + +func init() { + check.Suite(&PeerMgrTestSuite{}) +} + +type PeerMgrTestSuite struct { +} + +func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { + peerManager, _ := NewPeerManager() + + // register + request := &types.PeerCreateRequest{ + IP: "192.168.10.11", + HostName: "foo", + Port: 65001, + Version: "v0.3.0", + } + resp, err := peerManager.Register(context.Background(), request) + c.Check(err, check.IsNil) + + // get + id := resp.ID + info, err := peerManager.Get(context.Background(), id) + c.Check(err, check.IsNil) + expected := &types.PeerInfo{ + ID: id, + IP: request.IP, + HostName: request.HostName, + Port: request.Port, + Version: request.Version, + Created: info.Created, + } + c.Check(info, check.DeepEquals, expected) + + // list + infoList, err := peerManager.List(context.Background(), nil) + c.Check(err, check.IsNil) + c.Check(infoList, check.DeepEquals, []*types.PeerInfo{expected}) + + // deRegister + err = peerManager.DeRegister(context.Background(), id) + c.Check(err, check.IsNil) + + // get + info, err = peerManager.Get(context.Background(), id) + c.Check(errors.IsDataNotFound(err), check.Equals, true) + c.Check(info, check.IsNil) +} + +func (s *PeerMgrTestSuite) TestGet(c *check.C) { + peerManager, _ := NewPeerManager() + + // register + request := &types.PeerCreateRequest{ + IP: "192.168.10.11", + HostName: "foo", + Port: 65001, + Version: "v0.3.0", + } + resp, err := peerManager.Register(context.Background(), request) + c.Check(err, check.IsNil) + + // get with empty peerID + info, err := peerManager.Get(context.Background(), "") + c.Check(errors.IsEmptyValue(err), check.Equals, true) + c.Check(info, check.IsNil) + + // get with not exist peerID + info, err = peerManager.Get(context.Background(), "fooerror") + c.Check(errors.IsDataNotFound(err), check.Equals, true) + c.Check(info, check.IsNil) + + // get normally + id := resp.ID + info, err = peerManager.Get(context.Background(), id) + c.Check(err, check.IsNil) + expected := &types.PeerInfo{ + ID: id, + IP: request.IP, + HostName: request.HostName, + Port: request.Port, + Version: request.Version, + Created: info.Created, + } + c.Check(info, check.DeepEquals, expected) +} + +func (s *PeerMgrTestSuite) TestList(c *check.C) { + peerManager, _ := NewPeerManager() + // the first data + request := &types.PeerCreateRequest{ + IP: "192.168.10.11", + HostName: "foo", + Port: 65001, + Version: "v0.3.0", + } + resp, err := peerManager.Register(context.Background(), request) + id := resp.ID + info, err := peerManager.Get(context.Background(), id) + + // the second data + request = &types.PeerCreateRequest{ + IP: "192.168.10.11", + HostName: "foo2", + Port: 65001, + Version: "v0.3.0", + } + resp, err = peerManager.Register(context.Background(), request) + id = resp.ID + info2, err := peerManager.Get(context.Background(), id) + + // get all + infoList, err := peerManager.List(context.Background(), nil) + c.Check(err, check.IsNil) + interfaceSlice := make([]interface{}, len(infoList)) + for k, v := range infoList { + interfaceSlice[k] = v + } + c.Check(getPageValues(interfaceSlice, 0, 0, func(i, j int) bool { + peeri := interfaceSlice[i].(*types.PeerInfo) + peerj := interfaceSlice[j].(*types.PeerInfo) + return time.Time(peeri.Created).Before(time.Time(peerj.Created)) + }), check.DeepEquals, []interface{}{info, info2}) + + // get with pageNum=0 && pageSize=1 && sortDirect=asc + infoList, err = peerManager.List(context.Background(), &PageFilter{ + pageNum: 0, + pageSize: 1, + sortDirect: "asc", + }) + c.Check(err, check.IsNil) + c.Check(infoList, check.DeepEquals, []*types.PeerInfo{info}) + + // get with pageNum=0 && pageSize=1 && sortDirect=desc + infoList, err = peerManager.List(context.Background(), &PageFilter{ + pageNum: 0, + pageSize: 1, + sortDirect: "desc", + }) + c.Check(err, check.IsNil) + c.Check(infoList, check.DeepEquals, []*types.PeerInfo{info2}) + + // get with pageNum=1 && pageSize=1 && sortDirect=asc + infoList, err = peerManager.List(context.Background(), &PageFilter{ + pageNum: 1, + pageSize: 1, + sortDirect: "asc", + }) + c.Check(err, check.IsNil) + c.Check(infoList, check.DeepEquals, []*types.PeerInfo{info2}) +} diff --git a/supernode/daemon/mgr/store.go b/supernode/daemon/mgr/store.go new file mode 100644 index 000000000..6903e453e --- /dev/null +++ b/supernode/daemon/mgr/store.go @@ -0,0 +1,60 @@ +package mgr + +import ( + "sync" + + errorType "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/pkg/errors" +) + +// Store maintains some metadata information in memory. +type Store struct { + metaMap sync.Map +} + +// NewStore returns a new Store. +func NewStore() *Store { + return &Store{} +} + +// Put a key-value pair into the store. +func (s *Store) Put(key string, value interface{}) error { + s.metaMap.Store(key, value) + return nil +} + +// Get a key-value pair from the store. +func (s *Store) Get(key string) (interface{}, error) { + v, ok := s.metaMap.Load(key) + if !ok { + return nil, errors.Wrapf(errorType.ErrDataNotFound, "key: %v", key) + } + + return v, nil +} + +// Delete a key-value pair from the store with sepcified key. +func (s *Store) Delete(key string) error { + _, ok := s.metaMap.Load(key) + if !ok { + return errors.Wrapf(errorType.ErrDataNotFound, "key: %v", key) + } + + s.metaMap.Delete(key) + + return nil +} + +// List returns all key-value pairs in the store. +// And the order of results is random. +func (s *Store) List() []interface{} { + metaSlice := make([]interface{}, 0) + rangeFunc := func(key, value interface{}) bool { + metaSlice = append(metaSlice, value) + return true + } + s.metaMap.Range(rangeFunc) + + return metaSlice +} diff --git a/supernode/daemon/mgr/store_test.go b/supernode/daemon/mgr/store_test.go new file mode 100644 index 000000000..cfda2b4e2 --- /dev/null +++ b/supernode/daemon/mgr/store_test.go @@ -0,0 +1,112 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mgr + +import ( + "testing" + + "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +func init() { + check.Suite(&StoreTestSuite{}) +} + +type StoreTestSuite struct { +} + +type SortStruct struct { + intField int + stringField string +} + +func (s *StoreTestSuite) TestStore(c *check.C) { + store := NewStore() + + cases := []SortStruct{ + {1, "c"}, + {3, "a"}, + {2, "b"}, + } + + for _, v := range cases { + err := store.Put(v.stringField, v) + c.Check(err, check.IsNil) + + value, err := store.Get(v.stringField) + c.Check(err, check.IsNil) + ss, ok := value.(SortStruct) + c.Check(ok, check.Equals, true) + c.Check(ss, check.DeepEquals, v) + } + + result := store.List() + pageResult := getPageValues(result, 0, 0, func(i, j int) bool { + tempA := result[i].(SortStruct) + tempB := result[j].(SortStruct) + return tempA.intField < tempB.intField + }) + c.Check(pageResult, check.DeepEquals, []interface{}{ + SortStruct{1, "c"}, + SortStruct{2, "b"}, + SortStruct{3, "a"}, + }) + + pageResult = getPageValues(result, 0, 0, func(i, j int) bool { + tempA := result[i].(SortStruct) + tempB := result[j].(SortStruct) + return tempA.stringField < tempB.stringField + }) + c.Check(pageResult, check.DeepEquals, []interface{}{ + SortStruct{3, "a"}, + SortStruct{2, "b"}, + SortStruct{1, "c"}, + }) + + pageResult = getPageValues(result, 0, 2, func(i, j int) bool { + tempA := result[i].(SortStruct) + tempB := result[j].(SortStruct) + return tempA.intField < tempB.intField + }) + c.Check(pageResult, check.DeepEquals, []interface{}{ + SortStruct{1, "c"}, + SortStruct{2, "b"}, + }) + + pageResult = getPageValues(result, 1, 2, func(i, j int) bool { + tempA := result[i].(SortStruct) + tempB := result[j].(SortStruct) + return tempA.intField < tempB.intField + }) + c.Check(pageResult, check.DeepEquals, []interface{}{ + SortStruct{3, "a"}, + }) + + for _, v := range cases { + err := store.Delete(v.stringField) + c.Check(err, check.IsNil) + _, err = store.Get(v.stringField) + c.Check(errors.IsDataNotFound(err), check.Equals, true) + } + +} diff --git a/supernode/errors/errors.go b/supernode/errors/errors.go new file mode 100644 index 000000000..82dbc8169 --- /dev/null +++ b/supernode/errors/errors.go @@ -0,0 +1,111 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package errors defines all exceptions happened in dfget's runtime. +package errors + +import ( + errHandler "github.com/pkg/errors" +) + +var ( + // ErrDataNotFound represents the data cannot be found.. + ErrDataNotFound = SuperError{codeDataNotFound, "data not found"} + + // ErrInvalidValue represents the value is invalid. + ErrInvalidValue = SuperError{codeInvalidValue, "invalid value"} + + // ErrEmptyValue represents the value is empty or nil. + ErrEmptyValue = SuperError{codeEmptyValue, "empty value"} + + // ErrNotInitialized represents the object is not initialized. + ErrNotInitialized = SuperError{codeNotInitialized, "not initialized"} + + // ErrConvertFailed represents failed to convert. + ErrConvertFailed = SuperError{codeConvertFailed, "convert failed"} + + // ErrRangeNotSatisfiable represents the length of file is insufficient. + ErrRangeNotSatisfiable = SuperError{codeRangeNotSatisfiable, "range not satisfiable"} + + // ErrSystemError represents the error is a system error.. + ErrSystemError = SuperError{codeSystemError, "system error"} +) + +const ( + codeDataNotFound = iota + codeEmptyValue + codeInvalidValue + codeNotInitialized + codeConvertFailed + codeRangeNotSatisfiable + codeSystemError +) + +// SuperError represents a error created by supernode. +type SuperError struct { + Code int + Msg string +} + +func (s SuperError) Error() string { + return s.Msg +} + +// IsNilError check the error is nil or not. +func IsNilError(err error) bool { + return err == nil +} + +// IsDataNotFound check the error is the data cannot be found. +func IsDataNotFound(err error) bool { + return checkError(err, codeDataNotFound) +} + +// IsEmptyValue check the error is the value is empty or nil. +func IsEmptyValue(err error) bool { + return checkError(err, codeEmptyValue) +} + +// IsInvalidValue check the error is the value is invalid or not. +func IsInvalidValue(err error) bool { + return checkError(err, codeInvalidValue) +} + +// IsNotInitialized check the error is the object is not initialized or not. +func IsNotInitialized(err error) bool { + return checkError(err, codeNotInitialized) +} + +// IsConvertFailed check the error is a conversion error or not. +func IsConvertFailed(err error) bool { + return checkError(err, codeConvertFailed) +} + +// IsRangeNotSatisfiable check the error is a +// range not exist error or not. +func IsRangeNotSatisfiable(err error) bool { + return checkError(err, codeRangeNotSatisfiable) +} + +// IsSystemError check the error is a system error or not. +func IsSystemError(err error) bool { + return checkError(err, codeSystemError) +} + +func checkError(err error, code int) bool { + e, ok := errHandler.Cause(err).(SuperError) + return ok && e.Code == code +} diff --git a/supernode/result/result_info.go b/supernode/result/result_info.go new file mode 100644 index 000000000..88b8295ee --- /dev/null +++ b/supernode/result/result_info.go @@ -0,0 +1,68 @@ +package server + +import ( + "fmt" + + "github.com/dragonflyoss/Dragonfly/common/constants" + "github.com/dragonflyoss/Dragonfly/supernode/errors" +) + +// ResultInfo identify a struct that will returned to the client. +type ResultInfo struct { + code int + msg string + data interface{} +} + +// NewResultInfoWithError returns a new ResultInfo with error only. +// And it will fill the result code according to the type of error. +func NewResultInfoWithError(err error) ResultInfo { + if errors.IsEmptyValue(err) || + errors.IsInvalidValue(err) { + return NewResultInfoWithCodeError(constants.CodeParamError, err) + } + + if errors.IsDataNotFound(err) { + return NewResultInfoWithCodeError(constants.CodeTargetNotFound, err) + } + + // IsConvertFailed + return NewResultInfoWithCodeError(constants.CodeSystemError, err) +} + +// NewResultInfoWithCodeError returns a new ResultInfo with code and error. +// And it will get the err.Error() as the value of ResultInfo.msg. +func NewResultInfoWithCodeError(code int, err error) ResultInfo { + msg := err.Error() + return NewResultInfoWithCodeMsg(code, msg) +} + +// NewResultInfoWithCode returns a new ResultInfo with code +// and it will get the default msg corresponding to the code as the value of ResultInfo.msg. +func NewResultInfoWithCode(code int) ResultInfo { + msg := constants.GetMsgByCode(code) + return NewResultInfoWithCodeMsg(code, msg) +} + +// NewResultInfoWithCodeMsg returns a new ResultInfo with code and specified msg. +func NewResultInfoWithCodeMsg(code int, msg string) ResultInfo { + return NewResultInfo(code, msg, nil) +} + +// NewResultInfo returns a new ResultInfo. +func NewResultInfo(code int, msg string, data interface{}) ResultInfo { + return ResultInfo{ + code: code, + msg: msg, + data: data, + } +} + +func (r ResultInfo) Error() string { + return fmt.Sprintf("{\"Code\":%d,\"Msg\":\"%s\"}", r.code, r.msg) +} + +// SuccessCode return whether the code equals SuccessCode. +func (r ResultInfo) SuccessCode() bool { + return r.code == constants.Success +} diff --git a/supernode/server/peer_bridge.go b/supernode/server/peer_bridge.go new file mode 100644 index 000000000..649ab532c --- /dev/null +++ b/supernode/server/peer_bridge.go @@ -0,0 +1,70 @@ +package server + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + errTypes "github.com/dragonflyoss/Dragonfly/supernode/errors" + + "github.com/go-openapi/strfmt" + "github.com/gorilla/mux" + "github.com/pkg/errors" +) + +func (s *Server) registerPeer(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + reader := req.Body + request := &types.PeerCreateRequest{} + if err := json.NewDecoder(reader).Decode(request); err != nil { + return errors.Wrap(errTypes.ErrInvalidValue, err.Error()) + } + + if err := request.Validate(strfmt.NewFormats()); err != nil { + return errors.Wrap(errTypes.ErrInvalidValue, err.Error()) + } + + resp, err := s.PeerMgr.Register(ctx, request) + if err != nil { + return err + } + return EncodeResponse(rw, http.StatusCreated, resp) +} + +// TODO: update the progress info. +func (s *Server) deRegisterPeer(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + id := mux.Vars(req)["id"] + + if err = s.PeerMgr.DeRegister(ctx, id); err != nil { + return err + } + + rw.WriteHeader(http.StatusOK) + return nil +} + +func (s *Server) getPeer(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + id := mux.Vars(req)["id"] + + peer, err := s.PeerMgr.Get(ctx, id) + if err != nil { + return err + } + + return EncodeResponse(rw, http.StatusOK, peer) +} + +// TODO: parse filter +func (s *Server) listPeers(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + filter, err := mgr.ParseFilter(req, nil) + if err != nil { + return err + } + peerList, err := s.PeerMgr.List(ctx, filter) + if err != nil { + return err + } + + return EncodeResponse(rw, http.StatusOK, peerList) +} diff --git a/supernode/server/router.go b/supernode/server/router.go index a2b4928ca..c6b71a21e 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -7,6 +7,7 @@ import ( "net/http/pprof" "github.com/dragonflyoss/Dragonfly/apis/types" + result "github.com/dragonflyoss/Dragonfly/supernode/result" "github.com/gorilla/mux" ) @@ -20,6 +21,12 @@ func initRoute(s *Server) *mux.Router { handlers := []*HandlerSpec{ // system {Method: http.MethodGet, Path: "/_ping", HandlerFunc: s.ping}, + + // peer + {Method: http.MethodPost, Path: "/peers", HandlerFunc: s.registerPeer}, + {Method: http.MethodDelete, Path: "/peers/{id}", HandlerFunc: s.deRegisterPeer}, + {Method: http.MethodGet, Path: "/peers/{id}", HandlerFunc: s.getPeer}, + {Method: http.MethodGet, Path: "/peers", HandlerFunc: s.listPeers}, } // register API @@ -82,7 +89,7 @@ func HandleErrorResponse(w http.ResponseWriter, err error) { // By default, daemon side returns code 500 if error happens. code = http.StatusInternalServerError - errMsg = err.Error() + errMsg = result.NewResultInfoWithError(err).Error() w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) diff --git a/supernode/server/server.go b/supernode/server/server.go index c63a59ffc..0d6afa84b 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -7,19 +7,26 @@ import ( "time" "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" "github.com/sirupsen/logrus" ) // Server is server instance. type Server struct { - Config *config.Config + Config *config.Config + PeerMgr mgr.PeerMgr } // New creates a brand new server instance. func New(cfg *config.Config) (*Server, error) { + peerMgr, err := mgr.NewPeerManager() + if err != nil { + return nil, err + } return &Server{ - Config: cfg, + Config: cfg, + PeerMgr: peerMgr, }, nil }