Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect to new nodes / disconnect from unused nodes #178

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/main/scala/redis/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,32 @@ import redis.protocol.RedisReply
import redis.util.CRC16

import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.stm.Ref
import scala.concurrent.{Await, Future, Promise}
import scala.util.control.NonFatal


case class RedisCluster(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
name: String = "RedisClientPool",
password: Option[String] = None)
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike(_system, redisDispatcher) with RedisCommands {

val log = Logging.getLogger(_system, this)

override val redisServerConnections = {
redisServers.map { server =>
makeRedisConnection(server, defaultActive = true)
} toMap
override val redisServerConnections = collection.mutable.Map {
redisServers.map(makeConnection): _*
}
refreshConnections()

def makeConnection(server: RedisServer) = {
makeRedisConnection(
server = server.copy(password = password, db = None),
defaultActive = true
)
}

def equalsHostPort(clusterNode:ClusterNode,server:RedisServer) = {
clusterNode.host == server.host && clusterNode.port == server.port
Expand All @@ -55,15 +60,17 @@ case class RedisCluster(redisServers: Seq[RedisServer],

val clusterSlotsRef:Ref[Option[Map[ClusterSlot, RedisConnection]]] = Ref(Option.empty[Map[ClusterSlot, RedisConnection]])
val lockClusterSlots = Ref(true)
Await.result(asyncRefreshClusterSlots(force=true), Duration(10,TimeUnit.SECONDS))
Await.result(asyncRefreshClusterSlots(force=true), 10.seconds)

def getClusterSlots(): Future[Map[ClusterSlot, RedisConnection]] = {

def resolveClusterSlots(retry:Int): Future[Map[ClusterSlot, RedisConnection]] = {
clusterSlots().map { clusterSlots =>
clusterSlots.flatMap { clusterSlot =>
val maybeServerConnection = redisServerConnections.find { case (server, _) => equalsHostPort(clusterSlot.master, server) }
maybeServerConnection.map { case (_, redisConnection) => (clusterSlot, redisConnection) }
clusterSlots.map { clusterSlot =>
val server = clusterSlot.master.hostAndPort
val connection = redisServerConnections
.getOrElseUpdate(server, makeConnection(server)._2)
(clusterSlot, connection)
}.toMap
}.recoverWith {
case e =>
Expand All @@ -83,6 +90,13 @@ case class RedisCluster(redisServers: Seq[RedisServer],
getClusterSlots().map { clusterSlot =>
log.info("refreshClusterSlots: " + clusterSlot.toString())
clusterSlotsRef.single.set(Some(clusterSlot))
val serverSet = clusterSlot.keysIterator.map(_.master.hostAndPort).toSet
redisServerConnections.keys.foreach { server =>
if (!serverSet.contains(server)) {
redisServerConnections.remove(server)
.map(connection => _system.scheduler.scheduleOnce(1.second)(_system.stop(connection.actor)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the correct way to disconnect from a cluster node? I tried to do it without scheduling, but failed the clusterNodes test

}
}
lockClusterSlots.single.compareAndSet(true, false)
()
}.recoverWith {
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/redis/actors/RedisWorkerIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Bo

override def postStop() {
log.info("RedisWorkerIO stop")
if (tcpWorker != null) {
tcpWorker ! Close
}
}

def initConnectedBuffer() {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/redis/api/Clusters.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package redis.api.clusters

import akka.util.ByteString
import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString}
import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString, RedisServer}
import redis.api.connection.Ping._
import redis.protocol.{DecodeResult, Bulk, MultiBulk, RedisProtocolReply, RedisReply}

import scala.math.Ordering



case class ClusterNode(host:String, port:Int, id:String)
case class ClusterNode(host:String, port:Int, id:String) {
def hostAndPort = RedisServer(host, port)
}
case class ClusterSlot(begin:Int, end:Int, master:ClusterNode, slaves:Seq[ClusterNode]) extends Comparable[ClusterSlot] {
override def compareTo(x: ClusterSlot): Int = {
this.begin.compare(x.begin)
Expand Down
59 changes: 59 additions & 0 deletions src/test/scala/redis/RedisClusterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis

import akka.actor.ActorSystem
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.specs2.mutable.SpecificationLike
import redis.api.clusters.{ClusterNode, ClusterSlot}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.stm.Ref

class RedisClusterSpec extends TestKit(ActorSystem()) with SpecificationLike {

var _clusterSlots = Seq(clusterSlot("1", 6000, 0, 16383))

val redisCluster = new RedisCluster(Seq(RedisServer("127.0.0.1", 6000))) {
override def clusterSlots() = Future(_clusterSlots)
override def makeConnection(server: RedisServer) = (server, RedisConnection(TestProbe().ref, Ref(true)))
}

def clusterSlot(nodeId: String, port: Int, begin: Int, end: Int) =
ClusterSlot(begin, end, ClusterNode("127.0.0.1", port, "1"), Nil)

def checkSlotMaps() = {
redisCluster.redisServerConnections.keySet.toSet mustEqual _clusterSlots.map(_.master.hostAndPort).toSet
redisCluster.clusterSlotsRef.single.get.map(_.keySet.toSet) mustEqual Some(_clusterSlots.toSet)
_clusterSlots.foreach { slots =>
val connection = redisCluster.redisServerConnections.get(slots.master.hostAndPort)
(slots.begin to slots.end).foreach { slot =>
redisCluster.getClusterAndConnection(slot).map(_._2) mustEqual (connection)
}
}
success
}

"redis cluster" should {

"add new nodes" in {
_clusterSlots = Seq(
clusterSlot("1", 6000, 0, 4095),
clusterSlot("2", 6001, 4096, 8191),
clusterSlot("1", 6000, 8192, 12287),
clusterSlot("2", 6001, 12288, 16383)
)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
checkSlotMaps()
}

"remove unused nodes" in {
_clusterSlots = Seq(
clusterSlot("2", 6001, 0, 16383)
)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds)
checkSlotMaps()
}
}
}