Skip to content

Commit

Permalink
Recover from transient gossip failures
Browse files Browse the repository at this point in the history
Currently if there is any transient gossip failure in any node the
recoevry process depends on other nodes propogating the information
indirectly. In cases if these transient failures affects all the nodes
that this node has in its memberlist then this node will be permenantly
cutoff from the the gossip channel. Added node state management code in
networkdb to address these problems by trying to rejoin the cluster via
the failed nodes when there is a failure. This also necessitates the
need to add new messages called node event messages to differentiate
between node leave and node failure.

Signed-off-by: Jana Radhakrishnan <[email protected]>
  • Loading branch information
mrjana committed Sep 19, 2016
1 parent 971c5e0 commit 716810d
Show file tree
Hide file tree
Showing 7 changed files with 685 additions and 90 deletions.
52 changes: 52 additions & 0 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package networkdb

import (
"fmt"
"time"

"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)

const broadcastTimeout = 5 * time.Second

type networkEventMessage struct {
id string
node string
Expand Down Expand Up @@ -44,6 +49,53 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
return nil
}

type nodeEventMessage struct {
msg []byte
notify chan<- struct{}
}

func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
}

func (m *nodeEventMessage) Message() []byte {
return m.msg
}

func (m *nodeEventMessage) Finished() {
if m.notify != nil {
close(m.notify)
}
}

func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName,
}

raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
if err != nil {
return err
}

notifyCh := make(chan struct{})
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: raw,
notify: notifyCh,
})

// Wait for the broadcast
select {
case <-notifyCh:
case <-time.After(broadcastTimeout):
return fmt.Errorf("timed out broadcasting node event")
}

return nil
}

type tableEventMessage struct {
id string
tname string
Expand Down
105 changes: 94 additions & 11 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ import (
"fmt"
"math/big"
rnd "math/rand"
"net"
"strings"
"time"

"github.com/Sirupsen/logrus"
"github.com/hashicorp/memberlist"
)

const reapInterval = 30 * time.Second
const (
reapInterval = 60 * time.Second
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
)

type logWriter struct{}

Expand Down Expand Up @@ -111,6 +116,13 @@ func (nDB *NetworkDB) clusterInit() error {
RetransmitMult: config.RetransmitMult,
}

nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.nodes)
},
RetransmitMult: config.RetransmitMult,
}

mlist, err := memberlist.Create(config)
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
Expand All @@ -124,9 +136,10 @@ func (nDB *NetworkDB) clusterInit() error {
interval time.Duration
fn func()
}{
{reapInterval, nDB.reapState},
{reapPeriod, nDB.reapState},
{config.GossipInterval, nDB.gossip},
{config.PushPullInterval, nDB.bulkSyncTables},
{retryInterval, nDB.reconnectNode},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
Expand All @@ -136,19 +149,49 @@ func (nDB *NetworkDB) clusterInit() error {
return nil
}

func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
t := time.NewTicker(retryInterval)
defer t.Stop()

for {
select {
case <-t.C:
if _, err := nDB.memberlist.Join(members); err != nil {
logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err)
continue
}
return
case <-stop:
return
}
}

}

func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
return fmt.Errorf("failed to send node join: %v", err)
}

return nil
}

func (nDB *NetworkDB) clusterLeave() error {
mlist := nDB.memberlist

if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil {
return fmt.Errorf("failed to send node leave: %v", err)
}

if err := mlist.Leave(time.Second); err != nil {
return err
}
Expand Down Expand Up @@ -180,6 +223,42 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto
}
}

func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {
nDB.RUnlock()
return
}

nodes := make([]*node, 0, len(nDB.failedNodes))
for _, n := range nDB.failedNodes {
nodes = append(nodes, n)
}
nDB.RUnlock()

// Update all the local state to a new time to force update on
// the node we are trying to rejoin, just in case that node
// has these in leaving/deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalStateTime()

node := nodes[randomOffset(len(nodes))]
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}

if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
return
}

if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
logrus.Errorf("failed to send node join during reconnect: %v", err)
return
}

logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}

func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
nDB.reapTableEntries()
Expand Down Expand Up @@ -288,7 +367,7 @@ func (nDB *NetworkDB) gossip() {
}

// Send the compound message
if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
Expand Down Expand Up @@ -323,7 +402,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
continue
}

completed, err := nDB.bulkSync(nid, nodes, false)
completed, err := nDB.bulkSync(nodes, false)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
Expand All @@ -350,7 +429,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
}
}

func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) {
func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
if !all {
// If not all, then just pick one.
nodes = nDB.mRandomNodes(1, nodes)
Expand Down Expand Up @@ -388,7 +467,12 @@ func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string,
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
var msgs [][]byte

logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
var unsolMsg string
if unsolicited {
unsolMsg = "unsolicited"
}

logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)

nDB.RLock()
mnode := nDB.nodes[node]
Expand All @@ -404,15 +488,14 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
return false
}

// Do not bulk sync state which is in the
// process of getting deleted.
eType := TableEventTypeCreate
if entry.deleting {
return false
eType = TableEventTypeDelete
}

params := strings.Split(path[1:], "/")
tEvent := TableEvent{
Type: TableEventTypeCreate,
Type: eType,
LTime: entry.ltime,
NodeName: entry.node,
NetworkID: nid,
Expand Down Expand Up @@ -454,7 +537,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()

err = nDB.memberlist.SendToTCP(mnode, buf)
err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
Expand Down
Loading

0 comments on commit 716810d

Please sign in to comment.