Skip to content

Commit

Permalink
Parse responses to ClientInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
drmarjanovic committed Apr 9, 2023
1 parent d578ba6 commit ff85b9d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 40 deletions.
18 changes: 17 additions & 1 deletion modules/redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,26 @@ object Output {
}
}

case object ClientInfoOutput extends Output[ClientInfo] {
protected def tryDecode(respValue: RespValue): ClientInfo =
respValue match {
case RespValue.BulkString(s) => ClientInfo.from(s.asString)
case other => throw ProtocolError(s"$other isn't a bulk string")
}
}

case object ClientsInfoOutput extends Output[ClientsInfo] {
protected def tryDecode(respValue: RespValue): ClientsInfo =
respValue match {
case RespValue.BulkString(s) => ClientInfo.from(s.asString.split("\r\n").filter(_.nonEmpty))
case other => throw ProtocolError(s"$other isn't a bulk string")
}
}

case object ClientTrackingInfoOutput extends Output[ClientTrackingInfo] {
protected def tryDecode(respValue: RespValue): ClientTrackingInfo =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.NullArray => throw ProtocolError("Array must not be empty")
case RespValue.Array(values) if values.length % 2 == 0 =>
val fields = values.toList
.grouped(2)
Expand Down
8 changes: 4 additions & 4 deletions modules/redis/src/main/scala/zio/redis/api/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ trait Connection extends RedisEnvironment {
* @return
* a unique string composed of 'property=value' fields separated by a space character.
*/
final def clientInfo: IO[RedisError, String] = {
val command = RedisCommand(ClientInfo, NoInput, MultiStringOutput, executor)
final def clientInfo: IO[RedisError, ClientInfo] = {
val command = RedisCommand(ClientInfo, NoInput, ClientInfoOutput, executor)

command.run(())
}
Expand Down Expand Up @@ -158,12 +158,12 @@ trait Connection extends RedisEnvironment {
final def clientList(
clientType: Option[ClientType] = None,
clientIds: Option[(Long, List[Long])] = None
): IO[RedisError, String] = {
): IO[RedisError, ClientsInfo] = {
val command =
RedisCommand(
ClientList,
Tuple2(OptionalInput(ClientTypeInput), OptionalInput(IdsInput)),
MultiStringOutput,
ClientsInfoOutput,
executor
)

Expand Down
109 changes: 81 additions & 28 deletions modules/redis/src/main/scala/zio/redis/options/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,34 @@ trait Connection {
private[redis] final def asString: String = s"${ip.getHostAddress}:$port"
}

object Address {
private[redis] final def fromString(addr: String): Option[Address] =
addr.split(":").toList match {
case ip :: port :: Nil => Some(new Address(InetAddress.getByName(ip), port.toInt))
case _ => None
}
}

sealed case class ClientEvents(readable: Boolean = false, writable: Boolean = false)

sealed trait ClientFlag

object ClientFlag {
case object ToBeClosedAsap extends ClientFlag

case object Blocked extends ClientFlag

case object ToBeClosedAfterReply extends ClientFlag

case object WatchedKeysModified extends ClientFlag

case object IsMaster extends ClientFlag

case object MonitorMode extends ClientFlag

case object PubSub extends ClientFlag

case object ReadOnlyMode extends ClientFlag

case object Replica extends ClientFlag

case object Unblocked extends ClientFlag

case object UnixDomainSocket extends ClientFlag

case object MultiExecContext extends ClientFlag

case object KeysTrackingEnabled extends ClientFlag

case object Blocked extends ClientFlag
case object BroadcastTrackingMode extends ClientFlag
case object IsMaster extends ClientFlag
case object KeysTrackingEnabled extends ClientFlag
case object MonitorMode extends ClientFlag
case object MultiExecContext extends ClientFlag
case object PubSub extends ClientFlag
case object ReadOnlyMode extends ClientFlag
case object Replica extends ClientFlag
case object ToBeClosedAfterReply extends ClientFlag
case object ToBeClosedAsap extends ClientFlag
case object TrackingTargetClientInvalid extends ClientFlag

case object BroadcastTrackingMode extends ClientFlag
case object Unblocked extends ClientFlag
case object UnixDomainSocket extends ClientFlag
case object WatchedKeysModified extends ClientFlag
}

sealed case class ClientInfo(
Expand Down Expand Up @@ -87,6 +81,65 @@ trait Connection {
user: Option[String] = None
)

object ClientInfo {
private[redis] final def from(line: String): ClientInfo = {
def getFlags(flags: String): Set[ClientFlag] = {
def clientFlag(cf: Char): Option[ClientFlag] = cf match {
case 'A' => Some(ClientFlag.ToBeClosedAsap)
case 'b' => Some(ClientFlag.Blocked)
case 'B' => Some(ClientFlag.BroadcastTrackingMode)
case 'c' => Some(ClientFlag.ToBeClosedAfterReply)
case 'd' => Some(ClientFlag.WatchedKeysModified)
case 'M' => Some(ClientFlag.IsMaster)
case 'O' => Some(ClientFlag.MonitorMode)
case 'P' => Some(ClientFlag.PubSub)
case 'r' => Some(ClientFlag.ReadOnlyMode)
case 'R' => Some(ClientFlag.TrackingTargetClientInvalid)
case 'S' => Some(ClientFlag.Replica)
case 't' => Some(ClientFlag.KeysTrackingEnabled)
case 'u' => Some(ClientFlag.Unblocked)
case 'U' => Some(ClientFlag.UnixDomainSocket)
case 'x' => Some(ClientFlag.MultiExecContext)
case _ => None
}

flags.foldLeft(Set.empty[ClientFlag])((fs, f) => fs ++ clientFlag(f))
}

val data = line.trim.split(" ").map(_.split("=").toList).collect { case k :: v :: Nil => k -> v }.toMap
val events = data("events")
new ClientInfo(
id = data("id").toLong,
name = data.get("name"),
address = data.get("addr").flatMap(Address.fromString),
localAddress = data.get("laddr").flatMap(Address.fromString),
fileDescriptor = data.get("fd").map(_.toLong),
age = data.get("age").map(s => Duration.fromSeconds(s.toLong)),
idle = data.get("idle").map(s => Duration.fromSeconds(s.toLong)),
flags = data.get("flags").fold(Set.empty[ClientFlag])(getFlags),
databaseId = data.get("id").map(_.toLong),
subscriptions = data("sub").toInt,
patternSubscriptions = data("psub").toInt,
multiCommands = data("multi").toInt,
queryBufferLength = data.get("qbuf").map(_.toInt),
queryBufferFree = data.get("qbuf-free").map(_.toInt),
outputListLength = data.get("oll").map(_.toInt),
outputBufferMem = data.get("omem").map(_.toLong),
events = ClientEvents(readable = events.contains("r"), writable = events.contains("w")),
lastCommand = data.get("cmd"),
argvMemory = data.get("argv-mem").map(_.toLong),
totalMemory = data.get("total-mem").map(_.toLong),
redirectionClientId = data.get("redir").map(_.toLong),
user = data.get("user")
)
}

private[redis] final def from(lines: Array[String]): ClientsInfo =
lines.map(from).toList
}

type ClientsInfo = List[ClientInfo]

sealed trait ClientKillFilter

object ClientKillFilter {
Expand Down
16 changes: 9 additions & 7 deletions modules/redis/src/test/scala/zio/redis/ConnectionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ trait ConnectionSpec extends BaseSpec {
suite("clientInfo")(
test("get client info") {
for {
info <- ZIO.serviceWithZIO[Redis](_.clientInfo)
} yield assert(info)(isNonEmptyString)
redis <- ZIO.service[Redis]
id <- redis.clientId
info <- ZIO.serviceWithZIO[Redis](_.clientInfo)
} yield assert(info.id)(equalTo(id))
}
),
suite("clientKill")(
Expand All @@ -76,23 +78,23 @@ trait ConnectionSpec extends BaseSpec {
test("get clients' info") {
for {
info <- ZIO.serviceWithZIO[Redis](_.clientList())
} yield assert(info)(isNonEmptyString)
} yield assert(info)(isNonEmpty)
},
test("get clients' info filtered by type") {
for {
redis <- ZIO.service[Redis]
infoNormal <- redis.clientList(Some(ClientType.Normal))
infoPubSub <- redis.clientList(Some(ClientType.PubSub))
} yield assert(infoNormal)(isNonEmptyString) && assert(infoPubSub)(isEmptyString)
} yield assert(infoNormal)(isNonEmpty) && assert(infoPubSub)(isEmpty)
},
test("get clients' info filtered by client IDs") {
for {
redis <- ZIO.service[Redis]
id <- redis.clientId
nonExistingId = id + 1
info <- ZIO.serviceWithZIO[Redis](_.clientList(clientIds = Some((id, Nil))))
infoNonExisting <- ZIO.serviceWithZIO[Redis](_.clientList(clientIds = Some((nonExistingId, Nil))))
} yield assert(info)(isNonEmptyString) && assert(infoNonExisting)(isEmptyString)
info <- redis.clientList(clientIds = Some((id, Nil)))
infoNonExisting <- redis.clientList(clientIds = Some((nonExistingId, Nil)))
} yield assert(info)(isNonEmpty) && assert(info.head.id)(equalTo(id)) && assert(infoNonExisting)(isEmpty)
}
),
suite("clientGetRedir")(
Expand Down

0 comments on commit ff85b9d

Please sign in to comment.