Skip to content

Commit

Permalink
connect to new nodes / disconnect from unused nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
thirstycrow committed Feb 19, 2017
1 parent 55573df commit cfeab77
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 12 deletions.
34 changes: 24 additions & 10 deletions src/main/scala/redis/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,32 @@ import redis.protocol.RedisReply
import redis.util.CRC16

import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.stm.Ref
import scala.concurrent.{Await, Future, Promise}
import scala.util.control.NonFatal


case class RedisCluster(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
name: String = "RedisClientPool",
password: Option[String] = None)
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike(_system, redisDispatcher) with RedisCommands {

val log = Logging.getLogger(_system, this)

override val redisServerConnections = {
redisServers.map { server =>
makeRedisConnection(server, defaultActive = true)
} toMap
override val redisServerConnections = collection.mutable.Map {
redisServers.map(makeConnection): _*
}
refreshConnections()

def makeConnection(server: RedisServer) = {
makeRedisConnection(
server = server.copy(password = password, db = None),
defaultActive = true
)
}

def equalsHostPort(clusterNode:ClusterNode,server:RedisServer) = {
clusterNode.host == server.host && clusterNode.port == server.port
Expand All @@ -55,15 +60,17 @@ case class RedisCluster(redisServers: Seq[RedisServer],

val clusterSlotsRef:Ref[Option[Map[ClusterSlot, RedisConnection]]] = Ref(Option.empty[Map[ClusterSlot, RedisConnection]])
val lockClusterSlots = Ref(true)
Await.result(asyncRefreshClusterSlots(force=true), Duration(10,TimeUnit.SECONDS))
Await.result(asyncRefreshClusterSlots(force=true), 10.seconds)

def getClusterSlots(): Future[Map[ClusterSlot, RedisConnection]] = {

def resolveClusterSlots(retry:Int): Future[Map[ClusterSlot, RedisConnection]] = {
clusterSlots().map { clusterSlots =>
clusterSlots.flatMap { clusterSlot =>
val maybeServerConnection = redisServerConnections.find { case (server, _) => equalsHostPort(clusterSlot.master, server) }
maybeServerConnection.map { case (_, redisConnection) => (clusterSlot, redisConnection) }
clusterSlots.map { clusterSlot =>
val server = clusterSlot.master.hostAndPort
val connection = redisServerConnections
.getOrElseUpdate(server, makeConnection(server)._2)
(clusterSlot, connection)
}.toMap
}.recoverWith {
case e =>
Expand All @@ -83,6 +90,13 @@ case class RedisCluster(redisServers: Seq[RedisServer],
getClusterSlots().map { clusterSlot =>
log.info("refreshClusterSlots: " + clusterSlot.toString())
clusterSlotsRef.single.set(Some(clusterSlot))
val serverSet = clusterSlot.keysIterator.map(_.master.hostAndPort).toSet
redisServerConnections.keys.foreach { server =>
if (!serverSet.contains(server)) {
redisServerConnections.remove(server)
//.map(connection => _system.stop(connection.actor))
}
}
lockClusterSlots.single.compareAndSet(true, false)
()
}.recoverWith {
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/redis/actors/RedisWorkerIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Bo

override def postStop() {
log.info("RedisWorkerIO stop")
if (tcpWorker != null) {
tcpWorker ! Close
}
}

def initConnectedBuffer() {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/redis/api/Clusters.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package redis.api.clusters

import akka.util.ByteString
import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString}
import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString, RedisServer}
import redis.api.connection.Ping._
import redis.protocol.{DecodeResult, Bulk, MultiBulk, RedisProtocolReply, RedisReply}

import scala.math.Ordering



case class ClusterNode(host:String, port:Int, id:String)
case class ClusterNode(host:String, port:Int, id:String) {
def hostAndPort = RedisServer(host, port)
}
case class ClusterSlot(begin:Int, end:Int, master:ClusterNode, slaves:Seq[ClusterNode]) extends Comparable[ClusterSlot] {
override def compareTo(x: ClusterSlot): Int = {
this.begin.compare(x.begin)
Expand Down
59 changes: 59 additions & 0 deletions src/test/scala/redis/RedisClusterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis

import akka.actor.ActorSystem
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.specs2.mutable.SpecificationLike
import redis.api.clusters.{ClusterNode, ClusterSlot}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.stm.Ref

class RedisClusterSpec extends TestKit(ActorSystem()) with SpecificationLike {

var _clusterSlots = Seq(clusterSlot("1", 6000, 0, 16383))

val redisCluster = new RedisCluster(Seq(RedisServer("127.0.0.1", 6000))) {
override def clusterSlots() = Future(_clusterSlots)
override def makeConnection(server: RedisServer) = (server, RedisConnection(TestProbe().ref, Ref(true)))
}

def clusterSlot(nodeId: String, port: Int, begin: Int, end: Int) =
ClusterSlot(begin, end, ClusterNode("127.0.0.1", port, "1"), Nil)

def checkSlotMaps() = {
redisCluster.redisServerConnections.keySet.toSet mustEqual _clusterSlots.map(_.master.hostAndPort).toSet
redisCluster.clusterSlotsRef.single.get.map(_.keySet.toSet) mustEqual Some(_clusterSlots.toSet)
_clusterSlots.foreach { slots =>
val connection = redisCluster.redisServerConnections.get(slots.master.hostAndPort)
(slots.begin to slots.end).foreach { slot =>
redisCluster.getClusterAndConnection(slot).map(_._2) mustEqual (connection)
}
}
success
}

"redis cluster" should {

"add new nodes" in {
_clusterSlots = Seq(
clusterSlot("1", 6000, 0, 4095),
clusterSlot("2", 6001, 4096, 8191),
clusterSlot("1", 6000, 8192, 12287),
clusterSlot("2", 6001, 12288, 16383)
)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
checkSlotMaps()
}

"remove unused nodes" in {
_clusterSlots = Seq(
clusterSlot("2", 6001, 0, 16383)
)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
checkSlotMaps()
}
}
}

0 comments on commit cfeab77

Please sign in to comment.