Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1362 from antsystem/feat/add-hash-circler-superno…
Browse files Browse the repository at this point in the history
…de-locator

add hash circler locator
  • Loading branch information
lowzj authored Jun 8, 2020
2 parents 07b4ad9 + 0fdd9e7 commit 80eae52
Show file tree
Hide file tree
Showing 8 changed files with 727 additions and 0 deletions.
191 changes: 191 additions & 0 deletions dfget/locator/hashcircler_locator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package locator

import (
"context"
"fmt"
"sort"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/hashcircler"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
)

const (
addEv = "add"
deleteEv = "delete"
)

type SuperNodeEvent struct {
evType string
node string
}

func NewEnableEvent(node string) *SuperNodeEvent {
return &SuperNodeEvent{
evType: addEv,
node: node,
}
}

func NewDisableEvent(node string) *SuperNodeEvent {
return &SuperNodeEvent{
evType: deleteEv,
node: node,
}
}

// hashCirclerLocator is an implementation of SupernodeLocator. And it provides ability to select a supernode
// by input key. It allows some supernodes disabled, on this condition the disable supernode will not be selected.
type hashCirclerLocator struct {
hc hashcircler.HashCircler
nodes []string
groupName string
group *SupernodeGroup

// evQueue will puts/polls SuperNodeEvent to disable/enable supernode.
evQueue queue.Queue
}

func NewHashCirclerLocator(groupName string, nodes []string, eventQueue queue.Queue) (SupernodeLocator, error) {
nodes = algorithm.DedupStringArr(nodes)
if len(nodes) == 0 {
return nil, fmt.Errorf("nodes should not be nil")
}

sort.Strings(nodes)

group := &SupernodeGroup{
Name: groupName,
Nodes: []*Supernode{},
Infos: make(map[string]string),
}
keys := []string{}
for _, node := range nodes {
ip, port := netutils.GetIPAndPortFromNode(node, config.DefaultSupernodePort)
if ip == "" {
continue
}
supernode := &Supernode{
Schema: config.DefaultSupernodeSchema,
IP: ip,
Port: port,
GroupName: groupName,
}

group.Nodes = append(group.Nodes, supernode)
keys = append(keys, supernode.String())
}

hc, err := hashcircler.NewConsistentHashCircler(keys, nil)
if err != nil {
return nil, err
}

h := &hashCirclerLocator{
hc: hc,
evQueue: eventQueue,
groupName: groupName,
group: group,
}

go h.eventLoop(context.Background())

return h, nil
}

func (h *hashCirclerLocator) Get() *Supernode {
// not implementation
return nil
}

func (h *hashCirclerLocator) Next() *Supernode {
// not implementation
return nil
}

func (h *hashCirclerLocator) Select(key interface{}) *Supernode {
s, err := h.hc.Hash(key.(string))
if err != nil {
logrus.Errorf("failed to get supernode: %v", err)
return nil
}

for _, sp := range h.group.Nodes {
if s == sp.String() {
return sp
}
}

return nil
}

func (h *hashCirclerLocator) GetGroup(name string) *SupernodeGroup {
if h.group == nil || h.group.Name != name {
return nil
}

return h.group
}

func (h *hashCirclerLocator) All() []*SupernodeGroup {
return []*SupernodeGroup{h.group}
}

func (h *hashCirclerLocator) Size() int {
return len(h.group.Nodes)
}

func (h *hashCirclerLocator) Report(node string, metrics *SupernodeMetrics) {
return
}

func (h *hashCirclerLocator) Refresh() bool {
return true
}

func (h *hashCirclerLocator) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}

if ev, ok := h.evQueue.PollTimeout(time.Second); ok {
h.handleEvent(ev.(*SuperNodeEvent))
}
}
}

func (h *hashCirclerLocator) handleEvent(ev *SuperNodeEvent) {
switch ev.evType {
case addEv:
h.hc.Add(ev.node)
case deleteEv:
h.hc.Delete(ev.node)
default:
}

return
}
125 changes: 125 additions & 0 deletions dfget/locator/hashcircler_locator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package locator

import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/go-check/check"
)

type hashCirclerLocatorTestSuite struct {
}

func init() {
check.Suite(&hashCirclerLocatorTestSuite{})
}

var testGroupName1 = "test-group1"

func (s *hashCirclerLocatorTestSuite) TestHashCirclerLocator(c *check.C) {
evQ := queue.NewQueue(0)
nodes := []string{"1.1.1.1:8002", "2.2.2.2:8002", "3.3.3.3:8002"}
hl, err := NewHashCirclerLocator(testGroupName1, nodes, evQ)
c.Assert(err, check.IsNil)

c.Assert(hl.Get(), check.IsNil)
c.Assert(hl.Next(), check.IsNil)

groups := hl.All()
c.Assert(len(groups), check.Equals, 1)
c.Assert(len(groups[0].Nodes), check.Equals, 3)
c.Assert(groups[0].Nodes[0].String(), check.Equals, nodes[0])
c.Assert(groups[0].Nodes[1].String(), check.Equals, nodes[1])
c.Assert(groups[0].Nodes[2].String(), check.Equals, nodes[2])

keys := []string{"x", "y", "z", "a", "b", "c", "m", "n", "p", "q", "j", "k", "i", "e", "f", "g"}
originSp := make([]string, len(keys))

for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
originSp[i] = sp.String()
}

// select again, the supernode should be equal
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(originSp[i], check.Equals, sp.String())
}

// disable nodes[0]
evQ.Put(NewDisableEvent(nodes[0]))
time.Sleep(time.Second * 2)
// select again, if originSp is not nodes[0], it should not be changed.
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
if originSp[i] == nodes[0] {
c.Assert(originSp[i], check.Not(check.Equals), sp.String())
continue
}

c.Assert(originSp[i], check.Equals, sp.String())
}

// disable nodes[1]
evQ.Put(NewDisableEvent(nodes[1]))
time.Sleep(time.Second * 2)
// select again, all select node should be nodes[2]
for _, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(nodes[2], check.Equals, sp.String())
}

// enable nodes[0], disable nodes[2]
evQ.Put(NewDisableEvent(nodes[2]))
evQ.Put(NewEnableEvent(nodes[0]))
time.Sleep(time.Second * 2)
for _, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(nodes[0], check.Equals, sp.String())
}

// enable nodes[1]
evQ.Put(NewEnableEvent(nodes[1]))
time.Sleep(time.Second * 2)
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
if originSp[i] == nodes[2] {
c.Assert(originSp[i], check.Not(check.Equals), sp.String())
continue
}

c.Assert(originSp[i], check.Equals, sp.String())
}

// enable nodes[2], select node should be equal with origin one
evQ.Put(NewEnableEvent(nodes[2]))
time.Sleep(time.Second * 2)
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(originSp[i], check.Equals, sp.String())
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/dragonflyoss/Dragonfly
go 1.12

require (
github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed
github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/asaskevich/govalidator v0.0.0-20170903095215-73945b6115bf // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed h1:YKqpA6qf8Bh73vj8Rv9SBB5OU558f2c1A889nCVUSLE=
github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed/go.mod h1:bODsl3NElqKlgf1UkBLj67fYmY5DsqkKrrYm/kMT/6Y=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf h1:ePmEKucT6HqNzbxw/yeyfoHplmyGDQUW76ppv4igW7Q=
github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
Expand Down
22 changes: 22 additions & 0 deletions pkg/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package algorithm

import (
"math/rand"
"sort"
"time"
)

Expand Down Expand Up @@ -100,3 +101,24 @@ func GCD(x, y int) int {
}
return x
}

// DedupStringArr removes duplicate string in array.
func DedupStringArr(input []string) []string {
if len(input) == 0 {
return []string{}
}

out := make([]string, len(input))
copy(out, input)
sort.Strings(out)

idx := 0
for i := 1; i < len(input); i++ {
if out[idx] != out[i] {
idx++
out[idx] = out[i]
}
}

return out[:idx+1]
}
Loading

0 comments on commit 80eae52

Please sign in to comment.